diff --git a/ckanext/dcat/profiles.py b/ckanext/dcat/profiles.py
deleted file mode 100644
index c1ced34f..00000000
--- a/ckanext/dcat/profiles.py
+++ /dev/null
@@ -1,2005 +0,0 @@
-import datetime
-import json
-
-from urllib.parse import quote
-
-from dateutil.parser import parse as parse_date
-
-from ckantoolkit import config
-from ckantoolkit import url_for
-
-import rdflib
-from rdflib import URIRef, BNode, Literal
-from rdflib.namespace import Namespace, RDF, XSD, SKOS, RDFS
-
-from geomet import wkt, InvalidGeoJSONException
-
-from ckan.model.license import LicenseRegister
-from ckan.plugins import toolkit
-from ckan.lib.munge import munge_tag
-from ckan.lib.helpers import resource_formats
-from ckanext.dcat.utils import resource_uri, publisher_uri_organization_fallback, DCAT_EXPOSE_SUBCATALOGS, DCAT_CLEAN_TAGS
-
-DCT = Namespace("http://purl.org/dc/terms/")
-DCAT = Namespace("http://www.w3.org/ns/dcat#")
-DCATAP = Namespace("http://data.europa.eu/r5r/")
-ADMS = Namespace("http://www.w3.org/ns/adms#")
-VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
-FOAF = Namespace("http://xmlns.com/foaf/0.1/")
-SCHEMA = Namespace('http://schema.org/')
-TIME = Namespace('http://www.w3.org/2006/time')
-LOCN = Namespace('http://www.w3.org/ns/locn#')
-GSP = Namespace('http://www.opengis.net/ont/geosparql#')
-OWL = Namespace('http://www.w3.org/2002/07/owl#')
-SPDX = Namespace('http://spdx.org/rdf/terms#')
-
-GEOJSON_IMT = 'https://www.iana.org/assignments/media-types/application/vnd.geo+json'
-
-namespaces = {
- 'dct': DCT,
- 'dcat': DCAT,
- 'dcatap': DCATAP,
- 'adms': ADMS,
- 'vcard': VCARD,
- 'foaf': FOAF,
- 'schema': SCHEMA,
- 'time': TIME,
- 'skos': SKOS,
- 'locn': LOCN,
- 'gsp': GSP,
- 'owl': OWL,
- 'spdx': SPDX,
-}
-
-PREFIX_MAILTO = u'mailto:'
-
-DISTRIBUTION_LICENSE_FALLBACK_CONFIG = 'ckanext.dcat.resource.inherit.license'
-
-
-class URIRefOrLiteral(object):
- '''Helper which creates an URIRef if the value appears to be an http URL,
- or a Literal otherwise. URIRefs are also cleaned using CleanedURIRef.
-
- Like CleanedURIRef, this is a factory class.
- '''
- def __new__(cls, value):
- try:
- stripped_value = value.strip()
- if (isinstance(value, str) and (stripped_value.startswith("http://")
- or stripped_value.startswith("https://"))):
- uri_obj = CleanedURIRef(value)
- # although all invalid chars checked by rdflib should have been quoted, try to serialize
- # the object. If it breaks, use Literal instead.
- uri_obj.n3()
- # URI is fine, return the object
- return uri_obj
- else:
- return Literal(value)
- except Exception:
- # In case something goes wrong: use Literal
- return Literal(value)
-
-
-class CleanedURIRef(object):
- '''Performs some basic URL encoding on value before creating an URIRef object.
-
- This is a factory for URIRef objects, which allows usage as type in graph.add()
- without affecting the resulting node types. That is,
- g.add(..., URIRef) and g.add(..., CleanedURIRef) will result in the exact same node type.
- '''
- @staticmethod
- def _careful_quote(value):
- # only encode this limited subset of characters to avoid more complex URL parsing
- # (e.g. valid ? in query string vs. ? as value).
- # can be applied multiple times, as encoded %xy is left untouched. Therefore, no
- # unquote is necessary beforehand.
- quotechars = ' !"$\'()*,;<>[]{|}\\^`'
- for c in quotechars:
- value = value.replace(c, quote(c))
- return value
-
- def __new__(cls, value):
- if isinstance(value, str):
- value = CleanedURIRef._careful_quote(value.strip())
- return URIRef(value)
-
-
-class RDFProfile(object):
- '''Base class with helper methods for implementing RDF parsing profiles
-
- This class should not be used directly, but rather extended to create
- custom profiles
- '''
-
- def __init__(self, graph, compatibility_mode=False):
- '''Class constructor
-
- Graph is an rdflib.Graph instance.
-
- In compatibility mode, some fields are modified to maintain
- compatibility with previous versions of the ckanext-dcat parsers
- (eg adding the `dcat_` prefix or storing comma separated lists instead
- of JSON dumps).
- '''
-
- self.g = graph
-
- self.compatibility_mode = compatibility_mode
-
- # Cache for mappings of licenses URL/title to ID built when needed in
- # _license().
- self._licenceregister_cache = None
-
- def _datasets(self):
- '''
- Generator that returns all DCAT datasets on the graph
-
- Yields rdflib.term.URIRef objects that can be used on graph lookups
- and queries
- '''
- for dataset in self.g.subjects(RDF.type, DCAT.Dataset):
- yield dataset
-
- def _distributions(self, dataset):
- '''
- Generator that returns all DCAT distributions on a particular dataset
-
- Yields rdflib.term.URIRef objects that can be used on graph lookups
- and queries
- '''
- for distribution in self.g.objects(dataset, DCAT.distribution):
- yield distribution
-
- def _keywords(self, dataset_ref):
- '''
- Returns all DCAT keywords on a particular dataset
- '''
- keywords = self._object_value_list(dataset_ref, DCAT.keyword) or []
- # Split keywords with commas
- keywords_with_commas = [k for k in keywords if ',' in k]
- for keyword in keywords_with_commas:
- keywords.remove(keyword)
- keywords.extend([k.strip() for k in keyword.split(',')])
- return keywords
-
- def _object(self, subject, predicate):
- '''
- Helper for returning the first object for this subject and predicate
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- Returns an rdflib reference (URIRef or BNode) or None if not found
- '''
- for _object in self.g.objects(subject, predicate):
- return _object
- return None
-
- def _object_value(self, subject, predicate):
- '''
- Given a subject and a predicate, returns the value of the object
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- If found, the string representation is returned, else an empty string
- '''
- default_lang = config.get('ckan.locale_default', 'en')
- fallback = ''
- for o in self.g.objects(subject, predicate):
- if isinstance(o, Literal):
- if o.language and o.language == default_lang:
- return str(o)
- # Use first object as fallback if no object with the default language is available
- elif fallback == '':
- fallback = str(o)
- else:
- return str(o)
- return fallback
-
- def _object_value_multiple_predicate(self, subject, predicates):
- '''
- Given a subject and a list of predicates, returns the value of the object
- according to the order in which it was specified.
-
- Both subject and predicates must be rdflib URIRef or BNode objects
-
- If found, the string representation is returned, else an empty string
- '''
- object_value = ''
- for predicate in predicates:
- object_value = self._object_value(subject, predicate)
- if object_value:
- break
-
- return object_value
-
- def _object_value_int(self, subject, predicate):
- '''
- Given a subject and a predicate, returns the value of the object as an
- integer
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- If the value can not be parsed as intger, returns None
- '''
- object_value = self._object_value(subject, predicate)
- if object_value:
- try:
- return int(float(object_value))
- except ValueError:
- pass
- return None
-
- def _object_value_int_list(self, subject, predicate):
- '''
- Given a subject and a predicate, returns the value of the object as a
- list of integers
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- If the value can not be parsed as intger, returns an empty list
- '''
- object_values = []
- for object in self.g.objects(subject, predicate):
- if object:
- try:
- object_values.append(int(float(object)))
- except ValueError:
- pass
- return object_values
-
- def _object_value_list(self, subject, predicate):
- '''
- Given a subject and a predicate, returns a list with all the values of
- the objects
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- If no values found, returns an empty string
- '''
- return [str(o) for o in self.g.objects(subject, predicate)]
-
- def _get_vcard_property_value(self, subject, predicate, predicate_string_property=None):
- '''
- Given a subject, a predicate and a predicate for the simple string property (optional),
- returns the value of the object. Trying to read the value in the following order
- * predicate_string_property
- * predicate
-
- All subject, predicate and predicate_string_property must be rdflib URIRef or BNode objects
-
- If no value is found, returns an empty string
- '''
-
- result = ''
- if predicate_string_property:
- result = self._object_value(subject, predicate_string_property)
-
- if not result:
- obj = self._object(subject, predicate)
- if isinstance(obj, BNode):
- result = self._object_value(obj, VCARD.hasValue)
- else:
- result = self._object_value(subject, predicate)
-
- return result
-
- def _time_interval(self, subject, predicate, dcat_ap_version=1):
- '''
- Returns the start and end date for a time interval object
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- It checks for time intervals defined with DCAT, W3C Time hasBeginning & hasEnd
- and schema.org startDate & endDate.
-
- Note that partial dates will be expanded to the first month / day
- value, eg '1904' -> '1904-01-01'.
-
- Returns a tuple with the start and end date values, both of which
- can be None if not found
- '''
-
- start_date = end_date = None
-
- if dcat_ap_version == 1:
- start_date, end_date = self._read_time_interval_schema_org(subject, predicate)
- if start_date or end_date:
- return start_date, end_date
- return self._read_time_interval_time(subject, predicate)
- elif dcat_ap_version == 2:
- start_date, end_date = self._read_time_interval_dcat(subject, predicate)
- if start_date or end_date:
- return start_date, end_date
- start_date, end_date = self._read_time_interval_time(subject, predicate)
- if start_date or end_date:
- return start_date, end_date
- return self._read_time_interval_schema_org(subject, predicate)
-
- def _read_time_interval_schema_org(self, subject, predicate):
- start_date = end_date = None
-
- for interval in self.g.objects(subject, predicate):
- start_date = self._object_value(interval, SCHEMA.startDate)
- end_date = self._object_value(interval, SCHEMA.endDate)
-
- if start_date or end_date:
- return start_date, end_date
-
- return start_date, end_date
-
- def _read_time_interval_dcat(self, subject, predicate):
- start_date = end_date = None
-
- for interval in self.g.objects(subject, predicate):
- start_date = self._object_value(interval, DCAT.startDate)
- end_date = self._object_value(interval, DCAT.endDate)
-
- if start_date or end_date:
- return start_date, end_date
-
- return start_date, end_date
-
- def _read_time_interval_time(self, subject, predicate):
- start_date = end_date = None
-
- for interval in self.g.objects(subject, predicate):
- start_nodes = [t for t in self.g.objects(interval,
- TIME.hasBeginning)]
- end_nodes = [t for t in self.g.objects(interval,
- TIME.hasEnd)]
- if start_nodes:
- start_date = self._object_value_multiple_predicate(start_nodes[0],
- [TIME.inXSDDateTimeStamp, TIME.inXSDDateTime, TIME.inXSDDate])
- if end_nodes:
- end_date = self._object_value_multiple_predicate(end_nodes[0],
- [TIME.inXSDDateTimeStamp, TIME.inXSDDateTime, TIME.inXSDDate])
-
- if start_date or end_date:
- return start_date, end_date
-
- return start_date, end_date
-
- def _insert_or_update_temporal(self, dataset_dict, key, value):
- temporal = next((item for item in dataset_dict['extras'] if(item['key'] == key)), None)
- if temporal:
- temporal['value'] = value
- else:
- dataset_dict['extras'].append({'key': key , 'value': value})
-
- def _publisher(self, subject, predicate):
- '''
- Returns a dict with details about a dct:publisher entity, a foaf:Agent
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- Examples:
-
-
-
- Publishing Organization for dataset 1
- contact@some.org
- http://some.org
-
-
-
-
- {
- 'uri': 'http://orgs.vocab.org/some-org',
- 'name': 'Publishing Organization for dataset 1',
- 'email': 'contact@some.org',
- 'url': 'http://some.org',
- 'type': 'http://purl.org/adms/publishertype/NonProfitOrganisation',
- }
-
-
-
- {
- 'uri': 'http://publications.europa.eu/resource/authority/corporate-body/EURCOU'
- }
-
- Returns keys for uri, name, email, url and type with the values set to
- an empty string if they could not be found
- '''
-
- publisher = {}
-
- for agent in self.g.objects(subject, predicate):
-
- publisher['uri'] = (str(agent) if isinstance(agent,
- rdflib.term.URIRef) else '')
-
- publisher['name'] = self._object_value(agent, FOAF.name)
-
- publisher['email'] = self._object_value(agent, FOAF.mbox)
-
- publisher['url'] = self._object_value(agent, FOAF.homepage)
-
- publisher['type'] = self._object_value(agent, DCT.type)
-
- return publisher
-
- def _contact_details(self, subject, predicate):
- '''
- Returns a dict with details about a vcard expression
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- Returns keys for uri, name and email with the values set to
- an empty string if they could not be found
- '''
-
- contact = {}
-
- for agent in self.g.objects(subject, predicate):
-
- contact['uri'] = (str(agent) if isinstance(agent,
- rdflib.term.URIRef) else '')
-
- contact['name'] = self._get_vcard_property_value(agent, VCARD.hasFN, VCARD.fn)
-
- contact['email'] = self._without_mailto(self._get_vcard_property_value(agent, VCARD.hasEmail))
-
- return contact
-
- def _parse_geodata(self, spatial, datatype, cur_value):
- '''
- Extract geodata with the given datatype from the spatial data and check if it contains a valid GeoJSON
- or WKT geometry.
-
- Returns the String or None if the value is no valid GeoJSON or WKT geometry.
- '''
- for geometry in self.g.objects(spatial, datatype):
- if (geometry.datatype == URIRef(GEOJSON_IMT) or
- not geometry.datatype):
- try:
- json.loads(str(geometry))
- cur_value = str(geometry)
- except (ValueError, TypeError):
- pass
- if not cur_value and geometry.datatype == GSP.wktLiteral:
- try:
- cur_value = json.dumps(wkt.loads(str(geometry)))
- except (ValueError, TypeError):
- pass
- return cur_value
-
-
- def _spatial(self, subject, predicate):
- '''
- Returns a dict with details about the spatial location
-
- Both subject and predicate must be rdflib URIRef or BNode objects
-
- Returns keys for uri, text or geom with the values set to
- None if they could not be found.
-
- Geometries are always returned in GeoJSON. If only WKT is provided,
- it will be transformed to GeoJSON.
-
- Check the notes on the README for the supported formats:
-
- https://github.com/ckan/ckanext-dcat/#rdf-dcat-to-ckan-dataset-mapping
- '''
-
- uri = None
- text = None
- geom = None
- bbox = None
- cent = None
-
- for spatial in self.g.objects(subject, predicate):
-
- if isinstance(spatial, URIRef):
- uri = str(spatial)
-
- if isinstance(spatial, Literal):
- text = str(spatial)
-
- if (spatial, RDF.type, DCT.Location) in self.g:
- geom = self._parse_geodata(spatial, LOCN.geometry, geom)
- bbox = self._parse_geodata(spatial, DCAT.bbox, bbox)
- cent = self._parse_geodata(spatial, DCAT.centroid, cent)
- for label in self.g.objects(spatial, SKOS.prefLabel):
- text = str(label)
- for label in self.g.objects(spatial, RDFS.label):
- text = str(label)
-
- return {
- 'uri': uri,
- 'text': text,
- 'geom': geom,
- 'bbox': bbox,
- 'centroid': cent,
- }
-
- def _license(self, dataset_ref):
- '''
- Returns a license identifier if one of the distributions license is
- found in CKAN license registry. If no distribution's license matches,
- an empty string is returned.
-
- The first distribution with a license found in the registry is used so
- that if distributions have different licenses we'll only get the first
- one.
- '''
- if self._licenceregister_cache is not None:
- license_uri2id, license_title2id = self._licenceregister_cache
- else:
- license_uri2id = {}
- license_title2id = {}
- for license_id, license in list(LicenseRegister().items()):
- license_uri2id[license.url] = license_id
- license_title2id[license.title] = license_id
- self._licenceregister_cache = license_uri2id, license_title2id
-
- for distribution in self._distributions(dataset_ref):
- # If distribution has a license, attach it to the dataset
- license = self._object(distribution, DCT.license)
- if license:
- # Try to find a matching license comparing URIs, then titles
- license_id = license_uri2id.get(license.toPython())
- if not license_id:
- license_id = license_title2id.get(
- self._object_value(license, DCT.title))
- if license_id:
- return license_id
- return ''
-
- def _access_rights(self, subject, predicate):
- '''
- Returns the rights statement or an empty string if no one is found.
- '''
-
- result = ''
- obj = self._object(subject, predicate)
- if obj:
- if isinstance(obj, BNode) and self._object(obj, RDF.type) == DCT.RightsStatement:
- result = self._object_value(obj, RDFS.label)
- elif isinstance(obj, Literal) or isinstance(obj, URIRef):
- # unicode_safe not include Literal or URIRef
- result = str(obj)
- return result
-
- def _distribution_format(self, distribution, normalize_ckan_format=True):
- '''
- Returns the Internet Media Type and format label for a distribution
-
- Given a reference (URIRef or BNode) to a dcat:Distribution, it will
- try to extract the media type (previously knowm as MIME type), eg
- `text/csv`, and the format label, eg `CSV`
-
- Values for the media type will be checked in the following order:
-
- 1. literal value of dcat:mediaType
- 2. literal value of dct:format if it contains a '/' character
- 3. value of dct:format if it is an instance of dct:IMT, eg:
-
-
-
-
- 4. value of dct:format if it is an URIRef and appears to be an IANA type
-
- Values for the label will be checked in the following order:
-
- 1. literal value of dct:format if it not contains a '/' character
- 2. label of dct:format if it is an instance of dct:IMT (see above)
- 3. value of dct:format if it is an URIRef and doesn't look like an IANA type
-
- If `normalize_ckan_format` is True the label will
- be tried to match against the standard list of formats that is included
- with CKAN core
- (https://github.com/ckan/ckan/blob/master/ckan/config/resource_formats.json)
- This allows for instance to populate the CKAN resource format field
- with a format that view plugins, etc will understand (`csv`, `xml`,
- etc.)
-
- Return a tuple with the media type and the label, both set to None if
- they couldn't be found.
- '''
-
- imt = None
- label = None
-
- imt = self._object_value(distribution, DCAT.mediaType)
-
- _format = self._object(distribution, DCT['format'])
- if isinstance(_format, Literal):
- if not imt and '/' in _format:
- imt = str(_format)
- else:
- label = str(_format)
- elif isinstance(_format, (BNode, URIRef)):
- if self._object(_format, RDF.type) == DCT.IMT:
- if not imt:
- imt = str(self.g.value(_format, default=None))
- label = self._object_value(_format, RDFS.label)
- elif isinstance(_format, URIRef):
- # If the URIRef does not reference a BNode, it could reference an IANA type.
- # Otherwise, use it as label.
- format_uri = str(_format)
- if 'iana.org/assignments/media-types' in format_uri and not imt:
- imt = format_uri
- else:
- label = format_uri
-
- if ((imt or label) and normalize_ckan_format):
-
- format_registry = resource_formats()
-
- if imt in format_registry:
- label = format_registry[imt][1]
- elif label in format_registry:
- label = format_registry[label][1]
-
- return imt, label
-
- def _get_dict_value(self, _dict, key, default=None):
- '''
- Returns the value for the given key on a CKAN dict
-
- By default a key on the root level is checked. If not found, extras
- are checked, both with the key provided and with `dcat_` prepended to
- support legacy fields.
-
- If not found, returns the default value, which defaults to None
- '''
-
- if key in _dict:
- return _dict[key]
-
- for extra in _dict.get('extras', []):
- if extra['key'] == key or extra['key'] == 'dcat_' + key:
- return extra['value']
-
- return default
-
- def _read_list_value(self, value):
- items = []
- # List of values
- if isinstance(value, list):
- items = value
- elif isinstance(value, str):
- try:
- items = json.loads(value)
- if isinstance(items, ((int, float, complex))):
- items = [items] # JSON list
- except ValueError:
- if ',' in value:
- # Comma-separated list
- items = value.split(',')
- else:
- items = [value] # Normal text value
- return items
-
- def _add_spatial_value_to_graph(self, spatial_ref, predicate, value):
- '''
- Adds spatial triples to the graph.
- '''
- # GeoJSON
- self.g.add((spatial_ref,
- predicate,
- Literal(value, datatype=GEOJSON_IMT)))
- # WKT, because GeoDCAT-AP says so
- try:
- self.g.add((spatial_ref,
- predicate,
- Literal(wkt.dumps(json.loads(value),
- decimals=4),
- datatype=GSP.wktLiteral)))
- except (TypeError, ValueError, InvalidGeoJSONException):
- pass
-
- def _add_spatial_to_dict(self, dataset_dict, key, spatial):
- if spatial.get(key):
- dataset_dict['extras'].append(
- {'key': 'spatial_{0}'.format(key) if key != 'geom' else 'spatial',
- 'value': spatial.get(key)})
-
- def _get_dataset_value(self, dataset_dict, key, default=None):
- '''
- Returns the value for the given key on a CKAN dict
-
- Check `_get_dict_value` for details
- '''
- return self._get_dict_value(dataset_dict, key, default)
-
- def _get_resource_value(self, resource_dict, key, default=None):
- '''
- Returns the value for the given key on a CKAN dict
-
- Check `_get_dict_value` for details
- '''
- return self._get_dict_value(resource_dict, key, default)
-
- def _add_date_triples_from_dict(self, _dict, subject, items):
- self._add_triples_from_dict(_dict, subject, items,
- date_value=True)
-
- def _add_list_triples_from_dict(self, _dict, subject, items):
- self._add_triples_from_dict(_dict, subject, items,
- list_value=True)
-
- def _add_triples_from_dict(self, _dict, subject, items,
- list_value=False,
- date_value=False):
- for item in items:
- key, predicate, fallbacks, _type = item
- self._add_triple_from_dict(_dict, subject, predicate, key,
- fallbacks=fallbacks,
- list_value=list_value,
- date_value=date_value,
- _type=_type)
-
- def _add_triple_from_dict(self, _dict, subject, predicate, key,
- fallbacks=None,
- list_value=False,
- date_value=False,
- _type=Literal,
- _datatype=None,
- value_modifier=None):
- '''
- Adds a new triple to the graph with the provided parameters
-
- The subject and predicate of the triple are passed as the relevant
- RDFLib objects (URIRef or BNode). As default, the object is a
- literal value, which is extracted from the dict using the provided key
- (see `_get_dict_value`). If the value for the key is not found, then
- additional fallback keys are checked.
- Using `value_modifier`, a function taking the extracted value and
- returning a modified value can be passed.
- If a value was found, the modifier is applied before adding the value.
-
- If `list_value` or `date_value` are True, then the value is treated as
- a list or a date respectively (see `_add_list_triple` and
- `_add_date_triple` for details.
- '''
- value = self._get_dict_value(_dict, key)
- if not value and fallbacks:
- for fallback in fallbacks:
- value = self._get_dict_value(_dict, fallback)
- if value:
- break
-
- # if a modifying function was given, apply it to the value
- if value and callable(value_modifier):
- value = value_modifier(value)
-
- if value and list_value:
- self._add_list_triple(subject, predicate, value, _type, _datatype)
- elif value and date_value:
- self._add_date_triple(subject, predicate, value, _type)
- elif value:
- # Normal text value
- # ensure URIRef items are preprocessed (space removal/url encoding)
- if _type == URIRef:
- _type = CleanedURIRef
- if _datatype:
- object = _type(value, datatype=_datatype)
- else:
- object = _type(value)
- self.g.add((subject, predicate, object))
-
- def _add_list_triple(self, subject, predicate, value, _type=Literal, _datatype=None):
- '''
- Adds as many triples to the graph as values
-
- Values are literal strings, if `value` is a list, one for each
- item. If `value` is a string there is an attempt to split it using
- commas, to support legacy fields.
- '''
- items = self._read_list_value(value)
-
- for item in items:
- # ensure URIRef items are preprocessed (space removal/url encoding)
- if _type == URIRef:
- _type = CleanedURIRef
- if _datatype:
- object = _type(item, datatype=_datatype)
- else:
- object = _type(item)
- self.g.add((subject, predicate, object))
-
- def _add_date_triple(self, subject, predicate, value, _type=Literal):
- '''
- Adds a new triple with a date object
-
- Dates are parsed using dateutil, and if the date obtained is correct,
- added to the graph as an XSD.dateTime value.
-
- If there are parsing errors, the literal string value is added.
- '''
- if not value:
- return
- try:
- default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
- _date = parse_date(value, default=default_datetime)
-
- self.g.add((subject, predicate, _type(_date.isoformat(),
- datatype=XSD.dateTime)))
- except ValueError:
- self.g.add((subject, predicate, _type(value)))
-
- def _last_catalog_modification(self):
- '''
- Returns the date and time the catalog was last modified
-
- To be more precise, the most recent value for `metadata_modified` on a
- dataset.
-
- Returns a dateTime string in ISO format, or None if it could not be
- found.
- '''
- context = {
- 'ignore_auth': True
- }
- result = toolkit.get_action('package_search')(context, {
- 'sort': 'metadata_modified desc',
- 'rows': 1,
- })
- if result and result.get('results'):
- return result['results'][0]['metadata_modified']
- return None
-
- def _add_mailto(self, mail_addr):
- '''
- Ensures that the mail address has an URIRef-compatible mailto: prefix.
- Can be used as modifier function for `_add_triple_from_dict`.
- '''
- if mail_addr:
- return PREFIX_MAILTO + self._without_mailto(mail_addr)
- else:
- return mail_addr
-
- def _without_mailto(self, mail_addr):
- '''
- Ensures that the mail address string has no mailto: prefix.
- '''
- if mail_addr:
- return str(mail_addr).replace(PREFIX_MAILTO, u'')
- else:
- return mail_addr
-
- def _get_source_catalog(self, dataset_ref):
- '''
- Returns Catalog reference that is source for this dataset.
-
- Catalog referenced in dct:hasPart is returned,
- if dataset is linked there, otherwise main catalog
- will be returned.
-
- This will not be used if ckanext.dcat.expose_subcatalogs
- configuration option is set to False.
- '''
- if not toolkit.asbool(config.get(DCAT_EXPOSE_SUBCATALOGS, False)):
- return
- catalogs = set(self.g.subjects(DCAT.dataset, dataset_ref))
- root = self._get_root_catalog_ref()
- try:
- catalogs.remove(root)
- except KeyError:
- pass
- assert len(catalogs) in (0, 1,), "len %s" %catalogs
- if catalogs:
- return catalogs.pop()
- return root
-
- def _get_root_catalog_ref(self):
- roots = list(self.g.subjects(DCT.hasPart))
- if not roots:
- roots = list(self.g.subjects(RDF.type, DCAT.Catalog))
- return roots[0]
-
- def _get_or_create_spatial_ref(self, dataset_dict, dataset_ref):
- for spatial_ref in self.g.objects(dataset_ref, DCT.spatial):
- if spatial_ref:
- return spatial_ref
-
- # Create new spatial_ref
- spatial_uri = self._get_dataset_value(dataset_dict, 'spatial_uri')
- if spatial_uri:
- spatial_ref = CleanedURIRef(spatial_uri)
- else:
- spatial_ref = BNode()
- self.g.add((spatial_ref, RDF.type, DCT.Location))
- self.g.add((dataset_ref, DCT.spatial, spatial_ref))
- return spatial_ref
-
- # Public methods for profiles to implement
-
- def parse_dataset(self, dataset_dict, dataset_ref):
- '''
- Creates a CKAN dataset dict from the RDF graph
-
- The `dataset_dict` is passed to all the loaded profiles before being
- yielded, so it can be further modified by each one of them.
- `dataset_ref` is an rdflib URIRef object
- that can be used to reference the dataset when querying the graph.
-
- Returns a dataset dict that can be passed to eg `package_create`
- or `package_update`
- '''
- return dataset_dict
-
- def _extract_catalog_dict(self, catalog_ref):
- '''
- Returns list of key/value dictionaries with catalog
- '''
-
- out = []
- sources = (('source_catalog_title', DCT.title,),
- ('source_catalog_description', DCT.description,),
- ('source_catalog_homepage', FOAF.homepage,),
- ('source_catalog_language', DCT.language,),
- ('source_catalog_modified', DCT.modified,),)
-
- for key, predicate in sources:
- val = self._object_value(catalog_ref, predicate)
- if val:
- out.append({'key': key, 'value': val})
-
- out.append({'key': 'source_catalog_publisher', 'value': json.dumps(self._publisher(catalog_ref, DCT.publisher))})
- return out
-
- def graph_from_catalog(self, catalog_dict, catalog_ref):
- '''
- Creates an RDF graph for the whole catalog (site)
-
- The class RDFLib graph (accessible via `self.g`) should be updated on
- this method
-
- `catalog_dict` is a dict that can contain literal values for the
- dcat:Catalog class like `title`, `homepage`, etc. `catalog_ref` is an
- rdflib URIRef object that must be used to reference the catalog when
- working with the graph.
- '''
- pass
-
- def graph_from_dataset(self, dataset_dict, dataset_ref):
- '''
- Given a CKAN dataset dict, creates an RDF graph
-
- The class RDFLib graph (accessible via `self.g`) should be updated on
- this method
-
- `dataset_dict` is a dict with the dataset metadata like the one
- returned by `package_show`. `dataset_ref` is an rdflib URIRef object
- that must be used to reference the dataset when working with the graph.
- '''
- pass
-
-
-class EuropeanDCATAPProfile(RDFProfile):
- '''
- An RDF profile based on the DCAT-AP for data portals in Europe
-
- More information and specification:
-
- https://joinup.ec.europa.eu/asset/dcat_application_profile
-
- '''
-
- def parse_dataset(self, dataset_dict, dataset_ref):
-
- dataset_dict['extras'] = []
- dataset_dict['resources'] = []
-
- # Basic fields
- for key, predicate in (
- ('title', DCT.title),
- ('notes', DCT.description),
- ('url', DCAT.landingPage),
- ('version', OWL.versionInfo),
- ):
- value = self._object_value(dataset_ref, predicate)
- if value:
- dataset_dict[key] = value
-
- if not dataset_dict.get('version'):
- # adms:version was supported on the first version of the DCAT-AP
- value = self._object_value(dataset_ref, ADMS.version)
- if value:
- dataset_dict['version'] = value
-
- # Tags
- # replace munge_tag to noop if there's no need to clean tags
- do_clean = toolkit.asbool(config.get(DCAT_CLEAN_TAGS, False))
- tags_val = [munge_tag(tag) if do_clean else tag for tag in self._keywords(dataset_ref)]
- tags = [{'name': tag} for tag in tags_val]
- dataset_dict['tags'] = tags
-
- # Extras
-
- # Simple values
- for key, predicate in (
- ('issued', DCT.issued),
- ('modified', DCT.modified),
- ('identifier', DCT.identifier),
- ('version_notes', ADMS.versionNotes),
- ('frequency', DCT.accrualPeriodicity),
- ('provenance', DCT.provenance),
- ('dcat_type', DCT.type),
- ):
- value = self._object_value(dataset_ref, predicate)
- if value:
- dataset_dict['extras'].append({'key': key, 'value': value})
-
- # Lists
- for key, predicate, in (
- ('language', DCT.language),
- ('theme', DCAT.theme),
- ('alternate_identifier', ADMS.identifier),
- ('conforms_to', DCT.conformsTo),
- ('documentation', FOAF.page),
- ('related_resource', DCT.relation),
- ('has_version', DCT.hasVersion),
- ('is_version_of', DCT.isVersionOf),
- ('source', DCT.source),
- ('sample', ADMS.sample),
- ):
- values = self._object_value_list(dataset_ref, predicate)
- if values:
- dataset_dict['extras'].append({'key': key,
- 'value': json.dumps(values)})
-
- # Contact details
- contact = self._contact_details(dataset_ref, DCAT.contactPoint)
- if not contact:
- # adms:contactPoint was supported on the first version of DCAT-AP
- contact = self._contact_details(dataset_ref, ADMS.contactPoint)
-
- if contact:
- for key in ('uri', 'name', 'email'):
- if contact.get(key):
- dataset_dict['extras'].append(
- {'key': 'contact_{0}'.format(key),
- 'value': contact.get(key)})
-
- # Publisher
- publisher = self._publisher(dataset_ref, DCT.publisher)
- for key in ('uri', 'name', 'email', 'url', 'type'):
- if publisher.get(key):
- dataset_dict['extras'].append(
- {'key': 'publisher_{0}'.format(key),
- 'value': publisher.get(key)})
-
- # Temporal
- start, end = self._time_interval(dataset_ref, DCT.temporal)
- if start:
- dataset_dict['extras'].append(
- {'key': 'temporal_start', 'value': start})
- if end:
- dataset_dict['extras'].append(
- {'key': 'temporal_end', 'value': end})
-
- # Spatial
- spatial = self._spatial(dataset_ref, DCT.spatial)
- for key in ('uri', 'text', 'geom'):
- self._add_spatial_to_dict(dataset_dict, key, spatial)
-
- # Dataset URI (explicitly show the missing ones)
- dataset_uri = (str(dataset_ref)
- if isinstance(dataset_ref, rdflib.term.URIRef)
- else '')
- dataset_dict['extras'].append({'key': 'uri', 'value': dataset_uri})
-
- # access_rights
- access_rights = self._access_rights(dataset_ref, DCT.accessRights)
- if access_rights:
- dataset_dict['extras'].append({'key': 'access_rights', 'value': access_rights})
-
- # License
- if 'license_id' not in dataset_dict:
- dataset_dict['license_id'] = self._license(dataset_ref)
-
- # Source Catalog
- if toolkit.asbool(config.get(DCAT_EXPOSE_SUBCATALOGS, False)):
- catalog_src = self._get_source_catalog(dataset_ref)
- if catalog_src is not None:
- src_data = self._extract_catalog_dict(catalog_src)
- dataset_dict['extras'].extend(src_data)
-
- # Resources
- for distribution in self._distributions(dataset_ref):
-
- resource_dict = {}
-
- # Simple values
- for key, predicate in (
- ('name', DCT.title),
- ('description', DCT.description),
- ('access_url', DCAT.accessURL),
- ('download_url', DCAT.downloadURL),
- ('issued', DCT.issued),
- ('modified', DCT.modified),
- ('status', ADMS.status),
- ('license', DCT.license),
- ):
- value = self._object_value(distribution, predicate)
- if value:
- resource_dict[key] = value
-
- resource_dict['url'] = (self._object_value(distribution,
- DCAT.downloadURL) or
- self._object_value(distribution,
- DCAT.accessURL))
- # Lists
- for key, predicate in (
- ('language', DCT.language),
- ('documentation', FOAF.page),
- ('conforms_to', DCT.conformsTo),
- ):
- values = self._object_value_list(distribution, predicate)
- if values:
- resource_dict[key] = json.dumps(values)
-
- # rights
- rights = self._access_rights(distribution, DCT.rights)
- if rights:
- resource_dict['rights'] = rights
-
- # Format and media type
- normalize_ckan_format = toolkit.asbool(config.get(
- 'ckanext.dcat.normalize_ckan_format', True))
- imt, label = self._distribution_format(distribution,
- normalize_ckan_format)
-
- if imt:
- resource_dict['mimetype'] = imt
-
- if label:
- resource_dict['format'] = label
- elif imt:
- resource_dict['format'] = imt
-
- # Size
- size = self._object_value_int(distribution, DCAT.byteSize)
- if size is not None:
- resource_dict['size'] = size
-
- # Checksum
- for checksum in self.g.objects(distribution, SPDX.checksum):
- algorithm = self._object_value(checksum, SPDX.algorithm)
- checksum_value = self._object_value(checksum, SPDX.checksumValue)
- if algorithm:
- resource_dict['hash_algorithm'] = algorithm
- if checksum_value:
- resource_dict['hash'] = checksum_value
-
- # Distribution URI (explicitly show the missing ones)
- resource_dict['uri'] = (str(distribution)
- if isinstance(distribution,
- rdflib.term.URIRef)
- else '')
-
- # Remember the (internal) distribution reference for referencing in
- # further profiles, e.g. for adding more properties
- resource_dict['distribution_ref'] = str(distribution)
-
- dataset_dict['resources'].append(resource_dict)
-
- if self.compatibility_mode:
- # Tweak the resulting dict to make it compatible with previous
- # versions of the ckanext-dcat parsers
- for extra in dataset_dict['extras']:
- if extra['key'] in ('issued', 'modified', 'publisher_name',
- 'publisher_email',):
-
- extra['key'] = 'dcat_' + extra['key']
-
- if extra['key'] == 'language':
- extra['value'] = ','.join(
- sorted(json.loads(extra['value'])))
-
- return dataset_dict
-
- def graph_from_dataset(self, dataset_dict, dataset_ref):
-
- g = self.g
-
- for prefix, namespace in namespaces.items():
- g.bind(prefix, namespace)
-
- g.add((dataset_ref, RDF.type, DCAT.Dataset))
-
- # Basic fields
- items = [
- ('title', DCT.title, None, Literal),
- ('notes', DCT.description, None, Literal),
- ('url', DCAT.landingPage, None, URIRef),
- ('identifier', DCT.identifier, ['guid', 'id'], URIRefOrLiteral),
- ('version', OWL.versionInfo, ['dcat_version'], Literal),
- ('version_notes', ADMS.versionNotes, None, Literal),
- ('frequency', DCT.accrualPeriodicity, None, URIRefOrLiteral),
- ('access_rights', DCT.accessRights, None, URIRefOrLiteral),
- ('dcat_type', DCT.type, None, Literal),
- ('provenance', DCT.provenance, None, Literal),
- ]
- self._add_triples_from_dict(dataset_dict, dataset_ref, items)
-
- # Tags
- for tag in dataset_dict.get('tags', []):
- g.add((dataset_ref, DCAT.keyword, Literal(tag['name'])))
-
- # Dates
- items = [
- ('issued', DCT.issued, ['metadata_created'], Literal),
- ('modified', DCT.modified, ['metadata_modified'], Literal),
- ]
- self._add_date_triples_from_dict(dataset_dict, dataset_ref, items)
-
- # Lists
- items = [
- ('language', DCT.language, None, URIRefOrLiteral),
- ('theme', DCAT.theme, None, URIRef),
- ('conforms_to', DCT.conformsTo, None, Literal),
- ('alternate_identifier', ADMS.identifier, None, URIRefOrLiteral),
- ('documentation', FOAF.page, None, URIRefOrLiteral),
- ('related_resource', DCT.relation, None, URIRefOrLiteral),
- ('has_version', DCT.hasVersion, None, URIRefOrLiteral),
- ('is_version_of', DCT.isVersionOf, None, URIRefOrLiteral),
- ('source', DCT.source, None, URIRefOrLiteral),
- ('sample', ADMS.sample, None, URIRefOrLiteral),
- ]
- self._add_list_triples_from_dict(dataset_dict, dataset_ref, items)
-
- # Contact details
- if any([
- self._get_dataset_value(dataset_dict, 'contact_uri'),
- self._get_dataset_value(dataset_dict, 'contact_name'),
- self._get_dataset_value(dataset_dict, 'contact_email'),
- self._get_dataset_value(dataset_dict, 'maintainer'),
- self._get_dataset_value(dataset_dict, 'maintainer_email'),
- self._get_dataset_value(dataset_dict, 'author'),
- self._get_dataset_value(dataset_dict, 'author_email'),
- ]):
-
- contact_uri = self._get_dataset_value(dataset_dict, 'contact_uri')
- if contact_uri:
- contact_details = CleanedURIRef(contact_uri)
- else:
- contact_details = BNode()
-
- g.add((contact_details, RDF.type, VCARD.Organization))
- g.add((dataset_ref, DCAT.contactPoint, contact_details))
-
- self._add_triple_from_dict(
- dataset_dict, contact_details,
- VCARD.fn, 'contact_name', ['maintainer', 'author']
- )
- # Add mail address as URIRef, and ensure it has a mailto: prefix
- self._add_triple_from_dict(
- dataset_dict, contact_details,
- VCARD.hasEmail, 'contact_email', ['maintainer_email',
- 'author_email'],
- _type=URIRef, value_modifier=self._add_mailto
- )
-
- # Publisher
- if any([
- self._get_dataset_value(dataset_dict, 'publisher_uri'),
- self._get_dataset_value(dataset_dict, 'publisher_name'),
- dataset_dict.get('organization'),
- ]):
-
- publisher_uri = self._get_dataset_value(dataset_dict, 'publisher_uri')
- publisher_uri_fallback = publisher_uri_organization_fallback(dataset_dict)
- publisher_name = self._get_dataset_value(dataset_dict, 'publisher_name')
- if publisher_uri:
- publisher_details = CleanedURIRef(publisher_uri)
- elif not publisher_name and publisher_uri_fallback:
- # neither URI nor name are available, use organization as fallback
- publisher_details = CleanedURIRef(publisher_uri_fallback)
- else:
- # No publisher_uri
- publisher_details = BNode()
-
- g.add((publisher_details, RDF.type, FOAF.Organization))
- g.add((dataset_ref, DCT.publisher, publisher_details))
-
- # In case no name and URI are available, again fall back to organization.
- # If no name but an URI is available, the name literal remains empty to
- # avoid mixing organization and dataset values.
- if not publisher_name and not publisher_uri and dataset_dict.get('organization'):
- publisher_name = dataset_dict['organization']['title']
-
- g.add((publisher_details, FOAF.name, Literal(publisher_name)))
- # TODO: It would make sense to fallback these to organization
- # fields but they are not in the default schema and the
- # `organization` object in the dataset_dict does not include
- # custom fields
- items = [
- ('publisher_email', FOAF.mbox, None, Literal),
- ('publisher_url', FOAF.homepage, None, URIRef),
- ('publisher_type', DCT.type, None, URIRefOrLiteral),
- ]
-
- self._add_triples_from_dict(dataset_dict, publisher_details, items)
-
- # Temporal
- start = self._get_dataset_value(dataset_dict, 'temporal_start')
- end = self._get_dataset_value(dataset_dict, 'temporal_end')
- if start or end:
- temporal_extent = BNode()
-
- g.add((temporal_extent, RDF.type, DCT.PeriodOfTime))
- if start:
- self._add_date_triple(temporal_extent, SCHEMA.startDate, start)
- if end:
- self._add_date_triple(temporal_extent, SCHEMA.endDate, end)
- g.add((dataset_ref, DCT.temporal, temporal_extent))
-
- # Spatial
- spatial_text = self._get_dataset_value(dataset_dict, 'spatial_text')
- spatial_geom = self._get_dataset_value(dataset_dict, 'spatial')
-
- if spatial_text or spatial_geom:
- spatial_ref = self._get_or_create_spatial_ref(dataset_dict, dataset_ref)
-
- if spatial_text:
- g.add((spatial_ref, SKOS.prefLabel, Literal(spatial_text)))
-
- if spatial_geom:
- self._add_spatial_value_to_graph(spatial_ref, LOCN.geometry, spatial_geom)
-
- # Use fallback license if set in config
- resource_license_fallback = None
- if toolkit.asbool(config.get(DISTRIBUTION_LICENSE_FALLBACK_CONFIG, False)):
- if 'license_id' in dataset_dict and isinstance(URIRefOrLiteral(dataset_dict['license_id']), URIRef):
- resource_license_fallback = dataset_dict['license_id']
- elif 'license_url' in dataset_dict and isinstance(URIRefOrLiteral(dataset_dict['license_url']), URIRef):
- resource_license_fallback = dataset_dict['license_url']
-
- # Resources
- for resource_dict in dataset_dict.get('resources', []):
-
- distribution = CleanedURIRef(resource_uri(resource_dict))
-
- g.add((dataset_ref, DCAT.distribution, distribution))
-
- g.add((distribution, RDF.type, DCAT.Distribution))
-
- # Simple values
- items = [
- ('name', DCT.title, None, Literal),
- ('description', DCT.description, None, Literal),
- ('status', ADMS.status, None, URIRefOrLiteral),
- ('rights', DCT.rights, None, URIRefOrLiteral),
- ('license', DCT.license, None, URIRefOrLiteral),
- ('access_url', DCAT.accessURL, None, URIRef),
- ('download_url', DCAT.downloadURL, None, URIRef),
- ]
-
- self._add_triples_from_dict(resource_dict, distribution, items)
-
- # Lists
- items = [
- ('documentation', FOAF.page, None, URIRefOrLiteral),
- ('language', DCT.language, None, URIRefOrLiteral),
- ('conforms_to', DCT.conformsTo, None, Literal),
- ]
- self._add_list_triples_from_dict(resource_dict, distribution, items)
-
- # Set default license for distribution if needed and available
- if resource_license_fallback and not (distribution, DCT.license, None) in g:
- g.add((distribution, DCT.license, URIRefOrLiteral(resource_license_fallback)))
-
- # Format
- mimetype = resource_dict.get('mimetype')
- fmt = resource_dict.get('format')
-
- # IANA media types (either URI or Literal) should be mapped as mediaType.
- # In case format is available and mimetype is not set or identical to format,
- # check which type is appropriate.
- if fmt and (not mimetype or mimetype == fmt):
- if ('iana.org/assignments/media-types' in fmt
- or not fmt.startswith('http') and '/' in fmt):
- # output format value as dcat:mediaType instead of dct:format
- mimetype = fmt
- fmt = None
- else:
- # Use dct:format
- mimetype = None
-
- if mimetype:
- g.add((distribution, DCAT.mediaType,
- URIRefOrLiteral(mimetype)))
-
- if fmt:
- g.add((distribution, DCT['format'],
- URIRefOrLiteral(fmt)))
-
-
- # URL fallback and old behavior
- url = resource_dict.get('url')
- download_url = resource_dict.get('download_url')
- access_url = resource_dict.get('access_url')
- # Use url as fallback for access_url if access_url is not set and download_url is not equal
- if url and not access_url:
- if (not download_url) or (download_url and url != download_url):
- self._add_triple_from_dict(resource_dict, distribution, DCAT.accessURL, 'url', _type=URIRef)
-
- # Dates
- items = [
- ('issued', DCT.issued, ['created'], Literal),
- ('modified', DCT.modified, ['metadata_modified'], Literal),
- ]
-
- self._add_date_triples_from_dict(resource_dict, distribution, items)
-
- # Numbers
- if resource_dict.get('size'):
- try:
- g.add((distribution, DCAT.byteSize,
- Literal(float(resource_dict['size']),
- datatype=XSD.decimal)))
- except (ValueError, TypeError):
- g.add((distribution, DCAT.byteSize,
- Literal(resource_dict['size'])))
- # Checksum
- if resource_dict.get('hash'):
- checksum = BNode()
- g.add((checksum, RDF.type, SPDX.Checksum))
- g.add((checksum, SPDX.checksumValue,
- Literal(resource_dict['hash'],
- datatype=XSD.hexBinary)))
-
- if resource_dict.get('hash_algorithm'):
- g.add((checksum, SPDX.algorithm,
- URIRefOrLiteral(resource_dict['hash_algorithm'])))
-
- g.add((distribution, SPDX.checksum, checksum))
-
- def graph_from_catalog(self, catalog_dict, catalog_ref):
-
- g = self.g
-
- for prefix, namespace in namespaces.items():
- g.bind(prefix, namespace)
-
- g.add((catalog_ref, RDF.type, DCAT.Catalog))
-
- # Basic fields
- items = [
- ('title', DCT.title, config.get('ckan.site_title'), Literal),
- ('description', DCT.description, config.get('ckan.site_description'), Literal),
- ('homepage', FOAF.homepage, config.get('ckan.site_url'), URIRef),
- ('language', DCT.language, config.get('ckan.locale_default', 'en'), URIRefOrLiteral),
- ]
- for item in items:
- key, predicate, fallback, _type = item
- if catalog_dict:
- value = catalog_dict.get(key, fallback)
- else:
- value = fallback
- if value:
- g.add((catalog_ref, predicate, _type(value)))
-
- # Dates
- modified = self._last_catalog_modification()
- if modified:
- self._add_date_triple(catalog_ref, DCT.modified, modified)
-
-
-class EuropeanDCATAP2Profile(EuropeanDCATAPProfile):
- '''
- An RDF profile based on the DCAT-AP 2 for data portals in Europe
-
- More information and specification:
-
- https://joinup.ec.europa.eu/asset/dcat_application_profile
-
- '''
-
- def parse_dataset(self, dataset_dict, dataset_ref):
-
- # call super method
- super(EuropeanDCATAP2Profile, self).parse_dataset(dataset_dict, dataset_ref)
-
- # Lists
- for key, predicate in (
- ('temporal_resolution', DCAT.temporalResolution),
- ('is_referenced_by', DCT.isReferencedBy),
- ('applicable_legislation', DCATAP.applicableLegislation),
- ('hvd_category', DCATAP.hvdCategory),
- ):
- values = self._object_value_list(dataset_ref, predicate)
- if values:
- dataset_dict['extras'].append({'key': key,
- 'value': json.dumps(values)})
- # Temporal
- start, end = self._time_interval(dataset_ref, DCT.temporal, dcat_ap_version=2)
- if start:
- self._insert_or_update_temporal(dataset_dict, 'temporal_start', start)
- if end:
- self._insert_or_update_temporal(dataset_dict, 'temporal_end', end)
-
- # Spatial
- spatial = self._spatial(dataset_ref, DCT.spatial)
- for key in ('bbox', 'centroid'):
- self._add_spatial_to_dict(dataset_dict, key, spatial)
-
- # Spatial resolution in meters
- spatial_resolution_in_meters = self._object_value_int_list(
- dataset_ref, DCAT.spatialResolutionInMeters)
- if spatial_resolution_in_meters:
- dataset_dict['extras'].append({'key': 'spatial_resolution_in_meters',
- 'value': json.dumps(spatial_resolution_in_meters)})
-
- # Resources
- for distribution in self._distributions(dataset_ref):
- distribution_ref = str(distribution)
- for resource_dict in dataset_dict.get('resources', []):
- # Match distribution in graph and distribution in resource dict
- if resource_dict and distribution_ref == resource_dict.get('distribution_ref'):
- # Simple values
- for key, predicate in (
- ('availability', DCATAP.availability),
- ('compress_format', DCAT.compressFormat),
- ('package_format', DCAT.packageFormat),
- ):
- value = self._object_value(distribution, predicate)
- if value:
- resource_dict[key] = value
-
- # Lists
- for key, predicate in (
- ('applicable_legislation', DCATAP.applicableLegislation),
- ):
- values = self._object_value_list(distribution, predicate)
- if values:
- resource_dict[key] = json.dumps(values)
-
- # Access services
- access_service_list = []
-
- for access_service in self.g.objects(distribution, DCAT.accessService):
- access_service_dict = {}
-
- # Simple values
- for key, predicate in (
- ('availability', DCATAP.availability),
- ('title', DCT.title),
- ('endpoint_description', DCAT.endpointDescription),
- ('license', DCT.license),
- ('access_rights', DCT.accessRights),
- ('description', DCT.description),
- ):
- value = self._object_value(access_service, predicate)
- if value:
- access_service_dict[key] = value
- # List
- for key, predicate in (
- ('endpoint_url', DCAT.endpointURL),
- ('serves_dataset', DCAT.servesDataset),
- ):
- values = self._object_value_list(access_service, predicate)
- if values:
- access_service_dict[key] = values
-
- # Access service URI (explicitly show the missing ones)
- access_service_dict['uri'] = (str(access_service)
- if isinstance(access_service, URIRef)
- else '')
-
- # Remember the (internal) access service reference for referencing in
- # further profiles, e.g. for adding more properties
- access_service_dict['access_service_ref'] = str(access_service)
-
- access_service_list.append(access_service_dict)
-
- if access_service_list:
- resource_dict['access_services'] = json.dumps(access_service_list)
-
- return dataset_dict
-
- def graph_from_dataset(self, dataset_dict, dataset_ref):
-
- # call super method
- super(EuropeanDCATAP2Profile, self).graph_from_dataset(dataset_dict, dataset_ref)
-
- # Lists
- for key, predicate, fallbacks, type, datatype in (
- ('temporal_resolution', DCAT.temporalResolution, None, Literal, XSD.duration),
- ('is_referenced_by', DCT.isReferencedBy, None, URIRefOrLiteral, None),
- ('applicable_legislation', DCATAP.applicableLegislation, None, URIRefOrLiteral, None),
- ('hvd_category', DCATAP.hvdCategory, None, URIRefOrLiteral, None),
- ):
- self._add_triple_from_dict(dataset_dict, dataset_ref, predicate, key, list_value=True,
- fallbacks=fallbacks, _type=type, _datatype=datatype)
-
- # Temporal
- start = self._get_dataset_value(dataset_dict, 'temporal_start')
- end = self._get_dataset_value(dataset_dict, 'temporal_end')
- if start or end:
- temporal_extent_dcat = BNode()
-
- self.g.add((temporal_extent_dcat, RDF.type, DCT.PeriodOfTime))
- if start:
- self._add_date_triple(temporal_extent_dcat, DCAT.startDate, start)
- if end:
- self._add_date_triple(temporal_extent_dcat, DCAT.endDate, end)
- self.g.add((dataset_ref, DCT.temporal, temporal_extent_dcat))
-
- # spatial
- spatial_bbox = self._get_dataset_value(dataset_dict, 'spatial_bbox')
- spatial_cent = self._get_dataset_value(dataset_dict, 'spatial_centroid')
-
- if spatial_bbox or spatial_cent:
- spatial_ref = self._get_or_create_spatial_ref(dataset_dict, dataset_ref)
-
- if spatial_bbox:
- self._add_spatial_value_to_graph(spatial_ref, DCAT.bbox, spatial_bbox)
-
- if spatial_cent:
- self._add_spatial_value_to_graph(spatial_ref, DCAT.centroid, spatial_cent)
-
- # Spatial resolution in meters
- spatial_resolution_in_meters = self._read_list_value(
- self._get_dataset_value(dataset_dict, 'spatial_resolution_in_meters'))
- if spatial_resolution_in_meters:
- for value in spatial_resolution_in_meters:
- try:
- self.g.add((dataset_ref, DCAT.spatialResolutionInMeters,
- Literal(float(value), datatype=XSD.decimal)))
- except (ValueError, TypeError):
- self.g.add((dataset_ref, DCAT.spatialResolutionInMeters, Literal(value)))
-
- # Resources
- for resource_dict in dataset_dict.get('resources', []):
-
- distribution = CleanedURIRef(resource_uri(resource_dict))
-
- # Simple values
- items = [
- ('availability', DCATAP.availability, None, URIRefOrLiteral),
- ('compress_format', DCAT.compressFormat, None, URIRefOrLiteral),
- ('package_format', DCAT.packageFormat, None, URIRefOrLiteral)
- ]
-
- self._add_triples_from_dict(resource_dict, distribution, items)
-
- # Lists
- items = [
- ('applicable_legislation', DCATAP.applicableLegislation, None, URIRefOrLiteral),
- ]
- self._add_list_triples_from_dict(resource_dict, distribution, items)
-
- try:
- access_service_list = json.loads(resource_dict.get('access_services', '[]'))
- # Access service
- for access_service_dict in access_service_list:
-
- access_service_uri = access_service_dict.get('uri')
- if access_service_uri:
- access_service_node = CleanedURIRef(access_service_uri)
- else:
- access_service_node = BNode()
- # Remember the (internal) access service reference for referencing in
- # further profiles
- access_service_dict['access_service_ref'] = str(access_service_node)
-
- self.g.add((distribution, DCAT.accessService, access_service_node))
-
- self.g.add((access_service_node, RDF.type, DCAT.DataService))
-
- # Simple values
- items = [
- ('availability', DCATAP.availability, None, URIRefOrLiteral),
- ('license', DCT.license, None, URIRefOrLiteral),
- ('access_rights', DCT.accessRights, None, URIRefOrLiteral),
- ('title', DCT.title, None, Literal),
- ('endpoint_description', DCAT.endpointDescription, None, Literal),
- ('description', DCT.description, None, Literal),
- ]
-
- self._add_triples_from_dict(access_service_dict, access_service_node, items)
-
- # Lists
- items = [
- ('endpoint_url', DCAT.endpointURL, None, URIRefOrLiteral),
- ('serves_dataset', DCAT.servesDataset, None, URIRefOrLiteral),
- ]
- self._add_list_triples_from_dict(access_service_dict, access_service_node, items)
-
- if access_service_list:
- resource_dict['access_services'] = json.dumps(access_service_list)
- except ValueError:
- pass
-
- def graph_from_catalog(self, catalog_dict, catalog_ref):
-
- # call super method
- super(EuropeanDCATAP2Profile, self).graph_from_catalog(catalog_dict, catalog_ref)
-
-
-class SchemaOrgProfile(RDFProfile):
- '''
- An RDF profile based on the schema.org Dataset
-
- More information and specification:
-
- http://schema.org/Dataset
-
- Mapping between schema.org Dataset and DCAT:
-
- https://www.w3.org/wiki/WebSchemas/Datasets
- '''
- def graph_from_dataset(self, dataset_dict, dataset_ref):
-
- g = self.g
-
- # Namespaces
- self._bind_namespaces()
-
- g.add((dataset_ref, RDF.type, SCHEMA.Dataset))
-
- # Basic fields
- self._basic_fields_graph(dataset_ref, dataset_dict)
-
- # Catalog
- self._catalog_graph(dataset_ref, dataset_dict)
-
- # Groups
- self._groups_graph(dataset_ref, dataset_dict)
-
- # Tags
- self._tags_graph(dataset_ref, dataset_dict)
-
- # Lists
- self._list_fields_graph(dataset_ref, dataset_dict)
-
- # Publisher
- self._publisher_graph(dataset_ref, dataset_dict)
-
- # Temporal
- self._temporal_graph(dataset_ref, dataset_dict)
-
- # Spatial
- self._spatial_graph(dataset_ref, dataset_dict)
-
- # Resources
- self._resources_graph(dataset_ref, dataset_dict)
-
- # Additional fields
- self.additional_fields(dataset_ref, dataset_dict)
-
- def additional_fields(self, dataset_ref, dataset_dict):
- '''
- Adds any additional fields.
-
- For a custom schema you should extend this class and
- implement this method.
- '''
- pass
-
- def _add_date_triple(self, subject, predicate, value, _type=Literal):
- '''
- Adds a new triple with a date object
-
- Dates are parsed using dateutil, and if the date obtained is correct,
- added to the graph as an SCHEMA.DateTime value.
-
- If there are parsing errors, the literal string value is added.
- '''
- if not value:
- return
- try:
- default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
- _date = parse_date(value, default=default_datetime)
-
- self.g.add((subject, predicate, _type(_date.isoformat())))
- except ValueError:
- self.g.add((subject, predicate, _type(value)))
-
- def _bind_namespaces(self):
- self.g.namespace_manager.bind('schema', namespaces['schema'], replace=True)
-
- def _basic_fields_graph(self, dataset_ref, dataset_dict):
- items = [
- ('identifier', SCHEMA.identifier, None, Literal),
- ('title', SCHEMA.name, None, Literal),
- ('notes', SCHEMA.description, None, Literal),
- ('version', SCHEMA.version, ['dcat_version'], Literal),
- ('issued', SCHEMA.datePublished, ['metadata_created'], Literal),
- ('modified', SCHEMA.dateModified, ['metadata_modified'], Literal),
- ('license', SCHEMA.license, ['license_url', 'license_title'], Literal),
- ]
- self._add_triples_from_dict(dataset_dict, dataset_ref, items)
-
- items = [
- ('issued', SCHEMA.datePublished, ['metadata_created'], Literal),
- ('modified', SCHEMA.dateModified, ['metadata_modified'], Literal),
- ]
-
- self._add_date_triples_from_dict(dataset_dict, dataset_ref, items)
-
- # Dataset URL
- dataset_url = url_for('dataset.read',
- id=dataset_dict['name'],
- _external=True)
- self.g.add((dataset_ref, SCHEMA.url, Literal(dataset_url)))
-
- def _catalog_graph(self, dataset_ref, dataset_dict):
- data_catalog = BNode()
- self.g.add((dataset_ref, SCHEMA.includedInDataCatalog, data_catalog))
- self.g.add((data_catalog, RDF.type, SCHEMA.DataCatalog))
- self.g.add((data_catalog, SCHEMA.name, Literal(config.get('ckan.site_title'))))
- self.g.add((data_catalog, SCHEMA.description, Literal(config.get('ckan.site_description'))))
- self.g.add((data_catalog, SCHEMA.url, Literal(config.get('ckan.site_url'))))
-
- def _groups_graph(self, dataset_ref, dataset_dict):
- for group in dataset_dict.get('groups', []):
- group_url = url_for(controller='group',
- action='read',
- id=group.get('id'),
- _external=True)
- about = BNode()
-
- self.g.add((about, RDF.type, SCHEMA.Thing))
-
- self.g.add((about, SCHEMA.name, Literal(group['name'])))
- self.g.add((about, SCHEMA.url, Literal(group_url)))
-
- self.g.add((dataset_ref, SCHEMA.about, about))
-
- def _tags_graph(self, dataset_ref, dataset_dict):
- for tag in dataset_dict.get('tags', []):
- self.g.add((dataset_ref, SCHEMA.keywords, Literal(tag['name'])))
-
- def _list_fields_graph(self, dataset_ref, dataset_dict):
- items = [
- ('language', SCHEMA.inLanguage, None, Literal),
- ]
- self._add_list_triples_from_dict(dataset_dict, dataset_ref, items)
-
- def _publisher_graph(self, dataset_ref, dataset_dict):
- if any([
- self._get_dataset_value(dataset_dict, 'publisher_uri'),
- self._get_dataset_value(dataset_dict, 'publisher_name'),
- dataset_dict.get('organization'),
- ]):
-
- publisher_uri = self._get_dataset_value(dataset_dict, 'publisher_uri')
- publisher_uri_fallback = publisher_uri_organization_fallback(dataset_dict)
- publisher_name = self._get_dataset_value(dataset_dict, 'publisher_name')
- if publisher_uri:
- publisher_details = CleanedURIRef(publisher_uri)
- elif not publisher_name and publisher_uri_fallback:
- # neither URI nor name are available, use organization as fallback
- publisher_details = CleanedURIRef(publisher_uri_fallback)
- else:
- # No publisher_uri
- publisher_details = BNode()
-
- self.g.add((publisher_details, RDF.type, SCHEMA.Organization))
- self.g.add((dataset_ref, SCHEMA.publisher, publisher_details))
-
- # In case no name and URI are available, again fall back to organization.
- # If no name but an URI is available, the name literal remains empty to
- # avoid mixing organization and dataset values.
- if not publisher_name and not publisher_uri and dataset_dict.get('organization'):
- publisher_name = dataset_dict['organization']['title']
- self.g.add((publisher_details, SCHEMA.name, Literal(publisher_name)))
-
- contact_point = BNode()
- self.g.add((contact_point, RDF.type, SCHEMA.ContactPoint))
- self.g.add((publisher_details, SCHEMA.contactPoint, contact_point))
-
- self.g.add((contact_point, SCHEMA.contactType, Literal('customer service')))
-
- publisher_url = self._get_dataset_value(dataset_dict, 'publisher_url')
- if not publisher_url and dataset_dict.get('organization'):
- publisher_url = dataset_dict['organization'].get('url') or config.get('ckan.site_url')
-
- self.g.add((contact_point, SCHEMA.url, Literal(publisher_url)))
- items = [
- ('publisher_email', SCHEMA.email, ['contact_email', 'maintainer_email', 'author_email'], Literal),
- ('publisher_name', SCHEMA.name, ['contact_name', 'maintainer', 'author'], Literal),
- ]
-
- self._add_triples_from_dict(dataset_dict, contact_point, items)
-
- def _temporal_graph(self, dataset_ref, dataset_dict):
- start = self._get_dataset_value(dataset_dict, 'temporal_start')
- end = self._get_dataset_value(dataset_dict, 'temporal_end')
- if start or end:
- if start and end:
- self.g.add((dataset_ref, SCHEMA.temporalCoverage, Literal('%s/%s' % (start, end))))
- elif start:
- self._add_date_triple(dataset_ref, SCHEMA.temporalCoverage, start)
- elif end:
- self._add_date_triple(dataset_ref, SCHEMA.temporalCoverage, end)
-
- def _spatial_graph(self, dataset_ref, dataset_dict):
- spatial_uri = self._get_dataset_value(dataset_dict, 'spatial_uri')
- spatial_text = self._get_dataset_value(dataset_dict, 'spatial_text')
- spatial_geom = self._get_dataset_value(dataset_dict, 'spatial')
-
- if spatial_uri or spatial_text or spatial_geom:
- if spatial_uri:
- spatial_ref = URIRef(spatial_uri)
- else:
- spatial_ref = BNode()
-
- self.g.add((spatial_ref, RDF.type, SCHEMA.Place))
- self.g.add((dataset_ref, SCHEMA.spatialCoverage, spatial_ref))
-
- if spatial_text:
- self.g.add((spatial_ref, SCHEMA.description, Literal(spatial_text)))
-
- if spatial_geom:
- geo_shape = BNode()
- self.g.add((geo_shape, RDF.type, SCHEMA.GeoShape))
- self.g.add((spatial_ref, SCHEMA.geo, geo_shape))
-
- # the spatial_geom typically contains GeoJSON
- self.g.add((geo_shape,
- SCHEMA.polygon,
- Literal(spatial_geom)))
-
- def _resources_graph(self, dataset_ref, dataset_dict):
- g = self.g
- for resource_dict in dataset_dict.get('resources', []):
- distribution = URIRef(resource_uri(resource_dict))
- g.add((dataset_ref, SCHEMA.distribution, distribution))
- g.add((distribution, RDF.type, SCHEMA.DataDownload))
-
- self._distribution_graph(distribution, resource_dict)
-
- def _distribution_graph(self, distribution, resource_dict):
- # Simple values
- self._distribution_basic_fields_graph(distribution, resource_dict)
-
- # Lists
- self._distribution_list_fields_graph(distribution, resource_dict)
-
- # Format
- self._distribution_format_graph(distribution, resource_dict)
-
- # URL
- self._distribution_url_graph(distribution, resource_dict)
-
- # Numbers
- self._distribution_numbers_graph(distribution, resource_dict)
-
- def _distribution_basic_fields_graph(self, distribution, resource_dict):
- items = [
- ('name', SCHEMA.name, None, Literal),
- ('description', SCHEMA.description, None, Literal),
- ('license', SCHEMA.license, ['rights'], Literal),
- ]
-
- self._add_triples_from_dict(resource_dict, distribution, items)
-
- items = [
- ('issued', SCHEMA.datePublished, None, Literal),
- ('modified', SCHEMA.dateModified, None, Literal),
- ]
-
- self._add_date_triples_from_dict(resource_dict, distribution, items)
-
- def _distribution_list_fields_graph(self, distribution, resource_dict):
- items = [
- ('language', SCHEMA.inLanguage, None, Literal),
- ]
- self._add_list_triples_from_dict(resource_dict, distribution, items)
-
- def _distribution_format_graph(self, distribution, resource_dict):
- if resource_dict.get('format'):
- self.g.add((distribution, SCHEMA.encodingFormat,
- Literal(resource_dict['format'])))
- elif resource_dict.get('mimetype'):
- self.g.add((distribution, SCHEMA.encodingFormat,
- Literal(resource_dict['mimetype'])))
-
- def _distribution_url_graph(self, distribution, resource_dict):
- url = resource_dict.get('url')
- download_url = resource_dict.get('download_url')
- if download_url:
- self.g.add((distribution, SCHEMA.contentUrl, Literal(download_url)))
- if (url and not download_url) or (url and url != download_url):
- self.g.add((distribution, SCHEMA.url, Literal(url)))
-
- def _distribution_numbers_graph(self, distribution, resource_dict):
- if resource_dict.get('size'):
- self.g.add((distribution, SCHEMA.contentSize, Literal(resource_dict['size'])))
diff --git a/ckanext/dcat/profiles/__init__.py b/ckanext/dcat/profiles/__init__.py
new file mode 100644
index 00000000..92266c72
--- /dev/null
+++ b/ckanext/dcat/profiles/__init__.py
@@ -0,0 +1,23 @@
+from .base import RDFProfile, CleanedURIRef
+from .base import (
+ RDF,
+ XSD,
+ SKOS,
+ RDFS,
+ DCAT,
+ DCATAP,
+ DCT,
+ ADMS,
+ VCARD,
+ FOAF,
+ SCHEMA,
+ LOCN,
+ GSP,
+ OWL,
+ SPDX,
+ GEOJSON_IMT,
+)
+
+from .euro_dcat_ap import EuropeanDCATAPProfile
+from .euro_dcat_ap_2 import EuropeanDCATAP2Profile
+from .schemaorg import SchemaOrgProfile
diff --git a/ckanext/dcat/profiles/base.py b/ckanext/dcat/profiles/base.py
new file mode 100644
index 00000000..4b652b91
--- /dev/null
+++ b/ckanext/dcat/profiles/base.py
@@ -0,0 +1,1017 @@
+import datetime
+import json
+from urllib.parse import quote
+
+from dateutil.parser import parse as parse_date
+from rdflib import term, URIRef, BNode, Literal
+from rdflib.namespace import Namespace, RDF, XSD, SKOS, RDFS
+from geomet import wkt, InvalidGeoJSONException
+
+from ckantoolkit import config, url_for, asbool, get_action
+from ckan.model.license import LicenseRegister
+from ckan.lib.helpers import resource_formats
+from ckanext.dcat.utils import DCAT_EXPOSE_SUBCATALOGS
+
+DCT = Namespace("http://purl.org/dc/terms/")
+DCAT = Namespace("http://www.w3.org/ns/dcat#")
+DCATAP = Namespace("http://data.europa.eu/r5r/")
+ADMS = Namespace("http://www.w3.org/ns/adms#")
+VCARD = Namespace("http://www.w3.org/2006/vcard/ns#")
+FOAF = Namespace("http://xmlns.com/foaf/0.1/")
+SCHEMA = Namespace("http://schema.org/")
+TIME = Namespace("http://www.w3.org/2006/time")
+LOCN = Namespace("http://www.w3.org/ns/locn#")
+GSP = Namespace("http://www.opengis.net/ont/geosparql#")
+OWL = Namespace("http://www.w3.org/2002/07/owl#")
+SPDX = Namespace("http://spdx.org/rdf/terms#")
+
+namespaces = {
+ "dct": DCT,
+ "dcat": DCAT,
+ "dcatap": DCATAP,
+ "adms": ADMS,
+ "vcard": VCARD,
+ "foaf": FOAF,
+ "schema": SCHEMA,
+ "time": TIME,
+ "skos": SKOS,
+ "locn": LOCN,
+ "gsp": GSP,
+ "owl": OWL,
+ "spdx": SPDX,
+}
+
+PREFIX_MAILTO = u"mailto:"
+
+GEOJSON_IMT = "https://www.iana.org/assignments/media-types/application/vnd.geo+json"
+
+
+class URIRefOrLiteral(object):
+ """Helper which creates an URIRef if the value appears to be an http URL,
+ or a Literal otherwise. URIRefs are also cleaned using CleanedURIRef.
+
+ Like CleanedURIRef, this is a factory class.
+ """
+
+ def __new__(cls, value):
+ try:
+ stripped_value = value.strip()
+ if isinstance(value, str) and (
+ stripped_value.startswith("http://")
+ or stripped_value.startswith("https://")
+ ):
+ uri_obj = CleanedURIRef(value)
+ # although all invalid chars checked by rdflib should have been quoted, try to serialize
+ # the object. If it breaks, use Literal instead.
+ uri_obj.n3()
+ # URI is fine, return the object
+ return uri_obj
+ else:
+ return Literal(value)
+ except Exception:
+ # In case something goes wrong: use Literal
+ return Literal(value)
+
+
+class CleanedURIRef(object):
+ """Performs some basic URL encoding on value before creating an URIRef object.
+
+ This is a factory for URIRef objects, which allows usage as type in graph.add()
+ without affecting the resulting node types. That is,
+ g.add(..., URIRef) and g.add(..., CleanedURIRef) will result in the exact same node type.
+ """
+
+ @staticmethod
+ def _careful_quote(value):
+ # only encode this limited subset of characters to avoid more complex URL parsing
+ # (e.g. valid ? in query string vs. ? as value).
+ # can be applied multiple times, as encoded %xy is left untouched. Therefore, no
+ # unquote is necessary beforehand.
+ quotechars = " !\"$'()*,;<>[]{|}\\^`"
+ for c in quotechars:
+ value = value.replace(c, quote(c))
+ return value
+
+ def __new__(cls, value):
+ if isinstance(value, str):
+ value = CleanedURIRef._careful_quote(value.strip())
+ return URIRef(value)
+
+
+class RDFProfile(object):
+ """Base class with helper methods for implementing RDF parsing profiles
+
+ This class should not be used directly, but rather extended to create
+ custom profiles
+ """
+
+ def __init__(self, graph, compatibility_mode=False):
+ """Class constructor
+
+ Graph is an rdflib.Graph instance.
+
+ In compatibility mode, some fields are modified to maintain
+ compatibility with previous versions of the ckanext-dcat parsers
+ (eg adding the `dcat_` prefix or storing comma separated lists instead
+ of JSON dumps).
+ """
+
+ self.g = graph
+
+ self.compatibility_mode = compatibility_mode
+
+ # Cache for mappings of licenses URL/title to ID built when needed in
+ # _license().
+ self._licenceregister_cache = None
+
+ def _datasets(self):
+ """
+ Generator that returns all DCAT datasets on the graph
+
+ Yields term.URIRef objects that can be used on graph lookups
+ and queries
+ """
+ for dataset in self.g.subjects(RDF.type, DCAT.Dataset):
+ yield dataset
+
+ def _distributions(self, dataset):
+ """
+ Generator that returns all DCAT distributions on a particular dataset
+
+ Yields term.URIRef objects that can be used on graph lookups
+ and queries
+ """
+ for distribution in self.g.objects(dataset, DCAT.distribution):
+ yield distribution
+
+ def _keywords(self, dataset_ref):
+ """
+ Returns all DCAT keywords on a particular dataset
+ """
+ keywords = self._object_value_list(dataset_ref, DCAT.keyword) or []
+ # Split keywords with commas
+ keywords_with_commas = [k for k in keywords if "," in k]
+ for keyword in keywords_with_commas:
+ keywords.remove(keyword)
+ keywords.extend([k.strip() for k in keyword.split(",")])
+ return keywords
+
+ def _object(self, subject, predicate):
+ """
+ Helper for returning the first object for this subject and predicate
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ Returns an rdflib reference (URIRef or BNode) or None if not found
+ """
+ for _object in self.g.objects(subject, predicate):
+ return _object
+ return None
+
+ def _object_value(self, subject, predicate):
+ """
+ Given a subject and a predicate, returns the value of the object
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ If found, the string representation is returned, else an empty string
+ """
+ default_lang = config.get("ckan.locale_default", "en")
+ fallback = ""
+ for o in self.g.objects(subject, predicate):
+ if isinstance(o, Literal):
+ if o.language and o.language == default_lang:
+ return str(o)
+ # Use first object as fallback if no object with the default language is available
+ elif fallback == "":
+ fallback = str(o)
+ else:
+ return str(o)
+ return fallback
+
+ def _object_value_multiple_predicate(self, subject, predicates):
+ """
+ Given a subject and a list of predicates, returns the value of the object
+ according to the order in which it was specified.
+
+ Both subject and predicates must be rdflib URIRef or BNode objects
+
+ If found, the string representation is returned, else an empty string
+ """
+ object_value = ""
+ for predicate in predicates:
+ object_value = self._object_value(subject, predicate)
+ if object_value:
+ break
+
+ return object_value
+
+ def _object_value_int(self, subject, predicate):
+ """
+ Given a subject and a predicate, returns the value of the object as an
+ integer
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ If the value can not be parsed as intger, returns None
+ """
+ object_value = self._object_value(subject, predicate)
+ if object_value:
+ try:
+ return int(float(object_value))
+ except ValueError:
+ pass
+ return None
+
+ def _object_value_int_list(self, subject, predicate):
+ """
+ Given a subject and a predicate, returns the value of the object as a
+ list of integers
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ If the value can not be parsed as intger, returns an empty list
+ """
+ object_values = []
+ for object in self.g.objects(subject, predicate):
+ if object:
+ try:
+ object_values.append(int(float(object)))
+ except ValueError:
+ pass
+ return object_values
+
+ def _object_value_list(self, subject, predicate):
+ """
+ Given a subject and a predicate, returns a list with all the values of
+ the objects
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ If no values found, returns an empty string
+ """
+ return [str(o) for o in self.g.objects(subject, predicate)]
+
+ def _get_vcard_property_value(
+ self, subject, predicate, predicate_string_property=None
+ ):
+ """
+ Given a subject, a predicate and a predicate for the simple string property (optional),
+ returns the value of the object. Trying to read the value in the following order
+ * predicate_string_property
+ * predicate
+
+ All subject, predicate and predicate_string_property must be rdflib URIRef or BNode objects
+
+ If no value is found, returns an empty string
+ """
+
+ result = ""
+ if predicate_string_property:
+ result = self._object_value(subject, predicate_string_property)
+
+ if not result:
+ obj = self._object(subject, predicate)
+ if isinstance(obj, BNode):
+ result = self._object_value(obj, VCARD.hasValue)
+ else:
+ result = self._object_value(subject, predicate)
+
+ return result
+
+ def _time_interval(self, subject, predicate, dcat_ap_version=1):
+ """
+ Returns the start and end date for a time interval object
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ It checks for time intervals defined with DCAT, W3C Time hasBeginning & hasEnd
+ and schema.org startDate & endDate.
+
+ Note that partial dates will be expanded to the first month / day
+ value, eg '1904' -> '1904-01-01'.
+
+ Returns a tuple with the start and end date values, both of which
+ can be None if not found
+ """
+
+ start_date = end_date = None
+
+ if dcat_ap_version == 1:
+ start_date, end_date = self._read_time_interval_schema_org(
+ subject, predicate
+ )
+ if start_date or end_date:
+ return start_date, end_date
+ return self._read_time_interval_time(subject, predicate)
+ elif dcat_ap_version == 2:
+ start_date, end_date = self._read_time_interval_dcat(subject, predicate)
+ if start_date or end_date:
+ return start_date, end_date
+ start_date, end_date = self._read_time_interval_time(subject, predicate)
+ if start_date or end_date:
+ return start_date, end_date
+ return self._read_time_interval_schema_org(subject, predicate)
+
+ def _read_time_interval_schema_org(self, subject, predicate):
+ start_date = end_date = None
+
+ for interval in self.g.objects(subject, predicate):
+ start_date = self._object_value(interval, SCHEMA.startDate)
+ end_date = self._object_value(interval, SCHEMA.endDate)
+
+ if start_date or end_date:
+ return start_date, end_date
+
+ return start_date, end_date
+
+ def _read_time_interval_dcat(self, subject, predicate):
+ start_date = end_date = None
+
+ for interval in self.g.objects(subject, predicate):
+ start_date = self._object_value(interval, DCAT.startDate)
+ end_date = self._object_value(interval, DCAT.endDate)
+
+ if start_date or end_date:
+ return start_date, end_date
+
+ return start_date, end_date
+
+ def _read_time_interval_time(self, subject, predicate):
+ start_date = end_date = None
+
+ for interval in self.g.objects(subject, predicate):
+ start_nodes = [t for t in self.g.objects(interval, TIME.hasBeginning)]
+ end_nodes = [t for t in self.g.objects(interval, TIME.hasEnd)]
+ if start_nodes:
+ start_date = self._object_value_multiple_predicate(
+ start_nodes[0],
+ [TIME.inXSDDateTimeStamp, TIME.inXSDDateTime, TIME.inXSDDate],
+ )
+ if end_nodes:
+ end_date = self._object_value_multiple_predicate(
+ end_nodes[0],
+ [TIME.inXSDDateTimeStamp, TIME.inXSDDateTime, TIME.inXSDDate],
+ )
+
+ if start_date or end_date:
+ return start_date, end_date
+
+ return start_date, end_date
+
+ def _insert_or_update_temporal(self, dataset_dict, key, value):
+ temporal = next(
+ (item for item in dataset_dict["extras"] if (item["key"] == key)), None
+ )
+ if temporal:
+ temporal["value"] = value
+ else:
+ dataset_dict["extras"].append({"key": key, "value": value})
+
+ def _publisher(self, subject, predicate):
+ """
+ Returns a dict with details about a dct:publisher entity, a foaf:Agent
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ Examples:
+
+
+
+ Publishing Organization for dataset 1
+ contact@some.org
+ http://some.org
+
+
+
+
+ {
+ 'uri': 'http://orgs.vocab.org/some-org',
+ 'name': 'Publishing Organization for dataset 1',
+ 'email': 'contact@some.org',
+ 'url': 'http://some.org',
+ 'type': 'http://purl.org/adms/publishertype/NonProfitOrganisation',
+ }
+
+
+
+ {
+ 'uri': 'http://publications.europa.eu/resource/authority/corporate-body/EURCOU'
+ }
+
+ Returns keys for uri, name, email, url and type with the values set to
+ an empty string if they could not be found
+ """
+
+ publisher = {}
+
+ for agent in self.g.objects(subject, predicate):
+
+ publisher["uri"] = str(agent) if isinstance(agent, term.URIRef) else ""
+
+ publisher["name"] = self._object_value(agent, FOAF.name)
+
+ publisher["email"] = self._object_value(agent, FOAF.mbox)
+
+ publisher["url"] = self._object_value(agent, FOAF.homepage)
+
+ publisher["type"] = self._object_value(agent, DCT.type)
+
+ return publisher
+
+ def _contact_details(self, subject, predicate):
+ """
+ Returns a dict with details about a vcard expression
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ Returns keys for uri, name and email with the values set to
+ an empty string if they could not be found
+ """
+
+ contact = {}
+
+ for agent in self.g.objects(subject, predicate):
+
+ contact["uri"] = str(agent) if isinstance(agent, term.URIRef) else ""
+
+ contact["name"] = self._get_vcard_property_value(
+ agent, VCARD.hasFN, VCARD.fn
+ )
+
+ contact["email"] = self._without_mailto(
+ self._get_vcard_property_value(agent, VCARD.hasEmail)
+ )
+
+ return contact
+
+ def _parse_geodata(self, spatial, datatype, cur_value):
+ """
+ Extract geodata with the given datatype from the spatial data and check if it contains a valid GeoJSON
+ or WKT geometry.
+
+ Returns the String or None if the value is no valid GeoJSON or WKT geometry.
+ """
+ for geometry in self.g.objects(spatial, datatype):
+ if geometry.datatype == URIRef(GEOJSON_IMT) or not geometry.datatype:
+ try:
+ json.loads(str(geometry))
+ cur_value = str(geometry)
+ except (ValueError, TypeError):
+ pass
+ if not cur_value and geometry.datatype == GSP.wktLiteral:
+ try:
+ cur_value = json.dumps(wkt.loads(str(geometry)))
+ except (ValueError, TypeError):
+ pass
+ return cur_value
+
+ def _spatial(self, subject, predicate):
+ """
+ Returns a dict with details about the spatial location
+
+ Both subject and predicate must be rdflib URIRef or BNode objects
+
+ Returns keys for uri, text or geom with the values set to
+ None if they could not be found.
+
+ Geometries are always returned in GeoJSON. If only WKT is provided,
+ it will be transformed to GeoJSON.
+
+ Check the notes on the README for the supported formats:
+
+ https://github.com/ckan/ckanext-dcat/#rdf-dcat-to-ckan-dataset-mapping
+ """
+
+ uri = None
+ text = None
+ geom = None
+ bbox = None
+ cent = None
+
+ for spatial in self.g.objects(subject, predicate):
+
+ if isinstance(spatial, URIRef):
+ uri = str(spatial)
+
+ if isinstance(spatial, Literal):
+ text = str(spatial)
+
+ if (spatial, RDF.type, DCT.Location) in self.g:
+ geom = self._parse_geodata(spatial, LOCN.geometry, geom)
+ bbox = self._parse_geodata(spatial, DCAT.bbox, bbox)
+ cent = self._parse_geodata(spatial, DCAT.centroid, cent)
+ for label in self.g.objects(spatial, SKOS.prefLabel):
+ text = str(label)
+ for label in self.g.objects(spatial, RDFS.label):
+ text = str(label)
+
+ return {
+ "uri": uri,
+ "text": text,
+ "geom": geom,
+ "bbox": bbox,
+ "centroid": cent,
+ }
+
+ def _license(self, dataset_ref):
+ """
+ Returns a license identifier if one of the distributions license is
+ found in CKAN license registry. If no distribution's license matches,
+ an empty string is returned.
+
+ The first distribution with a license found in the registry is used so
+ that if distributions have different licenses we'll only get the first
+ one.
+ """
+ if self._licenceregister_cache is not None:
+ license_uri2id, license_title2id = self._licenceregister_cache
+ else:
+ license_uri2id = {}
+ license_title2id = {}
+ for license_id, license in list(LicenseRegister().items()):
+ license_uri2id[license.url] = license_id
+ license_title2id[license.title] = license_id
+ self._licenceregister_cache = license_uri2id, license_title2id
+
+ for distribution in self._distributions(dataset_ref):
+ # If distribution has a license, attach it to the dataset
+ license = self._object(distribution, DCT.license)
+ if license:
+ # Try to find a matching license comparing URIs, then titles
+ license_id = license_uri2id.get(license.toPython())
+ if not license_id:
+ license_id = license_title2id.get(
+ self._object_value(license, DCT.title)
+ )
+ if license_id:
+ return license_id
+ return ""
+
+ def _access_rights(self, subject, predicate):
+ """
+ Returns the rights statement or an empty string if no one is found.
+ """
+
+ result = ""
+ obj = self._object(subject, predicate)
+ if obj:
+ if (
+ isinstance(obj, BNode)
+ and self._object(obj, RDF.type) == DCT.RightsStatement
+ ):
+ result = self._object_value(obj, RDFS.label)
+ elif isinstance(obj, Literal) or isinstance(obj, URIRef):
+ # unicode_safe not include Literal or URIRef
+ result = str(obj)
+ return result
+
+ def _distribution_format(self, distribution, normalize_ckan_format=True):
+ """
+ Returns the Internet Media Type and format label for a distribution
+
+ Given a reference (URIRef or BNode) to a dcat:Distribution, it will
+ try to extract the media type (previously knowm as MIME type), eg
+ `text/csv`, and the format label, eg `CSV`
+
+ Values for the media type will be checked in the following order:
+
+ 1. literal value of dcat:mediaType
+ 2. literal value of dct:format if it contains a '/' character
+ 3. value of dct:format if it is an instance of dct:IMT, eg:
+
+
+
+
+ 4. value of dct:format if it is an URIRef and appears to be an IANA type
+
+ Values for the label will be checked in the following order:
+
+ 1. literal value of dct:format if it not contains a '/' character
+ 2. label of dct:format if it is an instance of dct:IMT (see above)
+ 3. value of dct:format if it is an URIRef and doesn't look like an IANA type
+
+ If `normalize_ckan_format` is True the label will
+ be tried to match against the standard list of formats that is included
+ with CKAN core
+ (https://github.com/ckan/ckan/blob/master/ckan/config/resource_formats.json)
+ This allows for instance to populate the CKAN resource format field
+ with a format that view plugins, etc will understand (`csv`, `xml`,
+ etc.)
+
+ Return a tuple with the media type and the label, both set to None if
+ they couldn't be found.
+ """
+
+ imt = None
+ label = None
+
+ imt = self._object_value(distribution, DCAT.mediaType)
+
+ _format = self._object(distribution, DCT["format"])
+ if isinstance(_format, Literal):
+ if not imt and "/" in _format:
+ imt = str(_format)
+ else:
+ label = str(_format)
+ elif isinstance(_format, (BNode, URIRef)):
+ if self._object(_format, RDF.type) == DCT.IMT:
+ if not imt:
+ imt = str(self.g.value(_format, default=None))
+ label = self._object_value(_format, RDFS.label)
+ elif isinstance(_format, URIRef):
+ # If the URIRef does not reference a BNode, it could reference an IANA type.
+ # Otherwise, use it as label.
+ format_uri = str(_format)
+ if "iana.org/assignments/media-types" in format_uri and not imt:
+ imt = format_uri
+ else:
+ label = format_uri
+
+ if (imt or label) and normalize_ckan_format:
+
+ format_registry = resource_formats()
+
+ if imt in format_registry:
+ label = format_registry[imt][1]
+ elif label in format_registry:
+ label = format_registry[label][1]
+
+ return imt, label
+
+ def _get_dict_value(self, _dict, key, default=None):
+ """
+ Returns the value for the given key on a CKAN dict
+
+ By default a key on the root level is checked. If not found, extras
+ are checked, both with the key provided and with `dcat_` prepended to
+ support legacy fields.
+
+ If not found, returns the default value, which defaults to None
+ """
+
+ if key in _dict:
+ return _dict[key]
+
+ for extra in _dict.get("extras", []):
+ if extra["key"] == key or extra["key"] == "dcat_" + key:
+ return extra["value"]
+
+ return default
+
+ def _read_list_value(self, value):
+ items = []
+ # List of values
+ if isinstance(value, list):
+ items = value
+ elif isinstance(value, str):
+ try:
+ items = json.loads(value)
+ if isinstance(items, ((int, float, complex))):
+ items = [items] # JSON list
+ except ValueError:
+ if "," in value:
+ # Comma-separated list
+ items = value.split(",")
+ else:
+ items = [value] # Normal text value
+ return items
+
+ def _add_spatial_value_to_graph(self, spatial_ref, predicate, value):
+ """
+ Adds spatial triples to the graph.
+ """
+ # GeoJSON
+ self.g.add((spatial_ref, predicate, Literal(value, datatype=GEOJSON_IMT)))
+ # WKT, because GeoDCAT-AP says so
+ try:
+ self.g.add(
+ (
+ spatial_ref,
+ predicate,
+ Literal(
+ wkt.dumps(json.loads(value), decimals=4),
+ datatype=GSP.wktLiteral,
+ ),
+ )
+ )
+ except (TypeError, ValueError, InvalidGeoJSONException):
+ pass
+
+ def _add_spatial_to_dict(self, dataset_dict, key, spatial):
+ if spatial.get(key):
+ dataset_dict["extras"].append(
+ {
+ "key": "spatial_{0}".format(key) if key != "geom" else "spatial",
+ "value": spatial.get(key),
+ }
+ )
+
+ def _get_dataset_value(self, dataset_dict, key, default=None):
+ """
+ Returns the value for the given key on a CKAN dict
+
+ Check `_get_dict_value` for details
+ """
+ return self._get_dict_value(dataset_dict, key, default)
+
+ def _get_resource_value(self, resource_dict, key, default=None):
+ """
+ Returns the value for the given key on a CKAN dict
+
+ Check `_get_dict_value` for details
+ """
+ return self._get_dict_value(resource_dict, key, default)
+
+ def _add_date_triples_from_dict(self, _dict, subject, items):
+ self._add_triples_from_dict(_dict, subject, items, date_value=True)
+
+ def _add_list_triples_from_dict(self, _dict, subject, items):
+ self._add_triples_from_dict(_dict, subject, items, list_value=True)
+
+ def _add_triples_from_dict(
+ self, _dict, subject, items, list_value=False, date_value=False
+ ):
+ for item in items:
+ key, predicate, fallbacks, _type = item
+ self._add_triple_from_dict(
+ _dict,
+ subject,
+ predicate,
+ key,
+ fallbacks=fallbacks,
+ list_value=list_value,
+ date_value=date_value,
+ _type=_type,
+ )
+
+ def _add_triple_from_dict(
+ self,
+ _dict,
+ subject,
+ predicate,
+ key,
+ fallbacks=None,
+ list_value=False,
+ date_value=False,
+ _type=Literal,
+ _datatype=None,
+ value_modifier=None,
+ ):
+ """
+ Adds a new triple to the graph with the provided parameters
+
+ The subject and predicate of the triple are passed as the relevant
+ RDFLib objects (URIRef or BNode). As default, the object is a
+ literal value, which is extracted from the dict using the provided key
+ (see `_get_dict_value`). If the value for the key is not found, then
+ additional fallback keys are checked.
+ Using `value_modifier`, a function taking the extracted value and
+ returning a modified value can be passed.
+ If a value was found, the modifier is applied before adding the value.
+
+ If `list_value` or `date_value` are True, then the value is treated as
+ a list or a date respectively (see `_add_list_triple` and
+ `_add_date_triple` for details.
+ """
+ value = self._get_dict_value(_dict, key)
+ if not value and fallbacks:
+ for fallback in fallbacks:
+ value = self._get_dict_value(_dict, fallback)
+ if value:
+ break
+
+ # if a modifying function was given, apply it to the value
+ if value and callable(value_modifier):
+ value = value_modifier(value)
+
+ if value and list_value:
+ self._add_list_triple(subject, predicate, value, _type, _datatype)
+ elif value and date_value:
+ self._add_date_triple(subject, predicate, value, _type)
+ elif value:
+ # Normal text value
+ # ensure URIRef items are preprocessed (space removal/url encoding)
+ if _type == URIRef:
+ _type = CleanedURIRef
+ if _datatype:
+ object = _type(value, datatype=_datatype)
+ else:
+ object = _type(value)
+ self.g.add((subject, predicate, object))
+
+ def _add_list_triple(
+ self, subject, predicate, value, _type=Literal, _datatype=None
+ ):
+ """
+ Adds as many triples to the graph as values
+
+ Values are literal strings, if `value` is a list, one for each
+ item. If `value` is a string there is an attempt to split it using
+ commas, to support legacy fields.
+ """
+ items = self._read_list_value(value)
+
+ for item in items:
+ # ensure URIRef items are preprocessed (space removal/url encoding)
+ if _type == URIRef:
+ _type = CleanedURIRef
+ if _datatype:
+ object = _type(item, datatype=_datatype)
+ else:
+ object = _type(item)
+ self.g.add((subject, predicate, object))
+
+ def _add_date_triple(self, subject, predicate, value, _type=Literal):
+ """
+ Adds a new triple with a date object
+
+ Dates are parsed using dateutil, and if the date obtained is correct,
+ added to the graph as an XSD.dateTime value.
+
+ If there are parsing errors, the literal string value is added.
+ """
+ if not value:
+ return
+ try:
+ default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
+ _date = parse_date(value, default=default_datetime)
+
+ self.g.add(
+ (subject, predicate, _type(_date.isoformat(), datatype=XSD.dateTime))
+ )
+ except ValueError:
+ self.g.add((subject, predicate, _type(value)))
+
+ def _last_catalog_modification(self):
+ """
+ Returns the date and time the catalog was last modified
+
+ To be more precise, the most recent value for `metadata_modified` on a
+ dataset.
+
+ Returns a dateTime string in ISO format, or None if it could not be
+ found.
+ """
+ context = {"ignore_auth": True}
+ result = get_action("package_search")(
+ context,
+ {
+ "sort": "metadata_modified desc",
+ "rows": 1,
+ },
+ )
+ if result and result.get("results"):
+ return result["results"][0]["metadata_modified"]
+ return None
+
+ def _add_mailto(self, mail_addr):
+ """
+ Ensures that the mail address has an URIRef-compatible mailto: prefix.
+ Can be used as modifier function for `_add_triple_from_dict`.
+ """
+ if mail_addr:
+ return PREFIX_MAILTO + self._without_mailto(mail_addr)
+ else:
+ return mail_addr
+
+ def _without_mailto(self, mail_addr):
+ """
+ Ensures that the mail address string has no mailto: prefix.
+ """
+ if mail_addr:
+ return str(mail_addr).replace(PREFIX_MAILTO, u"")
+ else:
+ return mail_addr
+
+ def _get_source_catalog(self, dataset_ref):
+ """
+ Returns Catalog reference that is source for this dataset.
+
+ Catalog referenced in dct:hasPart is returned,
+ if dataset is linked there, otherwise main catalog
+ will be returned.
+
+ This will not be used if ckanext.dcat.expose_subcatalogs
+ configuration option is set to False.
+ """
+ if not asbool(config.get(DCAT_EXPOSE_SUBCATALOGS, False)):
+ return
+ catalogs = set(self.g.subjects(DCAT.dataset, dataset_ref))
+ root = self._get_root_catalog_ref()
+ try:
+ catalogs.remove(root)
+ except KeyError:
+ pass
+ assert len(catalogs) in (0, 1,), (
+ "len %s" % catalogs
+ )
+ if catalogs:
+ return catalogs.pop()
+ return root
+
+ def _get_root_catalog_ref(self):
+ roots = list(self.g.subjects(DCT.hasPart))
+ if not roots:
+ roots = list(self.g.subjects(RDF.type, DCAT.Catalog))
+ return roots[0]
+
+ def _get_or_create_spatial_ref(self, dataset_dict, dataset_ref):
+ for spatial_ref in self.g.objects(dataset_ref, DCT.spatial):
+ if spatial_ref:
+ return spatial_ref
+
+ # Create new spatial_ref
+ spatial_uri = self._get_dataset_value(dataset_dict, "spatial_uri")
+ if spatial_uri:
+ spatial_ref = CleanedURIRef(spatial_uri)
+ else:
+ spatial_ref = BNode()
+ self.g.add((spatial_ref, RDF.type, DCT.Location))
+ self.g.add((dataset_ref, DCT.spatial, spatial_ref))
+ return spatial_ref
+
+ # Public methods for profiles to implement
+
+ def parse_dataset(self, dataset_dict, dataset_ref):
+ """
+ Creates a CKAN dataset dict from the RDF graph
+
+ The `dataset_dict` is passed to all the loaded profiles before being
+ yielded, so it can be further modified by each one of them.
+ `dataset_ref` is an rdflib URIRef object
+ that can be used to reference the dataset when querying the graph.
+
+ Returns a dataset dict that can be passed to eg `package_create`
+ or `package_update`
+ """
+ return dataset_dict
+
+ def _extract_catalog_dict(self, catalog_ref):
+ """
+ Returns list of key/value dictionaries with catalog
+ """
+
+ out = []
+ sources = (
+ (
+ "source_catalog_title",
+ DCT.title,
+ ),
+ (
+ "source_catalog_description",
+ DCT.description,
+ ),
+ (
+ "source_catalog_homepage",
+ FOAF.homepage,
+ ),
+ (
+ "source_catalog_language",
+ DCT.language,
+ ),
+ (
+ "source_catalog_modified",
+ DCT.modified,
+ ),
+ )
+
+ for key, predicate in sources:
+ val = self._object_value(catalog_ref, predicate)
+ if val:
+ out.append({"key": key, "value": val})
+
+ out.append(
+ {
+ "key": "source_catalog_publisher",
+ "value": json.dumps(self._publisher(catalog_ref, DCT.publisher)),
+ }
+ )
+ return out
+
+ def graph_from_catalog(self, catalog_dict, catalog_ref):
+ """
+ Creates an RDF graph for the whole catalog (site)
+
+ The class RDFLib graph (accessible via `self.g`) should be updated on
+ this method
+
+ `catalog_dict` is a dict that can contain literal values for the
+ dcat:Catalog class like `title`, `homepage`, etc. `catalog_ref` is an
+ rdflib URIRef object that must be used to reference the catalog when
+ working with the graph.
+ """
+ pass
+
+ def graph_from_dataset(self, dataset_dict, dataset_ref):
+ """
+ Given a CKAN dataset dict, creates an RDF graph
+
+ The class RDFLib graph (accessible via `self.g`) should be updated on
+ this method
+
+ `dataset_dict` is a dict with the dataset metadata like the one
+ returned by `package_show`. `dataset_ref` is an rdflib URIRef object
+ that must be used to reference the dataset when working with the graph.
+ """
+ pass
diff --git a/ckanext/dcat/profiles/euro_dcat_ap.py b/ckanext/dcat/profiles/euro_dcat_ap.py
new file mode 100644
index 00000000..9a4c853b
--- /dev/null
+++ b/ckanext/dcat/profiles/euro_dcat_ap.py
@@ -0,0 +1,601 @@
+import json
+
+from rdflib import term, URIRef, BNode, Literal
+import ckantoolkit as toolkit
+
+from ckan.lib.munge import munge_tag
+
+from ckanext.dcat.utils import (
+ resource_uri,
+ DCAT_EXPOSE_SUBCATALOGS,
+ DCAT_CLEAN_TAGS,
+ publisher_uri_organization_fallback,
+)
+from .base import RDFProfile, URIRefOrLiteral, CleanedURIRef
+from .base import (
+ RDF,
+ XSD,
+ SKOS,
+ RDFS,
+ DCAT,
+ DCT,
+ ADMS,
+ XSD,
+ VCARD,
+ FOAF,
+ SCHEMA,
+ SKOS,
+ LOCN,
+ GSP,
+ OWL,
+ SPDX,
+ GEOJSON_IMT,
+ namespaces,
+)
+
+config = toolkit.config
+
+
+DISTRIBUTION_LICENSE_FALLBACK_CONFIG = "ckanext.dcat.resource.inherit.license"
+
+
+class EuropeanDCATAPProfile(RDFProfile):
+ """
+ An RDF profile based on the DCAT-AP for data portals in Europe
+
+ More information and specification:
+
+ https://joinup.ec.europa.eu/asset/dcat_application_profile
+
+ """
+
+ def parse_dataset(self, dataset_dict, dataset_ref):
+
+ dataset_dict["extras"] = []
+ dataset_dict["resources"] = []
+
+ # Basic fields
+ for key, predicate in (
+ ("title", DCT.title),
+ ("notes", DCT.description),
+ ("url", DCAT.landingPage),
+ ("version", OWL.versionInfo),
+ ):
+ value = self._object_value(dataset_ref, predicate)
+ if value:
+ dataset_dict[key] = value
+
+ if not dataset_dict.get("version"):
+ # adms:version was supported on the first version of the DCAT-AP
+ value = self._object_value(dataset_ref, ADMS.version)
+ if value:
+ dataset_dict["version"] = value
+
+ # Tags
+ # replace munge_tag to noop if there's no need to clean tags
+ do_clean = toolkit.asbool(config.get(DCAT_CLEAN_TAGS, False))
+ tags_val = [
+ munge_tag(tag) if do_clean else tag for tag in self._keywords(dataset_ref)
+ ]
+ tags = [{"name": tag} for tag in tags_val]
+ dataset_dict["tags"] = tags
+
+ # Extras
+
+ # Simple values
+ for key, predicate in (
+ ("issued", DCT.issued),
+ ("modified", DCT.modified),
+ ("identifier", DCT.identifier),
+ ("version_notes", ADMS.versionNotes),
+ ("frequency", DCT.accrualPeriodicity),
+ ("provenance", DCT.provenance),
+ ("dcat_type", DCT.type),
+ ):
+ value = self._object_value(dataset_ref, predicate)
+ if value:
+ dataset_dict["extras"].append({"key": key, "value": value})
+
+ # Lists
+ for key, predicate, in (
+ ("language", DCT.language),
+ ("theme", DCAT.theme),
+ ("alternate_identifier", ADMS.identifier),
+ ("conforms_to", DCT.conformsTo),
+ ("documentation", FOAF.page),
+ ("related_resource", DCT.relation),
+ ("has_version", DCT.hasVersion),
+ ("is_version_of", DCT.isVersionOf),
+ ("source", DCT.source),
+ ("sample", ADMS.sample),
+ ):
+ values = self._object_value_list(dataset_ref, predicate)
+ if values:
+ dataset_dict["extras"].append({"key": key, "value": json.dumps(values)})
+
+ # Contact details
+ contact = self._contact_details(dataset_ref, DCAT.contactPoint)
+ if not contact:
+ # adms:contactPoint was supported on the first version of DCAT-AP
+ contact = self._contact_details(dataset_ref, ADMS.contactPoint)
+
+ if contact:
+ for key in ("uri", "name", "email"):
+ if contact.get(key):
+ dataset_dict["extras"].append(
+ {"key": "contact_{0}".format(key), "value": contact.get(key)}
+ )
+
+ # Publisher
+ publisher = self._publisher(dataset_ref, DCT.publisher)
+ for key in ("uri", "name", "email", "url", "type"):
+ if publisher.get(key):
+ dataset_dict["extras"].append(
+ {"key": "publisher_{0}".format(key), "value": publisher.get(key)}
+ )
+
+ # Temporal
+ start, end = self._time_interval(dataset_ref, DCT.temporal)
+ if start:
+ dataset_dict["extras"].append({"key": "temporal_start", "value": start})
+ if end:
+ dataset_dict["extras"].append({"key": "temporal_end", "value": end})
+
+ # Spatial
+ spatial = self._spatial(dataset_ref, DCT.spatial)
+ for key in ("uri", "text", "geom"):
+ self._add_spatial_to_dict(dataset_dict, key, spatial)
+
+ # Dataset URI (explicitly show the missing ones)
+ dataset_uri = str(dataset_ref) if isinstance(dataset_ref, term.URIRef) else ""
+ dataset_dict["extras"].append({"key": "uri", "value": dataset_uri})
+
+ # access_rights
+ access_rights = self._access_rights(dataset_ref, DCT.accessRights)
+ if access_rights:
+ dataset_dict["extras"].append(
+ {"key": "access_rights", "value": access_rights}
+ )
+
+ # License
+ if "license_id" not in dataset_dict:
+ dataset_dict["license_id"] = self._license(dataset_ref)
+
+ # Source Catalog
+ if toolkit.asbool(config.get(DCAT_EXPOSE_SUBCATALOGS, False)):
+ catalog_src = self._get_source_catalog(dataset_ref)
+ if catalog_src is not None:
+ src_data = self._extract_catalog_dict(catalog_src)
+ dataset_dict["extras"].extend(src_data)
+
+ # Resources
+ for distribution in self._distributions(dataset_ref):
+
+ resource_dict = {}
+
+ # Simple values
+ for key, predicate in (
+ ("name", DCT.title),
+ ("description", DCT.description),
+ ("access_url", DCAT.accessURL),
+ ("download_url", DCAT.downloadURL),
+ ("issued", DCT.issued),
+ ("modified", DCT.modified),
+ ("status", ADMS.status),
+ ("license", DCT.license),
+ ):
+ value = self._object_value(distribution, predicate)
+ if value:
+ resource_dict[key] = value
+
+ resource_dict["url"] = self._object_value(
+ distribution, DCAT.downloadURL
+ ) or self._object_value(distribution, DCAT.accessURL)
+ # Lists
+ for key, predicate in (
+ ("language", DCT.language),
+ ("documentation", FOAF.page),
+ ("conforms_to", DCT.conformsTo),
+ ):
+ values = self._object_value_list(distribution, predicate)
+ if values:
+ resource_dict[key] = json.dumps(values)
+
+ # rights
+ rights = self._access_rights(distribution, DCT.rights)
+ if rights:
+ resource_dict["rights"] = rights
+
+ # Format and media type
+ normalize_ckan_format = toolkit.asbool(
+ config.get("ckanext.dcat.normalize_ckan_format", True)
+ )
+ imt, label = self._distribution_format(distribution, normalize_ckan_format)
+
+ if imt:
+ resource_dict["mimetype"] = imt
+
+ if label:
+ resource_dict["format"] = label
+ elif imt:
+ resource_dict["format"] = imt
+
+ # Size
+ size = self._object_value_int(distribution, DCAT.byteSize)
+ if size is not None:
+ resource_dict["size"] = size
+
+ # Checksum
+ for checksum in self.g.objects(distribution, SPDX.checksum):
+ algorithm = self._object_value(checksum, SPDX.algorithm)
+ checksum_value = self._object_value(checksum, SPDX.checksumValue)
+ if algorithm:
+ resource_dict["hash_algorithm"] = algorithm
+ if checksum_value:
+ resource_dict["hash"] = checksum_value
+
+ # Distribution URI (explicitly show the missing ones)
+ resource_dict["uri"] = (
+ str(distribution) if isinstance(distribution, term.URIRef) else ""
+ )
+
+ # Remember the (internal) distribution reference for referencing in
+ # further profiles, e.g. for adding more properties
+ resource_dict["distribution_ref"] = str(distribution)
+
+ dataset_dict["resources"].append(resource_dict)
+
+ if self.compatibility_mode:
+ # Tweak the resulting dict to make it compatible with previous
+ # versions of the ckanext-dcat parsers
+ for extra in dataset_dict["extras"]:
+ if extra["key"] in (
+ "issued",
+ "modified",
+ "publisher_name",
+ "publisher_email",
+ ):
+
+ extra["key"] = "dcat_" + extra["key"]
+
+ if extra["key"] == "language":
+ extra["value"] = ",".join(sorted(json.loads(extra["value"])))
+
+ return dataset_dict
+
+ def graph_from_dataset(self, dataset_dict, dataset_ref):
+
+ g = self.g
+
+ for prefix, namespace in namespaces.items():
+ g.bind(prefix, namespace)
+
+ g.add((dataset_ref, RDF.type, DCAT.Dataset))
+
+ # Basic fields
+ items = [
+ ("title", DCT.title, None, Literal),
+ ("notes", DCT.description, None, Literal),
+ ("url", DCAT.landingPage, None, URIRef),
+ ("identifier", DCT.identifier, ["guid", "id"], URIRefOrLiteral),
+ ("version", OWL.versionInfo, ["dcat_version"], Literal),
+ ("version_notes", ADMS.versionNotes, None, Literal),
+ ("frequency", DCT.accrualPeriodicity, None, URIRefOrLiteral),
+ ("access_rights", DCT.accessRights, None, URIRefOrLiteral),
+ ("dcat_type", DCT.type, None, Literal),
+ ("provenance", DCT.provenance, None, Literal),
+ ]
+ self._add_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ # Tags
+ for tag in dataset_dict.get("tags", []):
+ g.add((dataset_ref, DCAT.keyword, Literal(tag["name"])))
+
+ # Dates
+ items = [
+ ("issued", DCT.issued, ["metadata_created"], Literal),
+ ("modified", DCT.modified, ["metadata_modified"], Literal),
+ ]
+ self._add_date_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ # Lists
+ items = [
+ ("language", DCT.language, None, URIRefOrLiteral),
+ ("theme", DCAT.theme, None, URIRef),
+ ("conforms_to", DCT.conformsTo, None, Literal),
+ ("alternate_identifier", ADMS.identifier, None, URIRefOrLiteral),
+ ("documentation", FOAF.page, None, URIRefOrLiteral),
+ ("related_resource", DCT.relation, None, URIRefOrLiteral),
+ ("has_version", DCT.hasVersion, None, URIRefOrLiteral),
+ ("is_version_of", DCT.isVersionOf, None, URIRefOrLiteral),
+ ("source", DCT.source, None, URIRefOrLiteral),
+ ("sample", ADMS.sample, None, URIRefOrLiteral),
+ ]
+ self._add_list_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ # Contact details
+ if any(
+ [
+ self._get_dataset_value(dataset_dict, "contact_uri"),
+ self._get_dataset_value(dataset_dict, "contact_name"),
+ self._get_dataset_value(dataset_dict, "contact_email"),
+ self._get_dataset_value(dataset_dict, "maintainer"),
+ self._get_dataset_value(dataset_dict, "maintainer_email"),
+ self._get_dataset_value(dataset_dict, "author"),
+ self._get_dataset_value(dataset_dict, "author_email"),
+ ]
+ ):
+
+ contact_uri = self._get_dataset_value(dataset_dict, "contact_uri")
+ if contact_uri:
+ contact_details = CleanedURIRef(contact_uri)
+ else:
+ contact_details = BNode()
+
+ g.add((contact_details, RDF.type, VCARD.Organization))
+ g.add((dataset_ref, DCAT.contactPoint, contact_details))
+
+ self._add_triple_from_dict(
+ dataset_dict,
+ contact_details,
+ VCARD.fn,
+ "contact_name",
+ ["maintainer", "author"],
+ )
+ # Add mail address as URIRef, and ensure it has a mailto: prefix
+ self._add_triple_from_dict(
+ dataset_dict,
+ contact_details,
+ VCARD.hasEmail,
+ "contact_email",
+ ["maintainer_email", "author_email"],
+ _type=URIRef,
+ value_modifier=self._add_mailto,
+ )
+
+ # Publisher
+ if any(
+ [
+ self._get_dataset_value(dataset_dict, "publisher_uri"),
+ self._get_dataset_value(dataset_dict, "publisher_name"),
+ dataset_dict.get("organization"),
+ ]
+ ):
+
+ publisher_uri = self._get_dataset_value(dataset_dict, "publisher_uri")
+ publisher_uri_fallback = publisher_uri_organization_fallback(dataset_dict)
+ publisher_name = self._get_dataset_value(dataset_dict, "publisher_name")
+ if publisher_uri:
+ publisher_details = CleanedURIRef(publisher_uri)
+ elif not publisher_name and publisher_uri_fallback:
+ # neither URI nor name are available, use organization as fallback
+ publisher_details = CleanedURIRef(publisher_uri_fallback)
+ else:
+ # No publisher_uri
+ publisher_details = BNode()
+
+ g.add((publisher_details, RDF.type, FOAF.Organization))
+ g.add((dataset_ref, DCT.publisher, publisher_details))
+
+ # In case no name and URI are available, again fall back to organization.
+ # If no name but an URI is available, the name literal remains empty to
+ # avoid mixing organization and dataset values.
+ if (
+ not publisher_name
+ and not publisher_uri
+ and dataset_dict.get("organization")
+ ):
+ publisher_name = dataset_dict["organization"]["title"]
+
+ g.add((publisher_details, FOAF.name, Literal(publisher_name)))
+ # TODO: It would make sense to fallback these to organization
+ # fields but they are not in the default schema and the
+ # `organization` object in the dataset_dict does not include
+ # custom fields
+ items = [
+ ("publisher_email", FOAF.mbox, None, Literal),
+ ("publisher_url", FOAF.homepage, None, URIRef),
+ ("publisher_type", DCT.type, None, URIRefOrLiteral),
+ ]
+
+ self._add_triples_from_dict(dataset_dict, publisher_details, items)
+
+ # Temporal
+ start = self._get_dataset_value(dataset_dict, "temporal_start")
+ end = self._get_dataset_value(dataset_dict, "temporal_end")
+ if start or end:
+ temporal_extent = BNode()
+
+ g.add((temporal_extent, RDF.type, DCT.PeriodOfTime))
+ if start:
+ self._add_date_triple(temporal_extent, SCHEMA.startDate, start)
+ if end:
+ self._add_date_triple(temporal_extent, SCHEMA.endDate, end)
+ g.add((dataset_ref, DCT.temporal, temporal_extent))
+
+ # Spatial
+ spatial_text = self._get_dataset_value(dataset_dict, "spatial_text")
+ spatial_geom = self._get_dataset_value(dataset_dict, "spatial")
+
+ if spatial_text or spatial_geom:
+ spatial_ref = self._get_or_create_spatial_ref(dataset_dict, dataset_ref)
+
+ if spatial_text:
+ g.add((spatial_ref, SKOS.prefLabel, Literal(spatial_text)))
+
+ if spatial_geom:
+ self._add_spatial_value_to_graph(
+ spatial_ref, LOCN.geometry, spatial_geom
+ )
+
+ # Use fallback license if set in config
+ resource_license_fallback = None
+ if toolkit.asbool(config.get(DISTRIBUTION_LICENSE_FALLBACK_CONFIG, False)):
+ if "license_id" in dataset_dict and isinstance(
+ URIRefOrLiteral(dataset_dict["license_id"]), URIRef
+ ):
+ resource_license_fallback = dataset_dict["license_id"]
+ elif "license_url" in dataset_dict and isinstance(
+ URIRefOrLiteral(dataset_dict["license_url"]), URIRef
+ ):
+ resource_license_fallback = dataset_dict["license_url"]
+
+ # Resources
+ for resource_dict in dataset_dict.get("resources", []):
+
+ distribution = CleanedURIRef(resource_uri(resource_dict))
+
+ g.add((dataset_ref, DCAT.distribution, distribution))
+
+ g.add((distribution, RDF.type, DCAT.Distribution))
+
+ # Simple values
+ items = [
+ ("name", DCT.title, None, Literal),
+ ("description", DCT.description, None, Literal),
+ ("status", ADMS.status, None, URIRefOrLiteral),
+ ("rights", DCT.rights, None, URIRefOrLiteral),
+ ("license", DCT.license, None, URIRefOrLiteral),
+ ("access_url", DCAT.accessURL, None, URIRef),
+ ("download_url", DCAT.downloadURL, None, URIRef),
+ ]
+
+ self._add_triples_from_dict(resource_dict, distribution, items)
+
+ # Lists
+ items = [
+ ("documentation", FOAF.page, None, URIRefOrLiteral),
+ ("language", DCT.language, None, URIRefOrLiteral),
+ ("conforms_to", DCT.conformsTo, None, Literal),
+ ]
+ self._add_list_triples_from_dict(resource_dict, distribution, items)
+
+ # Set default license for distribution if needed and available
+ if resource_license_fallback and not (distribution, DCT.license, None) in g:
+ g.add(
+ (
+ distribution,
+ DCT.license,
+ URIRefOrLiteral(resource_license_fallback),
+ )
+ )
+
+ # Format
+ mimetype = resource_dict.get("mimetype")
+ fmt = resource_dict.get("format")
+
+ # IANA media types (either URI or Literal) should be mapped as mediaType.
+ # In case format is available and mimetype is not set or identical to format,
+ # check which type is appropriate.
+ if fmt and (not mimetype or mimetype == fmt):
+ if (
+ "iana.org/assignments/media-types" in fmt
+ or not fmt.startswith("http")
+ and "/" in fmt
+ ):
+ # output format value as dcat:mediaType instead of dct:format
+ mimetype = fmt
+ fmt = None
+ else:
+ # Use dct:format
+ mimetype = None
+
+ if mimetype:
+ g.add((distribution, DCAT.mediaType, URIRefOrLiteral(mimetype)))
+
+ if fmt:
+ g.add((distribution, DCT["format"], URIRefOrLiteral(fmt)))
+
+ # URL fallback and old behavior
+ url = resource_dict.get("url")
+ download_url = resource_dict.get("download_url")
+ access_url = resource_dict.get("access_url")
+ # Use url as fallback for access_url if access_url is not set and download_url is not equal
+ if url and not access_url:
+ if (not download_url) or (download_url and url != download_url):
+ self._add_triple_from_dict(
+ resource_dict, distribution, DCAT.accessURL, "url", _type=URIRef
+ )
+
+ # Dates
+ items = [
+ ("issued", DCT.issued, ["created"], Literal),
+ ("modified", DCT.modified, ["metadata_modified"], Literal),
+ ]
+
+ self._add_date_triples_from_dict(resource_dict, distribution, items)
+
+ # Numbers
+ if resource_dict.get("size"):
+ try:
+ g.add(
+ (
+ distribution,
+ DCAT.byteSize,
+ Literal(float(resource_dict["size"]), datatype=XSD.decimal),
+ )
+ )
+ except (ValueError, TypeError):
+ g.add((distribution, DCAT.byteSize, Literal(resource_dict["size"])))
+ # Checksum
+ if resource_dict.get("hash"):
+ checksum = BNode()
+ g.add((checksum, RDF.type, SPDX.Checksum))
+ g.add(
+ (
+ checksum,
+ SPDX.checksumValue,
+ Literal(resource_dict["hash"], datatype=XSD.hexBinary),
+ )
+ )
+
+ if resource_dict.get("hash_algorithm"):
+ g.add(
+ (
+ checksum,
+ SPDX.algorithm,
+ URIRefOrLiteral(resource_dict["hash_algorithm"]),
+ )
+ )
+
+ g.add((distribution, SPDX.checksum, checksum))
+
+ def graph_from_catalog(self, catalog_dict, catalog_ref):
+
+ g = self.g
+
+ for prefix, namespace in namespaces.items():
+ g.bind(prefix, namespace)
+
+ g.add((catalog_ref, RDF.type, DCAT.Catalog))
+
+ # Basic fields
+ items = [
+ ("title", DCT.title, config.get("ckan.site_title"), Literal),
+ (
+ "description",
+ DCT.description,
+ config.get("ckan.site_description"),
+ Literal,
+ ),
+ ("homepage", FOAF.homepage, config.get("ckan.site_url"), URIRef),
+ (
+ "language",
+ DCT.language,
+ config.get("ckan.locale_default", "en"),
+ URIRefOrLiteral,
+ ),
+ ]
+ for item in items:
+ key, predicate, fallback, _type = item
+ if catalog_dict:
+ value = catalog_dict.get(key, fallback)
+ else:
+ value = fallback
+ if value:
+ g.add((catalog_ref, predicate, _type(value)))
+
+ # Dates
+ modified = self._last_catalog_modification()
+ if modified:
+ self._add_date_triple(catalog_ref, DCT.modified, modified)
diff --git a/ckanext/dcat/profiles/euro_dcat_ap_2.py b/ckanext/dcat/profiles/euro_dcat_ap_2.py
new file mode 100644
index 00000000..6f40e3ab
--- /dev/null
+++ b/ckanext/dcat/profiles/euro_dcat_ap_2.py
@@ -0,0 +1,316 @@
+import json
+
+from rdflib import URIRef, BNode, Literal
+from ckanext.dcat.utils import resource_uri
+
+from .base import URIRefOrLiteral, CleanedURIRef
+from .base import (
+ RDF,
+ SKOS,
+ DCAT,
+ DCATAP,
+ DCT,
+ XSD,
+)
+
+from .euro_dcat_ap import EuropeanDCATAPProfile
+
+
+class EuropeanDCATAP2Profile(EuropeanDCATAPProfile):
+ """
+ An RDF profile based on the DCAT-AP 2 for data portals in Europe
+
+ More information and specification:
+
+ https://joinup.ec.europa.eu/asset/dcat_application_profile
+
+ """
+
+ def parse_dataset(self, dataset_dict, dataset_ref):
+
+ # call super method
+ super(EuropeanDCATAP2Profile, self).parse_dataset(dataset_dict, dataset_ref)
+
+ # Lists
+ for key, predicate in (
+ ("temporal_resolution", DCAT.temporalResolution),
+ ("is_referenced_by", DCT.isReferencedBy),
+ ("applicable_legislation", DCATAP.applicableLegislation),
+ ("hvd_category", DCATAP.hvdCategory),
+ ):
+ values = self._object_value_list(dataset_ref, predicate)
+ if values:
+ dataset_dict["extras"].append({"key": key, "value": json.dumps(values)})
+ # Temporal
+ start, end = self._time_interval(dataset_ref, DCT.temporal, dcat_ap_version=2)
+ if start:
+ self._insert_or_update_temporal(dataset_dict, "temporal_start", start)
+ if end:
+ self._insert_or_update_temporal(dataset_dict, "temporal_end", end)
+
+ # Spatial
+ spatial = self._spatial(dataset_ref, DCT.spatial)
+ for key in ("bbox", "centroid"):
+ self._add_spatial_to_dict(dataset_dict, key, spatial)
+
+ # Spatial resolution in meters
+ spatial_resolution_in_meters = self._object_value_int_list(
+ dataset_ref, DCAT.spatialResolutionInMeters
+ )
+ if spatial_resolution_in_meters:
+ dataset_dict["extras"].append(
+ {
+ "key": "spatial_resolution_in_meters",
+ "value": json.dumps(spatial_resolution_in_meters),
+ }
+ )
+
+ # Resources
+ for distribution in self._distributions(dataset_ref):
+ distribution_ref = str(distribution)
+ for resource_dict in dataset_dict.get("resources", []):
+ # Match distribution in graph and distribution in resource dict
+ if resource_dict and distribution_ref == resource_dict.get(
+ "distribution_ref"
+ ):
+ # Simple values
+ for key, predicate in (
+ ("availability", DCATAP.availability),
+ ("compress_format", DCAT.compressFormat),
+ ("package_format", DCAT.packageFormat),
+ ):
+ value = self._object_value(distribution, predicate)
+ if value:
+ resource_dict[key] = value
+
+ # Lists
+ for key, predicate in (
+ ("applicable_legislation", DCATAP.applicableLegislation),
+ ):
+ values = self._object_value_list(distribution, predicate)
+ if values:
+ resource_dict[key] = json.dumps(values)
+
+ # Access services
+ access_service_list = []
+
+ for access_service in self.g.objects(
+ distribution, DCAT.accessService
+ ):
+ access_service_dict = {}
+
+ # Simple values
+ for key, predicate in (
+ ("availability", DCATAP.availability),
+ ("title", DCT.title),
+ ("endpoint_description", DCAT.endpointDescription),
+ ("license", DCT.license),
+ ("access_rights", DCT.accessRights),
+ ("description", DCT.description),
+ ):
+ value = self._object_value(access_service, predicate)
+ if value:
+ access_service_dict[key] = value
+ # List
+ for key, predicate in (
+ ("endpoint_url", DCAT.endpointURL),
+ ("serves_dataset", DCAT.servesDataset),
+ ):
+ values = self._object_value_list(
+ access_service, predicate
+ )
+ if values:
+ access_service_dict[key] = values
+
+ # Access service URI (explicitly show the missing ones)
+ access_service_dict["uri"] = (
+ str(access_service)
+ if isinstance(access_service, URIRef)
+ else ""
+ )
+
+ # Remember the (internal) access service reference for referencing in
+ # further profiles, e.g. for adding more properties
+ access_service_dict["access_service_ref"] = str(
+ access_service
+ )
+
+ access_service_list.append(access_service_dict)
+
+ if access_service_list:
+ resource_dict["access_services"] = json.dumps(
+ access_service_list
+ )
+
+ return dataset_dict
+
+ def graph_from_dataset(self, dataset_dict, dataset_ref):
+
+ # call super method
+ super(EuropeanDCATAP2Profile, self).graph_from_dataset(
+ dataset_dict, dataset_ref
+ )
+
+ # Lists
+ for key, predicate, fallbacks, type, datatype in (
+ (
+ "temporal_resolution",
+ DCAT.temporalResolution,
+ None,
+ Literal,
+ XSD.duration,
+ ),
+ ("is_referenced_by", DCT.isReferencedBy, None, URIRefOrLiteral, None),
+ (
+ "applicable_legislation",
+ DCATAP.applicableLegislation,
+ None,
+ URIRefOrLiteral,
+ None,
+ ),
+ ("hvd_category", DCATAP.hvdCategory, None, URIRefOrLiteral, None),
+ ):
+ self._add_triple_from_dict(
+ dataset_dict,
+ dataset_ref,
+ predicate,
+ key,
+ list_value=True,
+ fallbacks=fallbacks,
+ _type=type,
+ _datatype=datatype,
+ )
+
+ # Temporal
+ start = self._get_dataset_value(dataset_dict, "temporal_start")
+ end = self._get_dataset_value(dataset_dict, "temporal_end")
+ if start or end:
+ temporal_extent_dcat = BNode()
+
+ self.g.add((temporal_extent_dcat, RDF.type, DCT.PeriodOfTime))
+ if start:
+ self._add_date_triple(temporal_extent_dcat, DCAT.startDate, start)
+ if end:
+ self._add_date_triple(temporal_extent_dcat, DCAT.endDate, end)
+ self.g.add((dataset_ref, DCT.temporal, temporal_extent_dcat))
+
+ # spatial
+ spatial_bbox = self._get_dataset_value(dataset_dict, "spatial_bbox")
+ spatial_cent = self._get_dataset_value(dataset_dict, "spatial_centroid")
+
+ if spatial_bbox or spatial_cent:
+ spatial_ref = self._get_or_create_spatial_ref(dataset_dict, dataset_ref)
+
+ if spatial_bbox:
+ self._add_spatial_value_to_graph(spatial_ref, DCAT.bbox, spatial_bbox)
+
+ if spatial_cent:
+ self._add_spatial_value_to_graph(
+ spatial_ref, DCAT.centroid, spatial_cent
+ )
+
+ # Spatial resolution in meters
+ spatial_resolution_in_meters = self._read_list_value(
+ self._get_dataset_value(dataset_dict, "spatial_resolution_in_meters")
+ )
+ if spatial_resolution_in_meters:
+ for value in spatial_resolution_in_meters:
+ try:
+ self.g.add(
+ (
+ dataset_ref,
+ DCAT.spatialResolutionInMeters,
+ Literal(float(value), datatype=XSD.decimal),
+ )
+ )
+ except (ValueError, TypeError):
+ self.g.add(
+ (dataset_ref, DCAT.spatialResolutionInMeters, Literal(value))
+ )
+
+ # Resources
+ for resource_dict in dataset_dict.get("resources", []):
+
+ distribution = CleanedURIRef(resource_uri(resource_dict))
+
+ # Simple values
+ items = [
+ ("availability", DCATAP.availability, None, URIRefOrLiteral),
+ ("compress_format", DCAT.compressFormat, None, URIRefOrLiteral),
+ ("package_format", DCAT.packageFormat, None, URIRefOrLiteral),
+ ]
+
+ self._add_triples_from_dict(resource_dict, distribution, items)
+
+ # Lists
+ items = [
+ (
+ "applicable_legislation",
+ DCATAP.applicableLegislation,
+ None,
+ URIRefOrLiteral,
+ ),
+ ]
+ self._add_list_triples_from_dict(resource_dict, distribution, items)
+
+ try:
+ access_service_list = json.loads(
+ resource_dict.get("access_services", "[]")
+ )
+ # Access service
+ for access_service_dict in access_service_list:
+
+ access_service_uri = access_service_dict.get("uri")
+ if access_service_uri:
+ access_service_node = CleanedURIRef(access_service_uri)
+ else:
+ access_service_node = BNode()
+ # Remember the (internal) access service reference for referencing in
+ # further profiles
+ access_service_dict["access_service_ref"] = str(
+ access_service_node
+ )
+
+ self.g.add((distribution, DCAT.accessService, access_service_node))
+
+ self.g.add((access_service_node, RDF.type, DCAT.DataService))
+
+ # Simple values
+ items = [
+ ("availability", DCATAP.availability, None, URIRefOrLiteral),
+ ("license", DCT.license, None, URIRefOrLiteral),
+ ("access_rights", DCT.accessRights, None, URIRefOrLiteral),
+ ("title", DCT.title, None, Literal),
+ (
+ "endpoint_description",
+ DCAT.endpointDescription,
+ None,
+ Literal,
+ ),
+ ("description", DCT.description, None, Literal),
+ ]
+
+ self._add_triples_from_dict(
+ access_service_dict, access_service_node, items
+ )
+
+ # Lists
+ items = [
+ ("endpoint_url", DCAT.endpointURL, None, URIRefOrLiteral),
+ ("serves_dataset", DCAT.servesDataset, None, URIRefOrLiteral),
+ ]
+ self._add_list_triples_from_dict(
+ access_service_dict, access_service_node, items
+ )
+
+ if access_service_list:
+ resource_dict["access_services"] = json.dumps(access_service_list)
+ except ValueError:
+ pass
+
+ def graph_from_catalog(self, catalog_dict, catalog_ref):
+
+ # call super method
+ super(EuropeanDCATAP2Profile, self).graph_from_catalog(
+ catalog_dict, catalog_ref
+ )
diff --git a/ckanext/dcat/profiles/schemaorg.py b/ckanext/dcat/profiles/schemaorg.py
new file mode 100644
index 00000000..3b3ec3b0
--- /dev/null
+++ b/ckanext/dcat/profiles/schemaorg.py
@@ -0,0 +1,339 @@
+import datetime
+
+from dateutil.parser import parse as parse_date
+from rdflib import URIRef, BNode, Literal
+from ckantoolkit import url_for, config
+
+from ckanext.dcat.utils import resource_uri, publisher_uri_organization_fallback
+from .base import RDFProfile, CleanedURIRef
+from .base import (
+ RDF,
+ SCHEMA,
+)
+
+
+class SchemaOrgProfile(RDFProfile):
+ """
+ An RDF profile based on the schema.org Dataset
+
+ More information and specification:
+
+ http://schema.org/Dataset
+
+ Mapping between schema.org Dataset and DCAT:
+
+ https://www.w3.org/wiki/WebSchemas/Datasets
+ """
+
+ def graph_from_dataset(self, dataset_dict, dataset_ref):
+
+ g = self.g
+
+ # Namespaces
+ self._bind_namespaces()
+
+ g.add((dataset_ref, RDF.type, SCHEMA.Dataset))
+
+ # Basic fields
+ self._basic_fields_graph(dataset_ref, dataset_dict)
+
+ # Catalog
+ self._catalog_graph(dataset_ref, dataset_dict)
+
+ # Groups
+ self._groups_graph(dataset_ref, dataset_dict)
+
+ # Tags
+ self._tags_graph(dataset_ref, dataset_dict)
+
+ # Lists
+ self._list_fields_graph(dataset_ref, dataset_dict)
+
+ # Publisher
+ self._publisher_graph(dataset_ref, dataset_dict)
+
+ # Temporal
+ self._temporal_graph(dataset_ref, dataset_dict)
+
+ # Spatial
+ self._spatial_graph(dataset_ref, dataset_dict)
+
+ # Resources
+ self._resources_graph(dataset_ref, dataset_dict)
+
+ # Additional fields
+ self.additional_fields(dataset_ref, dataset_dict)
+
+ def additional_fields(self, dataset_ref, dataset_dict):
+ """
+ Adds any additional fields.
+
+ For a custom schema you should extend this class and
+ implement this method.
+ """
+ pass
+
+ def _add_date_triple(self, subject, predicate, value, _type=Literal):
+ """
+ Adds a new triple with a date object
+
+ Dates are parsed using dateutil, and if the date obtained is correct,
+ added to the graph as an SCHEMA.DateTime value.
+
+ If there are parsing errors, the literal string value is added.
+ """
+ if not value:
+ return
+ try:
+ default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
+ _date = parse_date(value, default=default_datetime)
+
+ self.g.add((subject, predicate, _type(_date.isoformat())))
+ except ValueError:
+ self.g.add((subject, predicate, _type(value)))
+
+ def _bind_namespaces(self):
+ self.g.namespace_manager.bind("schema", SCHEMA, replace=True)
+
+ def _basic_fields_graph(self, dataset_ref, dataset_dict):
+ items = [
+ ("identifier", SCHEMA.identifier, None, Literal),
+ ("title", SCHEMA.name, None, Literal),
+ ("notes", SCHEMA.description, None, Literal),
+ ("version", SCHEMA.version, ["dcat_version"], Literal),
+ ("issued", SCHEMA.datePublished, ["metadata_created"], Literal),
+ ("modified", SCHEMA.dateModified, ["metadata_modified"], Literal),
+ ("license", SCHEMA.license, ["license_url", "license_title"], Literal),
+ ]
+ self._add_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ items = [
+ ("issued", SCHEMA.datePublished, ["metadata_created"], Literal),
+ ("modified", SCHEMA.dateModified, ["metadata_modified"], Literal),
+ ]
+
+ self._add_date_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ # Dataset URL
+ dataset_url = url_for("dataset.read", id=dataset_dict["name"], _external=True)
+ self.g.add((dataset_ref, SCHEMA.url, Literal(dataset_url)))
+
+ def _catalog_graph(self, dataset_ref, dataset_dict):
+ data_catalog = BNode()
+ self.g.add((dataset_ref, SCHEMA.includedInDataCatalog, data_catalog))
+ self.g.add((data_catalog, RDF.type, SCHEMA.DataCatalog))
+ self.g.add((data_catalog, SCHEMA.name, Literal(config.get("ckan.site_title"))))
+ self.g.add(
+ (
+ data_catalog,
+ SCHEMA.description,
+ Literal(config.get("ckan.site_description")),
+ )
+ )
+ self.g.add((data_catalog, SCHEMA.url, Literal(config.get("ckan.site_url"))))
+
+ def _groups_graph(self, dataset_ref, dataset_dict):
+ for group in dataset_dict.get("groups", []):
+ group_url = url_for(
+ controller="group", action="read", id=group.get("id"), _external=True
+ )
+ about = BNode()
+
+ self.g.add((about, RDF.type, SCHEMA.Thing))
+
+ self.g.add((about, SCHEMA.name, Literal(group["name"])))
+ self.g.add((about, SCHEMA.url, Literal(group_url)))
+
+ self.g.add((dataset_ref, SCHEMA.about, about))
+
+ def _tags_graph(self, dataset_ref, dataset_dict):
+ for tag in dataset_dict.get("tags", []):
+ self.g.add((dataset_ref, SCHEMA.keywords, Literal(tag["name"])))
+
+ def _list_fields_graph(self, dataset_ref, dataset_dict):
+ items = [
+ ("language", SCHEMA.inLanguage, None, Literal),
+ ]
+ self._add_list_triples_from_dict(dataset_dict, dataset_ref, items)
+
+ def _publisher_graph(self, dataset_ref, dataset_dict):
+ if any(
+ [
+ self._get_dataset_value(dataset_dict, "publisher_uri"),
+ self._get_dataset_value(dataset_dict, "publisher_name"),
+ dataset_dict.get("organization"),
+ ]
+ ):
+
+ publisher_uri = self._get_dataset_value(dataset_dict, "publisher_uri")
+ publisher_uri_fallback = publisher_uri_organization_fallback(dataset_dict)
+ publisher_name = self._get_dataset_value(dataset_dict, "publisher_name")
+ if publisher_uri:
+ publisher_details = CleanedURIRef(publisher_uri)
+ elif not publisher_name and publisher_uri_fallback:
+ # neither URI nor name are available, use organization as fallback
+ publisher_details = CleanedURIRef(publisher_uri_fallback)
+ else:
+ # No publisher_uri
+ publisher_details = BNode()
+
+ self.g.add((publisher_details, RDF.type, SCHEMA.Organization))
+ self.g.add((dataset_ref, SCHEMA.publisher, publisher_details))
+
+ # In case no name and URI are available, again fall back to organization.
+ # If no name but an URI is available, the name literal remains empty to
+ # avoid mixing organization and dataset values.
+ if (
+ not publisher_name
+ and not publisher_uri
+ and dataset_dict.get("organization")
+ ):
+ publisher_name = dataset_dict["organization"]["title"]
+ self.g.add((publisher_details, SCHEMA.name, Literal(publisher_name)))
+
+ contact_point = BNode()
+ self.g.add((contact_point, RDF.type, SCHEMA.ContactPoint))
+ self.g.add((publisher_details, SCHEMA.contactPoint, contact_point))
+
+ self.g.add((contact_point, SCHEMA.contactType, Literal("customer service")))
+
+ publisher_url = self._get_dataset_value(dataset_dict, "publisher_url")
+ if not publisher_url and dataset_dict.get("organization"):
+ publisher_url = dataset_dict["organization"].get("url") or config.get(
+ "ckan.site_url"
+ )
+
+ self.g.add((contact_point, SCHEMA.url, Literal(publisher_url)))
+ items = [
+ (
+ "publisher_email",
+ SCHEMA.email,
+ ["contact_email", "maintainer_email", "author_email"],
+ Literal,
+ ),
+ (
+ "publisher_name",
+ SCHEMA.name,
+ ["contact_name", "maintainer", "author"],
+ Literal,
+ ),
+ ]
+
+ self._add_triples_from_dict(dataset_dict, contact_point, items)
+
+ def _temporal_graph(self, dataset_ref, dataset_dict):
+ start = self._get_dataset_value(dataset_dict, "temporal_start")
+ end = self._get_dataset_value(dataset_dict, "temporal_end")
+ if start or end:
+ if start and end:
+ self.g.add(
+ (
+ dataset_ref,
+ SCHEMA.temporalCoverage,
+ Literal("%s/%s" % (start, end)),
+ )
+ )
+ elif start:
+ self._add_date_triple(dataset_ref, SCHEMA.temporalCoverage, start)
+ elif end:
+ self._add_date_triple(dataset_ref, SCHEMA.temporalCoverage, end)
+
+ def _spatial_graph(self, dataset_ref, dataset_dict):
+ spatial_uri = self._get_dataset_value(dataset_dict, "spatial_uri")
+ spatial_text = self._get_dataset_value(dataset_dict, "spatial_text")
+ spatial_geom = self._get_dataset_value(dataset_dict, "spatial")
+
+ if spatial_uri or spatial_text or spatial_geom:
+ if spatial_uri:
+ spatial_ref = URIRef(spatial_uri)
+ else:
+ spatial_ref = BNode()
+
+ self.g.add((spatial_ref, RDF.type, SCHEMA.Place))
+ self.g.add((dataset_ref, SCHEMA.spatialCoverage, spatial_ref))
+
+ if spatial_text:
+ self.g.add((spatial_ref, SCHEMA.description, Literal(spatial_text)))
+
+ if spatial_geom:
+ geo_shape = BNode()
+ self.g.add((geo_shape, RDF.type, SCHEMA.GeoShape))
+ self.g.add((spatial_ref, SCHEMA.geo, geo_shape))
+
+ # the spatial_geom typically contains GeoJSON
+ self.g.add((geo_shape, SCHEMA.polygon, Literal(spatial_geom)))
+
+ def _resources_graph(self, dataset_ref, dataset_dict):
+ g = self.g
+ for resource_dict in dataset_dict.get("resources", []):
+ distribution = URIRef(resource_uri(resource_dict))
+ g.add((dataset_ref, SCHEMA.distribution, distribution))
+ g.add((distribution, RDF.type, SCHEMA.DataDownload))
+
+ self._distribution_graph(distribution, resource_dict)
+
+ def _distribution_graph(self, distribution, resource_dict):
+ # Simple values
+ self._distribution_basic_fields_graph(distribution, resource_dict)
+
+ # Lists
+ self._distribution_list_fields_graph(distribution, resource_dict)
+
+ # Format
+ self._distribution_format_graph(distribution, resource_dict)
+
+ # URL
+ self._distribution_url_graph(distribution, resource_dict)
+
+ # Numbers
+ self._distribution_numbers_graph(distribution, resource_dict)
+
+ def _distribution_basic_fields_graph(self, distribution, resource_dict):
+ items = [
+ ("name", SCHEMA.name, None, Literal),
+ ("description", SCHEMA.description, None, Literal),
+ ("license", SCHEMA.license, ["rights"], Literal),
+ ]
+
+ self._add_triples_from_dict(resource_dict, distribution, items)
+
+ items = [
+ ("issued", SCHEMA.datePublished, None, Literal),
+ ("modified", SCHEMA.dateModified, None, Literal),
+ ]
+
+ self._add_date_triples_from_dict(resource_dict, distribution, items)
+
+ def _distribution_list_fields_graph(self, distribution, resource_dict):
+ items = [
+ ("language", SCHEMA.inLanguage, None, Literal),
+ ]
+ self._add_list_triples_from_dict(resource_dict, distribution, items)
+
+ def _distribution_format_graph(self, distribution, resource_dict):
+ if resource_dict.get("format"):
+ self.g.add(
+ (distribution, SCHEMA.encodingFormat, Literal(resource_dict["format"]))
+ )
+ elif resource_dict.get("mimetype"):
+ self.g.add(
+ (
+ distribution,
+ SCHEMA.encodingFormat,
+ Literal(resource_dict["mimetype"]),
+ )
+ )
+
+ def _distribution_url_graph(self, distribution, resource_dict):
+ url = resource_dict.get("url")
+ download_url = resource_dict.get("download_url")
+ if download_url:
+ self.g.add((distribution, SCHEMA.contentUrl, Literal(download_url)))
+ if (url and not download_url) or (url and url != download_url):
+ self.g.add((distribution, SCHEMA.url, Literal(url)))
+
+ def _distribution_numbers_graph(self, distribution, resource_dict):
+ if resource_dict.get("size"):
+ self.g.add(
+ (distribution, SCHEMA.contentSize, Literal(resource_dict["size"]))
+ )
diff --git a/ckanext/dcat/tests/test_euro_dcatap_profile_serialize.py b/ckanext/dcat/tests/test_euro_dcatap_profile_serialize.py
index a389acfd..dd43ef62 100644
--- a/ckanext/dcat/tests/test_euro_dcatap_profile_serialize.py
+++ b/ckanext/dcat/tests/test_euro_dcatap_profile_serialize.py
@@ -16,9 +16,11 @@
from ckanext.dcat import utils
from ckanext.dcat.processors import RDFSerializer, HYDRA
-from ckanext.dcat.profiles import (DCAT, DCT, ADMS, XSD, VCARD, FOAF, SCHEMA,
- SKOS, LOCN, GSP, OWL, SPDX, GEOJSON_IMT,
- DISTRIBUTION_LICENSE_FALLBACK_CONFIG)
+from ckanext.dcat.profiles import (
+ DCAT, DCT, ADMS, XSD, VCARD, FOAF, SCHEMA,
+ SKOS, LOCN, GSP, OWL, SPDX, GEOJSON_IMT,
+)
+from ckanext.dcat.profiles.euro_dcat_ap import DISTRIBUTION_LICENSE_FALLBACK_CONFIG
from ckanext.dcat.utils import DCAT_EXPOSE_SUBCATALOGS
from ckanext.dcat.tests.utils import BaseSerializeTest