Skip to content

Commit

Permalink
logic for remove and ignore
Browse files Browse the repository at this point in the history
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
imRishN committed Jul 15, 2024
1 parent ada30d1 commit a9b6b48
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class AllocateUnassignedDecision extends AbstractAllocationDecision {
private final long remainingDelayInMillis;
private final long configuredDelayInMillis;

private AllocateUnassignedDecision(
public AllocateUnassignedDecision(
AllocationStatus allocationStatus,
DiscoveryNode assignedNode,
String allocationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ private void processWorkItemQueue(RoutingAllocation allocation) {
while (workQueue.isEmpty() == false) {
if (System.nanoTime() - startTime > allocator.getAllocatorTimeout().nanos()) {
logger.info("Timed out while running process work item queue");
allocator.removeAndIgnorePendingUnassignedShards(allocation);
return;
}
logger.info("attempting to start work queue with size [{}], elapsed time [{}]",
Expand Down Expand Up @@ -822,6 +823,11 @@ public void allocateUnassigned(
unassignedAllocationHandler.removeAndIgnore(AllocationStatus.NO_VALID_SHARD_COPY, allocation.changes());
}

@Override
public void removeAndIgnorePendingUnassignedShards(RoutingAllocation allocation) {

}

@Override
public AllocateUnassignedDecision explainUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation allocation) {
assert unassignedShard.unassigned();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ default TimeValue getAllocatorTimeout() {
return null;
}

default void removeAndIgnorePendingUnassignedShards(RoutingAllocation allocation) {
}

/**
* Allocate all unassigned shards in the given {@link RoutingAllocation} for which this {@link ExistingShardsAllocator} is responsible.
* Default implementation calls {@link #allocateUnassigned(ShardRouting, RoutingAllocation, UnassignedAllocationHandler)} for each Unassigned shard
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -894,11 +894,11 @@ AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, lo
return AllocateUnassignedDecision.throttle(null);
}

if (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && shard.primary()) {
logger.debug("Skipping decide allocate unassigned for existing shard, shard primary : [{}], unassigned reason : [{}]",
shard.primary(), shard.unassignedInfo().getReason());
return AllocateUnassignedDecision.throttle(null);
}
// if (shard.recoverySource().getType() == RecoverySource.Type.EXISTING_STORE && shard.primary()) {
// logger.debug("Skipping decide allocate unassigned for existing shard, shard primary : [{}], unassigned reason : [{}]",
// shard.primary(), shard.unassignedInfo().getReason());
// return AllocateUnassignedDecision.throttle(null);
// }

final boolean explain = allocation.debugDecision();
Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ private Decision allocateShardCopies(
inRecoveriesLimit
);
} else {
logger.debug("reached the limit of incoming shard recoveries [{}], cluster setting [{}={}])", currentInRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),
inRecoveriesLimit);
return allocation.decision(
THROTTLE,
NAME,
Expand Down Expand Up @@ -293,6 +296,8 @@ private Decision allocateShardCopies(
inRecoveriesLimit
);
} else {
logger.debug("reached the limit of outgoing shard recoveries [{}}] on the node [{}] which holds the primary", primaryNodeOutRecoveries,
primaryShard.currentNodeId() );
return allocation.decision(
THROTTLE,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Map<ShardId,
+ "]"
);
}

logger.info("fetch result: fetch data - [{}], all ignore nodes map - [{}]", fetchData, allIgnoreNodesMap);
return new FetchResult<>(fetchData, allIgnoreNodesMap);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.core.index.shard.ShardId;
Expand All @@ -20,11 +21,8 @@
import org.opensearch.gateway.TransportNodesGatewayStartedShardHelper.NodeGatewayStartedShard;
import org.opensearch.gateway.TransportNodesListGatewayStartedShardsBatch.NodeGatewayStartedShardsBatch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
* PrimaryShardBatchAllocator is similar to {@link org.opensearch.gateway.PrimaryShardAllocator} only difference is
Expand Down Expand Up @@ -96,27 +94,74 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
}
}

long startTime = System.nanoTime();
// only fetch data for eligible shards
final FetchResult<NodeGatewayStartedShardsBatch> shardsState = fetchData(eligibleShards, inEligibleShards, allocation);

Set<ShardRouting> batchShardRoutingSet = new HashSet<>(shardRoutings);
logger.info("Time taken to fetch data in allocateUnassignedBatch [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
logger.info("Eligible shards [{}], ineligible shards [{}]", eligibleShards.size(), inEligibleShards.size());

RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
AllocateUnassignedDecision allocationDecision;

if (shardRoutings.contains(unassignedShard)) {
if (unassignedShard.primary() && batchShardRoutingSet.contains(unassignedShard)) {
assert unassignedShard.primary();
if (ineligibleShardAllocationDecisions.containsKey(unassignedShard.shardId())) {
allocationDecision = ineligibleShardAllocationDecisions.get(unassignedShard.shardId());

} else {
List<NodeGatewayStartedShard> nodeShardStates = adaptToNodeShardStates(unassignedShard, shardsState);
allocationDecision = getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
if (allocationDecision != null && allocationDecision.isDecisionTaken()) {
logger.info("allocate unassigned decision for eligible shards has decision [{}] ",
allocationDecision.getAllocationDecision().toString());
} else {
logger.info("null or no decision taken");
}
}
if (allocationDecision != null && allocationDecision.isDecisionTaken()) {
logger.info("executing allocate unassigned decision for eligible shards has decision [{}] ",
allocationDecision.getAllocationDecision().toString());
} else {
logger.info("executing - null or no decision taken");
}
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

public void removeAndIgnorePendingUnassignedBatches(RoutingAllocation allocation) {
logger.info("Triggering remove and ignore for primary shards");
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
if (unassignedShard.primary() == true) {
AllocateUnassignedDecision allocationDecision = getUnassignedShardAllocationDecisionToIgnore(unassignedShard);
// AllocateUnassignedDecision allocationDecision = getAllocationDecision(unassignedShard, allocation, null, logger);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

private AllocateUnassignedDecision getUnassignedShardAllocationDecisionToIgnore(
ShardRouting shardRouting
) {
if (!isResponsibleFor(shardRouting)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
// return AllocateUnassignedDecision.no(
// UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, // TODO - need to create new allocation status
// null
// );

return new AllocateUnassignedDecision(
UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L);
}

/**
* Transforms {@link FetchResult} of {@link NodeGatewayStartedShardsBatch} to {@link List} of {@link TransportNodesListGatewayStartedShards.NodeGatewayStartedShards}.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,7 @@
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch;
import org.opensearch.indices.store.TransportNodesListShardStoreMetadataHelper.StoreFilesMetadata;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -135,8 +131,13 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
// only fetch data for eligible shards
final FetchResult<NodeStoreFilesMetadataBatch> shardsState = fetchData(eligibleShards, ineligibleShards, allocation);

List<ShardId> shardIdsFromBatch = shardRoutings.stream().map(shardRouting -> shardRouting.shardId()).collect(Collectors.toList());
Set<ShardId> shardIdsFromBatch = new HashSet<>();
for (ShardRouting shardRouting : shardRoutings) {
ShardId shardId = shardRouting.shardId();
shardIdsFromBatch.add(shardId);
}
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
logger.info("Total unassigned shards identified [{}]", allocation.routingNodes().unassigned().size());
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
// There will be only one entry for the shard in the unassigned shards batch
Expand All @@ -155,12 +156,57 @@ public void allocateUnassignedBatch(List<ShardRouting> shardRoutings, RoutingAll
allocation,
() -> convertToNodeStoreFilesMetadataMap(unassignedShard, shardsState)
);
if (allocateUnassignedDecision != null && allocateUnassignedDecision.isDecisionTaken()) {
logger.debug("allocate unassigned decision for eligible shards has decision [{}] ",
allocateUnassignedDecision.getAllocationDecision().toString());
} else {
logger.debug("produced null decision or decision not taken yet");
}
}
if (allocateUnassignedDecision != null && allocateUnassignedDecision.isDecisionTaken()) {
logger.debug("Executing decision for replica batch with decision [{}]",
allocateUnassignedDecision.getAllocationDecision().toString());
} else {
logger.debug("produced null/not taken decision before executing");
}
executeDecision(unassignedShard, allocateUnassignedDecision, allocation, iterator);
}
}
}

public void removeAndIgnorePendingUnassignedBatches(RoutingAllocation allocation) {
logger.info("Triggering remove and ignore for replica shards");
RoutingNodes.UnassignedShards.UnassignedIterator iterator = allocation.routingNodes().unassigned().iterator();
while (iterator.hasNext()) {
ShardRouting unassignedShard = iterator.next();
if (unassignedShard.primary() == false) {
// if (unassignedShard.primary() == false && unassignedShard.recoverySource().getType().equals(RecoverySource.Type.PEER) == false) {
AllocateUnassignedDecision allocationDecision = getUnassignedShardAllocationDecisionToIgnore(unassignedShard);
executeDecision(unassignedShard, allocationDecision, allocation, iterator);
}
}
}

private AllocateUnassignedDecision getUnassignedShardAllocationDecisionToIgnore(
ShardRouting shardRouting
) {
if (!isResponsibleFor(shardRouting)) {
return AllocateUnassignedDecision.NOT_TAKEN;
}
if (shardRouting.unassignedInfo().getLastAllocationStatus().equals(UnassignedInfo.AllocationStatus.FETCHING_SHARD_DATA)
|| shardRouting.unassignedInfo().getLastAllocationStatus().equals(UnassignedInfo.AllocationStatus.NO_VALID_SHARD_COPY)
) {
logger.info("Shard routing in getUnassignedShardAllocationDecisionToIgnore with id [{}], and index [{}]"
, shardRouting.id(), shardRouting.index().getName());
}
return AllocateUnassignedDecision.no(
UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, // TODO - need to create new allocation status
null
);
// return new AllocateUnassignedDecision(
// UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED, null, null, null, false, 0L, 0L);
}

private AllocateUnassignedDecision getUnassignedShardAllocationDecision(
ShardRouting shardRouting,
RoutingAllocation allocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,21 @@ protected List<Runnable> innerAllocateUnassignedBatch(
}
}

@Override
public void removeAndIgnorePendingUnassignedShards(RoutingAllocation allocation) {
long startTime = System.nanoTime();
long startTimeP = System.nanoTime();
primaryShardBatchAllocator.removeAndIgnorePendingUnassignedBatches(allocation);
logger.info("Completing remove and ignore for primary in this reroute cycle, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeP));
long startTimeR = System.nanoTime();
replicaShardBatchAllocator.removeAndIgnorePendingUnassignedBatches(allocation);
logger.info("Completing remove and ignore for replica in this reroute cycle, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeR));
logger.info("completing remove and ignore for this reroute cycle, elapsed time: [{}]",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}

// visible for testing
protected Set<String> createAndUpdateBatches(RoutingAllocation allocation, boolean primary) {
Set<String> batchesToBeAssigned = new HashSet<>();
Expand Down

0 comments on commit a9b6b48

Please sign in to comment.