Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Use ExecutionCreatedAt in filter (#1004)
Browse files Browse the repository at this point in the history
* Use ExecutionCreatedAt in filter
  • Loading branch information
honnix authored Aug 8, 2022
1 parent c907a15 commit 22ad5ab
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@ public static String getExecutionsListFilter(Instant timeNow, Duration since, Du
.toLocalDateTime()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));


String dateTo = timeNow.minus(to)
final String dateTo = timeNow.minus(to)
.atZone(ZoneId.of("UTC"))
.toLocalDateTime()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));

return String.format("value_in(phase,RUNNING)+gte(started_at,%s)+lte(started_at,%s)",dateSince,dateTo);
return String.format("value_in(phase,RUNNING)+gte(execution_created_at,%s)+lte(execution_created_at,%s)", dateSince,
dateTo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class RpcHelperTest {

@Test
public void testGtExecutionsListFilter() {
final Instant someTime = LocalDateTime.of(2022, 2, 2, 12, 12, 05)
final Instant someTime = LocalDateTime.of(2022, 2, 2, 12, 12, 5)
.atZone(ZoneOffset.UTC)
.toInstant();

Expand All @@ -46,7 +46,8 @@ public void testGtExecutionsListFilter() {
Duration.of(3, ChronoUnit.MINUTES));

assertThat(executionsListFilter,
equalTo("value_in(phase,RUNNING)+gte(started_at,2022-02-01T12:12:05)+lte(started_at,2022-02-02T12:09:05)"));
equalTo("value_in(phase,RUNNING)+gte(execution_created_at,2022-02-01T12:12:05)+lte(execution_created_at,"
+ "2022-02-02T12:09:05)"));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import com.spotify.styx.util.TriggerUtil;
import flyteidl.admin.ExecutionOuterClass;
import flyteidl.admin.ExecutionOuterClass.ExecutionMetadata.ExecutionMode;
import flyteidl.core.Execution;
import flyteidl.core.IdentifierOuterClass;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -313,11 +312,6 @@ private void tryTerminateDanglingFlyteExecutions() {
}

private boolean haveBeenRunningForAWhile(ExecutionOuterClass.Execution exec) {
var isRunning = exec.getClosure().getPhase() == Execution.WorkflowExecution.Phase.RUNNING;
if (!isRunning) {
return false;
}

var startedAt = exec.getClosure().getStartedAt();
var startedAtInstant = Instant.ofEpochSecond(startedAt.getSeconds(), startedAt.getNanos());
var age = Duration.between(startedAtInstant, time.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ public class FlyteAdminClientRunnerTerminateDanglingTest {

private static final String RUNNING_1 = "running-1";
private static final String RUNNING_2 = "running-2";
private static final String NON_RUNNING_1 = "non-running-1";
private static final String NON_RUNNING_2 = "non-running-2";
private static final String NON_STYX_1 = "non-styx-1";
private static final String NON_STYX_2 = "non-styx-2";
private static final String NA_DANGLING_1 = "na-dangling-1";
Expand Down Expand Up @@ -131,12 +129,11 @@ public void setUp() {
var oldIdInAnnotationDangling = executions(this::oldIdInAnnotationDanglingExecution, OA_DANGLING_1, OA_DANGLING_2);
var running = executions(this::runningExecution, RUNNING_1, RUNNING_2);
var nonStyx = executions(this::runningNonStyxExecution, NON_STYX_1, NON_STYX_2);
var nonRunning = executions(this::nonRunningExecutions, NON_RUNNING_1, NON_RUNNING_2);

time.setOffset(BACK_ENOUGH_TO_MAKE_EXECUTIONS_YOUNG);
var nonActiveYoungDangling = executions(this::nonActiveDanglingExecution, NA_DANGLING_YOUNG_1, NA_DANGLING_YOUNG_2);
stubListExecutions(nonActiveYoungDangling, nonActiveDangling, noIdInStateDangling, noIdInAnnotationDangling,
oldIdInAnnotationDangling, running, nonStyx, nonRunning);
oldIdInAnnotationDangling, running, nonStyx);

time.reset();
}
Expand All @@ -155,8 +152,8 @@ public void shouldCallListExecutionsWithFilter() {
var filters = filterCatcher.getValue().split("\\+");
assertThat(filters, arrayContainingInAnyOrder(
equalTo("value_in(phase,RUNNING)"),
startsWith("gte(started_at,"),
startsWith("lte(started_at,"))
startsWith("gte(execution_created_at,"),
startsWith("lte(execution_created_at,"))
);
}

Expand Down Expand Up @@ -216,14 +213,6 @@ public void shouldNotTerminateNonStyxFlyteExecutions() {
verifyTerminateExecution(never(), NON_STYX_2);
}

@Test
public void shouldNotTerminateFlyteExecutionsInTerminalPhase() {
runner.terminateDanglingFlyteExecutions();

verifyTerminateExecution(never(), NON_RUNNING_1);
verifyTerminateExecution(never(), NON_RUNNING_2);
}

@Test
public void shouldScheduleTerminationOnInit() {
runner.init();
Expand Down Expand Up @@ -256,32 +245,28 @@ private void stubListExecutions(List<Execution>... executions) {
when(stateManager.listActiveInstances()).thenReturn(activeInstances);
}

private Execution nonRunningExecutions(String name) {
return execution(name, styxAnnotations(name, workflowInstance(name)), Phase.SUCCEEDED);
}

private Execution noIdInStateDanglingExecution(String name) {
final var workflowInstance = workflowInstance(name);
addToActiveStates(workflowInstance, StateData.newBuilder().build());
return execution(name, styxAnnotations(name, workflowInstance), Phase.RUNNING);
return execution(name, styxAnnotations(name, workflowInstance));
}

private Execution oldIdInAnnotationDanglingExecution(String name) {
final var workflowInstance = workflowInstance(name);
addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build());
return execution(name, oldRunIdStyxAnnotation(name, workflowInstance), Phase.RUNNING);
return execution(name, oldRunIdStyxAnnotation(name, workflowInstance));
}

private Execution noIdInAnnotationDanglingExecution(String name) {
final var workflowInstance = workflowInstance(name);
addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build());
return execution(name, noRunIdStyxAnnotation(workflowInstance), Phase.RUNNING);
return execution(name, noRunIdStyxAnnotation(workflowInstance));
}

private Execution runningExecution(String name) {
final var workflowInstance = workflowInstance(name);
addToActiveStates(workflowInstance, StateData.newBuilder().executionId(name).build());
return execution(name, styxAnnotations(name, workflowInstance), Phase.RUNNING);
return execution(name, styxAnnotations(name, workflowInstance));
}

private void addToActiveStates(WorkflowInstance workflowInstance, StateData state) {
Expand All @@ -295,14 +280,14 @@ private void addToActiveStates(WorkflowInstance workflowInstance, StateData stat
}

private Execution nonActiveDanglingExecution(String name) {
return execution(name, styxAnnotations(name, workflowInstance(name)), Phase.RUNNING);
return execution(name, styxAnnotations(name, workflowInstance(name)));
}

private Execution runningNonStyxExecution(String name) {
return execution(name, emptyAnnotations(), Phase.RUNNING);
return execution(name, emptyAnnotations());
}

private Execution execution(String name, Common.Annotations annotations, Phase phase) {
private Execution execution(String name, Common.Annotations annotations) {
var timestamp = nowTimestamp();
var identifier = IdentifierOuterClass.WorkflowExecutionIdentifier.newBuilder()
.setProject(PROJECT)
Expand All @@ -315,7 +300,7 @@ private Execution execution(String name, Common.Annotations annotations, Phase p
.setAnnotations(annotations)
.build())
.setClosure(ExecutionOuterClass.ExecutionClosure.newBuilder()
.setPhase(phase)
.setPhase(Phase.RUNNING)
.setCreatedAt(timestamp)
.setStartedAt(timestamp)
.build())
Expand Down

0 comments on commit 22ad5ab

Please sign in to comment.