From efc043ab9d5f71a6885ede92bd818dc971c283ab Mon Sep 17 00:00:00 2001 From: Brandon Duffany Date: Tue, 17 Sep 2024 14:17:41 -0700 Subject: [PATCH] Use affinity router for non-CI tasks (#7476) --- .../scheduling/task_router/task_router.go | 33 +++---- .../task_router/task_router_test.go | 89 +------------------ .../remote_execution/remote_execution_test.go | 20 +++-- 3 files changed, 26 insertions(+), 116 deletions(-) diff --git a/enterprise/server/scheduling/task_router/task_router.go b/enterprise/server/scheduling/task_router/task_router.go index ade67139ddf..20feeb49208 100644 --- a/enterprise/server/scheduling/task_router/task_router.go +++ b/enterprise/server/scheduling/task_router/task_router.go @@ -35,12 +35,12 @@ const ( // router for routable tasks. This is intentionally less than the number of // probes per task (for load balancing purposes). defaultPreferredNodeLimit = 1 - // The preferred node limit for workflows. + // The preferred node limit for ci_runner tasks. // This is set higher than the default limit since we strongly prefer - // workflow tasks to hit a node with a warm bazel workspace, but it is + // these tasks to hit a node with a warm bazel workspace, but it is // set less than the number of probes so that we can autoscale the workflow // executor pool effectively. - workflowsPreferredNodeLimit = 1 + ciRunnerPreferredNodeLimit = 1 ) type taskRouter struct { @@ -71,7 +71,7 @@ func New(env environment.Env) (interfaces.TaskRouter, error) { if rdb == nil { return nil, status.FailedPreconditionError("Redis is required for task router") } - strategies := []Router{runnerRecycler{}, affinityRouter{}} + strategies := []Router{ciRunnerRouter{}, affinityRouter{}} return &taskRouter{ env: env, rdb: rdb, @@ -268,22 +268,19 @@ type Router interface { RoutingInfo(params routingParams) (int, []string, error) } -// The runnerRecycler is a router that attempts to "recycle" warm execution -// nodes when possible. -type runnerRecycler struct{} +// The ciRunnerRouter routes ci_runner tasks according to git branch +// information. +type ciRunnerRouter struct{} -func (runnerRecycler) Applies(params routingParams) bool { - return platform.IsTrue(platform.FindValue(params.cmd.GetPlatform(), platform.RecycleRunnerPropertyName)) +func (ciRunnerRouter) Applies(params routingParams) bool { + return platform.IsCICommand(params.cmd) && platform.IsTrue(platform.FindValue(params.cmd.GetPlatform(), platform.RecycleRunnerPropertyName)) } -func (runnerRecycler) preferredNodeLimit(params routingParams) int { - if isWorkflow(params.cmd) { - return workflowsPreferredNodeLimit - } - return defaultPreferredNodeLimit +func (ciRunnerRouter) preferredNodeLimit(params routingParams) int { + return ciRunnerPreferredNodeLimit } -func (runnerRecycler) routingKeys(params routingParams) ([]string, error) { +func (ciRunnerRouter) routingKeys(params routingParams) ([]string, error) { parts := []string{"task_route", params.groupID} keys := make([]string, 0) @@ -326,16 +323,12 @@ func (runnerRecycler) routingKeys(params routingParams) ([]string, error) { return keys, nil } -func (s runnerRecycler) RoutingInfo(params routingParams) (int, []string, error) { +func (s ciRunnerRouter) RoutingInfo(params routingParams) (int, []string, error) { nodeLimit := s.preferredNodeLimit(params) keys, err := s.routingKeys(params) return nodeLimit, keys, err } -func isWorkflow(cmd *repb.Command) bool { - return platform.FindValue(cmd.GetPlatform(), platform.WorkflowIDPropertyName) != "" -} - // affinityRouter generates Redis routing keys based on: // - remoteInstanceName // - groupID diff --git a/enterprise/server/scheduling/task_router/task_router_test.go b/enterprise/server/scheduling/task_router/task_router_test.go index 6195e19c8c4..80bd5ad3c47 100644 --- a/enterprise/server/scheduling/task_router/task_router_test.go +++ b/enterprise/server/scheduling/task_router/task_router_test.go @@ -70,55 +70,6 @@ func TestTaskRouter_RankNodes_Workflows_ReturnsLatestRunnerThatExecutedWorkflow( requireNonSequential(t, ranked[2:]) } -func TestTaskRouter_RankNodes_DefaultNodeLimit_ReturnsOnlyLatestNodeMarkedComplete(t *testing.T) { - // Mark a routable task complete by executor 1. - - env := newTestEnv(t) - router := newTaskRouter(t, env) - ctx := withAuthUser(t, context.Background(), env, "US1") - cmd := &repb.Command{ - Platform: &repb.Platform{ - Properties: []*repb.Platform_Property{ - {Name: "recycle-runner", Value: "true"}, - }, - }, - } - instanceName := "test-instance" - - router.MarkComplete(ctx, cmd, instanceName, executorHostID1) - - nodes := sequentiallyNumberedNodes(100) - - // Task should now be routed to executor 1. - - ranked := router.RankNodes(ctx, cmd, instanceName, nodes) - - requireSameExecutionNodes(t, nodes, ranked) - require.Equal(t, executorHostID1, ranked[0].GetExecutionNode().GetExecutorHostId()) - requireNonSequential(t, ranked[1:]) - - // Mark the same task complete by executor 2 as well. - - router.MarkComplete(ctx, cmd, instanceName, executorHostID2) - - ranked = router.RankNodes(ctx, cmd, instanceName, nodes) - - // Task should now be routed to executor 2, but executor 1 should be ranked - // randomly, since we only store up to 1 recent executor for non-workflow - // tasks. - - requireSameExecutionNodes(t, nodes, ranked) - require.Equal(t, executorHostID2, ranked[0].GetExecutionNode().GetExecutorHostId()) - require.True(t, ranked[0].IsPreferred()) - - requireNotAlwaysRanked(1, executorHostID1, t, router, ctx, cmd, instanceName) - requireNonSequential(t, ranked[1:]) - - for i := 1; i < 100; i++ { - require.False(t, ranked[i].IsPreferred()) - } -} - func TestTaskRouter_RankNodes_RoutesByHostID(t *testing.T) { // Mark a routable task complete by executor 1. @@ -131,6 +82,7 @@ func TestTaskRouter_RankNodes_RoutesByHostID(t *testing.T) { {Name: "recycle-runner", Value: "true"}, }, }, + OutputPaths: []string{"foo.out"}, } instanceName := "test-instance" @@ -310,45 +262,6 @@ func TestTaskRouter_RankNodes_AffinityRoutingDisabled(t *testing.T) { requireNotAlwaysRanked(0, executorHostID1, t, router, ctx, cmd, instanceName) } -func TestTaskRouter_RankNodes_RunnerRecyclingTakesPrecedence(t *testing.T) { - env := newTestEnv(t) - router := newTaskRouter(t, env) - ctx := withAuthUser(t, context.Background(), env, "US1") - oaCmd := &repb.Command{ - Platform: &repb.Platform{ - Properties: []*repb.Platform_Property{ - {Name: "recycle-runner", Value: "true"}, - {Name: "affinity-routing", Value: "true"}, - }, - }, - OutputPaths: []string{"/bazel-out/foo.a"}, - } - instanceName := "test-instance" - - router.MarkComplete(ctx, oaCmd, instanceName, executorHostID1) - - rrCmd := &repb.Command{ - Platform: &repb.Platform{ - Properties: []*repb.Platform_Property{ - {Name: "recycle-runner", Value: "true"}, - {Name: "affinity-routing", Value: "true"}, - }, - }, - } - - router.MarkComplete(ctx, rrCmd, instanceName, executorHostID2) - - nodes := sequentiallyNumberedNodes(100) - - // Task should be routed to executor 2, because the runner recycling - // routing should take priority - ranked := router.RankNodes(ctx, oaCmd, instanceName, nodes) - - requireSameExecutionNodes(t, nodes, ranked) - require.Equal(t, executorHostID2, ranked[0].GetExecutionNode().GetExecutorHostId()) - requireNonSequential(t, ranked[1:]) -} - func TestTaskRouter_RankNodes_JustShufflesIfCommandIsNotAvailable(t *testing.T) { env := newTestEnv(t) router := newTaskRouter(t, env) diff --git a/enterprise/server/test/integration/remote_execution/remote_execution_test.go b/enterprise/server/test/integration/remote_execution/remote_execution_test.go index b3f46fb7af2..dc6b4bc67e9 100644 --- a/enterprise/server/test/integration/remote_execution/remote_execution_test.go +++ b/enterprise/server/test/integration/remote_execution/remote_execution_test.go @@ -474,8 +474,9 @@ func TestSimpleCommand_RunnerReuse_MultipleExecutors_RoutesCommandToSameExecutor opts := &rbetest.ExecuteOpts{APIKey: rbe.APIKey1} cmd := rbe.Execute(&repb.Command{ - Arguments: []string{"touch", "foo.txt"}, - Platform: platform, + Arguments: []string{"touch", "foo.txt"}, + OutputPaths: []string{"foo.txt"}, + Platform: platform, }, opts) res := cmd.Wait() @@ -484,8 +485,9 @@ func TestSimpleCommand_RunnerReuse_MultipleExecutors_RoutesCommandToSameExecutor rbetest.WaitForAnyPooledRunner(t, ctx) cmd = rbe.Execute(&repb.Command{ - Arguments: []string{"stat", "foo.txt"}, - Platform: platform, + Arguments: []string{"stat", "foo.txt"}, + OutputPaths: []string{"foo.txt"}, + Platform: platform, }, opts) res = cmd.Wait() @@ -520,8 +522,9 @@ func TestSimpleCommand_RunnerReuse_PoolSelectionViaHeader_RoutesCommandToSameExe } cmd := rbe.Execute(&repb.Command{ - Arguments: []string{"touch", "foo.txt"}, - Platform: platform, + Arguments: []string{"touch", "foo.txt"}, + OutputPaths: []string{"foo.txt"}, + Platform: platform, }, opts) res := cmd.Wait() @@ -530,8 +533,9 @@ func TestSimpleCommand_RunnerReuse_PoolSelectionViaHeader_RoutesCommandToSameExe rbetest.WaitForAnyPooledRunner(t, ctx) cmd = rbe.Execute(&repb.Command{ - Arguments: []string{"stat", "foo.txt"}, - Platform: platform, + Arguments: []string{"stat", "foo.txt"}, + OutputPaths: []string{"foo.txt"}, + Platform: platform, }, opts) res = cmd.Wait()