diff --git a/data_registry/models.py b/data_registry/models.py index 6e4cf40..74bac95 100644 --- a/data_registry/models.py +++ b/data_registry/models.py @@ -92,7 +92,7 @@ def initiate(self): self.status = Job.Status.RUNNING self.save() - def complete(self, result): + def complete(self): self.end = Now() self.status = Job.Status.COMPLETED self.save() diff --git a/data_registry/process_manager/task/collect.py b/data_registry/process_manager/task/collect.py index e8361c7..2f6851c 100644 --- a/data_registry/process_manager/task/collect.py +++ b/data_registry/process_manager/task/collect.py @@ -100,7 +100,7 @@ def get_status(self): if any(j["id"] == scrapyd_job_id for j in data.get("finished", [])): # If the collection ID was irretrievable, the job can't continue. if "process_id" not in self.job.context: - raise Exception("Unablel to retrieve collection ID from Scrapy log") + raise Exception("Unable to retrieve collection ID from Scrapy log") return Task.Status.COMPLETED diff --git a/data_registry/process_manager/task/exporter.py b/data_registry/process_manager/task/exporter.py index 172e3cc..8b35587 100644 --- a/data_registry/process_manager/task/exporter.py +++ b/data_registry/process_manager/task/exporter.py @@ -20,5 +20,5 @@ def get_status(self): case TaskStatus.COMPLETED: return Task.Status.COMPLETED - def wipe(self): + def do_wipe(self): publish({"job_id": self.job.id}, "wiper_init") diff --git a/data_registry/process_manager/task/flattener.py b/data_registry/process_manager/task/flattener.py index f9cb177..fc31a10 100644 --- a/data_registry/process_manager/task/flattener.py +++ b/data_registry/process_manager/task/flattener.py @@ -20,5 +20,5 @@ def get_status(self): case TaskStatus.COMPLETED: return Task.Status.COMPLETED - def wipe(self): + def do_wipe(self): publish({"job_id": self.job.id}, "wiper_init") diff --git a/data_registry/process_manager/task/pelican.py b/data_registry/process_manager/task/pelican.py index 59a75cb..c416f34 100644 --- a/data_registry/process_manager/task/pelican.py +++ b/data_registry/process_manager/task/pelican.py @@ -102,14 +102,11 @@ def get_pelican_id(self): return data.get("id") - def wipe(self): - if "pelican_dataset_name" not in self.job.context: # for example, if this task never started - logger.warning("%s: Unable to wipe dataset (dataset name is not set)", self) - return - + def do_wipe(self): pelican_id = self.job.context.get("pelican_id") # set in get_status() - if not pelican_id: # for example, if a previous task failed, or if wiped after run() but before get_status() + # If wiped after run() but before get_status(), or before processed by Pelican... + if not pelican_id: pelican_id = self.get_pelican_id() if not pelican_id: logger.warning("%s: Unable to wipe dataset (dataset ID is not set)", self) diff --git a/data_registry/process_manager/task/process.py b/data_registry/process_manager/task/process.py index f31238e..7a07de5 100644 --- a/data_registry/process_manager/task/process.py +++ b/data_registry/process_manager/task/process.py @@ -62,11 +62,7 @@ def get_status(self): return Task.Status.COMPLETED - def wipe(self): - if "process_id" not in self.job.context: # for example, if Collect task failed - logger.warning("%s: Unable to wipe collection (collection ID is not set)", self) - return - + def do_wipe(self): process_id = self.job.context["process_id"] # set in Collect.get_status() logger.info("%s: Wiping data for collection %s", self, process_id) diff --git a/data_registry/process_manager/util.py b/data_registry/process_manager/util.py index a826337..23cb41c 100644 --- a/data_registry/process_manager/util.py +++ b/data_registry/process_manager/util.py @@ -4,8 +4,8 @@ import requests from requests.exceptions import RequestException +from data_registry import models from data_registry.exceptions import RecoverableException -from data_registry.models import Task logger = logging.getLogger(__name__) @@ -21,14 +21,14 @@ def __init__(self, task): self.task = task @property - def job(self): + def job(self) -> models.Job: """ The job of which the task is a part. """ return self.task.job @property - def collection(self): + def collection(self) -> models.Collection: """ The publication on which the task is performed. """ @@ -36,7 +36,7 @@ def collection(self): @property @abstractmethod - def final_output(self): + def final_output(self) -> bool: """ Whether the task produces a final output, like a bulk download. If not, its intermediate outputs are wiped if the job is complete and isn't configured to preserve temporary data. @@ -67,7 +67,7 @@ def run(self) -> None: """ @abstractmethod - def get_status(self) -> Task.Status: + def get_status(self) -> models.Task.Status: """ Return the status of the task. @@ -79,13 +79,29 @@ def get_status(self) -> Task.Status: :raises RecoverableException: """ - @abstractmethod def wipe(self) -> None: """ Delete any side effects of (for example, data written by) the task. This method must be idempotent. It is retried if any task failed to be wiped. + Implement :meth:`~data_registry.process_manager.util.TaskManager.do_wipe` in subclasses. + + :raises RecoverableException: + """ + if not self.task.start: + logger.debug("%s has nothing to wipe (task didn't start)", self) + return + + self.do_wipe() + + @abstractmethod + def do_wipe(self) -> None: + """ + Delete any side effects of the task. + + This method can assume that the task had started. + :raises RecoverableException: """ diff --git a/tests/data_registry/process_manager/test_process.py b/tests/data_registry/process_manager/test_process.py index 4482045..c6f6296 100644 --- a/tests/data_registry/process_manager/test_process.py +++ b/tests/data_registry/process_manager/test_process.py @@ -14,7 +14,7 @@ def run(self): def get_status(self): return Task.Status.COMPLETED - def wipe(self): + def do_wipe(self): pass @@ -24,9 +24,9 @@ class ProcessTests(TransactionTestCase): def test(self): collection = Collection.objects.get(pk=1) - with patch("data_registry.process_manager.process.get_runner") as mock_get_runner: - # get_runner returns only TestTask - mock_get_runner.return_value = TestTask() + with patch("data_registry.process_manager.process.get_task_manager") as mock_get_task_manager: + # get_task_manager returns only TestTask + mock_get_task_manager.return_value = TestTask() settings.JOB_TASKS_PLAN = ["test"]