Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tightly coupled, strongly typed, stairway flight #1634

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package bio.terra.workspace.common.flightGenerator;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD})
public @interface ExponentialBackoffRetry {
long initialIntervalSeconds();

long maxIntervalSeconds();

long maxOperationTimeSeconds();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bio.terra.workspace.common.flightGenerator;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD})
public @interface FixedIntervalRetry {
int intervalSeconds();

int maxCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package bio.terra.workspace.common.flightGenerator;

import bio.terra.stairway.Flight;
import bio.terra.stairway.FlightMap;
import bio.terra.stairway.RetryRule;
import java.lang.reflect.Proxy;
import java.util.function.Function;

public class FlightGenerator extends Flight {

/**
* The next step id to be assigned to a step. Each step is assigned a unique-within-the-flight id.
*/
private int nextStepId = 0;

/**
* All subclasses must provide a constructor with this signature.
*
* @param inputParameters FlightMap of the inputs for the flight
* @param applicationContext Anonymous context meaningful to the application using Stairway
*/
public FlightGenerator(FlightMap inputParameters, Object applicationContext) {
super(inputParameters, applicationContext);
}

/**
* Given an object that implements an interface, create a Step proxy for the object that will be
* added to the flight.
*/
protected <T> T createStep(T step) {
var stepInvocationHandler = new StepInvocationHandler(this, step, nextStepId++);
var stepProxy =
Proxy.newProxyInstance(
step.getClass().getClassLoader(),
step.getClass().getInterfaces(),
stepInvocationHandler);

return (T) stepProxy;
}

/**
* Set the response for the flight.
*
* @param response
* @param <T>
*/
protected <T> void setResponse(T response) {
if (Proxy.isProxyClass(response.getClass())
&& Proxy.getInvocationHandler(response) instanceof StepResultInvocationHandler) {
var responseStepIndex =
((StepResultInvocationHandler) Proxy.getInvocationHandler(response)).stepIndex();
addStep(new SetResponseStep(responseStepIndex));
} else {
throw new IllegalArgumentException("Response must be a step");
}
}

protected <T, R> void setResponse(T response, Class<T> clazz, Function<T, R> getter) {
if (Proxy.isProxyClass(response.getClass())
&& Proxy.getInvocationHandler(response) instanceof StepResultInvocationHandler) {
var responseStepIndex =
((StepResultInvocationHandler) Proxy.getInvocationHandler(response)).stepIndex();
addStep(new SetTypedResponseStep<>(responseStepIndex, getter, clazz));
} else {
throw new IllegalArgumentException("Response must be a step");
}
}

/**
* Add a step to the flight. This is a callback from the StepInvocationHandler when a step
* function is invoked.
*/
void addStep(StepInvocationHandler stepInvocationHandler, RetryRule retryRule) {
super.addStep(stepInvocationHandler, retryRule);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package bio.terra.workspace.common.flightGenerator;

import bio.terra.stairway.exception.StairwayException;

public class InvalidRetryAnnotationException extends StairwayException {
public InvalidRetryAnnotationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package bio.terra.workspace.common.flightGenerator;

import bio.terra.stairway.exception.StairwayException;

public class InvalidUndoAnnotationException extends StairwayException {

public InvalidUndoAnnotationException(String message, Throwable cause) {
super(message, cause);
}

public InvalidUndoAnnotationException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package bio.terra.workspace.common.flightGenerator;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface NoRetry {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package bio.terra.workspace.common.flightGenerator;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/** This annotation is used to mark a method as not having an undo method. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface NoUndo {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package bio.terra.workspace.common.flightGenerator;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.ANNOTATION_TYPE, ElementType.METHOD})
public @interface RandomBackoffRetry {
long operationIncrementMilliseconds();

int maxConcurrency();

int maxCount();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package bio.terra.workspace.common.flightGenerator;

import bio.terra.stairway.FlightContext;
import bio.terra.stairway.Step;
import bio.terra.stairway.StepResult;
import bio.terra.stairway.exception.RetryException;
import bio.terra.workspace.service.job.JobMapKeys;

public class SetResponseStep implements Step {
private final int responseStepIndex;

public SetResponseStep(int responseStepIndex) {
this.responseStepIndex = responseStepIndex;
}

@Override
public StepResult doStep(FlightContext context) throws InterruptedException, RetryException {
var response =
context.getWorkingMap().getRaw(StepInvocationHandler.outputKey(responseStepIndex));
context.getWorkingMap().putRaw(JobMapKeys.RESPONSE.getKeyName(), response);
return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext context) throws InterruptedException {
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package bio.terra.workspace.common.flightGenerator;

import bio.terra.stairway.FlightContext;
import bio.terra.stairway.Step;
import bio.terra.stairway.StepResult;
import bio.terra.stairway.exception.RetryException;
import bio.terra.workspace.service.job.JobMapKeys;
import java.util.Optional;
import java.util.function.Function;

public class SetTypedResponseStep<T, R> implements Step {
private final int responseStepIndex;
private final Function<T, R> getter;
private final Class<T> clazz;

public SetTypedResponseStep(int responseStepIndex, Function<T, R> getter, Class<T> clazz) {
this.responseStepIndex = responseStepIndex;
this.getter = getter;
this.clazz = clazz;
}

@Override
public StepResult doStep(FlightContext context) throws InterruptedException, RetryException {
var input =
context.getWorkingMap().get(StepInvocationHandler.outputKey(responseStepIndex), clazz);
var response = Optional.ofNullable(input).map(getter);
response.ifPresent(r -> context.getWorkingMap().put(JobMapKeys.RESPONSE.getKeyName(), r));
return StepResult.getStepResultSuccess();
}

@Override
public StepResult undoStep(FlightContext context) throws InterruptedException {
return StepResult.getStepResultSuccess();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package bio.terra.workspace.common.flightGenerator;

@FixedIntervalRetry(intervalSeconds = 1, maxCount = 5)
public @interface ShortDatabaseRetry {}
Loading
Loading