From 418335521f8d52332372696b76888cc67b423f14 Mon Sep 17 00:00:00 2001 From: Elliot Metsger Date: Fri, 22 Jun 2018 19:22:07 -0400 Subject: [PATCH] Support the refresh of a Deposit DepositStatus (#124) * Abstract deposit status reference to DepositStatusReferenceProcessor, and add it to the Packager. * Reduce warnings during compilation * Make the time sleeping between SWORD deposits configurable. * Implement the refreshing of a Deposit status. - Implements SubmittedUpdateRunner, invoked by the `refresh` command line argument - Refactors DepositTask and the DepositTaskHelper to allow processing of the deposit status reference outside of SubmissionProcessor (e.g. by SubmittedUpdateRunner) - Necessitates the creation of a RepositoryCopy in the IN_PROGRESS state as soon as a Deposit is performed, in order to store the DSpace Item URL. - This is required because the DSpace SWORD receipt is not retrievable; the SWORD server does not make the receipt available except when performing the initial deposit. --- README.md | 35 ++- deposit-messaging/pom.xml | 26 ++ .../pass/deposit/messaging/DepositApp.java | 2 +- .../messaging/config/DepositConfig.java | 7 +- .../deposit/messaging/model/Packager.java | 18 ++ .../runner/SubmittedUpdateRunner.java | 83 ++++-- .../service/DepositStatusRefProcessor.java | 29 ++ .../messaging/service/DepositTask.java | 172 ++++------- .../messaging/service/DepositTaskHelper.java | 281 ++++++++++++++++-- .../messaging/service/DepositUtil.java | 1 + .../AbderaDepositStatusRefProcessor.java | 75 +++++ .../SwordDspaceDepositStatusMapper.java | 2 +- .../messaging/support/CriticalPath.java | 1 + .../src/main/resources/application.properties | 4 +- .../messaging/policy/PolicyTestUtil.java | 1 + .../AbstractSubmissionProcessorTest.java | 7 +- .../messaging/service/DepositUtilTest.java | 1 + .../service/JmsSubmissionProcessorTest.java | 2 +- .../service/SubmissionProcessorTest.java | 16 +- .../service/SubmittedStatusHandlingIT.java | 160 ++++++++++ .../AbderaDepositStatusRefProcessorTest.java | 93 ++++++ 21 files changed, 835 insertions(+), 181 deletions(-) create mode 100644 deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositStatusRefProcessor.java create mode 100644 deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessor.java create mode 100644 deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmittedStatusHandlingIT.java create mode 100644 deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessorTest.java diff --git a/README.md b/README.md index 3e48fd51..84a7339a 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,18 @@ To retry all failed deposits: To retry specific deposits: > $ java -jar deposit-services.jar retry --uris http://192.168.99.100:8080/fcrepo/rest/deposits/8e/af/ac/a9/8eafaca9-1f24-413a-bf1e-fbbd673ba45b http://192.168.99.100:8080/fcrepo/rest/deposits/4a/cb/04/bb/4acb04bb-4f79-40ef-8ff9-e105261aa7fb +#### Refresh + +Refresh mode is used to re-process a Deposit in the `SUBMITTED` state that needs its deposit status refreshed. When `refresh` is invoked, the optional `--uris` argument is used to identify the `Deposit` resources to refresh. Otherwise a search of the index is performed for _all_ `Deposit` resources in the `SUBMITTED` state. + +Refreshing a `Deposit` means that its deposit status reference will be retrieved, parsed, and processed. The status returned from the reference will be stored on the `Deposit`, and the status of the corresponding `RepositoryCopy` will be updated as well. If the `Deposit` status is updated to `ACCEPTED`, the `RepositoryCopy` will be updated to `COMPLETE`. If the `Deposit` status is updated to `REJECTED`, the `RepositoryCopy` will be updated to `REJECTED` as well. + +To refresh all deposits in the `SUBMITTED` state: +> $ java -jar deposit-services.jar refresh + +To refresh specific deposits: +> $ java -jar deposit-services.jar refresh --uris http://192.168.99.100:8080/fcrepo/rest/deposits/8e/af/ac/a9/8eafaca9-1f24-413a-bf1e-fbbd673ba45b http://192.168.99.100:8080/fcrepo/rest/deposits/4a/cb/04/bb/4acb04bb-4f79-40ef-8ff9-e105261aa7fb + ### Future modes Modes to be supported by future releases of Deposit Services. @@ -171,9 +183,6 @@ Modes to be supported by future releases of Deposit Services. TODO -#### Process - -TODO # Developers @@ -193,6 +202,10 @@ The `PASS_FEDORA_USERNAME` and `PASS_FEDORA_PASSWORD` define the username and pa The `retry` argument invokes the `FailedDepositRunner` which will re-submit failed `Deposit` resources to the task queue for processing. URIs for specific Deposits may be specified, otherwise the index is searched for failed Deposits, and each one will be re-tried. +### SubmittedUpdateRunner + +The `refresh` argument invokes the `SubmittedUpdateRunner` which will attempt to re-process a `Deposit`'s status reference. URIs for specific Deposits may be specified, otherwise the index is searched for `SUBMITTED` Deposits, and each one will be refreshed. + ## Message flow and concurrency Each JMS listener (one each for the `deposit` and `submission` queues) can process messages concurrently. The number of messages each listener can process concurrently is set by the property `spring.jms.listener.concurrency` (or its environment equivalent: `SPRING_JMS_LISTENER_CONCURRENCY`). @@ -309,6 +322,8 @@ The semantics of _intermediate_ state are that the resource is currently in a wo A general pattern within Deposit services is that resources with _terminal_ status are explicitly accounted for (this is largely enforced by _policies_ which are documented elsewhere), and are considered "read-only". +#### Submission Status + Submission status is enumerated in the `AggregatedDepositStatus` class. Deposit services considers the following values: * `NOT_STARTED` (_intermediate_): Incoming Submissions from the UI must have this status value * `IN_PROGRESS` (_intermediate_): Deposit services places the Submission in an `IN_PROGRESS` state right away. When a thread observes a `Submission` in this state, it assumes that _another_ thread is processing this resource. @@ -316,14 +331,24 @@ Submission status is enumerated in the `AggregatedDepositStatus` class. Deposit * `ACCEPTED` (_terminal_): Deposit services places the Submission into this state when all of its `Deposit`s have been `ACCEPTED` * `REJECTED` (_terminal_): Deposit services places the Submission into this state when all of its `Deposit`s have been `REJECTED` +#### Deposit Status + Deposit status is enumerated in the `DepositStatus` class. Deposit services considers the following values: * `SUBMITTED` (_intermediate_): the custodial content of the `Submission` has been successfully transferred to the `Deposit`s `Repository` * `ACCEPTED` (_terminal_): the custodial content of the `Submission` has been accessioned by the `Deposit`s `Repository` (i.e. custody of the `Submission` has successfully been transferred to the downstream `Repository`) * `REJECTED` (_terminal_): the custodial content of the `Submission` has been rejected by the `Deposit`'s `Repository` (i.e. the downstream `Repository` has refused to accept custody of the `Submission` content) * `FAILED` (_intermediate_): the transfer of custodial content to the `Repository` failed, or there was some other error updating the status of the `Deposit` +#### RepositoryCopy Status + RepositoryCopy status is enumerated in the `CopyStatus` class. Deposit services considers the following values: * `COMPLETE` (_terminal_): a copy of the custodial content is available in the `Repository` at this location +* `IN_PROGRESS` (_intermediate_): a copy of the custodial content is _expected to be_ available in the `Repository` at this location. The custodial content should not be expected to exist until the `Deposit` status is `ACCEPTED` +* `REJECTED` (_terminal_): the copy should be considered to be invalid. Even if the custodial content is made available at the location indicated by the `RepositoryCopy`, it should not be mistaken for a successful transfer of custody. + +RepositoryCopy status is subservient to the Deposit status. They will always be congruent. For example, a RepositoryCopy cannot be `COMPLETE` if the Deposit is `REJECTED`. If a Deposit is `REJECTED`, then the RepositoryCopy must also be `REJECTED`. + +#### Common Permutations There are some common permutations of these statuses that will be observed: * `ACCEPTED` `Submission`s will only have `Deposit`s that are `ACCEPTED`. Each `Deposit` will have a `COMPLETE` `RepositoryCopy`. @@ -331,8 +356,8 @@ There are some common permutations of these statuses that will be observed: * `IN_PROGRESS` `Submission`s may have zero or more `Deposit`s in any state. * `FAILED` `Submission`s should have zero `Deposit`s. * `ACCEPTED` `Deposit`s should have a `COMPLETE` `RepositoryCopy`. -* `REJECTED` `Deposit`s will have no `RepositoryCopy` -* `SUBMITTED` `Deposit`s will have no `RepositoryCopy` +* `REJECTED` `Deposit`s will have a `REJECTED` `RepositoryCopy` +* `SUBMITTED` `Deposit`s will have an `IN_PROGRESS` `RepositoryCopy` * `FAILED` `Deposit`s will have no `RepositoryCopy` diff --git a/deposit-messaging/pom.xml b/deposit-messaging/pom.xml index 973a2990..91bf476a 100644 --- a/deposit-messaging/pom.xml +++ b/deposit-messaging/pom.xml @@ -341,6 +341,23 @@ + + org.apache.maven.plugins + maven-remote-resources-plugin + + + org.dataconservancy.nihms:shared-assembler:${project.parent.version}:test-jar:tests + + + + + + process + + + + + @@ -523,6 +540,15 @@ test + + org.dataconservancy.nihms + shared-assembler + ${project.parent.version} + tests + test-jar + test + + diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/DepositApp.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/DepositApp.java index 35a297ca..d97ce1a4 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/DepositApp.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/DepositApp.java @@ -94,7 +94,7 @@ public static void main(String[] args) { app = new SpringApplication(DepositApp.class, ListenerRunner.class); break; } - case "update": { + case "refresh": { app = new SpringApplication(DepositApp.class, SubmittedUpdateRunner.class); // TODO figure out elegant way to exclude JMS-related beans like SubmissionProcessor from being spun up app.setDefaultProperties(new HashMap() { { diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/config/DepositConfig.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/config/DepositConfig.java index dfcfe49c..f8a9516a 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/config/DepositConfig.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/config/DepositConfig.java @@ -36,6 +36,7 @@ import org.dataconservancy.pass.deposit.messaging.model.Registry; import org.dataconservancy.pass.deposit.messaging.policy.DirtyDepositPolicy; import org.dataconservancy.pass.deposit.messaging.service.DepositTask; +import org.dataconservancy.pass.deposit.messaging.status.AbderaDepositStatusRefProcessor; import org.dataconservancy.pass.deposit.messaging.status.AtomFeedStatusMapper; import org.dataconservancy.pass.deposit.messaging.status.RepositoryCopyStatusMapper; import org.dataconservancy.pass.deposit.messaging.status.SwordDspaceDepositStatusMapper; @@ -212,11 +213,13 @@ public Registry packagerRegistry(Map packagers) { @Bean public Map packagers(DspaceMetsAssembler dspaceAssembler, Sword2Transport swordTransport, NihmsAssembler nihmsAssembler, FtpTransport ftpTransport, - Map> transportRegistries) { + Map> transportRegistries, + AbderaDepositStatusRefProcessor abderaDepositStatusRefProcessor) { Map packagers = new HashMap<>(); // TODO: transport registries looked up by hard-coded strings. Need a more reliable way of discovering repositories, the packagers for those repositories, and their configuration packagers.put("JScholarship", - new Packager("JScholarship", dspaceAssembler, swordTransport, transportRegistries.get("js"))); + new Packager("JScholarship", dspaceAssembler, swordTransport, transportRegistries.get("js"), + abderaDepositStatusRefProcessor)); packagers.put("PubMed Central", new Packager("PubMed Central", nihmsAssembler, ftpTransport, transportRegistries.get("nihms"))); return packagers; diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/model/Packager.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/model/Packager.java index 1be00057..376f8f6b 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/model/Packager.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/model/Packager.java @@ -17,6 +17,7 @@ import org.dataconservancy.nihms.assembler.Assembler; import org.dataconservancy.nihms.transport.Transport; +import org.dataconservancy.pass.deposit.messaging.service.DepositStatusRefProcessor; import org.dataconservancy.pass.deposit.messaging.service.DepositTask; import org.dataconservancy.pass.model.Repository; import org.dataconservancy.pass.model.Submission; @@ -50,12 +51,20 @@ public class Packager { private Transport transport; + private DepositStatusRefProcessor depositStatusProcessor; + private Map configuration; public Packager(String name, Assembler assembler, Transport transport, Map configuration) { + this(name, assembler, transport, configuration, null); + } + + public Packager(String name, Assembler assembler, Transport transport, Map configuration, + DepositStatusRefProcessor depositStatusProcessor) { this.name = name; this.assembler = assembler; this.transport = transport; + this.depositStatusProcessor = depositStatusProcessor; this.configuration = configuration; } @@ -81,4 +90,13 @@ public Map getConfiguration() { .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)); } + /** + * The {@link DepositStatusRefProcessor}, may be {@code null}. + * + * @return the {@link DepositStatusRefProcessor}, may be {@code null}. + */ + public DepositStatusRefProcessor getDepositStatusProcessor() { + return depositStatusProcessor; + } + } diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/runner/SubmittedUpdateRunner.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/runner/SubmittedUpdateRunner.java index d77616f9..1f6f6629 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/runner/SubmittedUpdateRunner.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/runner/SubmittedUpdateRunner.java @@ -16,61 +16,98 @@ package org.dataconservancy.pass.deposit.messaging.runner; import org.dataconservancy.pass.client.PassClient; -import org.dataconservancy.pass.deposit.messaging.service.PassEntityProcessor; +import org.dataconservancy.pass.deposit.messaging.DepositServiceErrorHandler; +import org.dataconservancy.pass.deposit.messaging.service.DepositTaskHelper; import org.dataconservancy.pass.model.Deposit; import org.dataconservancy.pass.model.Repository; import org.dataconservancy.pass.model.Submission; -import org.springframework.beans.factory.annotation.Qualifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.net.URI; import java.util.Collection; -import java.util.function.Consumer; import java.util.stream.Collectors; import static org.dataconservancy.pass.deposit.messaging.support.Constants.Indexer.DEPOSIT_STATUS; +import static org.dataconservancy.pass.model.Deposit.DepositStatus.SUBMITTED; /** - * Accepts uris for, or searches for, - * Deposit repository resources that have a deposit status of {@code submitted}. - *

- * Submitted deposits have had the contents of their {@link Submission} successfully transferred to a {@link - * Repository}, but their terminal status is not known. That is, Deposit Services does not know if the {@code - * Deposit} has been accepted or rejected. - *

- *

- * Submitted deposits are examined for a deposit status reference and repository copies. - *

+ * Accepts uris for, or searches for, + * + * Deposit repository resources that have a deposit status of {@code submitted}.

Submitted deposits have had the + * contents of their {@link Submission} successfully transferred to a {@link Repository}, but their terminal + * status is not known. That is, Deposit Services does not know if the {@code Deposit} has been accepted or rejected. + *

Submitted deposits are examined for a deposit status reference and repository copies.

* * @author Elliot Metsger (emetsger@jhu.edu) */ public class SubmittedUpdateRunner { + private static final Logger LOG = LoggerFactory.getLogger(SubmittedUpdateRunner.class); + + @Autowired + private DepositTaskHelper depositTaskHelper; + + @Autowired + private ThreadPoolTaskExecutor taskExecutor; + + @Autowired + private DepositServiceErrorHandler errorHandler; + /** * Answers a Spring {@link ApplicationRunner} that will process a {@code Collection} of URIs representing submitted * deposits. If no URIs are supplied on the command line, a search is performed for all submitted deposits. The * submitted deposits are then queued for processing by the provided {@code processor}. * * @param passClient the client implementation used to resolve PASS entity uris and perform searches - * @param processor processes {@code Deposit} resources that have a state of submitted - * @return the Spring {@code ApplicationRunner} which receives the command line arguments supplied to this application + * @return the Spring {@code ApplicationRunner} which receives the command line arguments supplied to this + * application */ @Bean - public ApplicationRunner depositUpdate(PassClient passClient, PassEntityProcessor processor, @Qualifier("submittedDepositStatusUpdater") - Consumer updater) { + public ApplicationRunner depositUpdate(PassClient passClient) { return (args) -> { - Collection toUpdate = toUpdate(args, passClient); - processor.update(toUpdate, updater, Deposit.class); + Collection deposits = depositsToUpdate(args, passClient); + deposits.forEach(depositUri -> { + try { + depositTaskHelper.processDepositStatus(passClient.readResource(depositUri, Deposit.class)); + } catch (Exception e) { + errorHandler.handleError(e); + } + }); + + taskExecutor.shutdown(); + taskExecutor.setAwaitTerminationSeconds(10); }; } - private Collection toUpdate(ApplicationArguments args, PassClient passClient) { - if (args.getNonOptionArgs() != null && args.getNonOptionArgs().size() > 1) { - return args.getNonOptionArgs().stream().skip(1).map(URI::create).collect(Collectors.toSet()); + /** + * Parses command line arguments for the URIs to update, or searches the index for URIs of submitted deposits. + *
+ *
--uris
space-separated list of Deposit URIs to be processed. If the URI does not specify a Deposit, + * it is skipped (implies {@code --sync}, but can be overridden by supplying {@code --async})
--sync
+ *
the console remains attached as each URI is processed, allowing the end-user to examine the results of + * updated Deposits as they happen
--async
the console detaches immediately, with the Deposit URIs + * processed in the background
+ * + * @param args the command line arguments + * @param passClient used to search the index for dirty deposits + * @return a {@code Collection} of URIs representing dirty deposits + */ + private Collection depositsToUpdate(ApplicationArguments args, PassClient passClient) { + if (args.containsOption("uris") && args.getOptionValues("uris").size() > 0) { + // maintain the order of the uris as they were supplied on the CLI + return args.getOptionValues("uris").stream().map(URI::create).collect(Collectors.toList()); } else { - return passClient.findAllByAttribute(Deposit.class, DEPOSIT_STATUS, "submitted"); + Collection uris = passClient.findAllByAttribute(Deposit.class, DEPOSIT_STATUS, SUBMITTED); + if (uris.size() < 1) { + throw new IllegalArgumentException("No URIs found to process."); + } + return uris; } } diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositStatusRefProcessor.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositStatusRefProcessor.java new file mode 100644 index 00000000..b1bc963b --- /dev/null +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositStatusRefProcessor.java @@ -0,0 +1,29 @@ +/* + * Copyright 2018 Johns Hopkins University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dataconservancy.pass.deposit.messaging.service; + +import org.dataconservancy.pass.model.Deposit; + +import java.net.URI; + +/** + * @author Elliot Metsger (emetsger@jhu.edu) + */ +public interface DepositStatusRefProcessor { + + Deposit.DepositStatus process(URI depositStatusRef); + +} diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTask.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTask.java index ca1e4b87..cb5ae5a5 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTask.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTask.java @@ -22,19 +22,18 @@ import org.dataconservancy.pass.deposit.messaging.DepositServiceRuntimeException; import org.dataconservancy.pass.deposit.messaging.model.Packager; import org.dataconservancy.pass.deposit.messaging.policy.Policy; -import org.dataconservancy.pass.deposit.messaging.status.DepositStatusMapper; -import org.dataconservancy.pass.deposit.messaging.status.DepositStatusParser; -import org.dataconservancy.pass.deposit.messaging.status.SwordDspaceDepositStatus; +import org.dataconservancy.pass.deposit.messaging.service.DepositUtil.DepositWorkerContext; import org.dataconservancy.pass.deposit.messaging.support.CriticalRepositoryInteraction; import org.dataconservancy.pass.deposit.messaging.support.CriticalRepositoryInteraction.CriticalResult; import org.dataconservancy.pass.deposit.transport.sword2.Sword2DepositReceiptResponse; import org.dataconservancy.pass.model.Deposit; import org.dataconservancy.pass.model.RepositoryCopy; +import org.dataconservancy.pass.model.RepositoryCopy.CopyStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.Map; @@ -42,7 +41,6 @@ import static java.lang.String.format; import static java.lang.System.identityHashCode; import static org.dataconservancy.pass.model.Deposit.DepositStatus.ACCEPTED; -import static org.dataconservancy.pass.model.Deposit.DepositStatus.REJECTED; import static org.dataconservancy.pass.model.Deposit.DepositStatus.SUBMITTED; /** @@ -67,33 +65,28 @@ public class DepositTask implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(DepositTask.class); - private DepositUtil.DepositWorkerContext dc; + private DepositWorkerContext dc; private PassClient passClient; - private DepositStatusParser atomStatusParser; - - private DepositStatusMapper swordDepositStatusMapper; - private Policy intermediateDepositStatusPolicy; - private Policy terminalDepositStatusPolicy; + private CriticalRepositoryInteraction cri; + + private DepositTaskHelper depositHelper; - private CriticalRepositoryInteraction critical; + private long swordSleepTimeMs = 10000; - public DepositTask(DepositUtil.DepositWorkerContext dc, PassClient passClient, - DepositStatusParser atomStatusParser, - DepositStatusMapper swordDepositStatusMapper, + public DepositTask(DepositWorkerContext dc, + PassClient passClient, Policy intermediateDepositStatusPolicy, - Policy terminalDepositStatusPolicy, - CriticalRepositoryInteraction critical) { + CriticalRepositoryInteraction cri, + DepositTaskHelper depositHelper) { this.dc = dc; this.passClient = passClient; - this.atomStatusParser = atomStatusParser; - this.swordDepositStatusMapper = swordDepositStatusMapper; this.intermediateDepositStatusPolicy = intermediateDepositStatusPolicy; - this.terminalDepositStatusPolicy = terminalDepositStatusPolicy; - this.critical = critical; + this.cri = cri; + this.depositHelper = depositHelper; } @Override @@ -101,7 +94,7 @@ public void run() { LOG.debug(">>>> Running {}@{}", DepositTask.class.getSimpleName(), toHexString(identityHashCode(this))); - CriticalResult result = critical.performCritical(dc.deposit().getId(), Deposit.class, + CriticalResult result = cri.performCritical(dc.deposit().getId(), Deposit.class, /* * Only "intermediate" deposits can be processed by {@code DepositTask} @@ -192,24 +185,26 @@ public void run() { // TODO: abstract out a configurable timer. // Sleep here for a bit, let DSpace do its thing, and then we ought to be able to parse a deposit status try { - LOG.debug(">>>> Sleeping 10 seconds for SWORD deposit to complete ..."); - Thread.sleep(10000); + LOG.debug(">>>> Sleeping {} ms for SWORD deposit to complete ...", swordSleepTimeMs); + Thread.sleep(swordSleepTimeMs); } catch (InterruptedException e) { LOG.debug(">>>> DepositTask {}@{} interrupted!", DepositTask.class.getSimpleName(), toHexString(identityHashCode(this))); Thread.interrupted(); } - // deposit receipt -> SWORD Statement (ORE ReM/Atom XML) -> sword:state - Sword2DepositReceiptResponse swordResponse = (Sword2DepositReceiptResponse) transportResponse; - String itemUri = swordResponse.getReceipt().getEntry().getAlternateLink().getHref().toString(); URI statementUri = null; - Deposit.DepositStatus status = null; + String itemUri = null; try { + Sword2DepositReceiptResponse swordResponse = (Sword2DepositReceiptResponse) transportResponse; + + // deposit receipt -> SWORD Statement (ORE ReM/Atom XML) -> sword:state + if (swordResponse.getReceipt().getSplashPageLink() != null) { + itemUri = swordResponse.getReceipt().getSplashPageLink().getHref(); + } + + // deposit receipt -> DSpace Item URL statementUri = swordResponse.getReceipt().getAtomStatementLink().getIRI().toURI(); - dc.deposit().setDepositStatusRef(statementUri.toString()); - SwordDspaceDepositStatus swordStatus = atomStatusParser.parse(statementUri); - status = swordDepositStatusMapper.map(swordStatus); } catch (Exception e) { String msg = format("Failed to update deposit status to %s for tuple [%s, %s, %s]; " + "parsing the Atom statement %s for %s failed: %s", @@ -218,100 +213,49 @@ public void run() { throw new DepositServiceRuntimeException(msg, e, dc.deposit()); } - switch (status) { - case ACCEPTED: { - LOG.info(">>>> Deposit {} was accepted.", dc.deposit().getId()); - dc.deposit().setDepositStatus(ACCEPTED); - RepositoryCopy repoCopy = new RepositoryCopy(); - repoCopy.setCopyStatus(RepositoryCopy.CopyStatus.COMPLETE); - repoCopy.setRepository(dc.repository().getId()); - repoCopy.setPublication(dc.submission().getPublication()); - repoCopy.setExternalIds(Collections.singletonList(itemUri)); - dc.repoCopy(repoCopy); - break; - } - - case REJECTED: { - LOG.info(">>>> Deposit {} was rejected.", dc.deposit().getId()); - dc.deposit().setDepositStatus(Deposit.DepositStatus.REJECTED); - break; - } - } - } + // TransportResponse was successfully parsed, set the status ref + dc.deposit().setDepositStatusRef(statementUri.toString()); - // TransportResponse was successfully parsed and logical success or failure of the Deposit was determined. - // Create the RepositoryCopy and update the Deposit + // Create a RepositoryCopy, which will record the URL of the Item in DSpace + RepositoryCopy repoCopy = newRepositoryCopy(dc, itemUri, CopyStatus.IN_PROGRESS); + dc.repoCopy(repoCopy); + // Determine the logical success or failure of the Deposit, and persist the Deposit and RepositoryCopy in + // the Fedora repository + depositHelper.processDepositStatus(dc.submission(), dc.repository(), dc.repoCopy(), dc.deposit()); + } - CriticalResult finalResult = critical.performCritical(dc.deposit().getId(), Deposit.class, - - (deposit) -> { - if (deposit.getDepositStatus() != Deposit.DepositStatus.SUBMITTED) { - LOG.debug("Precondition for updating {} was not satisfied. Expected " + - "Deposit.DepositStatus={}, but was {}", - deposit.getId(), SUBMITTED, deposit.getDepositStatus()); - return false; - } - - return true; - }, - - (deposit) -> { - // As long as a Deposit is not FAILED, we are OK with the final result. - // - // A SWORD deposit may have taken longer than 10s (they are async, after all), so the Deposit - // may be in the SUBMITTED state still. - // - // NIHMS FTP deposits will always be in the SUBMITTED state upon success, because there is no - // way to determine the acceptability of the package simply by dropping it off at the FTP server - // - // So, the status of the Deposit might be REJECTED, ACCEPTED, or SUBMITTED, as long as it isn't - // FAILED. - if (terminalDepositStatusPolicy.accept(deposit.getDepositStatus()) || - deposit.getDepositStatus() == SUBMITTED) { - return true; - } - - LOG.debug("Postcondition for updating {} was not satisfied. Expected " + "DepositDepositStatus " + - "to be {}, {}, or {}, but was '{}'", ACCEPTED, REJECTED, SUBMITTED, deposit - .getDepositStatus()); - return false; - }, - - (deposit) -> { - if (dc.repoCopy() != null) { - dc.repoCopy(passClient.createAndReadResource(dc.repoCopy(), RepositoryCopy.class)); - deposit.setRepositoryCopy(dc.repoCopy().getId()); - LOG.debug(">>>> Created repository copy for {}: {}", dc.deposit().getId(), dc.repoCopy().getId()); - } - - // the 'deposit' lambda parameter is a Deposit resource that has been retrieved from the repository. - // If we have any state in the local DepositContext.deposit() that we want to be persisted in the - // repository, it must be copied over from dc.deposit() to deposit. - deposit.setDepositStatusRef(dc.deposit().getDepositStatusRef()); - deposit.setDepositStatus(dc.deposit().getDepositStatus()); - - return dc.repoCopy(); - }); + } - if (!finalResult.success()) { - String msg = format("Failed to update Deposit tuple [%s, %s, %s]", - dc.submission().getId(), dc.repository().getId(), dc.deposit().getId()); - if (finalResult.throwable().isPresent()) { - Throwable t = finalResult.throwable().get(); - msg = msg + ": " + t.getMessage(); - throw new DepositServiceRuntimeException(msg, t, dc.deposit()); + private static RepositoryCopy newRepositoryCopy(DepositWorkerContext dc, String itemUri, CopyStatus copyStatus) { + RepositoryCopy repoCopy = new RepositoryCopy(); + repoCopy.setCopyStatus(copyStatus); + repoCopy.setRepository(dc.repository().getId()); + repoCopy.setPublication(dc.submission().getPublication()); + if (itemUri != null) { + repoCopy.setExternalIds(Collections.singletonList(itemUri)); + try { + repoCopy.setAccessUrl(new URI(itemUri)); + } catch (URISyntaxException e) { + LOG.warn("Error creating an accessUrl from '{}' for a RepositoryCopy associated with {}", + itemUri, dc.deposit().getId()); } - - throw new DepositServiceRuntimeException(msg, dc.deposit()); } - + return repoCopy; } - public DepositUtil.DepositWorkerContext getDepositWorkerContext() { + public DepositWorkerContext getDepositWorkerContext() { return dc; } + public long getSwordSleepTimeMs() { + return swordSleepTimeMs; + } + + public void setSwordSleepTimeMs(long swordSleepTimeMs) { + this.swordSleepTimeMs = swordSleepTimeMs; + } + @Override public String toString() { return "DepositTask{" + "dc=" + dc + ", passClient=" + passClient + '}'; diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTaskHelper.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTaskHelper.java index 20455e17..f552033e 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTaskHelper.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositTaskHelper.java @@ -23,25 +23,30 @@ import org.dataconservancy.pass.deposit.messaging.model.Registry; import org.dataconservancy.pass.deposit.messaging.policy.Policy; import org.dataconservancy.pass.deposit.messaging.service.DepositUtil.DepositWorkerContext; -import org.dataconservancy.pass.deposit.messaging.status.DepositStatusMapper; -import org.dataconservancy.pass.deposit.messaging.status.DepositStatusParser; -import org.dataconservancy.pass.deposit.messaging.status.SwordDspaceDepositStatus; import org.dataconservancy.pass.deposit.messaging.support.CriticalRepositoryInteraction; +import org.dataconservancy.pass.deposit.messaging.support.CriticalRepositoryInteraction.CriticalResult; import org.dataconservancy.pass.model.Deposit; import org.dataconservancy.pass.model.Repository; +import org.dataconservancy.pass.model.RepositoryCopy; import org.dataconservancy.pass.model.Submission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import java.net.URI; +import java.net.URISyntaxException; import static java.lang.Integer.toHexString; import static java.lang.String.format; import static java.lang.System.identityHashCode; import static org.dataconservancy.pass.deposit.messaging.service.DepositUtil.toDepositWorkerContext; +import static org.dataconservancy.pass.model.Deposit.DepositStatus.ACCEPTED; +import static org.dataconservancy.pass.model.Deposit.DepositStatus.REJECTED; +import static org.dataconservancy.pass.model.Deposit.DepositStatus.SUBMITTED; +import static org.dataconservancy.pass.model.RepositoryCopy.CopyStatus.IN_PROGRESS; /** * Encapsulates functionality common to performing the submission of a Deposit to the TaskExecutor. @@ -62,37 +67,38 @@ public class DepositTaskHelper { public static final String MISSING_PACKAGER = ">>>> No Packager found for tuple [{}, {}, {}]: " + "Missing Packager for Repository named '{}', marking Deposit as FAILED."; - protected PassClient passClient; + private static final String PRECONDITION_FAILED = "Refusing to update {}, the following pre-condition failed: "; - protected Registry packagerRegistry; + private static final String POSTCONDITION_FAILED = "Refusing to update {}, the following post-condition failed: "; - protected TaskExecutor taskExecutor; + private PassClient passClient; - protected DepositStatusMapper depositStatusMapper; + private TaskExecutor taskExecutor; - protected DepositStatusParser atomStatusParser; + private Policy intermediateDepositStatusPolicy; - protected Policy intermediateDepositStatusPolicy; + private Policy terminalDepositStatusPolicy; - protected Policy terminalDepositStatusPolicy; + private CriticalRepositoryInteraction cri; - protected CriticalRepositoryInteraction critical; + @Value("${pass.deposit.transport.swordv2.sleep-time-ms}") + private long swordDepositSleepTimeMs; + + private Registry packagerRegistry; @Autowired - public DepositTaskHelper(PassClient passClient, Registry packagerRegistry, TaskExecutor taskExecutor, - DepositStatusMapper depositStatusMapper, - DepositStatusParser atomStatusParser, + public DepositTaskHelper(PassClient passClient, + TaskExecutor taskExecutor, Policy intermediateDepositStatusPolicy, Policy terminalDepositStatusPolicy, - CriticalRepositoryInteraction critical) { + CriticalRepositoryInteraction cri, + Registry packagerRegistry) { this.passClient = passClient; - this.packagerRegistry = packagerRegistry; this.taskExecutor = taskExecutor; - this.depositStatusMapper = depositStatusMapper; - this.atomStatusParser = atomStatusParser; this.intermediateDepositStatusPolicy = intermediateDepositStatusPolicy; this.terminalDepositStatusPolicy = terminalDepositStatusPolicy; - this.critical = critical; + this.cri = cri; + this.packagerRegistry = packagerRegistry; } /** @@ -103,6 +109,10 @@ public DepositTaskHelper(PassClient passClient, Registry packagerRegis * Note that the {@link DepositServiceErrorHandler} will be invoked to handle the {@code * DepositServiceRuntimeException}, which will attempt to mark the {@code Deposit} as FAILED. *

+ *

+ * The {@code DepositTask} composed by this helper method will only accept {@code Deposit} resources with + * intermediate state. + *

* * @param submission the submission that the {@code deposit} belongs to * @param depositSubmission the submission in the Deposit Services' model @@ -116,8 +126,8 @@ public void submitDeposit(Submission submission, DepositSubmission depositSubmis try { DepositWorkerContext dc = toDepositWorkerContext( deposit, submission, depositSubmission, repo, packager); - DepositTask depositTask = new DepositTask(dc, passClient, atomStatusParser, depositStatusMapper, - intermediateDepositStatusPolicy, terminalDepositStatusPolicy, critical); + DepositTask depositTask = new DepositTask(dc, passClient, intermediateDepositStatusPolicy, cri, this); + depositTask.setSwordSleepTimeMs(swordDepositSleepTimeMs); LOG.debug(">>>> Submitting task ({}@{}) for tuple [{}, {}, {}]", depositTask.getClass().getSimpleName(), toHexString(identityHashCode(depositTask)), @@ -131,4 +141,233 @@ public void submitDeposit(Submission submission, DepositSubmission depositSubmis } } + public void processDepositStatus(Deposit deposit) { + processDepositStatus( + passClient.readResource(deposit.getSubmission(), Submission.class), + passClient.readResource(deposit.getRepository(), Repository.class), + passClient.readResource(deposit.getRepositoryCopy(), RepositoryCopy.class), deposit); + } + + void processDepositStatus(Submission submission, Repository repo, RepositoryCopy repoCopy, Deposit deposit) { + + // Subtle issue to be aware of: + // + // The Deposit being passed into this method may contain state (e.g. a depositStatusRef) that is not + // present on the Deposit resource in the repository. Therefore, the Deposit retrieved by the CRI may + // be out of date with respect to the Deposit provided to this method. + // + // At this time, the depositStatusRef is the only state that may differ. + // + // To insure that the depositStatusRef from the 'deposit' method parameter is stored on the 'deposit' from + // the CRI, the field is copied in the "critical update" lambda below. This insures if a conflict arises, + // the ConflictHandler will retry the critical update, including the copy of the depositStatusRef. + + CriticalResult cr = cri.performCritical(deposit.getId(), Deposit.class, + + /* + * Preconditions: + * - The depositStatusRef on the 'deposit' *supplied to this method* must not be null, and must be a URI + * - The links between Deposit, Submission, Repository, and RepositoryCopy must be intact + * - The Deposit must be in a SUBMITTED state. + * - Insure a DepositStatusRefProcessor exists for the Repository + */ + (criDeposit) -> { + if (criDeposit.getDepositStatus() != SUBMITTED) { + LOG.warn(PRECONDITION_FAILED + " Expected Deposit.DepositStatus = {}, but was '{}'", + SUBMITTED, deposit.getDepositStatus()); + return false; + } + + if (!verifyNullityAndLinks(submission, repo, repoCopy, criDeposit)) { + return false; + } + + try { + new URI(deposit.getDepositStatusRef()); + } catch (URISyntaxException|NullPointerException e) { + LOG.warn(PRECONDITION_FAILED + " depositStatusRef must be a valid URI: {}", + deposit.getId(), e.getMessage(), e); + return false; + } + + if (packagerRegistry.get(repo.getName()) == null || packagerRegistry.get(repo.getName()) + .getDepositStatusProcessor() == null) { + LOG.warn(PRECONDITION_FAILED + " mising a DepositStatusRefProcessor for Repository with name " + + "'{}' (id: '{}')", repo.getName(), repo.getId()); + return false; + } + + return true; + }, + + /* + * Postconditions: + * - The criRepoCopy must not be null + * - The criDeposit must have a depositStatusRef + * - The criDeposit must be linked to the criRepoCopy + * - The criDeposit cannot be in a FAILED state + * - If the criDeposit is still in SUBMITTED state, then the RepositoryCopy must remain (or be in) an IN-PROGRESS state + * - If the criDeposit is in a REJECTED state, then the RepositoryCopy must be in a REJECTED state + * - If the criDeposit is in an ACCEPTED stated, then the RepositoryCopy must be in an ACCEPTED state + */ + (criDeposit, criRepoCopy) -> { + if (criRepoCopy == null) { + LOG.warn(POSTCONDITION_FAILED + " RepositoryCopy was null."); + return false; + } + + if (criDeposit.getDepositStatusRef() == null) { + LOG.warn(POSTCONDITION_FAILED + " Deposit must have a depositStatusRef."); + return false; + } + + if (criDeposit.getRepositoryCopy() == null || !criDeposit.getRepositoryCopy().equals(criRepoCopy.getId())) { + LOG.warn(POSTCONDITION_FAILED + " Deposit RepositoryCopy URI was '{}', and does not equal the" + + " expected URI {}", criDeposit.getRepositoryCopy(), criRepoCopy.getId()); + return false; + } + + // A SWORD deposit may have taken longer than 10s (they are async, after all), so the Deposit + // may be in the SUBMITTED state still. + // + // NIHMS FTP deposits will always be in the SUBMITTED state upon success, because there is no + // way to determine the acceptability of the package simply by dropping it off at the FTP server + // + // So, the status of the Deposit might be REJECTED, ACCEPTED, or SUBMITTED, as long as it isn't + // FAILED. + if (!terminalDepositStatusPolicy.accept(criDeposit.getDepositStatus()) && + criDeposit.getDepositStatus() != SUBMITTED) { + LOG.warn(POSTCONDITION_FAILED + " Expected Deposit.DepositStatus to be {}, {}, or {}, but was '{}'", + ACCEPTED, REJECTED, SUBMITTED, criDeposit.getDepositStatus()); + return false; + } + + if (criDeposit.getDepositStatus() == SUBMITTED && repoCopy.getCopyStatus() != IN_PROGRESS) { + LOG.warn(POSTCONDITION_FAILED + " Expected RepoCopy.CopyStatus = {}, but was '{}' for Deposit.DepositStatus = '{}'", + IN_PROGRESS, criRepoCopy.getCopyStatus(), SUBMITTED); + return false; + } + + if (criDeposit.getDepositStatus() == REJECTED && criRepoCopy.getCopyStatus() != RepositoryCopy.CopyStatus.REJECTED) { + LOG.warn(POSTCONDITION_FAILED + " Expected RepoCopy.CopyStatus = {}, but was '{}' for Deposit.DepositStatus = '{}'", + RepositoryCopy.CopyStatus.REJECTED, criRepoCopy.getCopyStatus(), REJECTED); + return false; + } + + if (criDeposit.getDepositStatus() == ACCEPTED && criRepoCopy.getCopyStatus() != RepositoryCopy.CopyStatus.COMPLETE) { + LOG.warn(POSTCONDITION_FAILED + " Expected RepoCopy.CopyStatus = {}, but was '{}' for Deposit.DepositStatus = '{}'", + RepositoryCopy.CopyStatus.COMPLETE, criRepoCopy.getCopyStatus(), ACCEPTED); + return false; + } + + return true; + }, + + (criDeposit) -> { + Deposit.DepositStatus status; + + criDeposit.setDepositStatusRef(deposit.getDepositStatusRef()); + + try { + DepositStatusRefProcessor depositStatusProcessor = + packagerRegistry.get(repo.getName()).getDepositStatusProcessor(); + status = depositStatusProcessor.process(URI.create(criDeposit.getDepositStatusRef())); + } catch (Exception e) { + String msg = format("Failed to update deposit status for [%s], " + + "parsing the status document referenced by %s failed: %s", + criDeposit.getId(), criDeposit.getDepositStatusRef(), e.getMessage()); + LOG.warn(msg, e); + throw new DepositServiceRuntimeException(msg, e, criDeposit); + } + + if (status == null) { + String msg = format("Failed to update deposit status for [%s], " + + "mapping the status obtained from %s failed", + criDeposit.getId(), criDeposit.getDepositStatusRef()); + throw new DepositServiceRuntimeException(msg, criDeposit); + } + + switch (status) { + case ACCEPTED: { + LOG.info("Deposit {} was accepted.", criDeposit.getId()); + criDeposit.setDepositStatus(ACCEPTED); + repoCopy.setCopyStatus(RepositoryCopy.CopyStatus.COMPLETE); + break; + } + + case REJECTED: { + LOG.info("Deposit {} was rejected.", criDeposit.getId()); + criDeposit.setDepositStatus(Deposit.DepositStatus.REJECTED); + repoCopy.setCopyStatus(RepositoryCopy.CopyStatus.REJECTED); + break; + } + } + + RepositoryCopy criRepoCopy; + + try { + if (repoCopy.getId() == null) { + criRepoCopy = passClient.createAndReadResource(repoCopy, RepositoryCopy.class); + } else { + criRepoCopy = passClient.updateAndReadResource(repoCopy, RepositoryCopy.class); + } + criDeposit.setRepositoryCopy(criRepoCopy.getId()); + } catch (Exception e) { + String msg = String.format("Failed to create or update RepositoryCopy '%s' for %s", + repoCopy.getId(), criDeposit.getId()); + throw new DepositServiceRuntimeException(msg, e, criDeposit); + } + + return criRepoCopy; + }); + + if (!cr.success()) { + String msg = format("Failed to update Deposit tuple [%s, %s, %s]", + submission.getId(), repo.getId(), deposit.getId()); + + if (cr.throwable().isPresent()) { + throw new DepositServiceRuntimeException(msg, cr.throwable().get(), deposit); + } + + throw new DepositServiceRuntimeException(msg, deposit); + } + } + + private static boolean verifyNullityAndLinks(Submission s, Repository r, RepositoryCopy rc, Deposit d) { + if (d.getDepositStatus() != SUBMITTED) { + LOG.warn(PRECONDITION_FAILED + " expected DepositStatus = '{}', but was '{}'", + d.getId(), SUBMITTED, d.getDepositStatus()); + return false; + } + + if (rc.getId() != null && !rc.getId().equals(d.getRepositoryCopy())) { + LOG.warn(PRECONDITION_FAILED + " RepositoryCopy URI mismatch: deposit RepositoryCopy URI: '{}', supplied RepositoryCopy URI: '{}'", + d.getId(), d.getRepositoryCopy(), rc.getId()); + return false; + } + + if (d.getSubmission() == null) { + LOG.warn(PRECONDITION_FAILED + " it has a 'null' Submission.", d.getId()); + return false; + } + + if (!s.getId().equals(d.getSubmission())) { + LOG.warn(PRECONDITION_FAILED + " Submission URI mismatch: deposit Submission URI: '{}', supplied Submission URI: '{}'", + d.getId(), d.getSubmission(), s.getId()); + } + + if (d.getRepository() == null) { + LOG.warn(PRECONDITION_FAILED + " it has a 'null' Repository.", d.getId()); + return false; + } + + if (!r.getId().equals(d.getRepository())) { + LOG.warn(PRECONDITION_FAILED + " Repository URI mismatch: deposit Repository URI: '{}', supplied Repository URI: '{}'", + d.getId(), d.getRepository(), r.getId()); + return false; + } + + return true; + } + } diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtil.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtil.java index 9570f32b..f4a574aa 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtil.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtil.java @@ -158,6 +158,7 @@ public static boolean csvStringContains(String toMatch, String csvCandidates) { * @param jmsMessage the message, in the native JMS model * @return an Object with references to the context of an incoming JMS message */ + @SuppressWarnings({"unchecked", "rawtypes"}) public static MessageContext toMessageContext(String resourceType, String eventType, long timestamp, String id, Session session, Message message, javax.jms.Message jmsMessage) { MessageContext mc = new MessageContext(); diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessor.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessor.java new file mode 100644 index 00000000..16e4d54e --- /dev/null +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessor.java @@ -0,0 +1,75 @@ +/* + * Copyright 2018 Johns Hopkins University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dataconservancy.pass.deposit.messaging.status; + +import org.dataconservancy.pass.deposit.messaging.service.DepositStatusRefProcessor; +import org.dataconservancy.pass.model.Deposit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.net.URI; + +/** + * Parses {@link Deposit.DepositStatus} from a SWORD statement document. + * + * @author Elliot Metsger (emetsger@jhu.edu) + * @see SWORD v2 Profile ยง11 + */ +@Component +public class AbderaDepositStatusRefProcessor implements DepositStatusRefProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(AbderaDepositStatusRefProcessor.class); + + private DepositStatusParser atomStatusParser; + + private DepositStatusMapper swordDepositStatusMapper; + + @Autowired + public AbderaDepositStatusRefProcessor(DepositStatusParser atomStatusParser, + DepositStatusMapper swordDepositStatusMapper) { + this.atomStatusParser = atomStatusParser; + this.swordDepositStatusMapper = swordDepositStatusMapper; + } + + /** + * Parses the SWORD statement at {@code depositStatusRef}, and returns a corresponding {@link Deposit.DepositStatus} + * + * @param depositStatusRef expected to be a {@code URI} referencing a SWORD statement + * @return the deposit status, may be {@code null} + */ + @Override + public Deposit.DepositStatus process(URI depositStatusRef) { + SwordDspaceDepositStatus swordStatus = atomStatusParser.parse(depositStatusRef); + + if (swordStatus == null) { + LOG.debug("Unable to parse the SWORD deposit status from {}, returning 'null' Deposit.DepositStatus", + depositStatusRef); + return null; + } + + Deposit.DepositStatus depositStatus = swordDepositStatusMapper.map(swordStatus); + + if (depositStatus == null) { + LOG.debug("Unable to map the SWORD deposit status '{}' parsed from {} to a Deposit.DepositStatus. " + + "Returning 'null' Deposit.DepositStatus", swordStatus, depositStatusRef); + return null; + } + + return depositStatus; + } +} diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/SwordDspaceDepositStatusMapper.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/SwordDspaceDepositStatusMapper.java index c67ca360..3fc042de 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/SwordDspaceDepositStatusMapper.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/status/SwordDspaceDepositStatusMapper.java @@ -38,7 +38,7 @@ protected String getConfigurationKey() { * Maps a {@link SwordDspaceDepositStatus} to a {@link org.dataconservancy.pass.model.Deposit.DepositStatus} * * @param statusToMap the SWORD deposit status - * @return the corresponding PASS deposit status + * @return the corresponding PASS deposit status, may be {@code null} */ @Override public Deposit.DepositStatus map(SwordDspaceDepositStatus statusToMap) { diff --git a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/support/CriticalPath.java b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/support/CriticalPath.java index 0facbec9..23136274 100644 --- a/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/support/CriticalPath.java +++ b/deposit-messaging/src/main/java/org/dataconservancy/pass/deposit/messaging/support/CriticalPath.java @@ -116,6 +116,7 @@ public CriticalResult performCritical(URI uri, C * any exception thrown, and the overall success as determined by the post-condition */ @Override + @SuppressWarnings("unchecked") public CriticalResult performCritical(URI uri, Class clazz, Predicate precondition, BiPredicate postcondition, diff --git a/deposit-messaging/src/main/resources/application.properties b/deposit-messaging/src/main/resources/application.properties index 87c7bff0..d537a2a6 100644 --- a/deposit-messaging/src/main/resources/application.properties +++ b/deposit-messaging/src/main/resources/application.properties @@ -35,4 +35,6 @@ pass.deposit.workers.concurrency=4 pass.deposit.status.mapping=classpath:/statusmapping.json pass.deposit.http.agent=pass-deposit/x.y.z pass.deposit.queue.deposit.name=deposit -pass.deposit.queue.submission.name=submission \ No newline at end of file +pass.deposit.queue.submission.name=submission +# TODO probably should be configured on a repository-by-repository basis +pass.deposit.transport.swordv2.sleep-time-ms=10000 \ No newline at end of file diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/policy/PolicyTestUtil.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/policy/PolicyTestUtil.java index 7685f708..3106797b 100644 --- a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/policy/PolicyTestUtil.java +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/policy/PolicyTestUtil.java @@ -36,6 +36,7 @@ static DepositUtil.MessageContext withResourceAndEventType(String resourceType, return withResourceAndEventType(resourceType, eventType, "software-agent-web-browser.json"); } + @SuppressWarnings({"unchecked", "rawtypes"}) static DepositUtil.MessageContext withResourceAndEventType(String resourceType, String eventType, String messageBodyResource) throws IOException { DepositUtil.MessageContext mc = mock(DepositUtil.MessageContext.class); diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/AbstractSubmissionProcessorTest.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/AbstractSubmissionProcessorTest.java index 1f133e01..df2aa923 100644 --- a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/AbstractSubmissionProcessorTest.java +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/AbstractSubmissionProcessorTest.java @@ -50,7 +50,7 @@ public abstract class AbstractSubmissionProcessorTest { SubmissionPolicy submissionPolicy; - Policy dirtyDepositPolicy; + Policy intermediateDepositStatusPolicy; Policy terminalDepositStatusPolicy; @@ -74,15 +74,14 @@ public void setUp() throws Exception { submissionBuilder = mock(SubmissionBuilder.class); packagerRegistry = mock(Registry.class); submissionPolicy = mock(SubmissionPolicy.class); - dirtyDepositPolicy = mock(Policy.class); + intermediateDepositStatusPolicy = mock(Policy.class); messagePolicy = mock(JmsMessagePolicy.class); taskExecutor = mock(TaskExecutor.class); dspaceStatusMapper = mock(DepositStatusMapper.class); atomStatusParser = mock(DepositStatusParser.class); cri = mock(CriticalRepositoryInteraction.class); terminalDepositStatusPolicy = mock(Policy.class); - depositTaskHelper = new DepositTaskHelper(passClient, packagerRegistry, taskExecutor, dspaceStatusMapper, - atomStatusParser, dirtyDepositPolicy, terminalDepositStatusPolicy, cri); + depositTaskHelper = new DepositTaskHelper(passClient, taskExecutor, intermediateDepositStatusPolicy, terminalDepositStatusPolicy, cri, packagerRegistry); } } diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtilTest.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtilTest.java index e6c0b75b..8e1afcc2 100644 --- a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtilTest.java +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/DepositUtilTest.java @@ -108,6 +108,7 @@ public void parseAckMode() throws Exception { } @Test + @SuppressWarnings("rawtypes") public void toMessageContext() throws Exception { String rType = "resource_type"; String eType = "event_type"; diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/JmsSubmissionProcessorTest.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/JmsSubmissionProcessorTest.java index 4750162c..20abc6b4 100644 --- a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/JmsSubmissionProcessorTest.java +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/JmsSubmissionProcessorTest.java @@ -75,7 +75,7 @@ public void setUp() throws Exception { critical = mock(CriticalRepositoryInteraction.class); underTest = new JmsSubmissionProcessor(passClient, jsonParser, submissionBuilder, packagerRegistry, - submissionPolicy, dirtyDepositPolicy, terminalDepositStatusPolicy, messagePolicy, depositTaskHelper, dspaceStatusMapper, atomStatusParser, critical); + submissionPolicy, intermediateDepositStatusPolicy, terminalDepositStatusPolicy, messagePolicy, depositTaskHelper, dspaceStatusMapper, atomStatusParser, critical); } /** diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmissionProcessorTest.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmissionProcessorTest.java index ba4b9f84..c797710c 100644 --- a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmissionProcessorTest.java +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmissionProcessorTest.java @@ -65,7 +65,7 @@ public class SubmissionProcessorTest extends AbstractSubmissionProcessorTest { public void setUp() throws Exception { super.setUp(); underTest = new SubmissionProcessor(passClient, jsonParser, submissionBuilder, packagerRegistry, - submissionPolicy, dirtyDepositPolicy, messagePolicy, terminalDepositStatusPolicy, depositTaskHelper, dspaceStatusMapper, atomStatusParser, cri); + submissionPolicy, intermediateDepositStatusPolicy, messagePolicy, terminalDepositStatusPolicy, depositTaskHelper, dspaceStatusMapper, atomStatusParser, cri); } /** @@ -111,7 +111,7 @@ public void submissionAcceptSuccess() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(true); when(criResult.resource()).thenReturn(Optional.of(submission)); when(criResult.result()).thenReturn(Optional.of(depositSubmission)); @@ -179,7 +179,7 @@ public void missingCriResource() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(true); when(criResult.result()).thenReturn(Optional.of(new DepositSubmission())); when(cri.performCritical(any(), any(), any(), any(BiPredicate.class), any())).thenReturn(criResult); @@ -210,7 +210,7 @@ public void missingCriResult() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.resource()).thenReturn(Optional.of(submission)); when(criResult.success()).thenReturn(true); when(cri.performCritical(any(), any(), any(), any(BiPredicate.class), any())).thenReturn(criResult); @@ -248,7 +248,7 @@ public void submissionCriFailure() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. // In this test the CRI fails, for whatever reason. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(false); Exception expectedCause = new Exception("Failed CRI"); when(criResult.throwable()).thenReturn(Optional.of(expectedCause)); @@ -286,7 +286,7 @@ public void depositCreationFailure() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(true); when(criResult.resource()).thenReturn(Optional.of(submission)); when(criResult.result()).thenReturn(Optional.of(depositSubmission)); @@ -328,7 +328,7 @@ public void taskRejection() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(true); when(criResult.resource()).thenReturn(Optional.of(submission)); when(criResult.result()).thenReturn(Optional.of(depositSubmission)); @@ -385,7 +385,7 @@ public void missingPackagerFromRegistry() throws Exception { // Mock the CRI that returns the "In-Progress" Submission and builds the DepositSubmission. - CriticalResult criResult = mock(CriticalResult.class); + CriticalResult criResult = mock(CriticalResult.class); when(criResult.success()).thenReturn(true); when(criResult.resource()).thenReturn(Optional.of(submission)); when(criResult.result()).thenReturn(Optional.of(depositSubmission)); diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmittedStatusHandlingIT.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmittedStatusHandlingIT.java new file mode 100644 index 00000000..d85da5c9 --- /dev/null +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/service/SubmittedStatusHandlingIT.java @@ -0,0 +1,160 @@ +/* + * Copyright 2018 Johns Hopkins University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dataconservancy.pass.deposit.messaging.service; + +import org.dataconservancy.nihms.assembler.PackageStream; +import org.dataconservancy.nihms.model.DepositFileType; +import org.dataconservancy.nihms.transport.Transport; +import org.dataconservancy.nihms.transport.TransportResponse; +import org.dataconservancy.nihms.transport.TransportSession; +import org.dataconservancy.pass.client.PassClient; +import org.dataconservancy.pass.deposit.assembler.dspace.mets.DspaceMetsAssembler; +import org.dataconservancy.pass.deposit.assembler.shared.AbstractAssembler; +import org.dataconservancy.pass.deposit.assembler.shared.BaseAssemblerIT; +import org.dataconservancy.pass.deposit.messaging.model.Packager; +import org.dataconservancy.pass.deposit.messaging.model.Registry; +import org.dataconservancy.pass.deposit.transport.sword2.Sword2DepositReceiptResponse; +import org.dataconservancy.pass.model.Deposit; +import org.dataconservancy.pass.model.Repository; +import org.dataconservancy.pass.model.RepositoryCopy; +import org.dataconservancy.pass.model.Submission; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +import java.io.File; +import java.net.URI; +import java.net.URL; +import java.util.Collections; +import java.util.Map; + +import static org.dataconservancy.pass.model.Deposit.DepositStatus.ACCEPTED; +import static org.dataconservancy.pass.model.Deposit.DepositStatus.SUBMITTED; +import static org.dataconservancy.pass.model.RepositoryCopy.CopyStatus.COMPLETE; +import static org.dataconservancy.pass.model.RepositoryCopy.CopyStatus.IN_PROGRESS; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * @author Elliot Metsger (emetsger@jhu.edu) + */ +@RunWith(SpringRunner.class) +@SpringBootTest(properties = { "spring.jms.listener.auto-startup=false" }) +public class SubmittedStatusHandlingIT extends BaseAssemblerIT { + + @Autowired + private DspaceMetsAssembler dspaceMetsAssembler; + + @Autowired + private Registry packagerRegistry; + + @Autowired + private PassClient passClient; + + @Autowired + DepositTaskHelper underTest; + + private Deposit toUpdate; + + @Override + public void setUp() throws Exception { + + // Step 1: Create a PackageStream + + mbf = metadataBuilderFactory(); + rbf = resourceBuilderFactory(); + + Map custodialContentWithTypes = prepareCustodialResources(); + + submission = prepareSubmission(custodialContentWithTypes); + + PackageStream stream = dspaceMetsAssembler.assemble(submission); + + // Step 2: Deposit the Package to DSpace. + + Packager packager = packagerRegistry.get("JScholarship"); + Transport transport = packager.getTransport(); + TransportSession session = transport.open(packager.getConfiguration()); + Sword2DepositReceiptResponse tr = (Sword2DepositReceiptResponse) session.send(stream, packager + .getConfiguration()); + assertTrue(tr.success()); + + // 2a. Keep a reference to the SWORD Statement + + URI swordStatement = tr.getReceipt().getAtomStatementLink().getIRI().toURI(); + + // 2.b Keep a reference to the DSpace Item URL + + URI dspaceItem = tr.getReceipt().getSplashPageLink().getIRI().toURI(); + + // Step 3: Manufacture a Deposit and RepositoryCopy for this test (normally created by the SubmissionProcessor) + + Submission submissionResource = new Submission(); + + Deposit deposit = new Deposit(); + deposit.setDepositStatusRef(swordStatement.toString()); + deposit.setDepositStatus(SUBMITTED); + deposit.setSubmission(submissionResource.getId()); + + Repository repo = new Repository(); + repo.setName("JScholarship"); + + repo = passClient.createAndReadResource(repo, Repository.class); + + RepositoryCopy rc = new RepositoryCopy(); + rc.setCopyStatus(RepositoryCopy.CopyStatus.IN_PROGRESS); + rc.setAccessUrl(dspaceItem); + rc.setExternalIds(Collections.singletonList(dspaceItem.toString())); + + submissionResource = passClient.createAndReadResource(submissionResource, Submission.class); + + rc = passClient.createAndReadResource(rc, RepositoryCopy.class); + + deposit.setRepositoryCopy(rc.getId()); + deposit.setSubmission(submissionResource.getId()); + deposit.setRepository(repo.getId()); + + toUpdate = passClient.createAndReadResource(deposit, Deposit.class); + } + + @Override + protected AbstractAssembler assemblerUnderTest() { + return dspaceMetsAssembler; + } + + @Override + protected void verifyStreamMetadata(PackageStream.Metadata metadata) { + // no-op, we don't care. we are simply re-using the logic in BaseAssemblerIT to + // produce a package + } + + @Test + public void processStatusFromSubmittedToAccepted() throws Exception { + assertEquals(SUBMITTED, toUpdate.getDepositStatus()); + assertEquals(IN_PROGRESS, + passClient.readResource(toUpdate.getRepositoryCopy(), RepositoryCopy.class).getCopyStatus()); + + underTest.processDepositStatus(toUpdate); + + Deposit deposit = passClient.readResource(toUpdate.getId(), Deposit.class); + RepositoryCopy repoCopy = passClient.readResource(deposit.getRepositoryCopy(), RepositoryCopy.class); + + assertEquals(ACCEPTED, deposit.getDepositStatus()); + assertEquals(COMPLETE, repoCopy.getCopyStatus()); + } +} diff --git a/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessorTest.java b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessorTest.java new file mode 100644 index 00000000..ff676442 --- /dev/null +++ b/deposit-messaging/src/test/java/org/dataconservancy/pass/deposit/messaging/status/AbderaDepositStatusRefProcessorTest.java @@ -0,0 +1,93 @@ +/* + * Copyright 2018 Johns Hopkins University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.dataconservancy.pass.deposit.messaging.status; + +import org.junit.Before; +import org.junit.Test; + +import java.net.URI; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +/** + * @author Elliot Metsger (emetsger@jhu.edu) + */ +public class AbderaDepositStatusRefProcessorTest { + + private AbderaDepositStatusRefProcessor underTest; + + private URI atomStatementUri = URI.create("http://example.org/statement.atom"); + + private DepositStatusParser atomStatusParser; + + private DepositStatusMapper swordDepositStatusMapper; + + @Before + @SuppressWarnings("unchecked") + public void setUp() throws Exception { + atomStatusParser = mock(DepositStatusParser.class); + swordDepositStatusMapper = mock(DepositStatusMapper.class); + underTest = new AbderaDepositStatusRefProcessor(atomStatusParser, swordDepositStatusMapper); + } + + @Test + public void parsingReturnsNullSwordStatus() throws Exception { + when(atomStatusParser.parse(any())).thenReturn(null); + + assertNull(underTest.process(atomStatementUri)); + + verify(atomStatusParser).parse(any()); + verifyZeroInteractions(swordDepositStatusMapper); + } + + @Test + public void mappingReturnsNullDepositStatus() throws Exception { + when(atomStatusParser.parse(any())).thenReturn(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS); + when(swordDepositStatusMapper.map(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS)).thenReturn(null); + + assertNull(underTest.process(atomStatementUri)); + + verify(atomStatusParser).parse(any()); + verify(swordDepositStatusMapper).map(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS); + } + + @Test(expected = RuntimeException.class) + public void parsingThrowsRuntimeException() throws Exception { + when(atomStatusParser.parse(any())).thenThrow(new RuntimeException("Expected")); + + underTest.process(atomStatementUri); + + verify(atomStatusParser).parse(any()); + verifyZeroInteractions(swordDepositStatusMapper); + } + + @Test(expected = RuntimeException.class) + public void mappingThrowsRuntimeException() throws Exception { + when(atomStatusParser.parse(any())).thenReturn(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS); + when(swordDepositStatusMapper.map(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS)) + .thenThrow(new RuntimeException("Expected")); + + assertNull(underTest.process(atomStatementUri)); + + verify(atomStatusParser).parse(any()); + verify(swordDepositStatusMapper).map(SwordDspaceDepositStatus.SWORD_STATE_INPROGRESS); + } +} \ No newline at end of file