diff --git a/README.rst b/README.rst index f5a56de7..95c3015e 100644 --- a/README.rst +++ b/README.rst @@ -191,6 +191,11 @@ Configuration: See the extension's `config_declaration.yaml `_ file. +This plugin also supports the `ckan.download_proxy` setting, to use a proxy server when downloading files. +This setting is shared with other plugins that download resource files, such as ckanext-archiver. Eg: + + ckan.download_proxy = http://my-proxy:1234/ + You may also wish to configure the database to use your preferred date input style on COPY. For example, to make [PostgreSQL](https://www.postgresql.org/docs/current/runtime-config-client.html#RUNTIME-CONFIG-CLIENT-FORMAT) expect European (day-first) dates, you could add to ``postgresql.conf``: diff --git a/ckanext/xloader/action.py b/ckanext/xloader/action.py index e45394a9..c0f3f84f 100644 --- a/ckanext/xloader/action.py +++ b/ckanext/xloader/action.py @@ -46,13 +46,13 @@ def xloader_submit(context, data_dict): :rtype: bool ''' + p.toolkit.check_access('xloader_submit', context, data_dict) + custom_queue = data_dict.pop('queue', rq_jobs.DEFAULT_QUEUE_NAME) schema = context.get('schema', ckanext.xloader.schema.xloader_submit_schema()) data_dict, errors = _validate(data_dict, schema, context) if errors: raise p.toolkit.ValidationError(errors) - p.toolkit.check_access('xloader_submit', context, data_dict) - res_id = data_dict['resource_id'] try: resource_dict = p.toolkit.get_action('resource_show')(context, { @@ -152,6 +152,10 @@ def xloader_submit(context, data_dict): 'original_url': resource_dict.get('url'), } } + if custom_queue != rq_jobs.DEFAULT_QUEUE_NAME: + # Don't automatically retry if it's a custom run + data['metadata']['tries'] = jobs.MAX_RETRIES + # Expand timeout for resources that have to be type-guessed timeout = config.get( 'ckanext.xloader.job_timeout', @@ -160,7 +164,9 @@ def xloader_submit(context, data_dict): try: job = enqueue_job( - jobs.xloader_data_into_datastore, [data], rq_kwargs=dict(timeout=timeout) + jobs.xloader_data_into_datastore, [data], queue=custom_queue, + title="xloader_submit: package: {} resource: {}".format(resource_dict.get('package_id'), res_id), + rq_kwargs=dict(timeout=timeout) ) except Exception: log.exception('Unable to enqueued xloader res_id=%s', res_id) diff --git a/ckanext/xloader/auth.py b/ckanext/xloader/auth.py index c40127a0..2547db6d 100644 --- a/ckanext/xloader/auth.py +++ b/ckanext/xloader/auth.py @@ -1,7 +1,14 @@ +from ckan import authz +from ckan.lib import jobs as rq_jobs + import ckanext.datastore.logic.auth as auth def xloader_submit(context, data_dict): + # only sysadmins can specify a custom processing queue + custom_queue = data_dict.get('queue') + if custom_queue and custom_queue != rq_jobs.DEFAULT_QUEUE_NAME: + return authz.is_authorized('config_option_update', context, data_dict) return auth.datastore_auth(context, data_dict) diff --git a/ckanext/xloader/cli.py b/ckanext/xloader/cli.py index 6b54482a..916db577 100644 --- a/ckanext/xloader/cli.py +++ b/ckanext/xloader/cli.py @@ -26,7 +26,10 @@ def status(): @click.argument(u'dataset-spec') @click.option('-y', is_flag=True, default=False, help='Always answer yes to questions') @click.option('--dry-run', is_flag=True, default=False, help='Don\'t actually submit any resources') -def submit(dataset_spec, y, dry_run): +@click.option('--queue', help='Queue name for asynchronous processing, unused if executing immediately') +@click.option('--sync', is_flag=True, default=False, + help='Execute immediately instead of enqueueing for asynchronous processing') +def submit(dataset_spec, y, dry_run, queue, sync): """ xloader submit [options] """ @@ -34,15 +37,15 @@ def submit(dataset_spec, y, dry_run): if dataset_spec == 'all': cmd._setup_xloader_logger() - cmd._submit_all() + cmd._submit_all(sync=sync, queue=queue) elif dataset_spec == 'all-existing': _confirm_or_abort(y, dry_run) cmd._setup_xloader_logger() - cmd._submit_all_existing() + cmd._submit_all_existing(sync=sync, queue=queue) else: pkg_name_or_id = dataset_spec cmd._setup_xloader_logger() - cmd._submit_package(pkg_name_or_id) + cmd._submit_package(pkg_name_or_id, sync=sync, queue=queue) if cmd.error_occured: print('Finished but saw errors - see above for details') diff --git a/ckanext/xloader/command.py b/ckanext/xloader/command.py index 64b79754..4c0f2d2f 100644 --- a/ckanext/xloader/command.py +++ b/ckanext/xloader/command.py @@ -3,6 +3,8 @@ import sys import logging import ckan.plugins.toolkit as tk + +from ckanext.xloader.jobs import xloader_data_into_datastore_ from ckanext.xloader.utils import XLoaderFormats @@ -23,7 +25,7 @@ def _setup_xloader_logger(self): logger.setLevel(logging.DEBUG) logger.propagate = False # in case the config - def _submit_all_existing(self): + def _submit_all_existing(self, sync=False, queue=None): from ckanext.datastore.backend \ import get_all_resources_ids_in_datastore resource_ids = get_all_resources_ids_in_datastore() @@ -38,9 +40,9 @@ def _submit_all_existing(self): print(' Skipping resource {} found in datastore but not in ' 'metadata'.format(resource_id)) continue - self._submit_resource(resource_dict, user, indent=2) + self._submit_resource(resource_dict, user, indent=2, sync=sync, queue=queue) - def _submit_all(self): + def _submit_all(self, sync=False, queue=None): # submit every package # for each package in the package list, # submit each resource w/ _submit_package @@ -51,9 +53,9 @@ def _submit_all(self): user = tk.get_action('get_site_user')( {'ignore_auth': True}, {}) for p_id in package_list: - self._submit_package(p_id, user, indent=2) + self._submit_package(p_id, user, indent=2, sync=sync, queue=queue) - def _submit_package(self, pkg_id, user=None, indent=0): + def _submit_package(self, pkg_id, user=None, indent=0, sync=False, queue=None): indentation = ' ' * indent if not user: user = tk.get_action('get_site_user')( @@ -73,15 +75,15 @@ def _submit_package(self, pkg_id, user=None, indent=0): for resource in pkg['resources']: try: resource['package_name'] = pkg['name'] # for debug output - self._submit_resource(resource, user, indent=indent + 2) + self._submit_resource(resource, user, indent=indent + 2, sync=sync, queue=queue) except Exception as e: self.error_occured = True - print(e) + print(str(e)) print(indentation + 'ERROR submitting resource "{}" '.format( resource['id'])) continue - def _submit_resource(self, resource, user, indent=0): + def _submit_resource(self, resource, user, indent=0, sync=False, queue=None): '''resource: resource dictionary ''' indentation = ' ' * indent @@ -99,23 +101,35 @@ def _submit_resource(self, resource, user, indent=0): r=resource)) return dataset_ref = resource.get('package_name', resource['package_id']) - print('{indent}Submitting /dataset/{dataset}/resource/{r[id]}\n' + print('{indent}{sync_style} /dataset/{dataset}/resource/{r[id]}\n' '{indent} url={r[url]}\n' '{indent} format={r[format]}' - .format(dataset=dataset_ref, r=resource, indent=indentation)) + .format(sync_style='Processing' if sync else 'Submitting', + dataset=dataset_ref, r=resource, indent=indentation)) + if self.dry_run: + print(indentation + '(not submitted - dry-run)') + return data_dict = { 'resource_id': resource['id'], 'ignore_hash': True, } - if self.dry_run: - print(indentation + '(not submitted - dry-run)') - return - success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict) - if success: - print(indentation + '...ok') + if sync: + data_dict['ckan_url'] = tk.config.get('ckan.site_url') + input_dict = { + 'metadata': data_dict, + 'api_key': 'TODO' + } + logger = logging.getLogger('ckanext.xloader.cli') + xloader_data_into_datastore_(input_dict, None, logger) else: - print(indentation + 'ERROR submitting resource') - self.error_occured = True + if queue: + data_dict['queue'] = queue + success = tk.get_action('xloader_submit')({'user': user['name']}, data_dict) + if success: + print(indentation + '...ok') + else: + print(indentation + 'ERROR submitting resource') + self.error_occured = True def print_status(self): import ckan.lib.jobs as rq_jobs diff --git a/ckanext/xloader/config_declaration.yaml b/ckanext/xloader/config_declaration.yaml index feb1cc9c..66888050 100644 --- a/ckanext/xloader/config_declaration.yaml +++ b/ckanext/xloader/config_declaration.yaml @@ -46,6 +46,15 @@ groups: type: bool required: false legacy_key: ckanext.xloader.just_load_with_messytables + - key: ckanext.xloader.strict_type_guessing + default: True + example: False + description: | + Use with ckanext.xloader.use_type_guessing to set strict true or false + for type guessing. If set to False, the types will always fallback to string type. + + Strict means that a type will not be guessed if parsing fails for a single cell in the column. + type: bool - key: ckanext.xloader.max_type_guessing_length default: 0 example: 100000 diff --git a/ckanext/xloader/helpers.py b/ckanext/xloader/helpers.py index 8b9dee8f..90c70933 100644 --- a/ckanext/xloader/helpers.py +++ b/ckanext/xloader/helpers.py @@ -31,10 +31,8 @@ def xloader_status_description(status): def is_resource_supported_by_xloader(res_dict, check_access=True): is_supported_format = XLoaderFormats.is_it_an_xloader_format(res_dict.get('format')) is_datastore_active = res_dict.get('datastore_active', False) - if check_access: - user_has_access = toolkit.h.check_access('package_update', {'id': res_dict.get('package_id')}) - else: - user_has_access = True + user_has_access = not check_access or toolkit.h.check_access('package_update', + {'id':res_dict.get('package_id')}) url_type = res_dict.get('url_type') if url_type: try: diff --git a/ckanext/xloader/jobs.py b/ckanext/xloader/jobs.py index f1d7467f..3ac8ebba 100644 --- a/ckanext/xloader/jobs.py +++ b/ckanext/xloader/jobs.py @@ -42,6 +42,7 @@ CHUNK_SIZE = 16 * 1024 # 16kb DOWNLOAD_TIMEOUT = 30 +MAX_RETRIES = 1 RETRYABLE_ERRORS = ( errors.DeadlockDetected, errors.LockNotAvailable, @@ -86,26 +87,50 @@ def xloader_data_into_datastore(input): job_id = get_current_job().id errored = False + + # Set-up logging to the db + handler = StoringHandler(job_id, input) + level = logging.DEBUG + handler.setLevel(level) + logger = logging.getLogger(job_id) + handler.setFormatter(logging.Formatter('%(message)s')) + logger.addHandler(handler) + # also show logs on stderr + logger.addHandler(logging.StreamHandler()) + logger.setLevel(logging.DEBUG) + + db.init(config) try: - xloader_data_into_datastore_(input, job_dict) + # Store details of the job in the db + db.add_pending_job(job_id, **input) + xloader_data_into_datastore_(input, job_dict, logger) job_dict['status'] = 'complete' db.mark_job_as_completed(job_id, job_dict) + except sa.exc.IntegrityError as e: + db.mark_job_as_errored(job_id, str(e)) + job_dict['status'] = 'error' + job_dict['error'] = str(e) + log.error('xloader error: job_id %s already exists', job_id) + errored = True except JobError as e: db.mark_job_as_errored(job_id, str(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) + log.error('xloader error: %s, %s', e, traceback.format_exc()) errored = True except Exception as e: if isinstance(e, RETRYABLE_ERRORS): tries = job_dict['metadata'].get('tries', 0) - if tries == 0: + if tries < MAX_RETRIES: + tries = tries + 1 log.info("Job %s failed due to temporary error [%s], retrying", job_id, e) job_dict['status'] = 'pending' - job_dict['metadata']['tries'] = tries + 1 + job_dict['metadata']['tries'] = tries enqueue_job( xloader_data_into_datastore, [input], + title="retry xloader_data_into_datastore: resource: {} attempt {}".format( + job_dict['metadata']['resource_id'], tries), rq_kwargs=dict(timeout=RETRIED_JOB_TIMEOUT) ) return None @@ -114,7 +139,7 @@ def xloader_data_into_datastore(input): job_id, traceback.format_tb(sys.exc_info()[2])[-1] + repr(e)) job_dict['status'] = 'error' job_dict['error'] = str(e) - log.error('xloader error: {0}, {1}'.format(e, traceback.format_exc())) + log.error('xloader error: %s, %s', e, traceback.format_exc()) errored = True finally: # job_dict is defined in xloader_hook's docstring @@ -125,7 +150,7 @@ def xloader_data_into_datastore(input): return 'error' if errored else None -def xloader_data_into_datastore_(input, job_dict): +def xloader_data_into_datastore_(input, job_dict, logger): '''This function: * downloads the resource (metadata) from CKAN * downloads the data @@ -134,26 +159,6 @@ def xloader_data_into_datastore_(input, job_dict): (datapusher called this function 'push_to_datastore') ''' - job_id = get_current_job().id - db.init(config) - - # Store details of the job in the db - try: - db.add_pending_job(job_id, **input) - except sa.exc.IntegrityError: - raise JobError('job_id {} already exists'.format(job_id)) - - # Set-up logging to the db - handler = StoringHandler(job_id, input) - level = logging.DEBUG - handler.setLevel(level) - logger = logging.getLogger(job_id) - handler.setFormatter(logging.Formatter('%(message)s')) - logger.addHandler(handler) - # also show logs on stderr - logger.addHandler(logging.StreamHandler()) - logger.setLevel(logging.DEBUG) - validate_input(input) data = input['metadata'] @@ -197,10 +202,11 @@ def direct_load(): loader.calculate_record_count( resource_id=resource['id'], logger=logger) set_datastore_active(data, resource, logger) - job_dict['status'] = 'running_but_viewable' - callback_xloader_hook(result_url=input['result_url'], - api_key=api_key, - job_dict=job_dict) + if 'result_url' in input: + job_dict['status'] = 'running_but_viewable' + callback_xloader_hook(result_url=input['result_url'], + api_key=api_key, + job_dict=job_dict) logger.info('Data now available to users: %s', resource_ckan_url) loader.create_column_indexes( fields=fields, diff --git a/ckanext/xloader/loader.py b/ckanext/xloader/loader.py index ec7bf249..f0fc71be 100644 --- a/ckanext/xloader/loader.py +++ b/ckanext/xloader/loader.py @@ -3,6 +3,7 @@ import datetime import itertools +from six import text_type as str, binary_type import os import os.path import tempfile @@ -388,7 +389,9 @@ def load_table(table_filepath, resource_id, mimetype='text/csv', logger=None): skip_rows = list(range(1, header_offset + 2)) TYPES, TYPE_MAPPING = get_types() - types = type_guess(stream.sample[1:], types=TYPES, strict=True) + strict_guessing = p.toolkit.asbool( + config.get('ckanext.xloader.strict_type_guessing', True)) + types = type_guess(stream.sample[1:], types=TYPES, strict=strict_guessing) # override with types user requested if existing_info: @@ -469,12 +472,17 @@ def row_iterator(): _TYPE_MAPPING = { + "": 'text', "": 'text', + "": 'text', "": 'text', "": 'numeric', "": 'numeric', "": 'numeric', + "": 'timestamp', "": 'text', + "": 'text', + "": 'text', "": 'text', "": 'numeric', "": 'numeric', @@ -483,7 +491,7 @@ def row_iterator(): def get_types(): - _TYPES = [int, bool, str, datetime.datetime, float, Decimal] + _TYPES = [int, bool, str, binary_type, datetime.datetime, float, Decimal] TYPE_MAPPING = config.get('TYPE_MAPPING', _TYPE_MAPPING) return _TYPES, TYPE_MAPPING diff --git a/ckanext/xloader/parser.py b/ckanext/xloader/parser.py index 812ccd1f..11e756cd 100644 --- a/ckanext/xloader/parser.py +++ b/ckanext/xloader/parser.py @@ -36,7 +36,9 @@ def convert_types(self, extended_rows): cell_type = self.types[cell_index] if self.types else None if cell_type in [Decimal, None]: converted_value = to_number(cell_value) - if converted_value: + # Can't do a simple truthiness check, + # because 0 is a valid numeric result. + if converted_value is not None: row[cell_index] = converted_value continue if cell_type in [datetime.datetime, None]: diff --git a/ckanext/xloader/templates/package/resource_read.html b/ckanext/xloader/templates/package/resource_read.html index 56bf0266..b227a58f 100644 --- a/ckanext/xloader/templates/package/resource_read.html +++ b/ckanext/xloader/templates/package/resource_read.html @@ -6,3 +6,10 @@
  • {% link_for _('DataStore'), named_route='xloader.resource_data', id=pkg.name, resource_id=res.id, class_='btn btn-light', icon='cloud-upload' %}
  • {% endif %} {% endblock %} + +{% block resource_actions_inner %} + {% if h.check_ckan_version(max_version='2.10') and h.is_resource_supported_by_xloader(res) %} +
  • {% link_for _('DataStore'), named_route='xloader.resource_data', id=pkg.name, resource_id=res.id, class_='btn btn-light', icon='cloud-upload' %}
  • + {% endif %} + {{ super() }} +{% endblock %} diff --git a/ckanext/xloader/templates/package/snippets/resource_item.html b/ckanext/xloader/templates/package/snippets/resource_item.html index 37ed457c..70bf99c4 100644 --- a/ckanext/xloader/templates/package/snippets/resource_item.html +++ b/ckanext/xloader/templates/package/snippets/resource_item.html @@ -6,3 +6,10 @@
  • {% link_for _('DataStore'), named_route='xloader.resource_data', id=pkg.name, resource_id=res.id, class_='dropdown-item', icon='cloud-upload' %}
  • {% endif %} {% endblock %} + +{% block resource_item_explore_links %} + {% if h.check_ckan_version(max_version='2.10') and h.is_resource_supported_by_xloader(res) %} +
  • {% link_for _('DataStore'), named_route='xloader.resource_data', id=pkg.name, resource_id=res.id, class_='dropdown-item', icon='cloud-upload' %}
  • + {% endif %} + {{ super() }} +{% endblock %} diff --git a/ckanext/xloader/templates/xloader/resource_data.html b/ckanext/xloader/templates/xloader/resource_data.html index 73b452d8..98027508 100644 --- a/ckanext/xloader/templates/xloader/resource_data.html +++ b/ckanext/xloader/templates/xloader/resource_data.html @@ -6,31 +6,34 @@ {% set show_table = true %} + {% block upload_ds_button %} + {% set action = h.url_for('xloader.resource_data', id=pkg.name, resource_id=res.id) %} +
    + {{ h.csrf_input() if 'csrf_input' in h }} + +
    + {% endblock %} + +
    + {% block delete_ds_button %} {% if res.datastore_active %} {% set delete_action = h.url_for('xloader.delete_datastore_table', id=pkg.id, resource_id=res.id) %} -
    + {{ h.csrf_input() if 'csrf_input' in h }} {% block delete_datastore_button_text %} {{ _('Delete from DataStore') }}{% endblock %} + >{% block delete_datastore_button_text %} {{ _('Delete from DataStore') }}{% endblock %}
    {% endif %} {% endblock %} - {% block upload_ds_button %} -
    - {{ h.csrf_input() if 'csrf_input' in h }} - -
    - {% endblock %} - {% if status.error and status.error.message %} {% set show_table = false %}
    diff --git a/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv b/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv index 9d076602..7f59686c 100644 --- a/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv +++ b/ckanext/xloader/tests/samples/mixed_numeric_string_sample.csv @@ -1,3 +1,3 @@ -Funding agency,Program title,Maximum (indicative) grant amount -DTIS,Accessible Tourism Infrastructure Grants,Five hundred thousand dollars -DTIS,Boosting Accessible Tourism Experiences Grants,5000 +Funding agency,Program title,Maximum (indicative) grant amount +DTIS,Accessible Tourism Infrastructure Grants,Five hundred thousand dollars +DTIS,Boosting Accessible Tourism Experiences Grants,5000 diff --git a/ckanext/xloader/tests/samples/sample_with_blanks.csv b/ckanext/xloader/tests/samples/sample_with_blanks.csv index b53b25db..2b7c415c 100644 --- a/ckanext/xloader/tests/samples/sample_with_blanks.csv +++ b/ckanext/xloader/tests/samples/sample_with_blanks.csv @@ -1,4 +1,4 @@ -Funding agency,Program title,Opening date,Service ID -DTIS,Visitor First Experiences Fund,23/03/2023,63039 -DTIS,First Nations Sport and Recreation Program Round 2,22/03/2023,63040 -,,,63041 +Funding agency,Program title,Opening date,Service ID +DTIS,Visitor First Experiences Fund,23/03/2023,63039 +DTIS,First Nations Sport and Recreation Program Round 2,22/03/2023,63040 +,,,63041 diff --git a/ckanext/xloader/tests/samples/simple-large.csv b/ckanext/xloader/tests/samples/simple-large.csv index 53d3fb24..46c6c3b9 100644 --- a/ckanext/xloader/tests/samples/simple-large.csv +++ b/ckanext/xloader/tests/samples/simple-large.csv @@ -1,4 +1,5 @@ id,text +0,- 1,a 2,b 3,c @@ -49997,4 +49998,4 @@ id,text 49996,x 49997,y 49998,z -49999,a \ No newline at end of file +49999,a diff --git a/ckanext/xloader/tests/test_action.py b/ckanext/xloader/tests/test_action.py index 71f4ad01..8b0e2729 100644 --- a/ckanext/xloader/tests/test_action.py +++ b/ckanext/xloader/tests/test_action.py @@ -4,6 +4,7 @@ except ImportError: import mock +from ckan.plugins.toolkit import NotAuthorized from ckan.tests import helpers, factories from ckanext.xloader.utils import get_xloader_user_apitoken @@ -30,6 +31,25 @@ def test_submit(self): ) assert 1 == enqueue_mock.call_count + def test_submit_to_custom_queue_without_auth(self): + # check that xloader_submit doesn't allow regular users to change queues + user = factories.User() + with pytest.raises(NotAuthorized): + helpers.call_auth( + "xloader_submit", + context=dict(user=user["name"], model=None), + queue='foo', + ) + + def test_submit_to_custom_queue_as_sysadmin(self): + # check that xloader_submit allows sysadmins to change queues + user = factories.Sysadmin() + assert helpers.call_auth( + "xloader_submit", + context=dict(user=user["name"], model=None), + queue='foo', + ) is True + def test_duplicated_submits(self): def submit(res, user): return helpers.call_action( diff --git a/ckanext/xloader/tests/test_loader.py b/ckanext/xloader/tests/test_loader.py index e024b315..e2bf8d7d 100644 --- a/ckanext/xloader/tests/test_loader.py +++ b/ckanext/xloader/tests/test_loader.py @@ -958,6 +958,23 @@ def test_simple(self, Session): assert fields[1].get("info", {}).get("type_override", "") == "numeric" assert fields[2].get("info", {}).get("type_override", "") == "" + def test_simple_large_file(self, Session): + csv_filepath = get_sample_filepath("simple-large.csv") + resource = factories.Resource() + resource_id = resource['id'] + loader.load_table( + csv_filepath, + resource_id=resource_id, + mimetype="text/csv", + logger=logger, + ) + assert self._get_column_types(Session, resource_id) == [ + u"int4", + u"tsvector", + u"numeric", + u"text", + ] + # test disabled by default to avoid adding large file to repo and slow test @pytest.mark.skip def test_boston_311_complete(self): diff --git a/ckanext/xloader/utils.py b/ckanext/xloader/utils.py index a51a7f0a..e7e79984 100644 --- a/ckanext/xloader/utils.py +++ b/ckanext/xloader/utils.py @@ -3,6 +3,8 @@ import json import datetime +from six import text_type as str, binary_type + from ckan import model from ckan.lib import search from collections import defaultdict @@ -11,6 +13,8 @@ import ckan.plugins as p from ckan.plugins.toolkit import config +from .job_exceptions import JobError + # resource.formats accepted by ckanext-xloader. Must be lowercase here. DEFAULT_FORMATS = [ "csv", @@ -33,6 +37,7 @@ def is_it_an_xloader_format(cls, format_): if cls.formats is None: cls._formats = config.get("ckanext.xloader.formats") if cls._formats is not None: + # use config value. preserves empty list as well. cls._formats = cls._formats.lower().split() else: cls._formats = DEFAULT_FORMATS @@ -183,7 +188,7 @@ def headers_guess(rows, tolerance=1): return 0, [] -TYPES = [int, bool, str, datetime.datetime, float, Decimal] +TYPES = [int, bool, str, binary_type, datetime.datetime, float, Decimal] def type_guess(rows, types=TYPES, strict=False): @@ -244,6 +249,8 @@ def type_guess(rows, types=TYPES, strict=False): # element in case of a tie # See: http://stackoverflow.com/a/6783101/214950 guesses_tuples = [(t, guess[t]) for t in types if t in guess] + if not guesses_tuples: + raise JobError('Failed to guess types') _columns.append(max(guesses_tuples, key=lambda t_n: t_n[1])[0]) return _columns