Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #26 from spotify/queue-triggers
Browse files Browse the repository at this point in the history
Triggers transition into QUEUED state
  • Loading branch information
fabriziodemaria authored Dec 13, 2016
2 parents 777165c + 75c09c3 commit 795a1ae
Show file tree
Hide file tree
Showing 24 changed files with 249 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ public Boolean created(WorkflowInstance workflowInstance, String executionId, St
return Boolean.FALSE;
}

@Override
public Boolean dequeue(WorkflowInstance workflowInstance) {
return Boolean.FALSE;
}

@Override
public Boolean submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription) {
return Boolean.FALSE;
Expand Down
10 changes: 10 additions & 0 deletions styx-cli/src/main/java/com/spotify/styx/cli/CliUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public String created(WorkflowInstance workflowInstance, String executionId, Str
return "Unexpected data";
}

@Override
public String dequeue(WorkflowInstance workflowInstance) {
return "Unexpected data";
}

@Override
public String started(WorkflowInstance workflowInstance) {
return "Unexpected data";
Expand Down Expand Up @@ -194,6 +199,11 @@ public String created(WorkflowInstance workflowInstance, String executionId, Str
return String.format("Execution id: %s, Docker image: %s", executionId, dockerImage);
}

@Override
public String dequeue(WorkflowInstance workflowInstance) {
return "";
}

@Override
public String started(WorkflowInstance workflowInstance) {
return "";
Expand Down
4 changes: 2 additions & 2 deletions styx-cli/src/main/java/com/spotify/styx/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ private void haltWorkflowInstance() {
private void retryWorkflowInstance() {
WorkflowInstance workflowInstance = getWorkflowInstance(namespace);

Event retry = Event.retry(workflowInstance);
Event dequeue = Event.dequeue(workflowInstance);
EventSerializer.PersistentEvent persistentEvent =
EventSerializer.convertEventToPersistentEvent(retry);
EventSerializer.convertEventToPersistentEvent(dequeue);
final ByteString payload;
try {
payload = ByteString.of(OBJECT_MAPPER.writeValueAsBytes(persistentEvent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,11 @@ public Ansi.Color created(WorkflowInstance workflowInstance, String executionId,
return WHITE;
}

@Override
public Ansi.Color dequeue(WorkflowInstance workflowInstance) {
return WHITE;
}

@Override
public Ansi.Color started(WorkflowInstance workflowInstance) {
return WHITE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public PersistentEvent created(WorkflowInstance workflowInstance, String executi
return new Created(workflowInstance.toKey(), executionId, Optional.of(dockerImage));
}

@Override
public PersistentEvent dequeue(WorkflowInstance workflowInstance) {
return new PersistentEvent("dequeue", workflowInstance.toKey());
}

@Override
public PersistentEvent submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription) {
return new Submit(workflowInstance.toKey(), executionDescription);
Expand Down Expand Up @@ -148,6 +153,7 @@ public PersistentEvent halt(WorkflowInstance workflowInstance) {
@JsonSubTypes.Type(value = PersistentEvent.class, name = "timeTrigger"),
@JsonSubTypes.Type(value = TriggerExecution.class, name = "triggerExecution"),
@JsonSubTypes.Type(value = Created.class, name = "created"),
@JsonSubTypes.Type(value = PersistentEvent.class, name = "dequeue"),
@JsonSubTypes.Type(value = Started.class, name = "started"),
@JsonSubTypes.Type(value = Terminate.class, name = "terminate"),
@JsonSubTypes.Type(value = RunError.class, name = "runError"),
Expand Down Expand Up @@ -181,6 +187,8 @@ public Event toEvent() {
switch (type) {
case "timeTrigger":
return Event.timeTrigger(workflowInstance);
case "dequeue":
return Event.dequeue(workflowInstance);
case "success":
return Event.success(workflowInstance);
case "retry":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
public interface EventVisitor<R> {

R triggerExecution(@Getter WorkflowInstance workflowInstance, String triggerId);
R dequeue(@Getter WorkflowInstance workflowInstance);
R submit(@Getter WorkflowInstance workflowInstance, ExecutionDescription executionDescription);
R submitted(@Getter WorkflowInstance workflowInstance, String executionId);
R started(@Getter WorkflowInstance workflowInstance);
R terminate(@Getter WorkflowInstance workflowInstance, int exitCode);
R runError(@Getter WorkflowInstance workflowInstance, String message);
R success(@Getter WorkflowInstance workflowInstance);
R retryAfter(@Getter WorkflowInstance workflowInstance, long delayMillis);
R retry(@Getter WorkflowInstance workflowInstance);
R stop(@Getter WorkflowInstance workflowInstance);
R timeout(@Getter WorkflowInstance workflowInstance);
R halt(@Getter WorkflowInstance workflowInstance);
Expand All @@ -48,4 +48,6 @@ public interface EventVisitor<R> {
R timeTrigger(@Getter WorkflowInstance workflowInstance);
@Deprecated
R created(@Getter WorkflowInstance workflowInstance, String executionId, String dockerImage);
@Deprecated
R retry(@Getter WorkflowInstance workflowInstance);
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ public Void triggerExecution(WorkflowInstance workflowInstance, String triggerId
return null;
}

@Override
public Void dequeue(WorkflowInstance workflowInstance) {
currWorkflowInstance = workflowInstance;
return null;
}

@Override
public Void created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
currWorkflowInstance = workflowInstance;
Expand Down
20 changes: 17 additions & 3 deletions styx-common/src/main/java/com/spotify/styx/state/RunState.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public RunState timeTrigger(WorkflowInstance workflowInstance) {
public RunState triggerExecution(WorkflowInstance workflowInstance, String triggerId) {
switch (state()) {
case NEW:
return state(PREPARE);
return state(QUEUED);

default:
throw illegalTransition("triggerExecution");
Expand All @@ -159,6 +159,7 @@ public RunState triggerExecution(WorkflowInstance workflowInstance, String trigg
public RunState created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
switch (state()) {
case PREPARE:
case QUEUED:
return state(
SUBMITTED, // for backwards compatibility
data().toBuilder()
Expand All @@ -171,9 +172,21 @@ public RunState created(WorkflowInstance workflowInstance, String executionId, S
}
}

@Override
public RunState dequeue(WorkflowInstance workflowInstance) {
switch (state()) {
case QUEUED:
return state(PREPARE);

default:
throw illegalTransition("dequeue");
}
}

@Override
public RunState submit(WorkflowInstance workflowInstance, ExecutionDescription executionDescription) {
switch (state()) {
case QUEUED: // for backwards compatibility
case PREPARE:
return state(
SUBMITTING,
Expand Down Expand Up @@ -280,11 +293,12 @@ public RunState retryAfter(WorkflowInstance workflowInstance, long delayMillis)
}
}

@Deprecated
@Override
public RunState retry(WorkflowInstance workflowInstance) {
switch (state()) {
case TERMINATED: // for backwards compatibility
case FAILED: // for backwards compatibility
case TERMINATED:
case FAILED:
case QUEUED:
return state(PREPARE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ public String triggerExecution(WorkflowInstance workflowInstance, String trigger
return "triggerExecution";
}

@Override
public String dequeue(WorkflowInstance workflowInstance) {
return "dequeue";
}

@Override
public String created(WorkflowInstance workflowInstance, String executionId, String dockerImage) {
return "created";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ public Event created(String executionId, String dockerImage) {
return Event.created(workflowInstance, executionId, dockerImage);
}

public Event dequeue() {
return Event.dequeue(workflowInstance);
}

public Event submit(ExecutionDescription executionDescription) {
return Event.submit(workflowInstance, executionDescription);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void testRoundtripAllEvents() {
assertRoundtrip(Event.timeTrigger(INSTANCE1));
assertRoundtrip(Event.triggerExecution(INSTANCE1, TRIGGER1));
assertRoundtrip(Event.created(INSTANCE1, POD_NAME, DOCKER_IMAGE));
assertRoundtrip(Event.dequeue(INSTANCE1));
assertRoundtrip(Event.started(INSTANCE1));
assertRoundtrip(Event.terminate(INSTANCE1, 20));
assertRoundtrip(Event.runError(INSTANCE1, "ErrorMessage"));
Expand All @@ -67,6 +68,7 @@ public void testRoundtripAllEvents() {
@Test
public void testDeserializeFromJson() throws Exception {
assertThat(eventSerializer.convert(json("timeTrigger")), is(Event.timeTrigger(INSTANCE1)));
assertThat(eventSerializer.convert(json("dequeue")), is(Event.dequeue(INSTANCE1)));
assertThat(eventSerializer.convert(json("started")), is(Event.started(INSTANCE1)));
assertThat(eventSerializer.convert(json("success")), is(Event.success(INSTANCE1)));
assertThat(eventSerializer.convert(json("retry")), is(Event.retry(INSTANCE1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void testGeneralExample() throws Exception {
long c = 0L;
List<SequenceEvent> events = Arrays.asList(
SequenceEvent.create(E.triggerExecution("trig-0"), c++, ts("07:55")),
SequenceEvent.create(E.dequeue(), c++, ts("07:55")),
SequenceEvent.create(E.submit(desc("img1")), c++, ts("07:55")),
SequenceEvent.create(E.submitted("exec-id-00"), c++, ts("07:56")),
SequenceEvent.create(E.started(), c++, ts("07:57")),
Expand All @@ -92,6 +93,7 @@ public void testGeneralExample() throws Exception {
SequenceEvent.create(E.success(), c++, ts("08:59")),

SequenceEvent.create(E.triggerExecution("trig-1"), c++, ts("09:55")),
SequenceEvent.create(E.dequeue(), c++, ts("09:55")),
SequenceEvent.create(E.submit(desc("img3")), c++, ts("09:55")),
SequenceEvent.create(E.submitted("exec-id-10"), c++, ts("09:56")),
SequenceEvent.create(E.started(), c++, ts("09:57")),
Expand Down Expand Up @@ -171,6 +173,7 @@ public void testTimeout() throws Exception {
long c = 0L;
List<SequenceEvent> events = Arrays.asList(
SequenceEvent.create(E.triggerExecution("trig-0"), c++, ts("07:55")),
SequenceEvent.create(E.dequeue(), c++, ts("07:55")),
SequenceEvent.create(E.submit(desc("img1")), c++, ts("07:55")),
SequenceEvent.create(E.submitted("exec-id-00"), c++, ts("07:56")),
SequenceEvent.create(E.started(), c++, ts("07:57")),
Expand Down Expand Up @@ -225,6 +228,7 @@ public void testRunError() throws Exception {
long c = 0L;
List<SequenceEvent> events = Arrays.asList(
SequenceEvent.create(E.triggerExecution("trig-0"), c++, ts("07:55")),
SequenceEvent.create(E.dequeue(), c++, ts("07:55")),
SequenceEvent.create(E.submit(desc("img1")), c++, ts("07:55")),
SequenceEvent.create(E.submitted("exec-id-00"), c++, ts("07:56")),
SequenceEvent.create(E.started(), c++, ts("07:57")),
Expand Down Expand Up @@ -279,11 +283,13 @@ public void testHaltAndReTrigger() throws Exception {
long c = 0L;
List<SequenceEvent> events = Arrays.asList(
SequenceEvent.create(E.triggerExecution("trig-0"), c++, ts("07:55")),
SequenceEvent.create(E.dequeue(), c++, ts("07:55")),
SequenceEvent.create(E.submit(desc("img1")), c++, ts("07:55")),
SequenceEvent.create(E.submitted("exec-id-00"), c++, ts("07:56")),
SequenceEvent.create(E.halt(), c++, ts("07:57")),

SequenceEvent.create(E.triggerExecution("trig-1"), c++, ts("08:56")),
SequenceEvent.create(E.dequeue(), c++, ts("08:56")),
SequenceEvent.create(E.submit(desc("img2")), c++, ts("08:55")),
SequenceEvent.create(E.submitted("exec-id-10"), c++, ts("08:56")),
SequenceEvent.create(E.started(), c++, ts("08:57"))
Expand Down
Loading

0 comments on commit 795a1ae

Please sign in to comment.