Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Add filter to execution list (#981)
Browse files Browse the repository at this point in the history
* avoid using unfiltered execution list
because the query is very expesive on the database

* refactor method

* use time.get() supplier

* mvn license:update-file-header

* remove unused imports

* bumped log4j

* minus since once

* cleanup

* Add test for calling admin with filters

* Remove obsolete TODO

* Make since duration a constant

Co-authored-by: Nelson Arapé Pérez <nelson@spotify.com>
  • Loading branch information
ckiosidis and narape authored Feb 9, 2022
1 parent c44aec1 commit fe8e605
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 4 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-bom</artifactId>
<version>2.17.0</version>
<version>2.17.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*-
* -\-\-
* Spotify Styx Flyte Client
* --
* Copyright (C) 2016 - 2022 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.styx.flyte.client;

import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

public class RpcHelper {

private RpcHelper() {}

// Test this filter by printing it and using it on a flytectl command
// example: flytectl get executions -p PROJECT -d DOMAIN --filter.fieldSelector="execution.phase in (RUNNING),execution.started_at>2022-02-07T18:23:05,execution.started_at<2022-02-08T18:10:05" -o json
public static String getExecutionsListFilter(Instant timeNow, Duration since, Duration to) {

final String dateSince = timeNow.minus(since)
.atZone(ZoneId.of("UTC"))
.toLocalDateTime()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));


String dateTo = timeNow.minus(to)
.atZone(ZoneId.of("UTC"))
.toLocalDateTime()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss"));

return String.format("execution.phase in (RUNNING),execution.started_at>%s,execution.started_at<%s", dateSince, dateTo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*-
* -\-\-
* Spotify Styx Flyte Client
* --
* Copyright (C) 2016 - 2022 Spotify AB
* --
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* -/-/-
*/

package com.spotify.styx.flyte.client;

import static com.spotify.styx.flyte.client.RpcHelper.getExecutionsListFilter;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import org.junit.Test;

public class RpcHelperTest {


@Test
public void testGtExecutionsListFilter() {
final Instant someTime = LocalDateTime.of(2022, 2, 2, 12, 12, 05)
.atZone(ZoneOffset.UTC)
.toInstant();

final String executionsListFilter = getExecutionsListFilter(
someTime,
Duration.of(24, ChronoUnit.HOURS),
Duration.of(3, ChronoUnit.MINUTES));

assertThat(executionsListFilter,
equalTo("execution.phase in (RUNNING),execution.started_at>2022-02-01T12:12:05,execution.started_at<2022-02-02T12:09:05"));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import com.spotify.styx.docker.LabelValue;
import com.spotify.styx.flyte.client.FlyteAdminClient;
import com.spotify.styx.flyte.client.FlyteInputsUtils;
import com.spotify.styx.flyte.client.RpcHelper;
import com.spotify.styx.model.Event;
import com.spotify.styx.model.FlyteExecConf;
import com.spotify.styx.model.TriggerParameters;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class FlyteAdminClientRunner implements FlyteRunner {
@VisibleForTesting static final String STYX_WORKFLOW_INSTANCE_ANNOTATION = "styx-workflow-instance";
@VisibleForTesting static final String STYX_EXECUTION_ID_ANNOTATION = "styx-execution-id";
@VisibleForTesting static final Duration TERMINATION_GRACE_PERIOD = Duration.ofMinutes(3);
private static final Duration TERMINATION_LOOKUP_SINCE = Duration.ofDays(24);
private static final int FLYTE_TERMINATING_THREADS = 4; //TODO: tune
private static final Duration DEFAULT_TERMINATE_EXEC_INTERVAL = Duration.ofMinutes(1);
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder()
Expand Down Expand Up @@ -285,10 +287,16 @@ private void tryTerminateDanglingFlyteExecutions() {
for (var domain : project.getDomainsList()) {
String paginationToken = null;
do {
//TODO: explore using filters for listing only running executions
// or at least listing only the ones newer than some threshold
var executions =
flyteAdminClient.listExecutions(project.getId(), domain.getId(), 100, paginationToken, "");
flyteAdminClient.listExecutions(
project.getId(),
domain.getId(),
100,
paginationToken,
RpcHelper.getExecutionsListFilter(
time.get(),
TERMINATION_LOOKUP_SINCE,
TERMINATION_GRACE_PERIOD));
executions.getExecutionsList().stream()
.filter(this::haveBeenRunningForAWhile)
.map(exec -> AnnotatedFlyteExecutionId.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import static com.spotify.styx.flyte.FlyteAdminClientRunner.STYX_WORKFLOW_INSTANCE_ANNOTATION;
import static com.spotify.styx.flyte.FlyteAdminClientRunner.TERMINATE_CAUSE;
import static com.spotify.styx.flyte.FlyteAdminClientRunner.TERMINATION_GRACE_PERIOD;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -65,6 +69,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;
Expand Down Expand Up @@ -141,6 +146,20 @@ public void tearDown() throws Exception {
runner.close();
}

@Test
public void shouldCallListExecutionsWithFilter() {
runner.terminateDanglingFlyteExecutions();

ArgumentCaptor<String> filterCatcher = ArgumentCaptor.forClass(String.class);
verify(adminClient).listExecutions(any(), any(), anyInt(), any(), filterCatcher.capture());
var filters = filterCatcher.getValue().split(",");
assertThat(filters, arrayContainingInAnyOrder(
equalTo("execution.phase in (RUNNING)"),
startsWith("execution.started_at>"),
startsWith("execution.started_at<"))
);
}

@Test
public void shouldNotTerminateDanglingFlyteExecutionsWhenStateNotActiveAnymoreAndExecutionIsYoung() {
runner.terminateDanglingFlyteExecutions();
Expand Down

0 comments on commit fe8e605

Please sign in to comment.