Skip to content

Commit

Permalink
Clean up harvest source clear command
Browse files Browse the repository at this point in the history
Revision tables are no longer present in CKAN 2.11 (see
ckan/ckan#8390)

Removed handling of really ancient and unsupported CKAN versions

Tried to make the statements more readable and compact.
  • Loading branch information
amercader committed Oct 31, 2024
1 parent bf849f1 commit 9883b1a
Showing 1 changed file with 94 additions and 120 deletions.
214 changes: 94 additions & 120 deletions ckanext/harvest/logic/action/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,146 +92,114 @@ def harvest_source_update(context, data_dict):


def harvest_source_clear(context, data_dict):
'''
"""
Clears all datasets, jobs and objects related to a harvest source, but
keeps the source itself. This is useful to clean history of long running
harvest sources to start again fresh.
:param id: the id of the harvest source to clear
:type id: string
'''
"""

check_access('harvest_source_clear', context, data_dict)
check_access("harvest_source_clear", context, data_dict)

harvest_source_id = data_dict.get('id')
harvest_source_id = data_dict.get("id")

source = HarvestSource.get(harvest_source_id)
if not source:
log.error('Harvest source %s does not exist', harvest_source_id)
raise NotFound('Harvest source %s does not exist' % harvest_source_id)
log.error("Harvest source %s does not exist", harvest_source_id)
raise NotFound("Harvest source %s does not exist" % harvest_source_id)

harvest_source_id = source.id

# Clear all datasets from this source from the index
harvest_source_index_clear(context, data_dict)

model = context['model']

# CKAN-2.6 or above: related don't exist any more
if toolkit.check_ckan_version(max_version='2.5.99'):

sql = '''select id from related where id in (
select related_id from related_dataset where dataset_id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}'));'''.format(
harvest_source_id=harvest_source_id)
result = model.Session.execute(sql)
ids = []
for row in result:
ids.append(row[0])
related_ids = "('" + "','".join(ids) + "')"

sql = '''begin;
update package set state = 'to_delete' where id in (
select package_id from harvest_object
where harvest_source_id = '{harvest_source_id}');'''.format(
harvest_source_id=harvest_source_id)

# CKAN-2.3 or above: delete resource views, resource revisions & resources
if toolkit.check_ckan_version(min_version='2.3'):
sql += '''
delete from resource_view where resource_id in (
select id from resource where package_id in (
select id from package where state = 'to_delete'));
delete from resource_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource where package_id in (
select id from package where state = 'to_delete');
'''
# Backwards-compatibility: support ResourceGroup (pre-CKAN-2.3)
else:
sql += '''
delete from resource_revision where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource where resource_group_id in (
select id from resource_group where package_id in (
select id from package where state = 'to_delete'));
delete from resource_group_revision where package_id in (
select id from package where state = 'to_delete');
delete from resource_group where package_id in (
select id from package where state = 'to_delete');
'''
# CKAN pre-2.5: authz models were removed in migration 078
if toolkit.check_ckan_version(max_version='2.4.99'):
sql += '''
delete from package_role where package_id in (
select id from package where state = 'to_delete');
delete from user_object_role where id not in (
select user_object_role_id from package_role)
and context = 'Package';
'''

sql += '''
delete from harvest_object_error where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object_extra where harvest_object_id in (
select id from harvest_object
where harvest_source_id = '{harvest_source_id}');
delete from harvest_object where harvest_source_id = '{harvest_source_id}';
delete from harvest_gather_error where harvest_job_id in (
select id from harvest_job where source_id = '{harvest_source_id}');
delete from harvest_job where source_id = '{harvest_source_id}';
delete from package_tag_revision where package_id in (
select id from package where state = 'to_delete');
delete from member_revision where table_id in (
select id from package where state = 'to_delete');
delete from package_extra_revision where package_id in (
select id from package where state = 'to_delete');
delete from package_revision where id in (
select id from package where state = 'to_delete');
delete from package_tag where package_id in (
select id from package where state = 'to_delete');
delete from package_extra where package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship_revision where object_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where subject_package_id in (
select id from package where state = 'to_delete');
delete from package_relationship where object_package_id in (
select id from package where state = 'to_delete');
delete from member where table_id in (
select id from package where state = 'to_delete');
'''.format(
harvest_source_id=harvest_source_id)

if toolkit.check_ckan_version(max_version='2.5.99'):
sql += '''
delete from related_dataset where dataset_id in (
select id from package where state = 'to_delete');
delete from related where id in {related_ids};
delete from package where id in (
select id from package where state = 'to_delete');
'''.format(related_ids=related_ids)
else:
# CKAN-2.6 or above: related don't exist any more
sql += '''
delete from package where id in (
select id from package where state = 'to_delete');
'''

sql += '''
commit;
'''
model = context["model"]

sql = "BEGIN;"

sql += f"""
UPDATE package set state = 'to_delete' WHERE id IN (
SELECT package_id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');"""

sql += """
CREATE TEMP TABLE package_ids_to_delete AS (
SELECT id FROM package WHERE state = 'to_delete');
"""

sql += f"""
DELETE FROM harvest_object_error WHERE harvest_object_id IN (
SELECT id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object_extra WHERE harvest_object_id IN (
SELECT id FROM harvest_object
WHERE harvest_source_id = '{harvest_source_id}');
DELETE FROM harvest_object WHERE harvest_source_id = '{harvest_source_id}';
DELETE FROM harvest_gather_error WHERE harvest_job_id IN (
SELECT id FROM harvest_job WHERE source_id = '{harvest_source_id}');
DELETE FROM harvest_job WHERE source_id = '{harvest_source_id}';
"""

if toolkit.check_ckan_version(max_version="2.10.99"):
# Revision tables were dropped IN CKAN 2.10

sql += """
DELETE FROM package_tag_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete');
DELETE FROM member_revision WHERE table_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_extra_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_revision WHERE id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship_revision WHERE subject_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship_revision WHERE object_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM resource_revision WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

if toolkit.check_ckan_version(max_version="2.11.99"):
# Package_extra table were dropped IN CKAN 2.12
sql += """
DELETE FROM package_extra WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
DELETE FROM package_tag WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship WHERE subject_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM package_relationship WHERE object_package_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM member WHERE table_id IN (
SELECT id FROM package_ids_to_delete);
DELETE FROM resource_view WHERE resource_id IN (
SELECT id FROM resource WHERE package_id IN (
SELECT id FROM package_ids_to_delete));
DELETE FROM resource WHERE package_id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
DELETE FROM package WHERE id IN (
SELECT id FROM package_ids_to_delete);
"""

sql += """
COMMIT;
"""
model.Session.execute(sql)

# Refresh the index for this source to update the status object
get_action('harvest_source_reindex')(context, {'id': harvest_source_id})
get_action("harvest_source_reindex")(context, {"id": harvest_source_id})

return {'id': harvest_source_id}
return {"id": harvest_source_id}


def harvest_abort_failed_jobs(context, data_dict):
Expand Down Expand Up @@ -974,7 +942,10 @@ def harvest_source_reindex(context, data_dict):

if 'extras_as_string' in context:
del context['extras_as_string']
context.update({'ignore_auth': True})
context.update({
'ignore_auth': True,
'validate': False,
})
package_dict = logic.get_action('harvest_source_show')(
context, {'id': harvest_source_id})
log.debug('Updating search index for harvest source: %s',
Expand All @@ -991,6 +962,9 @@ def harvest_source_reindex(context, data_dict):
if key not in config:
new_dict[key] = value

# TODO: workaroud for extras exception caused by convert_from_extras
new_dict.pop("extras", None)

package_index = PackageSearchIndex()
package_index.index_package(new_dict, defer_commit=defer_commit)

Expand Down

0 comments on commit 9883b1a

Please sign in to comment.