Skip to content

Commit

Permalink
Cleaned up code and removed unnecessary ineligible shards
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
  • Loading branch information
shiv0408 committed Aug 22, 2023
1 parent 8b47472 commit aa68e29
Showing 1 changed file with 8 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.gateway.AsyncBatchShardFetch.FetchResult;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadata;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;

Expand Down Expand Up @@ -59,32 +58,29 @@ public void processExistingRecoveries(RoutingAllocation allocation, List<Set<Sha
continue;
}
if (shard.initializing() == false) {
ineligibleShards.add(shard);
continue;
}
if (shard.relocatingNodeId() != null) {
ineligibleShards.add(shard);
continue;
}

// if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
if (shard.unassignedInfo() != null && shard.unassignedInfo().getReason() == UnassignedInfo.Reason.INDEX_CREATED) {
ineligibleShards.add(shard);
continue;
}
eligibleFetchShards.add(shard);
}
AsyncBatchShardFetch.FetchResult <NodeStoreFilesMetadataBatch> shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation);
if (shardState.hasData()) {
AsyncBatchShardFetch.FetchResult<NodeStoreFilesMetadataBatch> shardState = fetchData(eligibleFetchShards, ineligibleShards, allocation);
if (shardState.hasData() == false) {
logger.trace("{}: fetching new stores for initializing shard batch", eligibleFetchShards);
continue; // still fetching
}
for (ShardRouting shard: eligibleFetchShards) {
for (ShardRouting shard : eligibleFetchShards) {
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shard.shardId());
assert primaryShard != null : "the replica shard can be allocated on at least one node, so there must be an active primary";
assert primaryShard.currentNodeId() != null;
final DiscoveryNode primaryNode = allocation.nodes().get(primaryShard.currentNodeId());
final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore= findStore(primaryNode, shardState, shard);
final TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata primaryStore = findStore(primaryNode, shardState, shard);
if (primaryStore == null) {
// if we can't find the primary data, it is probably because the primary shard is corrupted (and listing failed)
// just let the recovery find it out, no need to do anything about it for the initializing shard
Expand Down Expand Up @@ -166,7 +162,7 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
Set<ShardRouting> shardsEligibleForFetch = new HashSet<>();
Set<ShardRouting> shardsNotEligibleForFetch = new HashSet<>();
HashMap<ShardRouting, Tuple<Decision, Map<String, NodeAllocationResult>>> nodeAllocationDecisions = new HashMap<>();
for(ShardRouting shard : shards) {
for (ShardRouting shard : shards) {
if (!isResponsibleFor(shard)) {
// this allocator n is not responsible for allocating this shard
shardsNotEligibleForFetch.add(shard);
Expand All @@ -176,19 +172,17 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(

Tuple<Decision, Map<String, NodeAllocationResult>> result = canBeAllocatedToAtLeastOneNode(shard, allocation);
Decision allocationDecision = result.v1();
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))){
if (allocationDecision.type() != Decision.Type.YES && (!explain || !hasInitiatedFetching(shard))) {
// only return early if we are not in explain mode, or we are in explain mode but we have not
// yet attempted to fetch any shard data
logger.trace("{}: ignoring allocation, can't be allocated on any node", shard);
shardsNotEligibleForFetch.add(shard);
shardAllocationDecisions.put(shard,
AllocateUnassignedDecision.no(UnassignedInfo.AllocationStatus.fromDecision(allocationDecision.type()),
result.v2() != null ? new ArrayList<>(result.v2().values()) : null));
continue;
}
// storing the nodeDecisions in nodeAllocationDecisions if the decision is not YES
// so that we don't have to compute the decisions again
// ToDo: Check if we need to store or computing again will be cheaper/better
nodeAllocationDecisions.put(shard, result);

shardsEligibleForFetch.add(shard);
Expand All @@ -197,15 +191,6 @@ public HashMap<ShardRouting, AllocateUnassignedDecision> makeAllocationDecision(
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(shardsEligibleForFetch, shardsNotEligibleForFetch, allocation);

// ToDo: Analyze if we need to create hashmaps here or sequential is better
// Map<ShardRouting, DiscoveryNode> primaryNodesMap = shardsEligibleForFetch.stream()
// .map(x -> routingNodes.activePrimary(x.shardId()))
// .filter(Objects::nonNull)
// .filter(node -> node.currentNodeId() != null)
// .collect(Collectors.toMap(Function.identity(), node -> allocation.nodes().get(node.currentNodeId())));
//
// Map<ShardRouting, NodeStoreFilesMetadata> primaryStoreMap = findStoresBatch(primaryNodesMap, shardsState);

for (ShardRouting unassignedShard : shardsEligibleForFetch) {
if (!shardsState.hasData()) {
logger.trace("{}: ignoring allocation, still fetching shard stores", unassignedShard);
Expand Down Expand Up @@ -420,7 +405,7 @@ private static long computeMatchingBytes(

/**
* Determines if the shard can be allocated on at least one node based on the allocation deciders.
*
* <p>
* Returns the best allocation decision for allocating the shard on any node (i.e. YES if at least one
* node decided YES, THROTTLE if at least one node decided THROTTLE, and NO if none of the nodes decided
* YES or THROTTLE). If in explain mode, also returns the node-level explanations as the second element
Expand Down Expand Up @@ -457,24 +442,8 @@ private static Tuple<Decision, Map<String, NodeAllocationResult>> canBeAllocated
return Tuple.tuple(madeDecision, nodeDecisions);
}

protected abstract AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetadata.NodeStoreFilesMetadata> fetchData(ShardRouting shard, RoutingAllocation allocation);

protected abstract boolean hasInitiatedFetching(ShardRouting shard);

// private static Map<ShardRouting, NodeStoreFilesMetadata> findStoresBatch(Map<ShardRouting, DiscoveryNode> shardToNodeMap,
// FetchResult<NodeStoreFilesMetadataBatch> data) {
// Map<ShardRouting, NodeStoreFilesMetadata> shardStores = new HashMap<>();
// shardToNodeMap.entrySet().forEach(entry -> {
// NodeStoreFilesMetadataBatch nodeFilesStore = data.getData().get(entry.getValue());
// if (nodeFilesStore == null) {
// shardStores.put(entry.getKey(), null);
// } else {
// shardStores.put(entry.getKey(), nodeFilesStore.getNodeStoreFilesMetadataBatch().get(entry.getKey().shardId()));
// }
// });
// return shardStores;
// }

private static TransportNodesListShardStoreMetadataBatch.StoreFilesMetadata findStore(
DiscoveryNode node,
FetchResult<NodeStoreFilesMetadataBatch> data,
Expand Down

0 comments on commit aa68e29

Please sign in to comment.