diff --git a/agate/agate/message_retrieval_protocol.py b/agate/agate/message_retrieval_protocol.py new file mode 100644 index 0000000..f0ac5db --- /dev/null +++ b/agate/agate/message_retrieval_protocol.py @@ -0,0 +1,32 @@ +from typing import Any, List +from typing import Protocol +from abc import abstractmethod + + +class MessageRetrievalProtocol(Protocol): + """ + This is the interface for the retrieval and acknowledgement of messages. + The VarysRetrieval is an example which conforms to such an interface. + It is left as an interface within agate, in order to reduce explicit dependence on Varys. + """ + + @abstractmethod + def receive_batch(self, exchange: str) -> List[Any]: + """ + Method to retrieve one or more messages from the provider + """ + raise NotImplementedError + + @abstractmethod + def acknowledge_message(self, message: Any) -> None: + """ + Method to acknowledge that a message has been received, and prevent repeat sends by the provider + """ + raise NotImplementedError + + @abstractmethod + def nack_message(self, message: Any) -> None: + """ + negative acknowlegement of the message, meaning it will be restored to the queue and will reoccur + """ + raise NotImplementedError diff --git a/agate/agate/queue_reading/queue_reader.py b/agate/agate/queue_reading/queue_reader.py index 18a6048..f97d685 100644 --- a/agate/agate/queue_reading/queue_reader.py +++ b/agate/agate/queue_reading/queue_reader.py @@ -2,47 +2,50 @@ import json import logging from agate.queue_reading.ingestion_updater import IngestionUpdater - -from varys import Varys +from agate.message_retrieval_protocol import MessageRetrievalProtocol logger = logging.getLogger(__name__) class QueueReader: """ - Class to read the rabbitMQ queues, find any new messages. + Class to retrieve appropriate messages from the rabbitMQ queues. + + The queue interactions are delegated to an injected service. + This class is responsible for reading the correctly named queues. + It monitors 3 queue families, inbound-matched, inbound-to_validate-{project} and inbound-results-{project}-{site} It learns about new projects and sites by monitoring the inbound-matched queue. It delegates the actual interpretation of the message to the IngestionUpdater class """ + def __init__(self, message_retrieval: MessageRetrievalProtocol): + self._message_retrieval: MessageRetrievalProtocol = message_retrieval - def update(self, varys_client: Varys) -> None: + def update(self) -> None: """ This method reads the queues for new messages and interprets the information """ - # self._receive(varys_client, exchange="inbound-s3", update_lists=False, exchange_key="inbound-s3") + # self._receive(exchange="inbound-s3", update_lists=False, exchange_key="inbound-s3") - self._receive(varys_client, exchange="inbound-matched", update_lists=True, exchange_key="inbound-matched") + self._receive(exchange="inbound-matched", update_lists=True, exchange_key="inbound-matched") for project in Project.objects.all(): self._receive( - varys_client, exchange=f"inbound-to_validate-{project.key}", update_lists=False, exchange_key="inbound-to-validate") for project_site in ProjectSite.objects.all(): self._receive( - varys_client, exchange=f"inbound-results-{project_site.key}", update_lists=False, exchange_key="inbound-results") - def _receive(self, varys_client: Varys, exchange: str, update_lists: bool, exchange_key: str): - messages = varys_client.receive_batch(exchange=exchange, queue_suffix="agate", timeout=1) + def _receive(self, exchange: str, update_lists: bool, exchange_key: str): + messages = self._message_retrieval.receive_batch(exchange=exchange) for m in messages: try: @@ -50,7 +53,7 @@ def _receive(self, varys_client: Varys, exchange: str, update_lists: bool, excha if update_lists: self._update_lists(m) finally: - varys_client.acknowledge_message(m) + self._message_retrieval.acknowledge_message(m) def _update_item_from_message(self, message, stage: str): """ diff --git a/agate/agate/scheduled_tasks.py b/agate/agate/scheduled_tasks.py index 58ee0b1..079ab96 100644 --- a/agate/agate/scheduled_tasks.py +++ b/agate/agate/scheduled_tasks.py @@ -2,32 +2,22 @@ import logging import os from .queue_reading.queue_reader import QueueReader -from varys import Varys from datetime import timedelta from django.utils import timezone from .caching import TokenCache from .models import IngestionAttempt +from core.settings import MESSAGE_RETRIEVAL _scheduler = BackgroundScheduler() -_queue_reader = QueueReader() - -open('varys_test.log', 'w').close() - -_varys_client = Varys( - config_path="varys_config.cfg", - profile="test", - logfile="varys_test.log", - log_level="DEBUG", - auto_acknowledge=False, -) +_queue_reader = QueueReader(MESSAGE_RETRIEVAL) def queue_retrieve_task(): """ Task to read the queues and update ingestion attempts accordingly """ - _queue_reader.update(_varys_client) + _queue_reader.update() def clear_old_tokens_task(): diff --git a/agate/agate/tests/test_queue_reading.py b/agate/agate/tests/test_queue_reading.py new file mode 100644 index 0000000..45e159b --- /dev/null +++ b/agate/agate/tests/test_queue_reading.py @@ -0,0 +1,154 @@ +from django.test import TestCase +from unittest.mock import Mock +from agate.queue_reading.queue_reader import QueueReader +from agate.queue_reading.tracking_models import Project, ProjectSite +from agate.models import IngestionAttempt +from agate.message_retrieval_protocol import MessageRetrievalProtocol + +inbound_matched_example = """{ + "uuid":"4d35e1bf-44f9-4901-a8c6-b4751746a722", + "site":"bham", + "raw_site":"synthscape.ukhsa", + "uploaders":[ + "bryn-synthscape-ukhsa" + ], + "match_timestamp":1727098664533980741, + "artifact":"synthscape|0|92274db4-3535-47db-b6f6-151354597142", + "run_index":"0", + "run_id":"92274db4-3535-47db-b6f6-151354597142", + "project":"synthscape", + "platform":"illumina", + "files":{ + ".1.fastq.gz":{ + "uri":"s3://synthscape-synthscape.ukhsa-illumina-test/synthscape.0.92274db4-3535-47db-b6f6-151354597142.1.fastq.gz", + "etag":"f667f435136e91b986254d735cee1c13", + "key":"synthscape.0.92274db4-3535-47db-b6f6-151354597142.1.fastq.gz", + "submitter":"bryn-synthscape-ukhsa", + "parsed_fname":{ + "project":"synthscape", + "run_index":"0", + "run_id":"92274db4-3535-47db-b6f6-151354597142", + "direction":"1", + "ftype":"fastq", + "gzip":"gz" + } + }, + ".2.fastq.gz":{ + "uri":"s3://synthscape-synthscape.ukhsa-illumina-test/synthscape.0.92274db4-3535-47db-b6f6-151354597142.2.fastq.gz", + "etag":"de2fed484dcad68d50ec6673df6b3670", + "key":"synthscape.0.92274db4-3535-47db-b6f6-151354597142.2.fastq.gz", + "submitter":"bryn-synthscape-ukhsa", + "parsed_fname":{ + "project":"synthscape", + "run_index":"0", + "run_id":"92274db4-3535-47db-b6f6-151354597142", + "direction":"2", + "ftype":"fastq", + "gzip":"gz" + } + }, + ".csv":{ + "uri":"s3://synthscape-synthscape.ukhsa-illumina-test/synthscape.0.92274db4-3535-47db-b6f6-151354597142.csv", + "etag":"714c18df9c62b30f70443219467fcba1", + "key":"synthscape.0.92274db4-3535-47db-b6f6-151354597142.csv", + "submitter":"bryn-synthscape-ukhsa", + "parsed_fname":{ + "project":"synthscape", + "run_index":"0", + "run_id":"92274db4-3535-47db-b6f6-151354597142", + "ftype":"csv" + } + } + }, + "test_flag":true +}""" + + +class QueueReadingTestCase(TestCase): + + def test_calling_inbound_matched(self): + + self.assertEqual(IngestionAttempt.objects.count(), 0) + + retrieval: MessageRetrievalProtocol = Mock() + retrieval.receive_batch = Mock(return_value=[]) + + q = QueueReader(retrieval) + q.update() + + retrieval.receive_batch.assert_any_call(exchange="inbound-matched") + + def test_inbound_matched_project_and_site_updates(self): + + self.assertEqual(IngestionAttempt.objects.count(), 0) + self.assertEqual(Project.objects.count(), 0) + self.assertEqual(ProjectSite.objects.count(), 0) + + message = Mock() + message.body = inbound_matched_example + + retrieval: MessageRetrievalProtocol = Mock() + + def side_effect_func(exchange): + if exchange == "inbound-matched": + return [message] + else: + return [] + retrieval.receive_batch = Mock(side_effect=side_effect_func) + retrieval.acknowledge_message = Mock() + + q = QueueReader(retrieval) + q.update() + + retrieval.receive_batch.assert_any_call(exchange="inbound-matched") + retrieval.acknowledge_message.assert_any_call(message) + + self.assertEqual(IngestionAttempt.objects.count(), 1) + self.assertEqual(Project.objects.count(), 1) + self.assertEqual(ProjectSite.objects.count(), 1) + Project.objects.get(key="synthscape") + ProjectSite.objects.get(key="synthscape-bham") + + retrieval.receive_batch.assert_any_call( + exchange="inbound-to_validate-synthscape") + retrieval.receive_batch.assert_any_call( + exchange="inbound-results-synthscape-bham") + + def test_inbound_matched_injestion_attempt(self): + + self.assertEqual(IngestionAttempt.objects.count(), 0) + + message = Mock() + message.body = inbound_matched_example + + retrieval: MessageRetrievalProtocol = Mock() + + def side_effect_func(exchange): + if exchange == "inbound-matched": + return [message] + else: + return [] + retrieval.receive_batch = Mock(side_effect=side_effect_func) + retrieval.acknowledge_message = Mock() + + q = QueueReader(retrieval) + q.update() + + retrieval.receive_batch.assert_any_call(exchange="inbound-matched") + retrieval.acknowledge_message.assert_any_call(message) + + self.assertEqual(IngestionAttempt.objects.count(), 1) + attempt = IngestionAttempt.objects.get(uuid="4d35e1bf-44f9-4901-a8c6-b4751746a722") + self.assertEqual(attempt.uuid, "4d35e1bf-44f9-4901-a8c6-b4751746a722") + self.assertEqual(attempt.name, "ToBeDecided") + self.assertEqual(attempt.project, "synthscape") + self.assertEqual(attempt.platform, "illumina") + self.assertEqual(attempt.site, "bham") + self.assertEqual(attempt.run_index, "0") + self.assertEqual(attempt.run_id, "92274db4-3535-47db-b6f6-151354597142") + self.assertEqual(attempt.is_published, False) + self.assertEqual(attempt.is_test_attempt, True) + self.assertEqual(attempt.climb_id, "") + self.assertEqual(attempt.error_message, "") + self.assertEqual(attempt.status, IngestionAttempt.Status.METADATA) + self.assertEqual(attempt.archived, False) diff --git a/agate/core/local_settings.example.py b/agate/core/local_settings.example.py index ea37092..7eace74 100644 --- a/agate/core/local_settings.example.py +++ b/agate/core/local_settings.example.py @@ -6,6 +6,7 @@ """ import os +from varys_message_retrieval import VarysMessageRetrieval BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -35,3 +36,12 @@ } ONYX_DOMAIN = '' + +MESSAGE_RETRIEVAL = VarysMessageRetrieval( + queue_suffix="agate", + timeout=1, + config_path="varys_config.cfg", + profile="test", + logfile="varys.log", + log_level="DEBUG", + auto_acknowledge=False) diff --git a/agate/core/local_settings.test.py b/agate/core/local_settings.test.py index 6cfcd04..cb59947 100644 --- a/agate/core/local_settings.test.py +++ b/agate/core/local_settings.test.py @@ -6,6 +6,7 @@ """ import os +from empty_message_retrieval import EmptyMessageRetrieval BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -28,3 +29,5 @@ } ONYX_DOMAIN = '' + +MESSAGE_RETRIEVAL = EmptyMessageRetrieval() diff --git a/agate/empty_message_retrieval.py b/agate/empty_message_retrieval.py new file mode 100644 index 0000000..01ba8e9 --- /dev/null +++ b/agate/empty_message_retrieval.py @@ -0,0 +1,17 @@ +from typing import Any, List +from agate.message_retrieval_protocol import MessageRetrievalProtocol + + +class EmptyMessageRetrieval(MessageRetrievalProtocol): + """ + A dummy message retriever which never receives any messages + """ + + def receive_batch(self, exchange: str) -> List[Any]: + return [] + + def acknowledge_message(self, message: Any) -> None: + pass + + def nack_message(self, message: Any) -> None: + pass diff --git a/agate/varys_message_retrieval.py b/agate/varys_message_retrieval.py new file mode 100644 index 0000000..b20482e --- /dev/null +++ b/agate/varys_message_retrieval.py @@ -0,0 +1,40 @@ +from typing import Any, List +from agate.message_retrieval_protocol import MessageRetrievalProtocol +from varys import Varys + + +class VarysMessageRetrieval(MessageRetrievalProtocol): + """ + A message retriever based on the Varys-RabbitMQ workflow + """ + + def __init__(self, + queue_suffix: str, + timeout: float, + config_path: str, + profile: str, + logfile: str = "varys.log", + log_level: str = "DEBUG", + auto_acknowledge: bool = False): + + open(logfile, 'w').close() + + self._queue_suffix: str = queue_suffix + self._timeout: float = timeout + + self._varys = Varys( + config_path=config_path, + profile=profile, + logfile=logfile, + log_level=log_level, + auto_acknowledge=auto_acknowledge, + ) + + def receive_batch(self, exchange: str) -> List[Any]: + return self._varys.receive_batch(exchange, self._queue_suffix, self._timeout) + + def acknowledge_message(self, message: Any) -> None: + return self._varys.acknowledge_message(message) + + def nack_message(self, message: Any) -> None: + return self._varys.nack_message(message)