From 5e83a92d2890df5edd45d1000ad5df9b7c29dc57 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 10:37:18 +0530 Subject: [PATCH 1/5] Change priority for scheduling reroute in timeout Signed-off-by: Rishab Nahata --- .../allocator/BalancedShardsAllocator.java | 42 ++++++++++++++++++- .../gateway/ShardsBatchGatewayAllocator.java | 8 ++-- 2 files changed, 44 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 785636fa7ff2a..a973193c76dce 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -62,9 +62,11 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.Locale; import java.util.Map; import java.util.Set; +import static org.opensearch.cluster.action.shard.ShardStateAction.FOLLOW_UP_REROUTE_PRIORITY_SETTING; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID; import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID; @@ -191,6 +193,32 @@ public class BalancedShardsAllocator implements ShardsAllocator { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + BalancedShardsAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -204,6 +232,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile boolean ignoreThrottleInRestore; private volatile TimeValue allocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; private long startTime; private RerouteService rerouteService; @@ -223,6 +252,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); setAllocatorTimeout(ALLOCATOR_TIMEOUT_SETTING.get(settings)); + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -233,6 +263,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); clusterSettings.addSettingsUpdateConsumer(IGNORE_THROTTLE_FOR_REMOTE_RESTORE, this::setIgnoreThrottleInRestore); clusterSettings.addSettingsUpdateConsumer(ALLOCATOR_TIMEOUT_SETTING, this::setAllocatorTimeout); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -321,6 +352,10 @@ private void setAllocatorTimeout(TimeValue allocatorTimeout) { this.allocatorTimeout = allocatorTimeout; } + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } + protected boolean allocatorTimedOut() { if (allocatorTimeout.equals(TimeValue.MINUS_ONE)) { if (logger.isTraceEnabled()) { @@ -417,10 +452,13 @@ private void failAllocationOfNewPrimaries(RoutingAllocation allocation) { private void scheduleRerouteIfAllocatorTimedOut() { if (allocatorTimedOut()) { - assert rerouteService != null : "RerouteService not set to schedule reroute after allocator time out"; + if (rerouteService == null) { + logger.info("RerouteService not set to schedule reroute after allocator time out"); + return; + } rerouteService.reroute( "reroute after balanced shards allocator timed out", - Priority.HIGH, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after balanced shards allocator timed out completed"), e -> logger.debug("reroute after balanced shards allocator timed out failed", e) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 9c38ea1df8a41..da00a0fb686d3 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -308,8 +308,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for primary shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [P] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -343,8 +343,8 @@ public void onComplete() { logger.trace("scheduling reroute after existing shards allocator timed out for replica shards"); assert rerouteService != null; rerouteService.reroute( - "reroute after existing shards allocator timed out", - Priority.HIGH, + "reroute after existing shards allocator [R] timed out", + Priority.NORMAL, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) From 6a448d0da566596b1f43beecd3d0830b9b19ba41 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 20:06:34 +0530 Subject: [PATCH 2/5] Add setting for ESA Signed-off-by: Rishab Nahata --- .../common/settings/ClusterSettings.java | 2 + .../gateway/ShardsBatchGatewayAllocator.java | 38 ++++++++++++++++++- 2 files changed, 38 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f769f8729c25b..0afe8617e156f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -276,6 +276,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.THRESHOLD_SETTING, BalancedShardsAllocator.IGNORE_THROTTLE_FOR_REMOTE_RESTORE, BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING, + BalancedShardsAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, @@ -353,6 +354,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, + ShardsBatchGatewayAllocator.FOLLOW_UP_REROUTE_PRIORITY_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index da00a0fb686d3..8e63133e87806 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -53,6 +53,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -82,6 +83,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private TimeValue primaryShardsBatchGatewayAllocatorTimeout; private TimeValue replicaShardsBatchGatewayAllocatorTimeout; + private volatile Priority followUpRerouteTaskPriority; public static final TimeValue MIN_ALLOCATOR_TIMEOUT = TimeValue.timeValueSeconds(20); private final ClusterManagerMetrics clusterManagerMetrics; @@ -145,6 +147,32 @@ public void validate(TimeValue timeValue) { Setting.Property.Dynamic ); + /** + * Adjusts the priority of the followup reroute task when current round times out. NORMAL is right for reasonable clusters, + * but for a cluster in a messed up state which is starving NORMAL priority tasks, it might be necessary to raise this higher + * to allocate existing shards. + */ + public static final Setting FOLLOW_UP_REROUTE_PRIORITY_SETTING = new Setting<>( + "cluster.routing.allocation.shards_batch_gateway_allocator.schedule_reroute.priority", + Priority.NORMAL.toString(), + ShardsBatchGatewayAllocator::parseReroutePriority, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + private static Priority parseReroutePriority(String priorityString) { + final Priority priority = Priority.valueOf(priorityString.toUpperCase(Locale.ROOT)); + switch (priority) { + case NORMAL: + case HIGH: + case URGENT: + return priority; + } + throw new IllegalArgumentException( + "priority [" + priority + "] not supported for [" + FOLLOW_UP_REROUTE_PRIORITY_SETTING.getKey() + "]" + ); + } + private final RerouteService rerouteService; private final PrimaryShardBatchAllocator primaryShardBatchAllocator; private final ReplicaShardBatchAllocator replicaShardBatchAllocator; @@ -179,6 +207,8 @@ public ShardsBatchGatewayAllocator( this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setReplicaBatchAllocatorTimeout); this.clusterManagerMetrics = clusterManagerMetrics; + setFollowUpRerouteTaskPriority(FOLLOW_UP_REROUTE_PRIORITY_SETTING.get(settings)); + clusterSettings.addSettingsUpdateConsumer(FOLLOW_UP_REROUTE_PRIORITY_SETTING, this::setFollowUpRerouteTaskPriority); } @Override @@ -309,7 +339,7 @@ public void onComplete() { assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator [P] timed out", - Priority.NORMAL, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -344,7 +374,7 @@ public void onComplete() { assert rerouteService != null; rerouteService.reroute( "reroute after existing shards allocator [R] timed out", - Priority.NORMAL, + followUpRerouteTaskPriority, ActionListener.wrap( r -> logger.trace("reroute after existing shards allocator timed out completed"), e -> logger.debug("reroute after existing shards allocator timed out failed", e) @@ -920,4 +950,8 @@ protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatew protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } + + private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; + } } From 825a983868ff593488adf0e246976191ea7a9a9f Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Wed, 23 Oct 2024 21:09:27 +0530 Subject: [PATCH 3/5] Fix tests Signed-off-by: Rishab Nahata --- .../gateway/ShardsBatchGatewayAllocator.java | 2 +- ...TimeBoundBalancedShardsAllocatorTests.java | 61 ++++++++++++++--- .../gateway/GatewayAllocatorTests.java | 68 ++++++++++++++++--- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 8e63133e87806..82229f244239f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -951,7 +951,7 @@ protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatew this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; } - private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { + protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) { this.followUpRerouteTaskPriority = followUpRerouteTaskPriority; } } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java index 45a0bd7b18afd..c6705a678e077 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/allocator/TimeBoundBalancedShardsAllocatorTests.java @@ -108,7 +108,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled( listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -143,6 +143,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() { System.nanoTime() ); AtomicBoolean rerouteScheduled = new AtomicBoolean(false); + final RerouteService rerouteService = (reason, priority, listener) -> { + if (randomBoolean()) { + listener.onFailure(new OpenSearchException("simulated")); + } else { + listener.onResponse(clusterService.state()); + } + assertEquals("reroute after balanced shards allocator timed out", reason); + assertEquals(Priority.NORMAL, priority); + rerouteScheduled.compareAndSet(false, true); + }; + allocator.setRerouteService(rerouteService); + allocator.allocate(allocation); + List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); + int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId()); + int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId()); + int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId()); + assertEquals(0, initializingShards.size()); + assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size()); + assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries); + assertTrue(rerouteScheduled.get()); + } + + public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() { + int numberOfIndices = 2; + int numberOfShards = 5; + int numberOfReplicas = 1; + int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1)); + Settings.Builder settings = Settings.builder() + .put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high"); + // passing 0 for timed out latch such that all shard times out + BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0)); + Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas); + RoutingTable routingTable = buildRoutingTable(metadata); + setupStateAndService(metadata, routingTable); + RoutingAllocation allocation = new RoutingAllocation( + yesAllocationDeciders(), + new RoutingNodes(state, false), + state, + ClusterInfo.EMPTY, + null, + System.nanoTime() + ); + AtomicBoolean rerouteScheduled = new AtomicBoolean(false); final RerouteService rerouteService = (reason, priority, listener) -> { if (randomBoolean()) { listener.onFailure(new OpenSearchException("simulated")); @@ -193,7 +236,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -237,7 +280,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -284,7 +327,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -326,7 +369,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled() listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -371,7 +414,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -416,7 +459,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -462,7 +505,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() { listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); @@ -522,7 +565,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca listener.onResponse(clusterService.state()); } assertEquals("reroute after balanced shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + assertEquals(Priority.NORMAL, priority); rerouteScheduled.compareAndSet(false, true); }; allocator.setRerouteService(rerouteService); diff --git a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java index ebc2e59fa5a30..be2486846d401 100644 --- a/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java +++ b/server/src/test/java/org/opensearch/gateway/GatewayAllocatorTests.java @@ -10,7 +10,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.support.nodes.BaseNodeResponse; import org.opensearch.cluster.ClusterInfo; @@ -53,6 +52,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING; @@ -437,10 +437,51 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); + rerouteLatch.countDown(); + }; + CountDownLatch timedOutShardsLatch = new CountDownLatch(20); + testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); + testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 10); + assertEquals(1, rerouteLatch.getCount()); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); + executor.run(); + assertEquals(timedOutShardsLatch.getCount(), 0); + assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners + final boolean terminated = terminate(threadPool); + assert terminated; + clusterService.close(); + } + + public void testCollectTimedOutShardsAndScheduleRerouteWithHighPriority_Success() throws InterruptedException { + createIndexAndUpdateClusterState(2, 5, 2); + TestThreadPool threadPool = new TestThreadPool(getTestName()); + ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); + final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); + final RerouteService rerouteService = (reason, priority, listener) -> { + listener.onResponse(clusterService.state()); + assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } assertEquals(Priority.HIGH, priority); rerouteLatch.countDown(); }; @@ -448,11 +489,13 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.HIGH); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners @@ -466,22 +509,29 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru TestThreadPool threadPool = new TestThreadPool(getTestName()); ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool); final CountDownLatch rerouteLatch = new CountDownLatch(2); + final AtomicBoolean primary = new AtomicBoolean(true); final RerouteService rerouteService = (reason, priority, listener) -> { - listener.onFailure(new OpenSearchException("simulated")); + listener.onResponse(clusterService.state()); assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L)); - assertEquals("reroute after existing shards allocator timed out", reason); - assertEquals(Priority.HIGH, priority); + if (primary.get()) { + assertEquals("reroute after existing shards allocator [P] timed out", reason); + } else { + assertEquals("reroute after existing shards allocator [R] timed out", reason); + } + assertEquals(Priority.NORMAL, priority); rerouteLatch.countDown(); }; CountDownLatch timedOutShardsLatch = new CountDownLatch(20); testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService); testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO); testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO); - BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true); + testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL); + BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 10); assertEquals(1, rerouteLatch.getCount()); - executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false); + primary.set(false); + executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get()); executor.run(); assertEquals(timedOutShardsLatch.getCount(), 0); assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners From 5368e7f9e7497f1ac34bdf71afacdf44e9d3d460 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 24 Oct 2024 00:38:42 +0530 Subject: [PATCH 4/5] Trigger Build Signed-off-by: Rishab Nahata From 2ba604d9228cc13e3ed9bed80a78d8741306c239 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Thu, 24 Oct 2024 08:50:51 +0530 Subject: [PATCH 5/5] Trigger Build Signed-off-by: Rishab Nahata