Skip to content
This repository has been archived by the owner on Nov 30, 2023. It is now read-only.

Commit

Permalink
Support the refresh of a Deposit DepositStatus (#124)
Browse files Browse the repository at this point in the history
* Abstract deposit status reference to DepositStatusReferenceProcessor, and add it to the Packager.

* Reduce warnings during compilation

* Make the time sleeping between SWORD deposits configurable.

* Implement the refreshing of a Deposit status.

- Implements SubmittedUpdateRunner, invoked by the `refresh` command line argument
- Refactors DepositTask and the DepositTaskHelper to allow processing of the deposit status reference outside of SubmissionProcessor (e.g. by SubmittedUpdateRunner)
- Necessitates the creation of a RepositoryCopy in the IN_PROGRESS state as soon as a Deposit is performed, in order to store the DSpace Item URL.
- This is required because the DSpace SWORD receipt is not retrievable; the SWORD server does not make the receipt available except when performing the initial deposit.
  • Loading branch information
emetsger authored Jun 22, 2018
1 parent 390aec0 commit 4183355
Show file tree
Hide file tree
Showing 21 changed files with 835 additions and 181 deletions.
35 changes: 30 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,18 @@ To retry all failed deposits:
To retry specific deposits:
> $ java -jar deposit-services.jar retry --uris http://192.168.99.100:8080/fcrepo/rest/deposits/8e/af/ac/a9/8eafaca9-1f24-413a-bf1e-fbbd673ba45b http://192.168.99.100:8080/fcrepo/rest/deposits/4a/cb/04/bb/4acb04bb-4f79-40ef-8ff9-e105261aa7fb
#### Refresh

Refresh mode is used to re-process a Deposit in the `SUBMITTED` state that needs its deposit status refreshed. When `refresh` is invoked, the optional `--uris` argument is used to identify the `Deposit` resources to refresh. Otherwise a search of the index is performed for _all_ `Deposit` resources in the `SUBMITTED` state.

Refreshing a `Deposit` means that its deposit status reference will be retrieved, parsed, and processed. The status returned from the reference will be stored on the `Deposit`, and the status of the corresponding `RepositoryCopy` will be updated as well. If the `Deposit` status is updated to `ACCEPTED`, the `RepositoryCopy` will be updated to `COMPLETE`. If the `Deposit` status is updated to `REJECTED`, the `RepositoryCopy` will be updated to `REJECTED` as well.

To refresh all deposits in the `SUBMITTED` state:
> $ java -jar deposit-services.jar refresh
To refresh specific deposits:
> $ java -jar deposit-services.jar refresh --uris http://192.168.99.100:8080/fcrepo/rest/deposits/8e/af/ac/a9/8eafaca9-1f24-413a-bf1e-fbbd673ba45b http://192.168.99.100:8080/fcrepo/rest/deposits/4a/cb/04/bb/4acb04bb-4f79-40ef-8ff9-e105261aa7fb
### Future modes

Modes to be supported by future releases of Deposit Services.
Expand All @@ -171,9 +183,6 @@ Modes to be supported by future releases of Deposit Services.

TODO

#### Process

TODO

# Developers

Expand All @@ -193,6 +202,10 @@ The `PASS_FEDORA_USERNAME` and `PASS_FEDORA_PASSWORD` define the username and pa

The `retry` argument invokes the `FailedDepositRunner` which will re-submit failed `Deposit` resources to the task queue for processing. URIs for specific Deposits may be specified, otherwise the index is searched for failed Deposits, and each one will be re-tried.

### SubmittedUpdateRunner

The `refresh` argument invokes the `SubmittedUpdateRunner` which will attempt to re-process a `Deposit`'s status reference. URIs for specific Deposits may be specified, otherwise the index is searched for `SUBMITTED` Deposits, and each one will be refreshed.

## Message flow and concurrency

Each JMS listener (one each for the `deposit` and `submission` queues) can process messages concurrently. The number of messages each listener can process concurrently is set by the property `spring.jms.listener.concurrency` (or its environment equivalent: `SPRING_JMS_LISTENER_CONCURRENCY`).
Expand Down Expand Up @@ -309,30 +322,42 @@ The semantics of _intermediate_ state are that the resource is currently in a wo

A general pattern within Deposit services is that resources with _terminal_ status are explicitly accounted for (this is largely enforced by _policies_ which are documented elsewhere), and are considered "read-only".

#### Submission Status

Submission status is enumerated in the `AggregatedDepositStatus` class. Deposit services considers the following values:
* `NOT_STARTED` (_intermediate_): Incoming Submissions from the UI must have this status value
* `IN_PROGRESS` (_intermediate_): Deposit services places the Submission in an `IN_PROGRESS` state right away. When a thread observes a `Submission` in this state, it assumes that _another_ thread is processing this resource.
* `FAILED` (_intermediate_): Occurs when a non-recoverable error happens when processing the `Submission`
* `ACCEPTED` (_terminal_): Deposit services places the Submission into this state when all of its `Deposit`s have been `ACCEPTED`
* `REJECTED` (_terminal_): Deposit services places the Submission into this state when all of its `Deposit`s have been `REJECTED`

#### Deposit Status

Deposit status is enumerated in the `DepositStatus` class. Deposit services considers the following values:
* `SUBMITTED` (_intermediate_): the custodial content of the `Submission` has been successfully transferred to the `Deposit`s `Repository`
* `ACCEPTED` (_terminal_): the custodial content of the `Submission` has been accessioned by the `Deposit`s `Repository` (i.e. custody of the `Submission` has successfully been transferred to the downstream `Repository`)
* `REJECTED` (_terminal_): the custodial content of the `Submission` has been rejected by the `Deposit`'s `Repository` (i.e. the downstream `Repository` has refused to accept custody of the `Submission` content)
* `FAILED` (_intermediate_): the transfer of custodial content to the `Repository` failed, or there was some other error updating the status of the `Deposit`

#### RepositoryCopy Status

RepositoryCopy status is enumerated in the `CopyStatus` class. Deposit services considers the following values:
* `COMPLETE` (_terminal_): a copy of the custodial content is available in the `Repository` at this location
* `IN_PROGRESS` (_intermediate_): a copy of the custodial content is _expected to be_ available in the `Repository` at this location. The custodial content should not be expected to exist until the `Deposit` status is `ACCEPTED`
* `REJECTED` (_terminal_): the copy should be considered to be invalid. Even if the custodial content is made available at the location indicated by the `RepositoryCopy`, it should not be mistaken for a successful transfer of custody.

RepositoryCopy status is subservient to the Deposit status. They will always be congruent. For example, a RepositoryCopy cannot be `COMPLETE` if the Deposit is `REJECTED`. If a Deposit is `REJECTED`, then the RepositoryCopy must also be `REJECTED`.

#### Common Permutations

There are some common permutations of these statuses that will be observed:
* `ACCEPTED` `Submission`s will only have `Deposit`s that are `ACCEPTED`. Each `Deposit` will have a `COMPLETE` `RepositoryCopy`.
* `REJECTED` `Submission`s will only have `Deposit`s that are `REJECTED`. `REJECTED` `Deposit`s will not have any `RepositoryCopy` at all.
* `IN_PROGRESS` `Submission`s may have zero or more `Deposit`s in any state.
* `FAILED` `Submission`s should have zero `Deposit`s.
* `ACCEPTED` `Deposit`s should have a `COMPLETE` `RepositoryCopy`.
* `REJECTED` `Deposit`s will have no `RepositoryCopy`
* `SUBMITTED` `Deposit`s will have no `RepositoryCopy`
* `REJECTED` `Deposit`s will have a `REJECTED` `RepositoryCopy`
* `SUBMITTED` `Deposit`s will have an `IN_PROGRESS` `RepositoryCopy`
* `FAILED` `Deposit`s will have no `RepositoryCopy`


Expand Down
26 changes: 26 additions & 0 deletions deposit-messaging/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,23 @@
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-remote-resources-plugin</artifactId>
<configuration>
<resourceBundles>
<resourceBundle>org.dataconservancy.nihms:shared-assembler:${project.parent.version}:test-jar:tests</resourceBundle>
</resourceBundles>
</configuration>
<executions>
<execution>
<goals>
<goal>process</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>

</build>
Expand Down Expand Up @@ -523,6 +540,15 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.dataconservancy.nihms</groupId>
<artifactId>shared-assembler</artifactId>
<version>${project.parent.version}</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static void main(String[] args) {
app = new SpringApplication(DepositApp.class, ListenerRunner.class);
break;
}
case "update": {
case "refresh": {
app = new SpringApplication(DepositApp.class, SubmittedUpdateRunner.class);
// TODO figure out elegant way to exclude JMS-related beans like SubmissionProcessor from being spun up
app.setDefaultProperties(new HashMap<String, Object>() { {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.dataconservancy.pass.deposit.messaging.model.Registry;
import org.dataconservancy.pass.deposit.messaging.policy.DirtyDepositPolicy;
import org.dataconservancy.pass.deposit.messaging.service.DepositTask;
import org.dataconservancy.pass.deposit.messaging.status.AbderaDepositStatusRefProcessor;
import org.dataconservancy.pass.deposit.messaging.status.AtomFeedStatusMapper;
import org.dataconservancy.pass.deposit.messaging.status.RepositoryCopyStatusMapper;
import org.dataconservancy.pass.deposit.messaging.status.SwordDspaceDepositStatusMapper;
Expand Down Expand Up @@ -212,11 +213,13 @@ public Registry<Packager> packagerRegistry(Map<String, Packager> packagers) {
@Bean
public Map<String, Packager> packagers(DspaceMetsAssembler dspaceAssembler, Sword2Transport swordTransport,
NihmsAssembler nihmsAssembler, FtpTransport ftpTransport,
Map<String, Map<String, String>> transportRegistries) {
Map<String, Map<String, String>> transportRegistries,
AbderaDepositStatusRefProcessor abderaDepositStatusRefProcessor) {
Map<String, Packager> packagers = new HashMap<>();
// TODO: transport registries looked up by hard-coded strings. Need a more reliable way of discovering repositories, the packagers for those repositories, and their configuration
packagers.put("JScholarship",
new Packager("JScholarship", dspaceAssembler, swordTransport, transportRegistries.get("js")));
new Packager("JScholarship", dspaceAssembler, swordTransport, transportRegistries.get("js"),
abderaDepositStatusRefProcessor));
packagers.put("PubMed Central",
new Packager("PubMed Central", nihmsAssembler, ftpTransport, transportRegistries.get("nihms")));
return packagers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.dataconservancy.nihms.assembler.Assembler;
import org.dataconservancy.nihms.transport.Transport;
import org.dataconservancy.pass.deposit.messaging.service.DepositStatusRefProcessor;
import org.dataconservancy.pass.deposit.messaging.service.DepositTask;
import org.dataconservancy.pass.model.Repository;
import org.dataconservancy.pass.model.Submission;
Expand Down Expand Up @@ -50,12 +51,20 @@ public class Packager {

private Transport transport;

private DepositStatusRefProcessor depositStatusProcessor;

private Map<String, String> configuration;

public Packager(String name, Assembler assembler, Transport transport, Map<String, String> configuration) {
this(name, assembler, transport, configuration, null);
}

public Packager(String name, Assembler assembler, Transport transport, Map<String, String> configuration,
DepositStatusRefProcessor depositStatusProcessor) {
this.name = name;
this.assembler = assembler;
this.transport = transport;
this.depositStatusProcessor = depositStatusProcessor;
this.configuration = configuration;
}

Expand All @@ -81,4 +90,13 @@ public Map<String, String> getConfiguration() {
.collect(toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* The {@link DepositStatusRefProcessor}, may be {@code null}.
*
* @return the {@link DepositStatusRefProcessor}, may be {@code null}.
*/
public DepositStatusRefProcessor getDepositStatusProcessor() {
return depositStatusProcessor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,98 @@
package org.dataconservancy.pass.deposit.messaging.runner;

import org.dataconservancy.pass.client.PassClient;
import org.dataconservancy.pass.deposit.messaging.service.PassEntityProcessor;
import org.dataconservancy.pass.deposit.messaging.DepositServiceErrorHandler;
import org.dataconservancy.pass.deposit.messaging.service.DepositTaskHelper;
import org.dataconservancy.pass.model.Deposit;
import org.dataconservancy.pass.model.Repository;
import org.dataconservancy.pass.model.Submission;
import org.springframework.beans.factory.annotation.Qualifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.URI;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.dataconservancy.pass.deposit.messaging.support.Constants.Indexer.DEPOSIT_STATUS;
import static org.dataconservancy.pass.model.Deposit.DepositStatus.SUBMITTED;

/**
* Accepts uris for, or searches for, <a href="https://github.com/OA-PASS/pass-data-model/blob/master/documentation/Deposit.md">
* Deposit</a> repository resources that have a deposit status of {@code submitted}.
* <p>
* Submitted deposits have had the contents of their {@link Submission} successfully transferred to a {@link
* Repository}, but their <em>terminal</em> status is not known. That is, Deposit Services does not know if the {@code
* Deposit} has been accepted or rejected.
* </p>
* <p>
* Submitted deposits are examined for a deposit status reference and repository copies.
* </p>
* Accepts uris for, or searches for,
* <a href="https://github.com/OA-PASS/pass-data-model/blob/master/documentation/Deposit.md">
* Deposit</a> repository resources that have a deposit status of {@code submitted}. <p> Submitted deposits have had the
* contents of their {@link Submission} successfully transferred to a {@link Repository}, but their <em>terminal</em>
* status is not known. That is, Deposit Services does not know if the {@code Deposit} has been accepted or rejected.
* </p> <p> Submitted deposits are examined for a deposit status reference and repository copies. </p>
*
* @author Elliot Metsger (emetsger@jhu.edu)
*/
public class SubmittedUpdateRunner {

private static final Logger LOG = LoggerFactory.getLogger(SubmittedUpdateRunner.class);

@Autowired
private DepositTaskHelper depositTaskHelper;

@Autowired
private ThreadPoolTaskExecutor taskExecutor;

@Autowired
private DepositServiceErrorHandler errorHandler;

/**
* Answers a Spring {@link ApplicationRunner} that will process a {@code Collection} of URIs representing submitted
* deposits. If no URIs are supplied on the command line, a search is performed for all submitted deposits. The
* submitted deposits are then queued for processing by the provided {@code processor}.
*
* @param passClient the client implementation used to resolve PASS entity uris and perform searches
* @param processor processes {@code Deposit} resources that have a state of submitted
* @return the Spring {@code ApplicationRunner} which receives the command line arguments supplied to this application
* @return the Spring {@code ApplicationRunner} which receives the command line arguments supplied to this
* application
*/
@Bean
public ApplicationRunner depositUpdate(PassClient passClient, PassEntityProcessor processor, @Qualifier("submittedDepositStatusUpdater")
Consumer<Deposit> updater) {
public ApplicationRunner depositUpdate(PassClient passClient) {
return (args) -> {
Collection<URI> toUpdate = toUpdate(args, passClient);
processor.update(toUpdate, updater, Deposit.class);
Collection<URI> deposits = depositsToUpdate(args, passClient);
deposits.forEach(depositUri -> {
try {
depositTaskHelper.processDepositStatus(passClient.readResource(depositUri, Deposit.class));
} catch (Exception e) {
errorHandler.handleError(e);
}
});

taskExecutor.shutdown();
taskExecutor.setAwaitTerminationSeconds(10);
};
}

private Collection<URI> toUpdate(ApplicationArguments args, PassClient passClient) {
if (args.getNonOptionArgs() != null && args.getNonOptionArgs().size() > 1) {
return args.getNonOptionArgs().stream().skip(1).map(URI::create).collect(Collectors.toSet());
/**
* Parses command line arguments for the URIs to update, or searches the index for URIs of submitted deposits.
* <dl>
* <dt>--uris</dt><dd>space-separated list of Deposit URIs to be processed. If the URI does not specify a Deposit,
* it is skipped (implies {@code --sync}, but can be overridden by supplying {@code --async})</dd> <dt>--sync</dt>
* <dd>the console remains attached as each URI is processed, allowing the end-user to examine the results of
* updated Deposits as they happen</dd> <dt>--async</dt> <dd>the console detaches immediately, with the Deposit URIs
* processed in the background</dd> </dl>
*
* @param args the command line arguments
* @param passClient used to search the index for dirty deposits
* @return a {@code Collection} of URIs representing dirty deposits
*/
private Collection<URI> depositsToUpdate(ApplicationArguments args, PassClient passClient) {
if (args.containsOption("uris") && args.getOptionValues("uris").size() > 0) {
// maintain the order of the uris as they were supplied on the CLI
return args.getOptionValues("uris").stream().map(URI::create).collect(Collectors.toList());
} else {
return passClient.findAllByAttribute(Deposit.class, DEPOSIT_STATUS, "submitted");
Collection<URI> uris = passClient.findAllByAttribute(Deposit.class, DEPOSIT_STATUS, SUBMITTED);
if (uris.size() < 1) {
throw new IllegalArgumentException("No URIs found to process.");
}
return uris;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2018 Johns Hopkins University
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.dataconservancy.pass.deposit.messaging.service;

import org.dataconservancy.pass.model.Deposit;

import java.net.URI;

/**
* @author Elliot Metsger (emetsger@jhu.edu)
*/
public interface DepositStatusRefProcessor {

Deposit.DepositStatus process(URI depositStatusRef);

}
Loading

0 comments on commit 4183355

Please sign in to comment.