Skip to content

Commit

Permalink
Updates based on discussion in PR #68
Browse files Browse the repository at this point in the history
* Update click option docstrings
* Make email-recipients required and update type hint to indicate it is a comma-delimited string
* Refactor Workflow.run method to include try/except block and return a dict summarizing the results of the submission
* Add new unit tests for Workflow.run method to account for code changes
  • Loading branch information
ehanson8 committed Jan 13, 2025
1 parent a24639a commit 09e23ee
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 26 deletions.
26 changes: 15 additions & 11 deletions dsc/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,24 @@
help="The S3 prefix for the batch of DSpace submission files",
required=True,
)
@click.option(
"-e",
"--email-recipients",
help="The recipients of the submission results email as a comma-delimited string, "
"this will override the workflow class's default value for this attribute",
required=True,
)
@click.option(
"-s",
"--s3-bucket",
help="The S3 bucket containing the DSpace submission files",
help="The S3 bucket containing the DSpace submission files, "
"this will override the workflow class's default value for this attribute",
)
@click.option(
"-o",
"--output-queue",
help="The SQS output queue for the DSS result messages",
)
@click.option(
"-e",
"--email-recipients",
help="The recipients of the submission results email",
help="The SQS output queue for the DSS result messages, "
"this will override the workflow class's default value for this attribute",
)
@click.option(
"-v", "--verbose", is_flag=True, help="Pass to log at debug level instead of info"
Expand All @@ -55,9 +59,9 @@ def main(
workflow_name: str,
collection_handle: str,
batch_id: str,
email_recipients: str,
s3_bucket: str | None,
output_queue: str | None,
email_recipients: tuple[str] | None,
verbose: bool, # noqa: FBT001
) -> None:
ctx.ensure_object(dict)
Expand All @@ -66,9 +70,9 @@ def main(
workflow_name=workflow_name,
collection_handle=collection_handle,
batch_id=batch_id,
email_recipients=email_recipients,
s3_bucket=s3_bucket,
output_queue=output_queue,
email_recipients=email_recipients,
)
ctx.obj["workflow"] = workflow

Expand Down Expand Up @@ -121,5 +125,5 @@ def deposit(ctx: click.Context) -> None:
"""Send a batch of item submissions to the DSpace Submission Service (DSS)."""
workflow = ctx.obj["workflow"]
logger.debug(f"Beginning submission of batch ID: {workflow.batch_id}")
for response in workflow.run():
logger.debug(response)
submission_results = workflow.run()
logger.debug(f"Results of submission: {submission_results}")
57 changes: 45 additions & 12 deletions dsc/workflows/base/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from abc import ABC, abstractmethod
from collections import defaultdict
from http import HTTPStatus
from typing import TYPE_CHECKING, Any, final

from dsc.exceptions import (
Expand All @@ -18,8 +19,6 @@
from _collections_abc import dict_keys
from collections.abc import Iterator

from mypy_boto3_sqs.type_defs import SendMessageResultTypeDef

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -201,17 +200,51 @@ def _match_item_identifiers_to_bitstreams(
]

@final
def run(self) -> Iterator[SendMessageResultTypeDef]:
"""Run workflow to submit items to the DSpace Submission Service."""
def run(self) -> dict:
"""Run workflow to submit items to the DSpace Submission Service.
Returns a dict with the attempted, successful, and failed submissions as well as
the results of each item submission organized by the item identifier.
"""
submission_results: dict[str, Any] = {"success": True}
attempted_submissions = 0
successful_submissions = 0
failed_submissions = 0
for item_submission in self.item_submissions_iter():
item_submission.upload_dspace_metadata(self.s3_bucket, self.batch_path)
response = item_submission.send_submission_message(
self.workflow_name,
self.output_queue,
self.submission_system,
self.collection_handle,
)
yield response
attempted_submissions += 1
try:
item_submission.upload_dspace_metadata(self.s3_bucket, self.batch_path)
response = item_submission.send_submission_message(
self.workflow_name,
self.output_queue,
self.submission_system,
self.collection_handle,
)
status_code = response["ResponseMetadata"]["HTTPStatusCode"]
if status_code != HTTPStatus.OK:
submission_results["success"] = False
submission_results[item_submission.item_identifier] = response[
"MessageId"
]
successful_submissions += 1
except Exception as e:
submission_results["success"] = False
logger.exception(
"Error while processing submission: "
f"{item_submission.item_identifier}"
)
submission_results[item_submission.item_identifier] = e
failed_submissions += 1
continue

submission_results["attempted_submissions"] = attempted_submissions
submission_results["successful_submissions"] = successful_submissions
submission_results["failed_submissions"] = failed_submissions
logger.info(
f"Submission results, attempted: {attempted_submissions}, successful: "
f"{successful_submissions} , failed: {failed_submissions}"
)
return submission_results

@final
def item_submissions_iter(self) -> Iterator[ItemSubmission]:
Expand Down
6 changes: 6 additions & 0 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ def test_reconcile_success(caplog, runner, mocked_s3, base_workflow_instance, s3
"123.4/5678",
"--batch-id",
"batch-aaa",
"--email-recipients",
"test@test.edu",
"reconcile",
],
)
Expand Down Expand Up @@ -41,6 +43,8 @@ def test_reconcile_discrepancies_logged(
"123.4/5678",
"--batch-id",
"batch-aaa",
"--email-recipients",
"test@test.edu",
"reconcile",
],
)
Expand Down Expand Up @@ -71,6 +75,8 @@ def test_deposit_success(
"123.4/5678",
"--batch-id",
"batch-aaa",
"--email-recipients",
"test@test.edu",
"deposit",
],
)
Expand Down
59 changes: 56 additions & 3 deletions tests/test_workflow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from http import HTTPStatus
from unittest.mock import patch

import pytest

Expand Down Expand Up @@ -120,13 +120,66 @@ def test_base_workflow_run_success(
caplog, base_workflow_instance, mocked_s3, mocked_sqs_input, mocked_sqs_output
):
caplog.set_level("DEBUG")
response = next(base_workflow_instance.run())
submission_results = base_workflow_instance.run()
assert "Processing submission for '123'" in caplog.text
assert (
"Metadata uploaded to S3: s3://dsc/test/batch-aaa/123_metadata.json"
in caplog.text
)
assert response["ResponseMetadata"]["HTTPStatusCode"] == HTTPStatus.OK
assert submission_results["success"] is True
assert submission_results["attempted_submissions"] == 2 # noqa: PLR2004
assert submission_results["successful_submissions"] == 2 # noqa: PLR2004
assert submission_results["failed_submissions"] == 0
assert submission_results["123"]
assert submission_results["789"]
assert "Submission results, attempted: 2, successful: 2 , failed: 0" in caplog.text


@patch("dsc.item_submission.ItemSubmission.send_submission_message")
def test_base_workflow_run_exceptions_handled(
mocked_method,
caplog,
base_workflow_instance,
mocked_s3,
mocked_sqs_input,
mocked_sqs_output,
):
side_effect = [
{"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}},
Exception,
]
mocked_method.side_effect = side_effect
submission_results = base_workflow_instance.run()
assert submission_results["success"] is False
assert submission_results["attempted_submissions"] == 2 # noqa: PLR2004
assert submission_results["successful_submissions"] == 1
assert submission_results["failed_submissions"] == 1
assert submission_results["123"]
assert isinstance(submission_results["789"], Exception)
assert "Submission results, attempted: 2, successful: 1 , failed: 1" in caplog.text


@patch("dsc.item_submission.ItemSubmission.send_submission_message")
def test_base_workflow_run_invalid_status_codes_handled(
mocked_method,
caplog,
base_workflow_instance,
mocked_s3,
mocked_sqs_input,
mocked_sqs_output,
):
side_effect = [
{"MessageId": "abcd", "ResponseMetadata": {"HTTPStatusCode": 200}},
{"ResponseMetadata": {"HTTPStatusCode": 400}},
]
mocked_method.side_effect = side_effect
submission_results = base_workflow_instance.run()
assert submission_results["success"] is False
assert submission_results["attempted_submissions"] == 2 # noqa: PLR2004
assert submission_results["successful_submissions"] == 1
assert submission_results["123"]
assert isinstance(submission_results["789"], Exception)
assert "Submission results, attempted: 2, successful: 1 , failed: 1" in caplog.text


def test_base_workflow_item_submission_iter_success(base_workflow_instance):
Expand Down

0 comments on commit 09e23ee

Please sign in to comment.