diff --git a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java deleted file mode 100644 index 26cec71b208cc..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/PrimaryShardBatchAllocator.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.gateway; - -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; -import org.opensearch.cluster.routing.allocation.NodeAllocationResult; -import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.gateway.AsyncShardFetch.FetchResult; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch; -import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards; - -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -/** - * PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is - * that it can allocate multiple unassigned primary shards wherein PrimaryShardAllocator can only allocate single - * unassigned shard. - * The primary shard batch allocator allocates multiple unassigned primary shards to nodes that hold - * valid copies of the unassigned primaries. It does this by iterating over all unassigned - * primary shards in the routing table and fetching shard metadata from each node in the cluster - * that holds a copy of the shard. The shard metadata from each node is compared against the - * set of valid allocation IDs and for all valid shard copies (if any), the primary shard batch allocator - * executes the allocation deciders to chose a copy to assign the primary shard to. - *

- * Note that the PrimaryShardBatchAllocator does *not* allocate primaries on index creation - * (see {@link org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator}), - * nor does it allocate primaries when a primary shard failed and there is a valid replica - * copy that can immediately be promoted to primary, as this takes place in {@link RoutingNodes#failShard}. - * - * @opensearch.internal - */ -public abstract class PrimaryShardBatchAllocator extends PrimaryShardAllocator { - - abstract protected FetchResult fetchData(Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation); - - protected FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation){ - return null; - } - - @Override - public AllocateUnassignedDecision makeAllocationDecision(ShardRouting unassignedShard, - RoutingAllocation allocation, - Logger logger) { - - return makeAllocationDecision(new HashSet<>(Collections.singletonList(unassignedShard)), - allocation, logger).get(unassignedShard); - } - - /** - * Build allocation decisions for all the shards present in the batch identified by batchId. - * - * @param shards set of shards given for allocation - * @param allocation current allocation of all the shards - * @param logger logger used for logging - * @return shard to allocation decision map - */ - @Override - public HashMap makeAllocationDecision(Set shards, - RoutingAllocation allocation, - Logger logger) { - HashMap shardAllocationDecisions = new HashMap<>(); - final boolean explain = allocation.debugDecision(); - Set shardsEligibleForFetch = new HashSet<>(); - Set shardsNotEligibleForFetch = new HashSet<>(); - // identify ineligible shards - for (ShardRouting shard : shards) { - ShardRouting matchingShard = null; - for (RoutingNode node: allocation.routingNodes()) { - matchingShard = node.getByShardId(shard.shardId()); - if (matchingShard != null && matchingShard.primary() == shard.primary()) { - // we have a matching shard on this node, so this is a valid copy - break; - } - } - if (matchingShard == null) { - matchingShard = shard; - } - AllocateUnassignedDecision decision = getInEligibleShardDecision(matchingShard, allocation); - if (decision != null) { - shardsNotEligibleForFetch.add(shard); - shardAllocationDecisions.put(shard, decision); - } else { - shardsEligibleForFetch.add(shard); - } - } - // Do not call fetchData if there are no eligible shards - if (shardsEligibleForFetch.size() == 0) { - return shardAllocationDecisions; - } - // only fetch data for eligible shards - final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); - // Note : shardsState contain the Data, there key is DiscoveryNode but value is Map so to get one shard level data (from all the nodes), we'll traverse the map - // and construct the nodeShardState along the way before making any allocation decision. As metadata for a - // particular shard is needed from all the discovery nodes. - - // process the received data - for (ShardRouting unassignedShard : shardsEligibleForFetch) { - if (shardsState.hasData() == false) { - // if fetching is not done, add that no decision in the resultant map - allocation.setHasPendingAsyncFetch(); - List nodeDecisions = null; - if (explain) { - nodeDecisions = buildDecisionsForAllNodes(unassignedShard, allocation); - } - shardAllocationDecisions.put(unassignedShard, - AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, - nodeDecisions)); - } else { - - NodeShardStates nodeShardStates = getNodeShardStates(unassignedShard, shardsState); - // get allocation decision for this shard - shardAllocationDecisions.put(unassignedShard, getAllocationDecision(unassignedShard, allocation, - nodeShardStates, logger)); - } - } - return shardAllocationDecisions; - } - - private static NodeShardStates getNodeShardStates(ShardRouting unassignedShard, FetchResult shardsState) { - NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1); - Map nodeResponses = shardsState.getData(); - - // build data for a shard from all the nodes - nodeResponses.forEach((node, nodeGatewayStartedShardsBatch) -> { - NodeGatewayStartedShards shardData = nodeGatewayStartedShardsBatch.getNodeGatewayStartedShardsBatch().get(unassignedShard.shardId()); - nodeShardStates.add(new NodeShardState(node, shardData.allocationId(), shardData.primary(), shardData.replicationCheckpoint(), shardData.storeException()), node); - }); - return nodeShardStates; - } -} diff --git a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java deleted file mode 100644 index 36e32844fa6a8..0000000000000 --- a/server/src/test/java/org/opensearch/gateway/PrimaryShardBatchAllocatorTests.java +++ /dev/null @@ -1,1027 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/* - * Modifications Copyright OpenSearch Contributors. See - * GitHub history for details. - */ - -package org.opensearch.gateway; - -import org.apache.lucene.codecs.Codec; -import org.apache.lucene.index.CorruptIndexException; -import org.junit.Before; -import org.junit.BeforeClass; -import org.opensearch.Version; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.OpenSearchAllocationTestCase; -import org.opensearch.cluster.health.ClusterHealthStatus; -import org.opensearch.cluster.health.ClusterStateHealth; -import org.opensearch.cluster.metadata.IndexMetadata; -import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; -import org.opensearch.cluster.routing.RoutingNode; -import org.opensearch.cluster.routing.RoutingNodes; -import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.routing.ShardRoutingState; -import org.opensearch.cluster.routing.UnassignedInfo; -import org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus; -import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; -import org.opensearch.cluster.routing.allocation.AllocationDecision; -import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.cluster.routing.allocation.decider.AllocationDecider; -import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; -import org.opensearch.cluster.routing.allocation.decider.Decision; -import org.opensearch.common.Nullable; -import org.opensearch.common.UUIDs; -import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.set.Sets; -import org.opensearch.core.index.shard.ShardId; -import org.opensearch.env.Environment; -import org.opensearch.env.ShardLockObtainFailedException; -import org.opensearch.index.IndexSettings; -import org.opensearch.index.codec.CodecService; -import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; -import org.opensearch.repositories.IndexId; -import org.opensearch.snapshots.Snapshot; -import org.opensearch.snapshots.SnapshotId; -import org.opensearch.snapshots.SnapshotShardSizeInfo; -import org.opensearch.test.IndexSettingsModule; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.opensearch.cluster.routing.UnassignedInfo.Reason.CLUSTER_RECOVERED; -import static org.opensearch.cluster.routing.UnassignedInfo.Reason.INDEX_CREATED; -import static org.opensearch.cluster.routing.UnassignedInfo.Reason.INDEX_REOPENED; - -public class PrimaryShardBatchAllocatorTests extends OpenSearchAllocationTestCase { - - private final ShardId shardId = new ShardId("test", "_na_", 0); - private static Set shardsInBatch; - private final DiscoveryNode node1 = newNode("node1"); - private final DiscoveryNode node2 = newNode("node2"); - private final DiscoveryNode node3 = newNode("node3"); - private TestBatchAllocator batchAllocator; - - public static void setUpShards(int numberOfShards) { - shardsInBatch = new HashSet<>(); - for (int shardNumber = 0; shardNumber < numberOfShards; shardNumber++) { - ShardId shardId = new ShardId("test", "_na_", shardNumber); - shardsInBatch.add(shardId); - } - } - - @Before - public void buildTestAllocator() { - this.batchAllocator = new TestBatchAllocator(); - } - - private void allocateAllUnassigned(final RoutingAllocation allocation) { - final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - while (iterator.hasNext()) { - batchAllocator.allocateUnassigned(iterator.next(), allocation, iterator); - } - } - - private void allocateAllUnassignedBatch(final RoutingAllocation allocation) { - final RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator(); - Set shardsToBatch = new HashSet<>(); - while (iterator.hasNext()) { - shardsToBatch.add(iterator.next()); - } - batchAllocator.allocateUnassignedBatch(shardsToBatch, allocation); - } - - public void testNoProcessPrimaryNotAllocatedBefore() { - final RoutingAllocation allocation; - // with old version, we can't know if a shard was allocated before or not - allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - randomFrom(INDEX_CREATED, CLUSTER_RECOVERED, INDEX_REOPENED) - ); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(false)); - assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when async fetch returns that there is no data, the shard will not be allocated. - */ - public void testNoAsyncFetchData() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId" - ); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests when the node returns that no data was found for it (null for allocation id), - * it will be moved to ignore unassigned. - */ - public void testNoAllocationFound() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId" - ); - batchAllocator.addData(node1, null, randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore - * unassigned. - */ - public void testNoMatchingAllocationIdFound() { - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), CLUSTER_RECOVERED, "id2"); - batchAllocator.addData(node1, "id1", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests when the node returns that no data was found for it, it will be moved to ignore unassigned. - */ - public void testStoreException() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - batchAllocator.addData(node1, "allocId1", randomBoolean(), new CorruptIndexException("test", "test")); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when the node returns a ShardLockObtainFailedException, it will be considered as a valid shard copy - */ - public void testShardLockObtainFailedException() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - batchAllocator.addData(node1, "allocId1", randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node1.getId()) - ); - // check that allocation id is reused - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo("allocId1") - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that replica with the highest primary term version will be selected as target - */ - public void testPreferReplicaWithHighestPrimaryTerm() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2, - allocId3 - ); - batchAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); - batchAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 22, 120, 2, Codec.getDefault().getName())); - batchAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName())); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node2.getId()) - ); - // Assert node2's allocation id is used - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId2) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that replica with highest primary ter version will be selected as target - */ - public void testPreferReplicaWithNullReplicationCheckpoint() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2, - allocId3 - ); - batchAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 20, 101, 1, Codec.getDefault().getName())); - batchAllocator.addData(node2, allocId2, false); - batchAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 40, 120, 2, Codec.getDefault().getName())); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node3.getId()) - ); - // Assert node3's allocation id should be used as it has highest replication checkpoint - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId3) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that null ReplicationCheckpoint are ignored - */ - public void testPreferReplicaWithAllNullReplicationCheckpoint() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2, - allocId3 - ); - batchAllocator.addData(node1, allocId1, false, null, null); - batchAllocator.addData(node2, allocId2, false, null, null); - batchAllocator.addData(node3, allocId3, true, null, null); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node3.getId()) - ); - // Assert node3's allocation id should be used as it was previous primary - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId3) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that replica with highest segment info version will be selected as target on equal primary terms - */ - public void testPreferReplicaWithHighestSegmentInfoVersion() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2, - allocId3 - ); - batchAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName())); - batchAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 3, Codec.getDefault().getName())); - batchAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName())); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node2.getId()) - ); - // Assert node2's allocation id is used - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId2) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that prefer allocation of replica at lower checkpoint but in sync set - */ - public void testOutOfSyncHighestRepCheckpointIsIgnored() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId3 - ); - batchAllocator.addData(node1, allocId1, false, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName())); - batchAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName())); - batchAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName())); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node3.getId()) - ); - // Assert node3's allocation id is used - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId3) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that prefer allocation of older primary over replica with higher replication checkpoint - */ - public void testPreferAllocatingPreviousPrimaryWithLowerRepCheckpoint() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - String allocId3 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2, - allocId3 - ); - batchAllocator.addData(node1, allocId1, true, new ReplicationCheckpoint(shardId, 10, 101, 1, Codec.getDefault().getName())); - batchAllocator.addData(node2, allocId2, false, new ReplicationCheckpoint(shardId, 20, 120, 2, Codec.getDefault().getName())); - batchAllocator.addData(node3, allocId3, false, new ReplicationCheckpoint(shardId, 15, 120, 2, Codec.getDefault().getName())); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node1.getId()) - ); - // Assert node1's allocation id is used with highest replication checkpoint - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId1) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when one node returns a ShardLockObtainFailedException and another properly loads the store, it will - * select the second node as target - */ - public void testShardLockObtainFailedExceptionPreferOtherValidCopies() { - String allocId1 = randomAlphaOfLength(10); - String allocId2 = randomAlphaOfLength(10); - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - CLUSTER_RECOVERED, - allocId1, - allocId2 - ); - batchAllocator.addData(node1, allocId1, randomBoolean(), new ShardLockObtainFailedException(shardId, "test")); - batchAllocator.addData(node2, allocId2, randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node2.getId()) - ); - // check that allocation id is reused - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo(allocId2) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when there is a node to allocate the shard to, it will be allocated to it. - */ - public void testFoundAllocationAndAllocating() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), - "allocId1" - ); - batchAllocator.addData(node1, "allocId1", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node1.getId()) - ); - // check that allocation id is reused - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).allocationId().getId(), - equalTo("allocId1") - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when the nodes with prior copies of the given shard all return a decision of NO, but - * {@link AllocationDecider#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} - * returns a YES decision for at least one of those NO nodes, then we force allocate to one of them - */ - public void testForceAllocatePrimary() { - batchAllocator.addData(node1, "allocId1", randomBoolean()); - AllocationDeciders deciders = new AllocationDeciders( - Arrays.asList( - // since the deciders return a NO decision for allocating a shard (due to the guaranteed NO decision from the second - // decider), - // the allocator will see if it can force assign the primary, where the decision will be YES - new TestAllocateDecision(randomBoolean() ? Decision.YES : Decision.NO), - getNoDeciderThatAllowsForceAllocate() - ) - ); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1"); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertTrue(allocation.routingNodes().unassigned().ignored().isEmpty()); - assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), 1); - assertEquals(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), node1.getId()); - } - - /** - * Tests that when the nodes with prior copies of the given shard all return a decision of NO, and - * {@link AllocationDecider#canForceAllocatePrimary(ShardRouting, RoutingNode, RoutingAllocation)} - * returns a NO or THROTTLE decision for a node, then we do not force allocate to that node. - */ - public void testDontAllocateOnNoOrThrottleForceAllocationDecision() { - batchAllocator.addData(node1, "allocId1", randomBoolean()); - boolean forceDecisionNo = randomBoolean(); - AllocationDeciders deciders = new AllocationDeciders( - Arrays.asList( - // since both deciders here return a NO decision for allocating a shard, - // the allocator will see if it can force assign the primary, where the decision will be either NO or THROTTLE, - // so the shard will remain un-initialized - new TestAllocateDecision(Decision.NO), - forceDecisionNo ? getNoDeciderThatDeniesForceAllocate() : getNoDeciderThatThrottlesForceAllocate() - ) - ); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1"); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - List ignored = allocation.routingNodes().unassigned().ignored(); - assertEquals(ignored.size(), 1); - assertEquals( - ignored.get(0).unassignedInfo().getLastAllocationStatus(), - forceDecisionNo ? AllocationStatus.DECIDERS_NO : AllocationStatus.DECIDERS_THROTTLED - ); - assertTrue(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).isEmpty()); - } - - /** - * Tests that when the nodes with prior copies of the given shard return a THROTTLE decision, - * then we do not force allocate to that node but instead throttle. - */ - public void testDontForceAllocateOnThrottleDecision() { - batchAllocator.addData(node1, "allocId1", randomBoolean()); - AllocationDeciders deciders = new AllocationDeciders( - Arrays.asList( - // since we have a NO decision for allocating a shard (because the second decider returns a NO decision), - // the allocator will see if it can force assign the primary, and in this case, - // the TestAllocateDecision's decision for force allocating is to THROTTLE (using - // the default behavior) so despite the other decider's decision to return YES for - // force allocating the shard, we still THROTTLE due to the decision from TestAllocateDecision - new TestAllocateDecision(Decision.THROTTLE), - getNoDeciderThatAllowsForceAllocate() - ) - ); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(deciders, CLUSTER_RECOVERED, "allocId1"); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - List ignored = allocation.routingNodes().unassigned().ignored(); - assertEquals(ignored.size(), 1); - assertEquals(ignored.get(0).unassignedInfo().getLastAllocationStatus(), AllocationStatus.DECIDERS_THROTTLED); - assertTrue(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).isEmpty()); - } - - /** - * Tests that when there was a node that previously had the primary, it will be allocated to that same node again. - */ - public void testPreferAllocatingPreviousPrimary() { - String primaryAllocId = UUIDs.randomBase64UUID(); - String replicaAllocId = UUIDs.randomBase64UUID(); - RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - yesAllocationDeciders(), - randomFrom(CLUSTER_RECOVERED, INDEX_REOPENED), - primaryAllocId, - replicaAllocId - ); - boolean node1HasPrimaryShard = randomBoolean(); - batchAllocator.addData(node1, node1HasPrimaryShard ? primaryAllocId : replicaAllocId, node1HasPrimaryShard); - batchAllocator.addData(node2, node1HasPrimaryShard ? replicaAllocId : primaryAllocId, !node1HasPrimaryShard); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - DiscoveryNode allocatedNode = node1HasPrimaryShard ? node1 : node2; - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(allocatedNode.getId()) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when there is a node to allocate to, but it is throttling (and it is the only one), - * it will be moved to ignore unassigned until it can be allocated to. - */ - public void testFoundAllocationButThrottlingDecider() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - throttleAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - batchAllocator.addData(node1, "allocId1", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); - assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when there is a node to be allocated to, but it the decider said "no", we still - * force the allocation to it. - */ - public void testFoundAllocationButNoDecider() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - noAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - batchAllocator.addData(node1, "allocId1", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertThat( - allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), - equalTo(node1.getId()) - ); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation - * deciders say yes, we allocate to that node. - */ - public void testRestore() { - RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomLong(), "allocId"); - batchAllocator.addData(node1, "some allocId", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation - * deciders say throttle, we add it to ignored shards. - */ - public void testRestoreThrottle() { - RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders(), randomLong(), "allocId"); - batchAllocator.addData(node1, "some allocId", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when restoring from a snapshot and we find a node with a shard copy but allocation - * deciders say no, we still allocate to that node. - */ - public void testRestoreForcesAllocateIfShardAvailable() { - final long shardSize = randomNonNegativeLong(); - RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders(), shardSize, "allocId"); - batchAllocator.addData(node1, "some allocId", randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - final List initializingShards = allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING); - assertThat(initializingShards.size(), equalTo(1)); - assertThat(initializingShards.get(0).getExpectedShardSize(), equalTo(shardSize)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when restoring from a snapshot and we don't find a node with a shard copy, the shard will remain in - * the unassigned list to be allocated later. - */ - public void testRestoreDoesNotAssignIfNoShardAvailable() { - RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), randomNonNegativeLong(), "allocId"); - batchAllocator.addData(node1, null, randomBoolean()); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(false)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().size(), equalTo(1)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - /** - * Tests that when restoring from a snapshot and we don't know the shard size yet, the shard will remain in - * the unassigned list to be allocated later. - */ - public void testRestoreDoesNotAssignIfShardSizeNotAvailable() { - RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders(), null, "allocId"); - batchAllocator.addData(node1, null, false); - allocateAllUnassignedBatch(allocation); - assertThat(allocation.routingNodesChanged(), equalTo(true)); - assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false)); - ShardRouting ignoredRouting = allocation.routingNodes().unassigned().ignored().get(0); - assertThat(ignoredRouting.unassignedInfo().getLastAllocationStatus(), equalTo(AllocationStatus.FETCHING_SHARD_DATA)); - assertClusterHealthStatus(allocation, ClusterHealthStatus.YELLOW); - } - - public void testMakeAllocationDecisionDataFetching() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - noAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - - Set shards = new HashSet<>(); - allocateAllUnassignedBatch(allocation); - ShardRouting shard = - allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, - allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, allDecisions.keySet()); - assertEquals(AllocationDecision.AWAITING_INFO, allDecisions.get(shard).getAllocationDecision()); - } - - public void testMakeAllocationDecisionDataFetched() { - final RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas( - noAllocationDeciders(), - CLUSTER_RECOVERED, - "allocId1" - ); - - Set shards = new HashSet<>(); - ShardRouting shard = - allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - batchAllocator.addData(node1, "allocId1", true, new ReplicationCheckpoint(shardId, 20, 101, 1, - Codec.getDefault().getName())); - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, - allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, allDecisions.keySet()); - assertEquals(AllocationDecision.YES, allDecisions.get(shard).getAllocationDecision()); - } - - public void testMakeAllocationDecisionDataFetchedMultipleShards() { - final RoutingAllocation allocation = routingAllocationWithMultiplePrimaryNoReplicas( - noAllocationDeciders(), - CLUSTER_RECOVERED, - 2, - 0, - "allocId-0", "allocaId-1" - ); - setUpShards(2); - Set shards = new HashSet<>(); - for (ShardId shardId : shardsInBatch) { - ShardRouting shard = - allocation.routingTable().getIndicesRouting().get("test").shard(shardId.id()).primaryShard(); - shards.add(shard); - batchAllocator.addShardData(node1, "allocId-" + shardId.id(), shardId, true, new - ReplicationCheckpoint(shardId, 20, 101, 1, - Codec.getDefault().getName()), null); - } - HashMap allDecisions = batchAllocator.makeAllocationDecision(shards, - allocation, logger); - // verify we get decisions for all the shards - assertEquals(shards.size(), allDecisions.size()); - assertEquals(shards, allDecisions.keySet()); - assertEquals(AllocationDecision.YES, allDecisions.get(shards.iterator().next()).getAllocationDecision()); - } - - private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders, Long shardSize, String... allocIds) { - Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .putInSyncAllocationIds(0, Sets.newHashSet(allocIds)) - ) - .build(); - - final Snapshot snapshot = new Snapshot("test", new SnapshotId("test", UUIDs.randomBase64UUID())); - RoutingTable routingTable = RoutingTable.builder() - .addAsRestore( - metadata.index(shardId.getIndex()), - new SnapshotRecoverySource( - UUIDs.randomBase64UUID(), - snapshot, - Version.CURRENT, - new IndexId(shardId.getIndexName(), UUIDs.randomBase64UUID(random())) - ) - ) - .build(); - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTable) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); - return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state, null, new SnapshotShardSizeInfo(Map.of()) { - @Override - public Long getShardSize(ShardRouting shardRouting) { - return shardSize; - } - }, System.nanoTime()); - } - - private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas( - AllocationDeciders deciders, - UnassignedInfo.Reason reason, - String... activeAllocationIds - ) { - Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings(settings(Version.CURRENT)) - .numberOfShards(1) - .numberOfReplicas(0) - .putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)) - ) - .build(); - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - switch (reason) { - - case INDEX_CREATED: - routingTableBuilder.addAsNew(metadata.index(shardId.getIndex())); - break; - case CLUSTER_RECOVERED: - routingTableBuilder.addAsRecovery(metadata.index(shardId.getIndex())); - break; - case INDEX_REOPENED: - routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardId.getIndex())); - break; - default: - throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); - } - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTableBuilder.build()) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); - } - - private RoutingAllocation routingAllocationWithMultiplePrimaryNoReplicas( - AllocationDeciders deciders, - UnassignedInfo.Reason reason, - int numberOfShards, - int replicas, - String... activeAllocationIds - ) { - Metadata metadata = Metadata.builder() - .put( - IndexMetadata.builder(shardId.getIndexName()) - .settings(settings(Version.CURRENT)) - .numberOfShards(numberOfShards) - .numberOfReplicas(replicas) - .putInSyncAllocationIds(shardId.id(), Sets.newHashSet(activeAllocationIds)) - ) - .build(); - - RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); - switch (reason) { - - case INDEX_CREATED: - routingTableBuilder.addAsNew(metadata.index(shardId.getIndex())); - break; - case CLUSTER_RECOVERED: - routingTableBuilder.addAsRecovery(metadata.index(shardId.getIndex())); - break; - case INDEX_REOPENED: - routingTableBuilder.addAsFromCloseToOpen(metadata.index(shardId.getIndex())); - break; - default: - throw new IllegalArgumentException("can't do " + reason + " for you. teach me"); - } - ClusterState state = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) - .metadata(metadata) - .routingTable(routingTableBuilder.build()) - .nodes(DiscoveryNodes.builder().add(node1).add(node2).add(node3)) - .build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state, null, null, System.nanoTime()); - } - - private void assertClusterHealthStatus(RoutingAllocation allocation, ClusterHealthStatus expectedStatus) { - RoutingTable oldRoutingTable = allocation.routingTable(); - RoutingNodes newRoutingNodes = allocation.routingNodes(); - final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(oldRoutingTable.version(), newRoutingNodes).build(); - ClusterState clusterState = ClusterState.builder(new ClusterName("test-cluster")).routingTable(newRoutingTable).build(); - ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState); - assertThat(clusterStateHealth.getStatus().ordinal(), lessThanOrEqualTo(expectedStatus.ordinal())); - } - - private AllocationDecider getNoDeciderThatAllowsForceAllocate() { - return getNoDeciderWithForceAllocate(Decision.YES); - } - - private AllocationDecider getNoDeciderThatThrottlesForceAllocate() { - return getNoDeciderWithForceAllocate(Decision.THROTTLE); - } - - private AllocationDecider getNoDeciderThatDeniesForceAllocate() { - return getNoDeciderWithForceAllocate(Decision.NO); - } - - private AllocationDecider getNoDeciderWithForceAllocate(final Decision forceAllocateDecision) { - return new TestAllocateDecision(Decision.NO) { - @Override - public Decision canForceAllocatePrimary(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { - assert shardRouting.primary() : "cannot force allocate a non-primary shard " + shardRouting; - return forceAllocateDecision; - } - }; - } - - class TestBatchAllocator extends PrimaryShardBatchAllocator { - - private Map data; - - public TestBatchAllocator clear() { - data = null; - return this; - } - - public TestBatchAllocator addData( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint - ) { - return addData(node, allocationId, primary, replicationCheckpoint, null); - } - - public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary) { - Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); - return addData( - node, - allocationId, - primary, - ReplicationCheckpoint.empty(shardId, - new CodecService(null, indexSettings, null).codec("default").getName()), - null - ); - } - - public TestBatchAllocator addData(DiscoveryNode node, String allocationId, boolean primary, @Nullable Exception storeException) { - Settings nodeSettings = Settings.builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()).build(); - IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", nodeSettings); - return addData( - node, - allocationId, - primary, - ReplicationCheckpoint.empty(shardId, new CodecService(null, indexSettings, null).codec("default").getName()), - storeException - ); - } - - public TestBatchAllocator addData( - DiscoveryNode node, - String allocationId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - @Nullable Exception storeException - ) { - if (data == null) { - data = new HashMap<>(); - } - Map shardData = - Map.of - (shardId, new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards(allocationId, - primary, - replicationCheckpoint, - storeException)); - data.put( - node, - new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch( - node, - shardData - ) - ); - return this; - } - - public TestBatchAllocator addShardData( - DiscoveryNode node, - String allocationId, - ShardId shardId, - boolean primary, - ReplicationCheckpoint replicationCheckpoint, - @Nullable Exception storeException - ) { - if (data == null) { - data = new HashMap<>(); - } - Map shardData = - new HashMap<>(); - shardData.put(shardId, - new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShards(allocationId, - primary, - replicationCheckpoint, - storeException)); - if (data.get(node) != null) shardData.putAll(data.get(node).getNodeGatewayStartedShardsBatch()); - data.put( - node, - new TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch( - node, - shardData - ) - ); - return this; - } - - @Override - protected AsyncShardFetch.FetchResult - fetchData( - Set shardsEligibleForFetch, Set inEligibleShards, - RoutingAllocation allocation) { - return new AsyncShardFetch.FetchResult<>(data, - Collections.>emptyMap()); - } - } -}