Skip to content

Commit

Permalink
test: Get tests passing
Browse files Browse the repository at this point in the history
  • Loading branch information
jpmckinney committed Apr 24, 2024
1 parent 069e437 commit f5f7241
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 24 deletions.
2 changes: 1 addition & 1 deletion data_registry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def initiate(self):
self.status = Job.Status.RUNNING
self.save()

def complete(self, result):
def complete(self):
self.end = Now()
self.status = Job.Status.COMPLETED
self.save()
Expand Down
2 changes: 1 addition & 1 deletion data_registry/process_manager/task/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ def get_status(self):
case TaskStatus.COMPLETED:
return Task.Status.COMPLETED

def wipe(self):
def do_wipe(self):
publish({"job_id": self.job.id}, "wiper_init")
2 changes: 1 addition & 1 deletion data_registry/process_manager/task/flattener.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ def get_status(self):
case TaskStatus.COMPLETED:
return Task.Status.COMPLETED

def wipe(self):
def do_wipe(self):
publish({"job_id": self.job.id}, "wiper_init")
9 changes: 3 additions & 6 deletions data_registry/process_manager/task/pelican.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,11 @@ def get_pelican_id(self):

return data.get("id")

def wipe(self):
if "pelican_dataset_name" not in self.job.context: # for example, if this task never started
logger.warning("%s: Unable to wipe dataset (dataset name is not set)", self)
return

def do_wipe(self):
pelican_id = self.job.context.get("pelican_id") # set in get_status()

if not pelican_id: # for example, if a previous task failed, or if wiped after run() but before get_status()
# If wiped after run() but before get_status(), or before processed by Pelican...
if not pelican_id:
pelican_id = self.get_pelican_id()
if not pelican_id:
logger.warning("%s: Unable to wipe dataset (dataset ID is not set)", self)
Expand Down
6 changes: 1 addition & 5 deletions data_registry/process_manager/task/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,7 @@ def get_status(self):

return Task.Status.COMPLETED

def wipe(self):
if "process_id" not in self.job.context: # for example, if Collect task failed
logger.warning("%s: Unable to wipe collection (collection ID is not set)", self)
return

def do_wipe(self):
process_id = self.job.context["process_id"] # set in Collect.get_status()

logger.info("%s: Wiping data for collection %s", self, process_id)
Expand Down
28 changes: 22 additions & 6 deletions data_registry/process_manager/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
import requests
from requests.exceptions import RequestException

from data_registry import models
from data_registry.exceptions import RecoverableException
from data_registry.models import Task

logger = logging.getLogger(__name__)

Expand All @@ -21,22 +21,22 @@ def __init__(self, task):
self.task = task

@property
def job(self):
def job(self) -> models.Job:
"""
The job of which the task is a part.
"""
return self.task.job

@property
def collection(self):
def collection(self) -> models.Collection:
"""
The publication on which the task is performed.
"""
return self.task.job.collection

@property
@abstractmethod
def final_output(self):
def final_output(self) -> bool:
"""
Whether the task produces a final output, like a bulk download. If not, its intermediate outputs are wiped if
the job is complete and isn't configured to preserve temporary data.
Expand Down Expand Up @@ -67,7 +67,7 @@ def run(self) -> None:
"""

@abstractmethod
def get_status(self) -> Task.Status:
def get_status(self) -> models.Task.Status:
"""
Return the status of the task.
Expand All @@ -79,13 +79,29 @@ def get_status(self) -> Task.Status:
:raises RecoverableException:
"""

@abstractmethod
def wipe(self) -> None:
"""
Delete any side effects of (for example, data written by) the task.
This method must be idempotent. It is retried if any task failed to be wiped.
Implement :meth:`~data_registry.process_manager.util.TaskManager.do_wipe` in subclasses.
:raises RecoverableException:
"""
if not self.task.start:
logger.debug("%s has nothing to wipe (task didn't start)", self)
return

self.do_wipe()

@abstractmethod
def do_wipe(self) -> None:
"""
Delete any side effects of the task.
This method can assume that the task had started.
:raises RecoverableException:
"""

Expand Down
8 changes: 4 additions & 4 deletions tests/data_registry/process_manager/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def run(self):
def get_status(self):
return Task.Status.COMPLETED

def wipe(self):
def do_wipe(self):
pass


Expand All @@ -24,9 +24,9 @@ class ProcessTests(TransactionTestCase):
def test(self):
collection = Collection.objects.get(pk=1)

with patch("data_registry.process_manager.process.get_runner") as mock_get_runner:
# get_runner returns only TestTask
mock_get_runner.return_value = TestTask()
with patch("data_registry.process_manager.process.get_task_manager") as mock_get_task_manager:
# get_task_manager returns only TestTask
mock_get_task_manager.return_value = TestTask()

settings.JOB_TASKS_PLAN = ["test"]

Expand Down

0 comments on commit f5f7241

Please sign in to comment.