From aa68e295f5821f0daeb36eab2a89efba9552fa0f Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 22 Aug 2023 13:27:19 +0530 Subject: [PATCH] Cleaned up code and removed unnecessary ineligible shards Signed-off-by: Shivansh Arora --- .../gateway/ReplicaShardBatchAllocator.java | 47 ++++--------------- 1 file changed, 8 insertions(+), 39 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java index 1ed28b281f710..1d49b6e122477 100644 --- a/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ReplicaShardBatchAllocator.java @@ -21,11 +21,10 @@ import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.common.collect.Tuple; -import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.common.unit.TimeValue; import org.opensearch.gateway.AsyncBatchShardFetch.FetchResult; import org.opensearch.index.store.StoreFileMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; @@ -59,32 +58,29 @@ public void processExistingRecoveries(RoutingAllocation allocation, List shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation); - if (shardState.hasData()) { + AsyncBatchShardFetch.FetchResult shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation); + if (shardState.hasData() == false) { logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards); continue; // still fetching } - for (ShardRouting shard: eligibleFetchShards) { + for (ShardRouting shard : eligibleFetchShards) { ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId()); assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary"; assert primaryShard.currentNodeId() != null; final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId()); - final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore= findStore(primaryNode, shardState, shard); + final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore = findStore(primaryNode, shardState, shard); if (primaryStore == null) { // if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed) // just let the recovery find it out, no need to do anything about it for the initializing shard @@ -166,7 +162,7 @@ public HashMap makeAllocationDecision( Set shardsEligibleForFetch = new HashSet<>(); Set shardsNotEligibleForFetch = new HashSet<>(); HashMap>> nodeAllocationDecisions = new HashMap<>(); - for(ShardRouting shard : shards) { + for (ShardRouting shard : shards) { if (!isResponsibleFor(shard)) { // this allocator n is not responsible for allocating this shard shardsNotEligibleForFetch.add(shard); @@ -176,11 +172,10 @@ public HashMap makeAllocationDecision( Tuple> result = canBeAllocatedToAtLeastOneNode(shard, allocation); Decision allocationDecision = result.v1(); - if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))){ + if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))) { // only return early if we are not in explain mode, or we are in explain mode but we have not // yet attempted to fetch any shard data logger.trace("{}: ignoring allocation, can't be allocated on any node", shard); - shardsNotEligibleForFetch.add(shard); shardAllocationDecisions.put(shard, AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()), result.v2() != null ? new ArrayList<>(result.v2().values()) : null)); @@ -188,7 +183,6 @@ public HashMap makeAllocationDecision( } // storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES // so that we don't have to compute the decisions again - // ToDo: Check if we need to store or computing again will be cheaper/better nodeAllocationDecisions.put(shard, result); shardsEligibleForFetch.add(shard); @@ -197,15 +191,6 @@ public HashMap makeAllocationDecision( // only fetch data for eligible shards final FetchResult shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation); - // ToDo: Analyze if we need to create hashmaps here or sequential is better -// Map primaryNodesMap = shardsEligibleForFetch.stream() -// .map(x -> routingNodes.activePrimary(x.shardId())) -// .filter(Objects::nonNull) -// .filter(node -> node.currentNodeId() != null) -// .collect(Collectors.toMap(Function.identity(), node -> allocation.nodes().get(node.currentNodeId()))); -// -// Map primaryStoreMap = findStoresBatch(primaryNodesMap, shardsState); - for (ShardRouting unassignedShard : shardsEligibleForFetch) { if (!shardsState.hasData()) { logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard); @@ -420,7 +405,7 @@ private static long computeMatchingBytes( /** * Determines if the shard can be allocated on at least one node based on the allocation deciders. - * + *

* Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one * node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided * YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element @@ -457,24 +442,8 @@ private static Tuple> canBeAllocated return Tuple.tuple(madeDecision, nodeDecisions); } - protected abstract AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation); - protected abstract boolean hasInitiatedFetching(ShardRouting shard); -// private static Map findStoresBatch(Map shardToNodeMap, -// FetchResult data) { -// Map shardStores = new HashMap<>(); -// shardToNodeMap.entrySet().forEach(entry -> { -// NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(entry.getValue()); -// if (nodeFilesStore == null) { -// shardStores.put(entry.getKey(), null); -// } else { -// shardStores.put(entry.getKey(), nodeFilesStore.getNodeStoreFilesMetadataBatch().get(entry.getKey().shardId())); -// } -// }); -// return shardStores; -// } - private static TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata findStore( DiscoveryNode node, FetchResult data,