From b01502873fcc4aa3bcf02ad527a60c46103aa7ca Mon Sep 17 00:00:00 2001 From: Eric Hanson Date: Tue, 14 Jan 2025 13:25:51 -0500 Subject: [PATCH] In 1104 deposit command (#68) * Add deposit CLI command Why these changes are being introduced: * A CLI command is needed for running the workflows's deposit functionality How this addresses that need: * Rename BaseWorkflow > Workflow across repo * Add s3-bucket, output-queue, and email-recipients CLI params * Add deposit CLI command * Add DemoWorkflow for testing purposes * Refactor Workflow class to use init defaults instead of some class attributes * Refactor Workflow.load method to optionally override class attributes with CLI params * Add _get_subclasses method for collecting all subclasses * Update _build_bitstream_dict method to ignore non-file values and the metadata.csv file * Add deposit and reconcile_discrepancies_logged CLI tests * Add metadata mapping for DemoWorkflow Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/IN-1104 * Update Pipfile.lock * Updates based on discussion in PR #68 * Update click option docstrings * Make email-recipients required and update type hint to indicate it is a comma-delimited string * Refactor Workflow.run method to include try/except block and return a dict summarizing the results of the submission * Add new unit tests for Workflow.run method to account for code changes * Further updates based on discussion in PR #68 * Refactor Workflow __init__ methods to better utilize defaults and named args * Refactor Workflow.run method's return object to provide a summary of the submission results * Remove Workflow.load method as it is now unnecessary and update CLI to call Workflow.get_workflow method * Update type hinting and processing for email-recipients param --- Pipfile.lock | 128 +++++++++--------- dsc/cli.py | 48 ++++++- dsc/workflows/__init__.py | 8 +- dsc/workflows/base/__init__.py | 105 ++++++++------ dsc/workflows/base/simple_csv.py | 4 +- dsc/workflows/demo.py | 8 ++ tests/conftest.py | 12 +- tests/fixtures/demo_metadata_mapping.json | 62 +++++++++ tests/test_cli.py | 69 +++++++++- ...test_base_workflow.py => test_workflow.py} | 87 ++++++++++-- 10 files changed, 399 insertions(+), 132 deletions(-) create mode 100644 dsc/workflows/demo.py create mode 100644 tests/fixtures/demo_metadata_mapping.json rename tests/{test_base_workflow.py => test_workflow.py} (63%) diff --git a/Pipfile.lock b/Pipfile.lock index ee62001..8ce4b94 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -18,12 +18,12 @@ "default": { "boto3": { "hashes": [ - "sha256:786930d5f1cd13d03db59ff2abbb2b7ffc173fd66646d5d8bee07f316a5f16ca", - "sha256:f7851cb320dcb2a53fc73b4075187ec9b05d51291539601fa238623fdc0e8cd3" + "sha256:7d398f66a11e67777c189d1f58c0a75d9d60f98d0ee51b8817e828930bf19e4e", + "sha256:8e49416216a6e3a62c2a0c44fba4dd2852c85472e7b702516605b1363867d220" ], "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.97" }, "boto3-stubs": { "extras": [ @@ -31,27 +31,27 @@ "ses" ], "hashes": [ - "sha256:8d23b03ab9ca88bedc432adb08fd179bf1efd178128704cad80c138062b8f8a3", - "sha256:f2af463889d37fbab23c7cd08fb1b035f123ad67e4b3efc46f7714f9abee5e57" + "sha256:4182f9f18f279969fbcb697200f9a89a6b07a95e45f7db276ab90dcdf65a72ba", + "sha256:da33f2a540c942505d761bcc59bc16d607a9adb815198967d66b38515a4a60e8" ], "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.97" }, "botocore": { "hashes": [ - "sha256:caa7d5d857fed5b3d694b89c45f82b9f938f840e90a4eb7bf50aa65da2ba8f82", - "sha256:f94ae1e056a675bd67c8af98a6858d06e3927d974d6c712ed6e27bb1d11bee1d" + "sha256:88f2fab29192ffe2f2115d5bafbbd823ff4b6eb2774296e03ec8b5b0fe074f61", + "sha256:fed4f156b1a9b8ece53738f702ba5851b8c6216b4952de326547f349cc494f14" ], "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.97" }, "botocore-stubs": { "hashes": [ - "sha256:c02ae70588e20d15a8100b34c1a1ebfa5f08e856f60570db0d16b128dc4c5c24", - "sha256:e116a2b84f67b84bbaa9a00577256664907d7c6a517fa0b1a3be7903fd6d0040" + "sha256:699fd80064e5ae2c25e566e6ebdf235b523aab0f8dbbcb5ae80a7247cbfbce38", + "sha256:a417dcdcf127da644c975963bc7403b188d2ae5a8c1bb907849d4b8681e4fb90" ], "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.96" }, "certifi": { "hashes": [ @@ -80,59 +80,59 @@ }, "mypy-boto3-cloudformation": { "hashes": [ - "sha256:aba213f3411a65096a8d95633c36e0c57a775ac6ac9ccf1e6fd9bea4002073bc", - "sha256:d1a1500df811ac8ebd459640f5b31c14daac784d8a00fc4f67bc6eb391e7b5a8" + "sha256:4111913cb2c9fd9099ecd616212923312fde0c126ee41f5821759ae9df4272b9", + "sha256:57dc112ff3e2ddc1e9e621e428490b904c0da8c1532d30e9fa2a19aefde9f719" ], - "version": "==1.35.64" + "version": "==1.35.93" }, "mypy-boto3-dynamodb": { "hashes": [ - "sha256:a815d044b8f5f4ba308ea3114916565fbd932fcaf218f8d0288b2840415f9c46", - "sha256:b693b459abb1910cbb28f3a478ced8c6e6515f1bf136b45aca1a76b6146b5adb" + "sha256:187c12a968dcc459bab3bb958becbfc22ddd4eca29ba69f9f4e00661b9dad86e", + "sha256:9128bc9dfa574f1f6fe3991ec8c33b34626d26a767b961973a95f7610d8e98c1" ], - "version": "==1.35.74" + "version": "==1.35.94" }, "mypy-boto3-ec2": { "hashes": [ - "sha256:03047a2615752468608e1de466a91d455cbf3ca1a9f96b2035e6528c81fec4a3", - "sha256:fb1a47261395ac5153f4ec17ed7ddb49f9d9ed06824adbf24bd7395f12843067" + "sha256:04fb2f9d029926f72737b3d52ea505db3a566799f894682ef176411cd9f19880", + "sha256:a1caaf39d31e6809fa57c51ff86624fbd0b6024fb2c7dfb42f1d7c5c99e16fd0" ], - "version": "==1.35.82" + "version": "==1.35.93" }, "mypy-boto3-lambda": { "hashes": [ - "sha256:00499898236fe423c9292f77644102d4bd6699b3c16b8c4062eb759c022447f5", - "sha256:577a9465ac63ac564efc2755a7e72c28a9d2f496747c1faf242cb13d5017b262" + "sha256:6bcd623c827724cde0b21b30c328515811b178763b75f0701a641cc7aa3aa414", + "sha256:c11b047743c7635ea8385abffaf97788a108b71479612e9b5e7d0bb19029d7a4" ], - "version": "==1.35.68" + "version": "==1.35.93" }, "mypy-boto3-rds": { "hashes": [ - "sha256:b982945ac24e8ea0a996de05eafa50c7de2c201df4acde77bed1baaf16b9e72f", - "sha256:d577f9723801bab7078b0be011009dbf7472f6cc6118ba04d83217c1cc890a0c" + "sha256:ad1ba47483c8ae976fe12f7e1089dd043a766f8ba858ff5c44857dab54702fd3", + "sha256:d9a24105bce442cd0fd8cfa3caf8077431ee70fa5b0121170a458f581ff7b89d" ], - "version": "==1.35.89" + "version": "==1.35.95" }, "mypy-boto3-s3": { "hashes": [ - "sha256:9ac88dc0f6d87892344ed99b1e5a2884215503afff3859417b6010b31de7cef6", - "sha256:ce302a635da78e1925d8ff4809184ba55618cd7e3707156bea405cde7fdcf67a" + "sha256:4cd3f1718fa0d8a54212c495cdff493bdcc6a8ae419d95428c60fb6bc7db7980", + "sha256:b4529e57a8d5f21d4c61fe650fa6764fee2ba7ab524a455a34ba2698ef6d27a8" ], - "version": "==1.35.92" + "version": "==1.35.93" }, "mypy-boto3-ses": { "hashes": [ - "sha256:5706a6802da50419d5bf077cd0f00ad3d98018d81f8e8cab38ff9871cc65004c", - "sha256:5a3b239ef05ff9c6b7988f33b77f5f63eaf44e1615b3269b0d394cdd665f4ef1" + "sha256:7bd814836695bc9ca9f8e31d30cf14e8091e2bb35c1d91893429974ffdabcadf", + "sha256:e957e4bc90e458d6988a391e175bea7427c927d80b87de26ce38b4f4bdbab91b" ], - "version": "==1.35.68" + "version": "==1.35.93" }, "mypy-boto3-sqs": { "hashes": [ - "sha256:012de7b44e0137083e2f5cb49b63c05091d141c32b569b469dfeaa5ea23d1ccd", - "sha256:346a87bc0a447bb4c005b04d3efa0008bfa0ddd498cadd97e0e53a58752f84e9" + "sha256:341974f77e66851b9a4190d0014481e6baabae82d32f9ee559faa823b693609b", + "sha256:8ea7f63e0878544705c31996ae4c064095fbb4f780f8323a84f7a75281d643fe" ], - "version": "==1.35.91" + "version": "==1.35.93" }, "python-dateutil": { "hashes": [ @@ -306,20 +306,20 @@ }, "boto3": { "hashes": [ - "sha256:786930d5f1cd13d03db59ff2abbb2b7ffc173fd66646d5d8bee07f316a5f16ca", - "sha256:f7851cb320dcb2a53fc73b4075187ec9b05d51291539601fa238623fdc0e8cd3" + "sha256:7d398f66a11e67777c189d1f58c0a75d9d60f98d0ee51b8817e828930bf19e4e", + "sha256:8e49416216a6e3a62c2a0c44fba4dd2852c85472e7b702516605b1363867d220" ], "index": "pypi", "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.97" }, "botocore": { "hashes": [ - "sha256:caa7d5d857fed5b3d694b89c45f82b9f938f840e90a4eb7bf50aa65da2ba8f82", - "sha256:f94ae1e056a675bd67c8af98a6858d06e3927d974d6c712ed6e27bb1d11bee1d" + "sha256:88f2fab29192ffe2f2115d5bafbbd823ff4b6eb2774296e03ec8b5b0fe074f61", + "sha256:fed4f156b1a9b8ece53738f702ba5851b8c6216b4952de326547f349cc494f14" ], "markers": "python_version >= '3.8'", - "version": "==1.35.92" + "version": "==1.35.97" }, "certifi": { "hashes": [ @@ -963,36 +963,36 @@ }, "responses": { "hashes": [ - "sha256:521efcbc82081ab8daa588e08f7e8a64ce79b91c39f6e62199b19159bea7dbcb", - "sha256:617b9247abd9ae28313d57a75880422d55ec63c29d33d629697590a034358dba" + "sha256:b3e1ae252f69301b84146ff615a869a4182fbe17e8b606f1ac54142515dad5eb", + "sha256:e53991613f76d17ba293c1e3cce8af107c5c7a6a513edf25195aafd89a870dd3" ], "markers": "python_version >= '3.8'", - "version": "==0.25.3" + "version": "==0.25.5" }, "ruff": { "hashes": [ - "sha256:0509e8da430228236a18a677fcdb0c1f102dd26d5520f71f79b094963322ed25", - "sha256:0c000a471d519b3e6cfc9c6680025d923b4ca140ce3e4612d1a2ef58e11f11fe", - "sha256:248b1fb3f739d01d528cc50b35ee9c4812aa58cc5935998e776bf8ed5b251e75", - "sha256:45a56f61b24682f6f6709636949ae8cc82ae229d8d773b4c76c09ec83964a95a", - "sha256:496dd38a53aa173481a7d8866bcd6451bd934d06976a2505028a50583e001b76", - "sha256:52d587092ab8df308635762386f45f4638badb0866355b2b86760f6d3c076188", - "sha256:54799ca3d67ae5e0b7a7ac234baa657a9c1784b48ec954a094da7c206e0365b1", - "sha256:61323159cf21bc3897674e5adb27cd9e7700bab6b84de40d7be28c3d46dc67cf", - "sha256:7ae4478b1471fc0c44ed52a6fb787e641a2ac58b1c1f91763bafbc2faddc5117", - "sha256:7d7fc2377a04b6e04ffe588caad613d0c460eb2ecba4c0ccbbfe2bc973cbc162", - "sha256:91a7ddb221779871cf226100e677b5ea38c2d54e9e2c8ed847450ebbdf99b32d", - "sha256:9257aa841e9e8d9b727423086f0fa9a86b6b420fbf4bf9e1465d1250ce8e4d8d", - "sha256:bc3c083c50390cf69e7e1b5a5a7303898966be973664ec0c4a4acea82c1d4315", - "sha256:dcad24b81b62650b0eb8814f576fc65cfee8674772a6e24c9b747911801eeaa5", - "sha256:defed167955d42c68b407e8f2e6f56ba52520e790aba4ca707a9c88619e580e3", - "sha256:e169ea1b9eae61c99b257dc83b9ee6c76f89042752cb2d83486a7d6e48e8f764", - "sha256:e88b8f6d901477c41559ba540beeb5a671e14cd29ebd5683903572f4b40a9807", - "sha256:f1d70bef3d16fdc897ee290d7d20da3cbe4e26349f62e8a0274e7a3f4ce7a905" + "sha256:186c2313de946f2c22bdf5954b8dd083e124bcfb685732cfb0beae0c47233d9b", + "sha256:1cd76c7f9c679e6e8f2af8f778367dca82b95009bc7b1a85a47f1521ae524fa7", + "sha256:2f312c86fb40c5c02b44a29a750ee3b21002bd813b5233facdaf63a51d9a85e1", + "sha256:342a824b46ddbcdddd3abfbb332fa7fcaac5488bf18073e841236aadf4ad5c19", + "sha256:39d0174ccc45c439093971cc06ed3ac4dc545f5e8bdacf9f067adf879544d969", + "sha256:3cae39ba5d137054b0e5b472aee3b78a7c884e61591b100aeb544bcd1fc38d4f", + "sha256:3f94942a3bb767675d9a051867c036655fe9f6c8a491539156a6f7e6b5f31831", + "sha256:46ebf5cc106cf7e7378ca3c28ce4293b61b449cd121b98699be727d40b79ba72", + "sha256:50c647ff96f4ba288db0ad87048257753733763b409b2faf2ea78b45c8bb7fcb", + "sha256:5dc40a378a0e21b4cfe2b8a0f1812a6572fc7b230ef12cd9fac9161aa91d807f", + "sha256:69572926c0f0c9912288915214ca9b2809525ea263603370b9e00bed2ba56dbd", + "sha256:728d791b769cc28c05f12c280f99e8896932e9833fef1dd8756a6af2261fd1ab", + "sha256:84330dda7abcc270e6055551aca93fdde1b0685fc4fd358f26410f9349cf1743", + "sha256:937267afce0c9170d6d29f01fcd1f4378172dec6760a9f4dface48cdabf9610a", + "sha256:ae017c3a29bee341ba584f3823f805abbe5fe9cd97f87ed07ecbf533c4c88366", + "sha256:beb3298604540c884d8b282fe7625651378e1986c25df51dec5b2f60cafc31ce", + "sha256:f0c8b149e9c7353cace7d698e1656ffcf1e36e50f8ea3b5d5f7f87ff9986a7ca", + "sha256:fd2b25ecaf907d6458fa842675382c8597b3c746a2dde6717fe3415425df0c17" ], "index": "pypi", "markers": "python_version >= '3.7'", - "version": "==0.8.6" + "version": "==0.9.1" }, "s3transfer": { "hashes": [ diff --git a/dsc/cli.py b/dsc/cli.py index 1126e7b..650de71 100644 --- a/dsc/cli.py +++ b/dsc/cli.py @@ -6,7 +6,7 @@ import click from dsc.config import Config -from dsc.workflows.base import BaseWorkflow +from dsc.workflows.base import Workflow logger = logging.getLogger(__name__) CONFIG = Config() @@ -29,9 +29,28 @@ @click.option( "-b", "--batch-id", - help="The S3 prefix for the batch of DSpace submissions", + help="The S3 prefix for the batch of DSpace submission files", required=True, ) +@click.option( + "-e", + "--email-recipients", + help="The recipients of the submission results email as a comma-delimited string, " + "this will override the workflow class's default value for this attribute", + required=True, +) +@click.option( + "-s", + "--s3-bucket", + help="The S3 bucket containing the DSpace submission files, " + "this will override the workflow class's default value for this attribute", +) +@click.option( + "-o", + "--output-queue", + help="The SQS output queue for the DSS result messages, " + "this will override the workflow class's default value for this attribute", +) @click.option( "-v", "--verbose", is_flag=True, help="Pass to log at debug level instead of info" ) @@ -40,12 +59,22 @@ def main( workflow_name: str, collection_handle: str, batch_id: str, + email_recipients: str, + s3_bucket: str | None, + output_queue: str | None, verbose: bool, # noqa: FBT001 ) -> None: ctx.ensure_object(dict) ctx.obj["start_time"] = perf_counter() - - ctx.obj["workflow"] = BaseWorkflow.load(workflow_name, collection_handle, batch_id) + workflow_class = Workflow.get_workflow(workflow_name) + workflow = workflow_class( + collection_handle=collection_handle, + batch_id=batch_id, + email_recipients=tuple(email_recipients.split(",")), + s3_bucket=s3_bucket, + output_queue=output_queue, + ) + ctx.obj["workflow"] = workflow stream = StringIO() root_logger = logging.getLogger() @@ -86,3 +115,14 @@ def reconcile(ctx: click.Context) -> None: logger.error( f"No item identifiers found for these bitstreams: {no_item_identifiers}" ) + else: + logger.info("All item identifiers and bitstreams successfully matched") + + +@main.command() +@click.pass_context +def deposit(ctx: click.Context) -> None: + """Send a batch of item submissions to the DSpace Submission Service (DSS).""" + workflow = ctx.obj["workflow"] + logger.debug(f"Beginning submission of batch ID: {workflow.batch_id}") + workflow.run() diff --git a/dsc/workflows/__init__.py b/dsc/workflows/__init__.py index 9b5c9ff..a085a0c 100644 --- a/dsc/workflows/__init__.py +++ b/dsc/workflows/__init__.py @@ -3,8 +3,8 @@ All primary functions used by CLI are importable from here. """ -from dsc.workflows.base import BaseWorkflow +from dsc.workflows.base import Workflow +from dsc.workflows.base.simple_csv import SimpleCSV +from dsc.workflows.demo import DemoWorkflow -__all__ = [ - "BaseWorkflow", -] +__all__ = ["DemoWorkflow", "SimpleCSV", "Workflow"] diff --git a/dsc/workflows/base/__init__.py b/dsc/workflows/base/__init__.py index 566db5a..03d27d3 100644 --- a/dsc/workflows/base/__init__.py +++ b/dsc/workflows/base/__init__.py @@ -4,6 +4,7 @@ import logging from abc import ABC, abstractmethod from collections import defaultdict +from http import HTTPStatus from typing import TYPE_CHECKING, Any, final from dsc.exceptions import ( @@ -18,25 +19,25 @@ from _collections_abc import dict_keys from collections.abc import Iterator - from mypy_boto3_sqs.type_defs import SendMessageResultTypeDef - logger = logging.getLogger(__name__) -class BaseWorkflow(ABC): +class Workflow(ABC): """A base workflow class from which other workflow classes are derived.""" workflow_name: str = "base" submission_system: str = "DSpace@MIT" - email_recipients: tuple[str] = ("None",) metadata_mapping_path: str = "" - s3_bucket: str = "" - output_queue: str = "" + s3_bucket: str = "dsc" + output_queue: str = "dsc-unhandled" def __init__( self, collection_handle: str, batch_id: str, + email_recipients: tuple[str, ...], + s3_bucket: str | None = None, + output_queue: str | None = None, ) -> None: """Initialize base instance. @@ -47,9 +48,17 @@ def __init__( to the name of a subfolder in the workflow directory of the S3 bucket. This subfolder is where the S3 client will search for bitstream and metadata files. + email_recipients: The recipients of the submission results email. + s3_bucket: The S3 bucket containing the DSpace submission files. + output_queue: The SQS output queue for the DSS result messages. """ - self.batch_id = batch_id self.collection_handle = collection_handle + self.batch_id = batch_id + self.email_recipients = email_recipients + if s3_bucket: + self.s3_bucket = s3_bucket + if output_queue: + self.output_queue = output_queue @property def batch_path(self) -> str: @@ -62,38 +71,24 @@ def metadata_mapping(self) -> dict: @final @classmethod - def load( - cls, workflow_name: str, collection_handle: str, batch_id: str - ) -> BaseWorkflow: - """Return configured workflow class instance. - - Args: - workflow_name: The label of the workflow. Must match a key from - config.WORKFLOWS. - collection_handle: The handle of the DSpace collection to which the batch will - be submitted. - batch_id: The S3 prefix for the batch of DSpace submissions. - """ - workflow_class = cls.get_workflow(workflow_name) - return workflow_class( - collection_handle=collection_handle, - batch_id=batch_id, - ) - - @final - @classmethod - def get_workflow(cls, workflow_name: str) -> type[BaseWorkflow]: + def get_workflow(cls, workflow_name: str) -> type[Workflow]: """Return workflow class. Args: workflow_name: The label of the workflow. Must match a workflow_name attribute - from BaseWorkflow subclass. + from Workflow subclass. """ - for workflow_class in BaseWorkflow.__subclasses__(): + for workflow_class in cls._get_subclasses(): if workflow_name == workflow_class.workflow_name: return workflow_class raise InvalidWorkflowNameError(f"Invalid workflow name: {workflow_name} ") + @classmethod + def _get_subclasses(cls) -> Iterator[type[Workflow]]: + for subclass in cls.__subclasses__(): + yield from subclass._get_subclasses() # noqa: SLF001 + yield subclass + def reconcile_bitstreams_and_metadata(self) -> tuple[set[str], set[str]]: """Reconcile bitstreams against metadata. @@ -145,8 +140,11 @@ def _build_bitstream_dict(self) -> dict: bitstream_dict: dict[str, list[str]] = defaultdict(list) for bitstream in bitstreams: file_name = bitstream.split("/")[-1] - item_identifier = file_name.split("_")[0] if "_" in file_name else file_name - bitstream_dict[item_identifier].append(bitstream) + if file_name and file_name != "metadata.csv": + item_identifier = ( + file_name.split("_")[0] if "_" in file_name else file_name + ) + bitstream_dict[item_identifier].append(bitstream) return bitstream_dict def _match_bitstreams_to_item_identifiers( @@ -184,17 +182,40 @@ def _match_item_identifiers_to_bitstreams( ] @final - def run(self) -> Iterator[SendMessageResultTypeDef]: - """Run workflow to submit items to the DSpace Submission Service.""" + def run(self) -> dict: + """Run workflow to submit items to the DSpace Submission Service. + + Returns a dict with the submission results organized into succeeded and failed + items. + """ + items: dict[str, Any] = {"succeeded": {}, "failed": {}} for item_submission in self.item_submissions_iter(): - item_submission.upload_dspace_metadata(self.s3_bucket, self.batch_path) - response = item_submission.send_submission_message( - self.workflow_name, - self.output_queue, - self.submission_system, - self.collection_handle, - ) - yield response + item_id = item_submission.item_identifier + try: + item_submission.upload_dspace_metadata(self.s3_bucket, self.batch_path) + response = item_submission.send_submission_message( + self.workflow_name, + self.output_queue, + self.submission_system, + self.collection_handle, + ) + except Exception as exception: + logger.exception(f"Error processing submission: {item_id}") + items["failed"][item_id] = exception + continue + status_code = response["ResponseMetadata"]["HTTPStatusCode"] + if status_code != HTTPStatus.OK: + items["failed"][item_id] = RuntimeError("Non OK HTTPStatus") + continue + items["succeeded"][item_id] = response["MessageId"] + + results = { + "success": not items["failed"], + "items_count": len(items["succeeded"]) + len(items["failed"]), + "items": items, + } + logger.info(results) + return results @final def item_submissions_iter(self) -> Iterator[ItemSubmission]: diff --git a/dsc/workflows/base/simple_csv.py b/dsc/workflows/base/simple_csv.py index 4f9f0d8..4576920 100644 --- a/dsc/workflows/base/simple_csv.py +++ b/dsc/workflows/base/simple_csv.py @@ -6,12 +6,12 @@ import smart_open from dsc.utilities.aws import S3Client -from dsc.workflows.base import BaseWorkflow +from dsc.workflows.base import Workflow logger = logging.getLogger(__name__) -class SimpleCSV(BaseWorkflow): +class SimpleCSV(Workflow): """Base workflow for deposits that rely on a metadata CSV file. The metadata CSV file must be stored in a designated path for the diff --git a/dsc/workflows/demo.py b/dsc/workflows/demo.py new file mode 100644 index 0000000..0ca82cb --- /dev/null +++ b/dsc/workflows/demo.py @@ -0,0 +1,8 @@ +from dsc.workflows.base.simple_csv import SimpleCSV + + +class DemoWorkflow(SimpleCSV): + + workflow_name: str = "demo" + submission_system: str = "DSpace@MIT" + metadata_mapping_path: str = "tests/fixtures/demo_metadata_mapping.json" diff --git a/tests/conftest.py b/tests/conftest.py index c4e3257..a4a58e4 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -12,18 +12,16 @@ from dsc.utilities.aws.s3 import S3Client from dsc.utilities.aws.ses import SESClient from dsc.utilities.aws.sqs import SQSClient -from dsc.workflows.base import BaseWorkflow +from dsc.workflows.base import Workflow from dsc.workflows.base.simple_csv import SimpleCSV -class TestBaseWorkflow(BaseWorkflow): +class TestWorkflow(Workflow): workflow_name: str = "test" submission_system: str = "Test@MIT" email_recipients: tuple[str] = ("test@test.test",) metadata_mapping_path: str = "tests/fixtures/test_metadata_mapping.json" - s3_bucket: str = "dsc" - output_queue: str = "mock-output_queue" def item_metadata_iter(self): yield from [ @@ -56,7 +54,7 @@ def process_deposit_results(self): class TestSimpleCSV(SimpleCSV): - workflow_name: str = "simple_csv" + workflow_name = "simple_csv" submission_system: str = "Test@MIT" email_recipients: tuple[str] = ("test@test.test",) metadata_mapping_path: str = "tests/fixtures/test_metadata_mapping.json" @@ -77,9 +75,10 @@ def _test_env(monkeypatch): @pytest.fixture def base_workflow_instance(item_metadata, metadata_mapping, mocked_s3): - return TestBaseWorkflow( + return TestWorkflow( collection_handle="123.4/5678", batch_id="batch-aaa", + email_recipients=["test@test.test"], ) @@ -88,6 +87,7 @@ def simple_csv_workflow_instance(metadata_mapping): return TestSimpleCSV( collection_handle="123.4/5678", batch_id="batch-aaa", + email_recipients=["test@test.test"], ) diff --git a/tests/fixtures/demo_metadata_mapping.json b/tests/fixtures/demo_metadata_mapping.json new file mode 100644 index 0000000..6b2f173 --- /dev/null +++ b/tests/fixtures/demo_metadata_mapping.json @@ -0,0 +1,62 @@ +{ + "item_identifier": { + "source_field_name": "item_identifier", + "language": null, + "delimiter": "" + }, + "dc.publisher": { + "source_field_name": "dc.publisher", + "language": "en_US", + "delimiter": "" + }, + "dc.eprint.version": { + "source_field_name": "dc.eprint.version", + "language": "en_US", + "delimiter": "" + }, + "dc.type": { + "source_field_name": "dc.type", + "language": "en_US", + "delimiter": "" + }, + "dc.source": { + "source_field_name": "dc.source", + "language": "en_US", + "delimiter": "" + }, + "dc.contributor.author": { + "source_field_name": "dc.contributor.author", + "language": "en_US", + "delimiter": "|" + }, + "dc.relation.isversionof": { + "source_field_name": "dc.relation.isversionof", + "language": "", + "delimiter": "" + }, + "dc.title": { + "source_field_name": "dc.title", + "language": "en_US", + "delimiter": "" + }, + "dc.relation.journal": { + "source_field_name": "dc.relation.journal", + "language": "", + "delimiter": "" + }, + "dc.identifier.issn": { + "source_field_name": "dc.identifier.issn", + "language": "", + "delimiter": "" + }, + "dc.date.issued": { + "source_field_name": "dc.date.issued", + "language": "", + "delimiter": "" + }, + "dc.rights.uri": { + "source_field_name": "dc.rights.uri", + "language": "", + "delimiter": "" + } +} \ No newline at end of file diff --git a/tests/test_cli.py b/tests/test_cli.py index 42a577b..5aa9420 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -2,6 +2,35 @@ def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3_client): + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/789_01.pdf") + result = runner.invoke( + main, + [ + "--workflow-name", + "test", + "--collection-handle", + "123.4/5678", + "--batch-id", + "batch-aaa", + "--email-recipients", + "test@test.edu", + "reconcile", + ], + ) + assert result.exit_code == 0 + assert ( + "Item identifiers from batch metadata with matching bitstreams: ['123', '789']" + in caplog.text + ) + assert "All item identifiers and bitstreams successfully matched" in caplog.text + assert "Total time elapsed" in caplog.text + + +def test_reconcile_discrepancies_logged( + caplog, runner, mocked_s3, base_workflow_instance, s3_client +): s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") @@ -14,10 +43,11 @@ def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3 "123.4/5678", "--batch-id", "batch-aaa", + "--email-recipients", + "test@test.edu", "reconcile", ], ) - assert result.output == "" assert result.exit_code == 0 assert ( "Item identifiers from batch metadata with matching bitstreams: ['123']" @@ -26,3 +56,40 @@ def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3 assert "No bitstreams found for these item identifiers: {'789'}" in caplog.text assert "No item identifiers found for these bitstreams: {'456'}" in caplog.text assert "Total time elapsed" in caplog.text + + +def test_deposit_success( + caplog, runner, mocked_s3, mocked_sqs_input, base_workflow_instance, s3_client +): + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_01.pdf") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/123_02.jpg") + s3_client.put_file(file_content="", bucket="dsc", key="test/batch-aaa/456_01.pdf") + caplog.set_level("DEBUG") + result = runner.invoke( + main, + [ + "--verbose", + "--workflow-name", + "test", + "--collection-handle", + "123.4/5678", + "--batch-id", + "batch-aaa", + "--email-recipients", + "test@test.edu", + "deposit", + ], + ) + assert result.exit_code == 0 + assert "Beginning submission of batch ID: batch-aaa" in caplog.text + assert "Processing submission for '123'" in caplog.text + assert ( + "Metadata uploaded to S3: s3://dsc/test/batch-aaa/123_metadata.json" + in caplog.text + ) + assert "Processing submission for '789'" in caplog.text + assert ( + "Metadata uploaded to S3: s3://dsc/test/batch-aaa/789_metadata.json" + in caplog.text + ) + assert "Total time elapsed" in caplog.text diff --git a/tests/test_base_workflow.py b/tests/test_workflow.py similarity index 63% rename from tests/test_base_workflow.py rename to tests/test_workflow.py index 5ba28a1..a3d57b5 100644 --- a/tests/test_base_workflow.py +++ b/tests/test_workflow.py @@ -1,4 +1,4 @@ -from http import HTTPStatus +from unittest.mock import patch import pytest @@ -8,29 +8,53 @@ ItemMetadatMissingRequiredFieldError, ) from dsc.item_submission import ItemSubmission +from dsc.workflows.base import Workflow -def test_base_workflow_load_success(base_workflow_instance): - workflow_instance = base_workflow_instance.load( - workflow_name="test", +def test_base_workflow_init_with_defaults_success(): + workflow_class = Workflow.get_workflow(workflow_name="test") + workflow_instance = workflow_class( collection_handle="123.4/5678", batch_id="batch-aaa", + email_recipients=("test@test.test",), ) assert workflow_instance.workflow_name == "test" assert workflow_instance.submission_system == "Test@MIT" - assert workflow_instance.email_recipients == ("test@test.test",) assert ( workflow_instance.metadata_mapping_path == "tests/fixtures/test_metadata_mapping.json" ) assert workflow_instance.s3_bucket == "dsc" + assert workflow_instance.output_queue == "dsc-unhandled" + assert workflow_instance.collection_handle == "123.4/5678" + assert workflow_instance.batch_id == "batch-aaa" + assert workflow_instance.email_recipients == ("test@test.test",) + + +def test_base_workflow_init_with_optional_params_success(): + workflow_class = Workflow.get_workflow(workflow_name="test") + workflow_instance = workflow_class( + collection_handle="123.4/5678", + batch_id="batch-aaa", + email_recipients=("test@test.test",), + s3_bucket="updated-bucket", + output_queue="mock-output_queue", + ) + assert workflow_instance.workflow_name == "test" + assert workflow_instance.submission_system == "Test@MIT" + assert ( + workflow_instance.metadata_mapping_path + == "tests/fixtures/test_metadata_mapping.json" + ) + assert workflow_instance.s3_bucket == "updated-bucket" assert workflow_instance.output_queue == "mock-output_queue" assert workflow_instance.collection_handle == "123.4/5678" assert workflow_instance.batch_id == "batch-aaa" + assert workflow_instance.email_recipients == ("test@test.test",) -def test_base_workflow_get_workflow_success(base_workflow_instance): - workflow_class = base_workflow_instance.get_workflow("test") +def test_base_workflow_get_workflow_success(): + workflow_class = Workflow.get_workflow("test") assert workflow_class.workflow_name == "test" @@ -97,13 +121,58 @@ def test_base_workflow_run_success( caplog, base_workflow_instance, mocked_s3, mocked_sqs_input, mocked_sqs_output ): caplog.set_level("DEBUG") - response = next(base_workflow_instance.run()) + submission_results = base_workflow_instance.run() assert "Processing submission for '123'" in caplog.text assert ( "Metadata uploaded to S3: s3://dsc/test/batch-aaa/123_metadata.json" in caplog.text ) - assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK + assert submission_results["success"] is True + assert submission_results["items_count"] == 2 # noqa: PLR2004 + assert len(submission_results["items"]["succeeded"]) == 2 # noqa: PLR2004 + assert not submission_results["items"]["failed"] + + +@patch("dsc.item_submission.ItemSubmission.send_submission_message") +def test_base_workflow_run_exceptions_handled( + mocked_method, + caplog, + base_workflow_instance, + mocked_s3, + mocked_sqs_input, + mocked_sqs_output, +): + side_effect = [ + {"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}}, + Exception, + ] + mocked_method.side_effect = side_effect + submission_results = base_workflow_instance.run() + assert submission_results["success"] is False + assert submission_results["items_count"] == 2 # noqa: PLR2004 + assert submission_results["items"]["succeeded"] == {"123": "abcd"} + assert isinstance(submission_results["items"]["failed"]["789"], Exception) + + +@patch("dsc.item_submission.ItemSubmission.send_submission_message") +def test_base_workflow_run_invalid_status_codes_handled( + mocked_method, + caplog, + base_workflow_instance, + mocked_s3, + mocked_sqs_input, + mocked_sqs_output, +): + side_effect = [ + {"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}}, + {"ResponseMetadata": {"HTTPStatusCode": 400}}, + ] + mocked_method.side_effect = side_effect + submission_results = base_workflow_instance.run() + assert submission_results["success"] is False + assert submission_results["items_count"] == 2 # noqa: PLR2004 + assert submission_results["items"]["succeeded"] == {"123": "abcd"} + assert isinstance(submission_results["items"]["failed"]["789"], RuntimeError) def test_base_workflow_item_submission_iter_success(base_workflow_instance):