Skip to content

Commit

Permalink
Merge pull request #79 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
Develop to master - handle lock conflicts more robustly
  • Loading branch information
ThrawnCA authored Jan 25, 2024
2 parents 3a47541 + fd1ed50 commit afc45b7
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 14 deletions.
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ Configuration:

See the extension's `config_declaration.yaml <ckanext/xloader/config_declaration.yaml>`_ file.

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:

datestyle=ISO,DMY

------------------------
Developer installation
Expand Down
10 changes: 1 addition & 9 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,7 @@ def add_pending_job(job_id, job_type, api_key,
if not metadata:
metadata = {}

conn = ENGINE.connect()
trans = conn.begin()
try:
with ENGINE.begin() as conn:
conn.execute(JOBS_TABLE.insert().values(
job_id=job_id,
job_type=job_type,
Expand Down Expand Up @@ -225,12 +223,6 @@ def add_pending_job(job_id, job_type, api_key,
)
if inserts:
conn.execute(METADATA_TABLE.insert(), inserts)
trans.commit()
except Exception:
trans.rollback()
raise
finally:
conn.close()


class InvalidErrorObjectError(Exception):
Expand Down
30 changes: 25 additions & 5 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@
import traceback
import sys

from psycopg2 import errors
from six.moves.urllib.parse import urlsplit
import requests
from rq import get_current_job
import sqlalchemy as sa

from ckan import model
from ckan.plugins.toolkit import get_action, asbool, ObjectNotFound, config
from ckan.plugins.toolkit import get_action, asbool, enqueue_job, ObjectNotFound, config

from . import db, loader
from .job_exceptions import JobError, HTTPError, DataTooBigError, FileCouldNotBeLoadedError
Expand All @@ -41,6 +42,15 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
errors.ObjectInUse,
)
# Retries can only occur in cases where the datastore entry exists,
# so use the standard timeout
RETRIED_JOB_TIMEOUT = config.get('ckanext.xloader.job_timeout', '3600')


# input = {
# 'api_key': user['apikey'],
Expand Down Expand Up @@ -87,6 +97,19 @@ def xloader_data_into_datastore(input):
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
enqueue_job(
xloader_data_into_datastore,
[input],
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None

db.mark_job_as_errored(
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
Expand Down Expand Up @@ -541,8 +564,7 @@ def __init__(self, task_id, input):
self.input = input

def emit(self, record):
conn = db.ENGINE.connect()
try:
with db.ENGINE.connect() as conn:
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
message = str(record.getMessage())
Expand All @@ -558,8 +580,6 @@ def emit(self, record):
module=module,
funcName=funcName,
lineno=record.lineno))
finally:
conn.close()


class DatetimeJsonEncoder(json.JSONEncoder):
Expand Down
1 change: 1 addition & 0 deletions ckanext/xloader/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def set_resource_metadata(update_dict):
# better fix

q = model.Session.query(model.Resource). \
with_for_update(of=model.Resource). \
filter(model.Resource.id == update_dict['resource_id'])
resource = q.one()

Expand Down

0 comments on commit afc45b7

Please sign in to comment.