Skip to content

Commit

Permalink
Augment pgcleanup to allow periodically deleting old datasets.
Browse files Browse the repository at this point in the history
... and restricting operations to specific object store ids.
  • Loading branch information
jmchilton committed Aug 17, 2023
1 parent d36282f commit dad2783
Show file tree
Hide file tree
Showing 4 changed files with 364 additions and 58 deletions.
22 changes: 22 additions & 0 deletions lib/galaxy_test/base/populators.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,28 @@ def run_collection_creates_list(self, history_id: str, hdca_id: str) -> Response
self.wait_for_history(history_id, assert_ok=True)
return self.run_tool("collection_creates_list", inputs, history_id)

def new_error_dataset(self, history_id: str) -> str:
payload = self.run_tool_payload(
tool_id="test_data_source",
inputs={
"URL": f"file://{os.path.join(os.getcwd(), 'README.rst')}",
"URL_method": "get",
"data_type": "bed",
},
history_id=history_id,
)
create_response = self._post("tools", data=payload)
api_asserts.assert_status_code_is(create_response, 200)
create_object = create_response.json()
api_asserts.assert_has_keys(create_object, "outputs")
assert len(create_object["outputs"]) == 1
output = create_object["outputs"][0]
self.wait_for_history(history_id, assert_ok=False)
# wait=False to allow errors
output_details = self.get_history_dataset_details(history_id, dataset=output, wait=False)
assert output_details["state"] == "error", output_details
return output_details["id"]

def run_exit_code_from_file(self, history_id: str, hdca_id: str) -> dict:
exit_code_inputs = {
"input": {"batch": True, "values": [{"src": "hdca", "id": hdca_id}]},
Expand Down
76 changes: 70 additions & 6 deletions scripts/cleanup_datasets/pgcleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ def __init__(self, app):
self._debug = app.args.debug
self._update_time = app.args.update_time
self._force_retry = app.args.force_retry
if app.args.object_store_id:
self._object_store_id_sql = f" AND dataset.object_store_id = '{app.args.object_store_id}'"
else:
self._object_store_id_sql = ""
self._epoch_time = str(int(time.time()))
self._days = app.args.days
self._config = app.config
Expand Down Expand Up @@ -200,6 +204,7 @@ def sql(self):
update_time_sql=self._update_time_sql,
force_retry_sql=self._force_retry_sql,
epoch_time=self._epoch_time,
object_store_id_sql=self._object_store_id_sql,
)

@property
Expand Down Expand Up @@ -359,6 +364,7 @@ def sql(self):
update_time_sql=self._update_time_sql,
force_retry_sql=self._force_retry_sql,
epoch_time=self._epoch_time,
object_store_id_sql=self._object_store_id_sql,
)


Expand Down Expand Up @@ -844,17 +850,68 @@ class PurgeDeletedHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalc
)


class PurgeOldHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalculation, Action):
"""
- Mark purged all HistoryDatasetAssociations that are older than the specified number of days.
- Mark deleted all MetadataFiles whose hda_id is purged in this step.
- Mark deleted all ImplicitlyConvertedDatasetAssociations whose hda_parent_id is purged in this
step.
- Mark purged all HistoryDatasetAssociations for which an ImplicitlyConvertedDatasetAssociation
with matching hda_id is deleted in this step.
"""

force_retry_sql = " AND NOT history_dataset_association.purged"
_action_sql = """
WITH purged_hda_ids
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
FROM dataset
WHERE history_dataset_association.dataset_id = dataset.id AND
dataset.create_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
{force_retry_sql} {object_store_id_sql}
RETURNING history_dataset_association.id,
history_id),
hda_events
AS (INSERT INTO cleanup_event_hda_association
(create_time, cleanup_event_id, hda_id)
SELECT NOW() AT TIME ZONE 'utc', %(event_id)s, id
FROM purged_hda_ids),
{purge_hda_dependencies_sql}
SELECT purged_hda_ids.id AS purged_hda_id,
history.user_id AS recalculate_disk_usage_user_id,
deleted_metadata_file_ids.id AS deleted_metadata_file_id,
deleted_metadata_file_ids.uuid AS deleted_metadata_file_uuid,
deleted_metadata_file_ids.object_store_id AS object_store_id,
deleted_icda_ids.id AS deleted_icda_id,
deleted_icda_ids.hda_id AS deleted_icda_hda_id
FROM purged_hda_ids
LEFT OUTER JOIN history
ON purged_hda_ids.history_id = history.id
LEFT OUTER JOIN deleted_metadata_file_ids
ON deleted_metadata_file_ids.hda_id = purged_hda_ids.id
LEFT OUTER JOIN deleted_icda_ids
ON deleted_icda_ids.hda_parent_id = purged_hda_ids.id
ORDER BY purged_hda_ids.id
"""
causals = (
("purged_hda_id", "deleted_metadata_file_id", "object_store_id"),
("purged_hda_id", "deleted_icda_id", "deleted_icda_hda_id"),
)


class PurgeHistorylessHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalculation, Action):
"""
- Mark purged all HistoryDatasetAssociations whose history_id is null.
"""

force_retry_sql = " AND NOT history_dataset_association.purged"
_action_sql = """
WITH purged_hda_ids
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
WHERE history_id IS NULL{force_retry_sql}
AND update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
FROM dataset
WHERE history_id IS NULL{force_retry_sql}{object_store_id_sql}
AND history_dataset_association.update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING id),
hda_events
AS (INSERT INTO cleanup_event_hda_association
Expand Down Expand Up @@ -893,7 +950,7 @@ class PurgeErrorHDAs(PurgesHDAs, RemovesMetadataFiles, RequiresDiskUsageRecalcul
AS ( UPDATE history_dataset_association
SET purged = true, deleted = true{update_time_sql}
FROM dataset
WHERE history_dataset_association.dataset_id = dataset.id{force_retry_sql}
WHERE history_dataset_association.dataset_id = dataset.id{force_retry_sql}{object_store_id_sql}
AND dataset.state = 'error'
AND history_dataset_association.update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING history_dataset_association.id as id,
Expand Down Expand Up @@ -1037,7 +1094,7 @@ class DeleteExportedHistories(Action):
SET deleted = true{update_time_sql}
FROM job_export_history_archive
WHERE job_export_history_archive.dataset_id = dataset.id
AND NOT deleted
AND NOT deleted {object_store_id_sql}
AND dataset.update_time <= (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING dataset.id),
dataset_events
Expand All @@ -1063,7 +1120,7 @@ class DeleteDatasets(Action):
WITH deleted_dataset_ids
AS ( UPDATE dataset
SET deleted = true{update_time_sql}
WHERE NOT deleted
WHERE NOT deleted {object_store_id_sql}
AND NOT EXISTS
(SELECT true
FROM library_dataset_dataset_association
Expand Down Expand Up @@ -1097,7 +1154,7 @@ class PurgeDatasets(RemovesDatasets, Action):
WITH purged_dataset_ids
AS ( UPDATE dataset
SET purged = true{update_time_sql}
WHERE deleted{force_retry_sql}
WHERE deleted{force_retry_sql}{object_store_id_sql}
AND update_time < (NOW() AT TIME ZONE 'utc' - interval '%(days)s days')
RETURNING id,
uuid,
Expand Down Expand Up @@ -1182,6 +1239,13 @@ def __parse_args(self):
default=14,
help="Only perform action(s) on objects that have not been updated since the specified number of days",
)
parser.add_argument(
"--object-store-id",
dest="object_store_id",
type=str,
default=None,
help="Only perform action(s) on objects stored in the target object store (for dataset operations - ignored by user/history centric operations)",
)
parser.add_argument(
"-U",
"--no-update-time",
Expand Down
88 changes: 36 additions & 52 deletions test/integration/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from galaxy_test.driver import integration_util


class TestScriptsIntegration(integration_util.IntegrationTestCase):
class BaseScriptsIntegrationTestCase(integration_util.IntegrationTestCase):
def setUp(self):
super().setUp()
self.dataset_populator = DatasetPopulator(self.galaxy_interactor)
Expand All @@ -26,6 +26,39 @@ def setUp(self):
def handle_galaxy_config_kwds(cls, config):
cls._raw_config = config

def _scripts_check_argparse_help(self, script):
# Test imports and argparse response to --help with 0 exit code.
output = self._scripts_check_output(script, ["--help"])
# Test -h, --help in printed output message.
assert "-h, --help" in output

def _scripts_check_output(self, script, args):
cwd = galaxy_directory()
cmd = ["python", os.path.join(cwd, "scripts", script)] + args
clean_env = {
"PATH": os.environ.get("PATH", None),
} # Don't let testing environment variables interfere with config.
try:
return unicodify(subprocess.check_output(cmd, cwd=cwd, env=clean_env))
except Exception as e:
if isinstance(e, subprocess.CalledProcessError):
raise Exception(f"{unicodify(e)}\nOutput was:\n{unicodify(e.output)}")
raise

def write_config_file(self):
config_dir = self.config_dir
path = os.path.join(config_dir, "galaxy.yml")
self._test_driver.temp_directories.extend([config_dir])
config = self._raw_config
# Update config dict with database_connection, which might be set through env variables
config["database_connection"] = self._app.config.database_connection
with open(path, "w") as f:
yaml.dump({"galaxy": config}, f)

return path


class TestScriptsIntegration(BaseScriptsIntegrationTestCase):
def test_helper(self):
script = "helper.py"
self._scripts_check_argparse_help(script)
Expand All @@ -52,25 +85,10 @@ def test_cleanup(self):
assert history_response.status_code == 200
assert history_response.json()["purged"] is True, history_response.json()

def test_pgcleanup(self):
self._skip_unless_postgres()

script = "cleanup_datasets/pgcleanup.py"
def test_admin_cleanup_datasets(self):
script = "admin_cleanup_datasets.py"
self._scripts_check_argparse_help(script)

history_id = self.dataset_populator.new_history()
delete_response = self.dataset_populator._delete(f"histories/{history_id}")
assert delete_response.status_code == 200
assert delete_response.json()["purged"] is False
config_file = self.write_config_file()
output = self._scripts_check_output(
script, ["-c", config_file, "--older-than", "0", "--sequence", "purge_deleted_histories"]
)
print(output)
history_response = self.dataset_populator._get(f"histories/{history_id}")
assert history_response.status_code == 200
assert history_response.json()["purged"] is True, history_response.json()

def test_set_user_disk_usage(self):
script = "set_user_disk_usage.py"
self._scripts_check_argparse_help(script)
Expand Down Expand Up @@ -123,9 +141,6 @@ def test_grt_export(self):
export = json.load(f)
assert export["version"] == 3

def test_admin_cleanup_datasets(self):
self._scripts_check_argparse_help("cleanup_datasets/admin_cleanup_datasets.py")

def test_secret_decoder_ring(self):
script = "secret_decoder_ring.py"
self._scripts_check_argparse_help(script)
Expand All @@ -143,34 +158,3 @@ def test_galaxy_main(self):
def test_runtime_stats(self):
self._skip_unless_postgres()
self._scripts_check_argparse_help("runtime_stats.py")

def _scripts_check_argparse_help(self, script):
# Test imports and argparse response to --help with 0 exit code.
output = self._scripts_check_output(script, ["--help"])
# Test -h, --help in printed output message.
assert "-h, --help" in output

def _scripts_check_output(self, script, args):
cwd = galaxy_directory()
cmd = ["python", os.path.join(cwd, "scripts", script)] + args
clean_env = {
"PATH": os.environ.get("PATH", None),
} # Don't let testing environment variables interfere with config.
try:
return unicodify(subprocess.check_output(cmd, cwd=cwd, env=clean_env))
except Exception as e:
if isinstance(e, subprocess.CalledProcessError):
raise Exception(f"{unicodify(e)}\nOutput was:\n{unicodify(e.output)}")
raise

def write_config_file(self):
config_dir = self.config_dir
path = os.path.join(config_dir, "galaxy.yml")
self._test_driver.temp_directories.extend([config_dir])
config = self._raw_config
# Update config dict with database_connection, which might be set through env variables
config["database_connection"] = self._app.config.database_connection
with open(path, "w") as f:
yaml.dump({"galaxy": config}, f)

return path
Loading

0 comments on commit dad2783

Please sign in to comment.