Skip to content

Commit

Permalink
Merge pull request #84 from qld-gov-au/develop
Browse files Browse the repository at this point in the history
Develop to master
  • Loading branch information
ThrawnCA authored Feb 2, 2024
2 parents afc45b7 + 967615e commit 497048d
Show file tree
Hide file tree
Showing 19 changed files with 203 additions and 83 deletions.
5 changes: 5 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,11 @@ Configuration:

See the extension's `config_declaration.yaml <ckanext/xloader/config_declaration.yaml>`_ file.

This plugin also supports the `ckan.download_proxy` setting, to use a proxy server when downloading files.
This setting is shared with other plugins that download resource files, such as ckanext-archiver. Eg:

ckan.download_proxy = http://my-proxy:1234/

You may also wish to configure the database to use your preferred date input style on COPY.
For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT)
expect European (day-first) dates, you could add to ``postgresql.conf``:
Expand Down
12 changes: 9 additions & 3 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ def xloader_submit(context, data_dict):
:rtype: bool
'''
p.toolkit.check_access('xloader_submit', context, data_dict)
custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME)
schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema())
data_dict, errors = _validate(data_dict, schema, context)
if errors:
raise p.toolkit.ValidationError(errors)

p.toolkit.check_access('xloader_submit', context, data_dict)

res_id = data_dict['resource_id']
try:
resource_dict = p.toolkit.get_action('resource_show')(context, {
Expand Down Expand Up @@ -152,6 +152,10 @@ def xloader_submit(context, data_dict):
'original_url': resource_dict.get('url'),
}
}
if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
# Don't automatically retry if it's a custom run
data['metadata']['tries'] = jobs.MAX_RETRIES

# Expand timeout for resources that have to be type-guessed
timeout = config.get(
'ckanext.xloader.job_timeout',
Expand All @@ -160,7 +164,9 @@ def xloader_submit(context, data_dict):

try:
job = enqueue_job(
jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout)
jobs.xloader_data_into_datastore, [data], queue=custom_queue,
title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id),
rq_kwargs=dict(timeout=timeout)
)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
Expand Down
7 changes: 7 additions & 0 deletions ckanext/xloader/auth.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
from ckan import authz
from ckan.lib import jobs as rq_jobs

import ckanext.datastore.logic.auth as auth


def xloader_submit(context, data_dict):
# only sysadmins can specify a custom processing queue
custom_queue = data_dict.get('queue')
if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME:
return authz.is_authorized('config_option_update', context, data_dict)
return auth.datastore_auth(context, data_dict)


Expand Down
11 changes: 7 additions & 4 deletions ckanext/xloader/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,26 @@ def status():
@click.argument(u'dataset-spec')
@click.option('-y', is_flag=True, default=False, help='Always answer yes to questions')
@click.option('--dry-run', is_flag=True, default=False, help='Don\'t actually submit any resources')
def submit(dataset_spec, y, dry_run):
@click.option('--queue', help='Queue name for asynchronous processing, unused if executing immediately')
@click.option('--sync', is_flag=True, default=False,
help='Execute immediately instead of enqueueing for asynchronous processing')
def submit(dataset_spec, y, dry_run, queue, sync):
"""
xloader submit [options] <dataset-spec>
"""
cmd = XloaderCmd(dry_run)

if dataset_spec == 'all':
cmd._setup_xloader_logger()
cmd._submit_all()
cmd._submit_all(sync=sync, queue=queue)
elif dataset_spec == 'all-existing':
_confirm_or_abort(y, dry_run)
cmd._setup_xloader_logger()
cmd._submit_all_existing()
cmd._submit_all_existing(sync=sync, queue=queue)
else:
pkg_name_or_id = dataset_spec
cmd._setup_xloader_logger()
cmd._submit_package(pkg_name_or_id)
cmd._submit_package(pkg_name_or_id, sync=sync, queue=queue)

if cmd.error_occured:
print('Finished but saw errors - see above for details')
Expand Down
50 changes: 32 additions & 18 deletions ckanext/xloader/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
import logging
import ckan.plugins.toolkit as tk

from ckanext.xloader.jobs import xloader_data_into_datastore_
from ckanext.xloader.utils import XLoaderFormats


Expand All @@ -23,7 +25,7 @@ def _setup_xloader_logger(self):
logger.setLevel(logging.DEBUG)
logger.propagate = False # in case the config

def _submit_all_existing(self):
def _submit_all_existing(self, sync=False, queue=None):
from ckanext.datastore.backend \
import get_all_resources_ids_in_datastore
resource_ids = get_all_resources_ids_in_datastore()
Expand All @@ -38,9 +40,9 @@ def _submit_all_existing(self):
print(' Skipping resource {} found in datastore but not in '
'metadata'.format(resource_id))
continue
self._submit_resource(resource_dict, user, indent=2)
self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue)

def _submit_all(self):
def _submit_all(self, sync=False, queue=None):
# submit every package
# for each package in the package list,
# submit each resource w/ _submit_package
Expand All @@ -51,9 +53,9 @@ def _submit_all(self):
user = tk.get_action('get_site_user')(
{'ignore_auth': True}, {})
for p_id in package_list:
self._submit_package(p_id, user, indent=2)
self._submit_package(p_id, user, indent=2, sync=sync, queue=queue)

def _submit_package(self, pkg_id, user=None, indent=0):
def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None):
indentation = ' ' * indent
if not user:
user = tk.get_action('get_site_user')(
Expand All @@ -73,15 +75,15 @@ def _submit_package(self, pkg_id, user=None, indent=0):
for resource in pkg['resources']:
try:
resource['package_name'] = pkg['name'] # for debug output
self._submit_resource(resource, user, indent=indent + 2)
self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue)
except Exception as e:
self.error_occured = True
print(e)
print(str(e))
print(indentation + 'ERROR submitting resource "{}" '.format(
resource['id']))
continue

def _submit_resource(self, resource, user, indent=0):
def _submit_resource(self, resource, user, indent=0, sync=False, queue=None):
'''resource: resource dictionary
'''
indentation = ' ' * indent
Expand All @@ -99,23 +101,35 @@ def _submit_resource(self, resource, user, indent=0):
r=resource))
return
dataset_ref = resource.get('package_name', resource['package_id'])
print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n'
print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n'
'{indent} url={r[url]}\n'
'{indent} format={r[format]}'
.format(dataset=dataset_ref, r=resource, indent=indentation))
.format(sync_style='Processing' if sync else 'Submitting',
dataset=dataset_ref, r=resource, indent=indentation))
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
data_dict = {
'resource_id': resource['id'],
'ignore_hash': True,
}
if self.dry_run:
print(indentation + '(not submitted - dry-run)')
return
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
if sync:
data_dict['ckan_url'] = tk.config.get('ckan.site_url')
input_dict = {
'metadata': data_dict,
'api_key': 'TODO'
}
logger = logging.getLogger('ckanext.xloader.cli')
xloader_data_into_datastore_(input_dict, None, logger)
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True
if queue:
data_dict['queue'] = queue
success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict)
if success:
print(indentation + '...ok')
else:
print(indentation + 'ERROR submitting resource')
self.error_occured = True

def print_status(self):
import ckan.lib.jobs as rq_jobs
Expand Down
9 changes: 9 additions & 0 deletions ckanext/xloader/config_declaration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ groups:
type: bool
required: false
legacy_key: ckanext.xloader.just_load_with_messytables
- key: ckanext.xloader.strict_type_guessing
default: True
example: False
description: |
Use with ckanext.xloader.use_type_guessing to set strict true or false
for type guessing. If set to False, the types will always fallback to string type.
Strict means that a type will not be guessed if parsing fails for a single cell in the column.
type: bool
- key: ckanext.xloader.max_type_guessing_length
default: 0
example: 100000
Expand Down
6 changes: 2 additions & 4 deletions ckanext/xloader/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ def xloader_status_description(status):
def is_resource_supported_by_xloader(res_dict, check_access=True):
is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format'))
is_datastore_active = res_dict.get('datastore_active', False)
if check_access:
user_has_access = toolkit.h.check_access('package_update', {'id': res_dict.get('package_id')})
else:
user_has_access = True
user_has_access = not check_access or toolkit.h.check_access('package_update',
{'id':res_dict.get('package_id')})
url_type = res_dict.get('url_type')
if url_type:
try:
Expand Down
66 changes: 36 additions & 30 deletions ckanext/xloader/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
CHUNK_SIZE = 16 * 1024 # 16kb
DOWNLOAD_TIMEOUT = 30

MAX_RETRIES = 1
RETRYABLE_ERRORS = (
errors.DeadlockDetected,
errors.LockNotAvailable,
Expand Down Expand Up @@ -86,26 +87,50 @@ def xloader_data_into_datastore(input):

job_id = get_current_job().id
errored = False

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

db.init(config)
try:
xloader_data_into_datastore_(input, job_dict)
# Store details of the job in the db
db.add_pending_job(job_id, **input)
xloader_data_into_datastore_(input, job_dict, logger)
job_dict['status'] = 'complete'
db.mark_job_as_completed(job_id, job_dict)
except sa.exc.IntegrityError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: job_id %s already exists', job_id)
errored = True
except JobError as e:
db.mark_job_as_errored(job_id, str(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
except Exception as e:
if isinstance(e, RETRYABLE_ERRORS):
tries = job_dict['metadata'].get('tries', 0)
if tries == 0:
if tries < MAX_RETRIES:
tries = tries + 1
log.info("Job %s failed due to temporary error [%s], retrying", job_id, e)
job_dict['status'] = 'pending'
job_dict['metadata']['tries'] = tries + 1
job_dict['metadata']['tries'] = tries
enqueue_job(
xloader_data_into_datastore,
[input],
title="retry xloader_data_into_datastore: resource: {} attempt {}".format(
job_dict['metadata']['resource_id'], tries),
rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT)
)
return None
Expand All @@ -114,7 +139,7 @@ def xloader_data_into_datastore(input):
job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e))
job_dict['status'] = 'error'
job_dict['error'] = str(e)
log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc()))
log.error('xloader error: %s, %s', e, traceback.format_exc())
errored = True
finally:
# job_dict is defined in xloader_hook's docstring
Expand All @@ -125,7 +150,7 @@ def xloader_data_into_datastore(input):
return 'error' if errored else None


def xloader_data_into_datastore_(input, job_dict):
def xloader_data_into_datastore_(input, job_dict, logger):
'''This function:
* downloads the resource (metadata) from CKAN
* downloads the data
Expand All @@ -134,26 +159,6 @@ def xloader_data_into_datastore_(input, job_dict):
(datapusher called this function 'push_to_datastore')
'''
job_id = get_current_job().id
db.init(config)

# Store details of the job in the db
try:
db.add_pending_job(job_id, **input)
except sa.exc.IntegrityError:
raise JobError('job_id {} already exists'.format(job_id))

# Set-up logging to the db
handler = StoringHandler(job_id, input)
level = logging.DEBUG
handler.setLevel(level)
logger = logging.getLogger(job_id)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
# also show logs on stderr
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.DEBUG)

validate_input(input)

data = input['metadata']
Expand Down Expand Up @@ -197,10 +202,11 @@ def direct_load():
loader.calculate_record_count(
resource_id=resource['id'], logger=logger)
set_datastore_active(data, resource, logger)
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
if 'result_url' in input:
job_dict['status'] = 'running_but_viewable'
callback_xloader_hook(result_url=input['result_url'],
api_key=api_key,
job_dict=job_dict)
logger.info('Data now available to users: %s', resource_ckan_url)
loader.create_column_indexes(
fields=fields,
Expand Down
Loading

0 comments on commit 497048d

Please sign in to comment.