Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Oct 23, 2024
1 parent 6a448d0 commit 825a983
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -951,7 +951,7 @@ protected void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatew
this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout;
}

private void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
protected void setFollowUpRerouteTaskPriority(Priority followUpRerouteTaskPriority) {
this.followUpRerouteTaskPriority = followUpRerouteTaskPriority;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void testAllUnassignedShardsAllocatedWhenNoTimeOutAndRerouteNotScheduled(
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -143,6 +143,49 @@ public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduled() {
System.nanoTime()
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
} else {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
allocator.allocate(allocation);
List<ShardRouting> initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING);
int node1Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node1.getId());
int node2Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node2.getId());
int node3Recoveries = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node3.getId());
assertEquals(0, initializingShards.size());
assertEquals(totalShardCount, allocation.routingNodes().unassigned().ignored().size());
assertEquals(0, node1Recoveries + node2Recoveries + node3Recoveries);
assertTrue(rerouteScheduled.get());
}

public void testAllUnassignedShardsIgnoredWhenTimedOutAndRerouteScheduledWithHighPriority() {
int numberOfIndices = 2;
int numberOfShards = 5;
int numberOfReplicas = 1;
int totalShardCount = numberOfIndices * (numberOfShards * (numberOfReplicas + 1));
Settings.Builder settings = Settings.builder()
.put("cluster.routing.allocation.balanced_shards_allocator.schedule_reroute.priority", "high");
// passing 0 for timed out latch such that all shard times out
BalancedShardsAllocator allocator = new TestBalancedShardsAllocator(settings.build(), new CountDownLatch(0));
Metadata metadata = buildMetadata(Metadata.builder(), numberOfIndices, numberOfShards, numberOfReplicas);
RoutingTable routingTable = buildRoutingTable(metadata);
setupStateAndService(metadata, routingTable);
RoutingAllocation allocation = new RoutingAllocation(
yesAllocationDeciders(),
new RoutingNodes(state, false),
state,
ClusterInfo.EMPTY,
null,
System.nanoTime()
);
AtomicBoolean rerouteScheduled = new AtomicBoolean(false);
final RerouteService rerouteService = (reason, priority, listener) -> {
if (randomBoolean()) {
listener.onFailure(new OpenSearchException("simulated"));
Expand Down Expand Up @@ -193,7 +236,7 @@ public void testAllocatePartialPrimaryShardsUntilTimedOutAndRerouteScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -237,7 +280,7 @@ public void testAllocateAllPrimaryShardsAndPartialReplicaShardsUntilTimedOutAndR
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -284,7 +327,7 @@ public void testAllShardsMoveWhenExcludedAndTimeoutNotBreachedAndRerouteNotSched
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -326,7 +369,7 @@ public void testNoShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteScheduled()
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -371,7 +414,7 @@ public void testPartialShardsMoveWhenExcludedAndTimeoutBreachedAndRerouteSchedul
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -416,7 +459,7 @@ public void testClusterRebalancedWhenNotTimedOutAndRerouteNotScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -462,7 +505,7 @@ public void testClusterNotRebalancedWhenTimedOutAndRerouteScheduled() {
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down Expand Up @@ -522,7 +565,7 @@ public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation alloca
listener.onResponse(clusterService.state());
}
assertEquals("reroute after balanced shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
assertEquals(Priority.NORMAL, priority);
rerouteScheduled.compareAndSet(false, true);
};
allocator.setRerouteService(rerouteService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.Version;
import org.opensearch.action.support.nodes.BaseNodeResponse;
import org.opensearch.cluster.ClusterInfo;
Expand Down Expand Up @@ -53,6 +52,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import static org.opensearch.gateway.ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING;
Expand Down Expand Up @@ -437,22 +437,65 @@ public void testCollectTimedOutShardsAndScheduleReroute_Success() throws Interru
TestThreadPool threadPool = new TestThreadPool(getTestName());
ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
final CountDownLatch rerouteLatch = new CountDownLatch(2);
final AtomicBoolean primary = new AtomicBoolean(true);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L));
assertEquals("reroute after existing shards allocator timed out", reason);
if (primary.get()) {
assertEquals("reroute after existing shards allocator [P] timed out", reason);
} else {
assertEquals("reroute after existing shards allocator [R] timed out", reason);
}
assertEquals(Priority.NORMAL, priority);
rerouteLatch.countDown();
};
CountDownLatch timedOutShardsLatch = new CountDownLatch(20);
testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService);
testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO);
testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO);
testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL);
BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 10);
assertEquals(1, rerouteLatch.getCount());
primary.set(false);
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 0);
assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners
final boolean terminated = terminate(threadPool);
assert terminated;
clusterService.close();
}

public void testCollectTimedOutShardsAndScheduleRerouteWithHighPriority_Success() throws InterruptedException {
createIndexAndUpdateClusterState(2, 5, 2);
TestThreadPool threadPool = new TestThreadPool(getTestName());
ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
final CountDownLatch rerouteLatch = new CountDownLatch(2);
final AtomicBoolean primary = new AtomicBoolean(true);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onResponse(clusterService.state());
assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L));
if (primary.get()) {
assertEquals("reroute after existing shards allocator [P] timed out", reason);
} else {
assertEquals("reroute after existing shards allocator [R] timed out", reason);
}
assertEquals(Priority.HIGH, priority);
rerouteLatch.countDown();
};
CountDownLatch timedOutShardsLatch = new CountDownLatch(20);
testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService);
testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO);
testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO);
BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true);
testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.HIGH);
BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 10);
assertEquals(1, rerouteLatch.getCount());
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false);
primary.set(false);
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 0);
assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners
Expand All @@ -466,22 +509,29 @@ public void testCollectTimedOutShardsAndScheduleReroute_Failure() throws Interru
TestThreadPool threadPool = new TestThreadPool(getTestName());
ClusterService clusterService = ClusterServiceUtils.createClusterService(clusterState, threadPool);
final CountDownLatch rerouteLatch = new CountDownLatch(2);
final AtomicBoolean primary = new AtomicBoolean(true);
final RerouteService rerouteService = (reason, priority, listener) -> {
listener.onFailure(new OpenSearchException("simulated"));
listener.onResponse(clusterService.state());
assertThat(rerouteLatch.getCount(), greaterThanOrEqualTo(0L));
assertEquals("reroute after existing shards allocator timed out", reason);
assertEquals(Priority.HIGH, priority);
if (primary.get()) {
assertEquals("reroute after existing shards allocator [P] timed out", reason);
} else {
assertEquals("reroute after existing shards allocator [R] timed out", reason);
}
assertEquals(Priority.NORMAL, priority);
rerouteLatch.countDown();
};
CountDownLatch timedOutShardsLatch = new CountDownLatch(20);
testShardsBatchGatewayAllocator = new TestShardBatchGatewayAllocator(timedOutShardsLatch, 1000, rerouteService);
testShardsBatchGatewayAllocator.setPrimaryBatchAllocatorTimeout(TimeValue.ZERO);
testShardsBatchGatewayAllocator.setReplicaBatchAllocatorTimeout(TimeValue.ZERO);
BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, true);
testShardsBatchGatewayAllocator.setFollowUpRerouteTaskPriority(Priority.NORMAL);
BatchRunnableExecutor executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 10);
assertEquals(1, rerouteLatch.getCount());
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, false);
primary.set(false);
executor = testShardsBatchGatewayAllocator.allocateAllUnassignedShards(testAllocation, primary.get());
executor.run();
assertEquals(timedOutShardsLatch.getCount(), 0);
assertEquals(0, rerouteLatch.getCount()); // even with failure it doesn't leak any listeners
Expand Down

0 comments on commit 825a983

Please sign in to comment.