From e3354b4b66c502477f317c1bc6a45b0ee5c7e4d0 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Tue, 22 Aug 2023 13:38:27 +0530 Subject: [PATCH] Removed GatewayAllocator from PR Signed-off-by: Shivansh Arora --- .../opensearch/gateway/GatewayAllocator.java | 427 +----------------- 1 file changed, 6 insertions(+), 421 deletions(-) diff --git a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java index 6c8a042e6b85d..5a20112b19219 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayAllocator.java @@ -42,29 +42,21 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RerouteService; -import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.opensearch.cluster.routing.allocation.ExistingShardsAllocator; import org.opensearch.cluster.routing.allocation.FailedShard; import org.opensearch.cluster.routing.allocation.RoutingAllocation; import org.opensearch.common.Priority; -import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.common.lease.Releasables; import org.opensearch.core.index.shard.ShardId; import org.opensearch.indices.store.TransportNodesListShardStoreMetadata; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch; -import org.opensearch.indices.store.TransportNodesListShardStoreMetadataBatch.NodeStoreFilesMetadataBatch; import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; @@ -81,45 +73,29 @@ public class GatewayAllocator implements ExistingShardsAllocator { public static final String ALLOCATOR_NAME = "gateway_allocator"; private static final Logger logger = LogManager.getLogger(GatewayAllocator.class); - private static final long MAX_BATCH_SIZE = 2000; // will change it to a dynamic setting later private final RerouteService rerouteService; private final PrimaryShardAllocator primaryShardAllocator; private final ReplicaShardAllocator replicaShardAllocator; - private final PrimaryShardBatchAllocator primaryBatchShardAllocator; - private final ReplicaShardBatchAllocator replicaBatchShardAllocator; - private final TransportNodesListGatewayStartedShardsBatch batchStartedAction; - private final TransportNodesListShardStoreMetadataBatch batchStoreAction; - private final ConcurrentMap< ShardId, AsyncShardFetch> asyncFetchStarted = ConcurrentCollections - .newConcurrentMap(); + .newConcurrentMap(); private final ConcurrentMap> asyncFetchStore = ConcurrentCollections.newConcurrentMap(); private Set lastSeenEphemeralIds = Collections.emptySet(); - private final ConcurrentMap startedShardBatchLookup = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap batchIdToStartedShardBatch = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap storeShardBatchLookup = ConcurrentCollections.newConcurrentMap(); - private final ConcurrentMap batchIdToStoreShardBatch = ConcurrentCollections.newConcurrentMap(); @Inject public GatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShards startedAction, - TransportNodesListShardStoreMetadata storeAction, - TransportNodesListGatewayStartedShardsBatch batchStartedAction, - TransportNodesListShardStoreMetadataBatch batchStoreAction + TransportNodesListShardStoreMetadata storeAction ) { this.rerouteService = rerouteService; this.primaryShardAllocator = new InternalPrimaryShardAllocator(startedAction); this.replicaShardAllocator = new InternalReplicaShardAllocator(storeAction); - this.batchStartedAction = batchStartedAction; - this.primaryBatchShardAllocator = new InternalPrimaryBatchShardAllocator(); - this.batchStoreAction = batchStoreAction; - this.replicaBatchShardAllocator = new InternalReplicaBatchShardAllocator(); } @Override @@ -128,10 +104,6 @@ public void cleanCaches() { asyncFetchStarted.clear(); Releasables.close(asyncFetchStore.values()); asyncFetchStore.clear(); - batchIdToStartedShardBatch.clear(); - batchIdToStoreShardBatch.clear(); - startedShardBatchLookup.clear(); - storeShardBatchLookup.clear(); } // for tests @@ -139,10 +111,6 @@ protected GatewayAllocator() { this.rerouteService = null; this.primaryShardAllocator = null; this.replicaShardAllocator = null; - this.batchStartedAction = null; - this.primaryBatchShardAllocator = null; - this.batchStoreAction = null; - this.replicaBatchShardAllocator = null; } @Override @@ -162,7 +130,6 @@ public void applyStartedShards(final List startedShards, final Rou for (ShardRouting startedShard : startedShards) { Releasables.close(asyncFetchStarted.remove(startedShard.shardId())); Releasables.close(asyncFetchStore.remove(startedShard.shardId())); - safelyRemoveShardFromBatch(startedShard); } } @@ -171,7 +138,6 @@ public void applyFailedShards(final List failedShards, final Routin for (FailedShard failedShard : failedShards) { Releasables.close(asyncFetchStarted.remove(failedShard.getRoutingEntry().shardId())); Releasables.close(asyncFetchStore.remove(failedShard.getRoutingEntry().shardId())); - safelyRemoveShardFromBatch(failedShard.getRoutingEntry()); } } @@ -179,30 +145,15 @@ public void applyFailedShards(final List failedShards, final Routin public void beforeAllocation(final RoutingAllocation allocation) { assert primaryShardAllocator != null; assert replicaShardAllocator != null; - assert primaryBatchShardAllocator != null; - assert replicaBatchShardAllocator != null; ensureAsyncFetchStorePrimaryRecency(allocation); } @Override public void afterPrimariesBeforeReplicas(RoutingAllocation allocation) { - // ToDo: fetch from settings - boolean batchMode = true; - if (batchMode) { - assert replicaBatchShardAllocator != null; - List> storedShardBatches = batchIdToStoreShardBatch.values().stream() - .map(ShardsBatch::getBatchedShardRoutings) - .collect(Collectors.toList()); - if (allocation.routingNodes().hasInactiveShards()) { - // cancel existing recoveries if we have a better match - replicaBatchShardAllocator.processExistingRecoveries(allocation, storedShardBatches); - } - } else { - assert replicaShardAllocator != null; - if (allocation.routingNodes().hasInactiveShards()) { - // cancel existing recoveries if we have a better match - replicaShardAllocator.processExistingRecoveries(allocation); - } + assert replicaShardAllocator != null; + if (allocation.routingNodes().hasInactiveShards()) { + // cancel existing recoveries if we have a better match + replicaShardAllocator.processExistingRecoveries(allocation); } } @@ -217,99 +168,6 @@ public void allocateUnassigned( innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator, shardRouting, unassignedAllocationHandler); } - @Override - public void allocateUnassignedBatch(final RoutingAllocation allocation, boolean primary) { - // create batches for unassigned shards - createBatches(allocation, primary); - - assert primaryBatchShardAllocator != null; - assert replicaBatchShardAllocator != null; - if (primary) { - batchIdToStartedShardBatch.values().forEach(shardsBatch -> primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation)); - } else { - batchIdToStoreShardBatch.values().forEach(batch -> replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation)); - } - } - - private void createBatches(RoutingAllocation allocation, boolean primary) { - RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); - // fetch all current batched shards - Set currentBatchedShards = primary? startedShardBatchLookup.keySet() : storeShardBatchLookup.keySet(); - Set shardsToBatch = Sets.newHashSet(); - // add all unassigned shards to the batch if they are not already in a batch - unassigned.forEach(shardRouting -> { - if ((currentBatchedShards.contains(shardRouting.shardId()) == false) && (shardRouting.primary() == primary)) { - assert shardRouting.unassigned(); - shardsToBatch.add(shardRouting); - } - }); - Iterator iterator = shardsToBatch.iterator(); - long batchSize = MAX_BATCH_SIZE; - Map addToCurrentBatch = new HashMap<>(); - while (iterator.hasNext()) { - ShardRouting currentShard = iterator.next(); - if (batchSize > 0) { - ShardBatchEntry shardBatchEntry = new ShardBatchEntry(IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(currentShard.index()).getSettings()) - , currentShard); - addToCurrentBatch.put(currentShard.shardId(), shardBatchEntry); - batchSize--; - iterator.remove(); - } - // add to batch if batch size full or last shard in unassigned list - if (batchSize == 0 || iterator.hasNext() == false) { - String batchUUId = UUIDs.base64UUID(); - - ShardsBatch shardsBatch = new ShardsBatch(batchUUId, addToCurrentBatch, primary); - // add the batch to list of current batches - addBatch(shardsBatch, primary); - addShardsIdsToLookup(addToCurrentBatch.keySet(), batchUUId, primary); - addToCurrentBatch.clear(); - batchSize = MAX_BATCH_SIZE; - } - } - } - - private void addBatch(ShardsBatch shardsBatch, boolean primary) { - ConcurrentMap batches = primary ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; - if (batches.containsKey(shardsBatch.getBatchId())) { - throw new IllegalStateException("Batch already exists. BatchId = " + shardsBatch.getBatchId()); - } - batches.put(shardsBatch.getBatchId(), shardsBatch); - } - - private void addShardsIdsToLookup(Set shards, String batchId, boolean primary) { - ConcurrentMap lookupMap = primary ? startedShardBatchLookup : storeShardBatchLookup; - shards.forEach(shardId -> { - if(lookupMap.containsKey(shardId)){ - throw new IllegalStateException("Shard is already Batched. ShardId = " + shardId + "Batch Id="+ lookupMap.get(shardId)); - } - lookupMap.put(shardId, batchId); - }); - } - - /** - * Safely remove a shard from the appropriate batch. - * If the shard is not in a batch, this is a no-op. - * Cleans the batch if it is empty after removing the shard. - * This method should be called when removing the shard from the batch instead {@link ShardsBatch#removeFromBatch(ShardRouting)} - * so that we can clean up the batch if it is empty and release the fetching resources - * @param shardRouting shard to be removed - */ - private void safelyRemoveShardFromBatch(ShardRouting shardRouting) { - String batchId = shardRouting.primary() ? startedShardBatchLookup.get(shardRouting.shardId()) : storeShardBatchLookup.get(shardRouting.shardId()); - if (batchId == null) { - return; - } - ConcurrentMap batches = shardRouting.primary() ? batchIdToStartedShardBatch : batchIdToStoreShardBatch; - ShardsBatch batch = batches.get(batchId); - batch.removeFromBatch(shardRouting); - // remove the batch if it is empty - if (batch.getBatchedShards().isEmpty()) { - Releasables.close(batch.getAsyncFetcher()); - batches.remove(batchId); - } - } - // allow for testing infra to change shard allocators implementation protected static void innerAllocatedUnassigned( RoutingAllocation allocation, @@ -358,13 +216,7 @@ private void ensureAsyncFetchStorePrimaryRecency(RoutingAllocation allocation) { Sets.difference(newEphemeralIds, lastSeenEphemeralIds) ) ); - asyncFetchStore.values().forEach(fetch -> clearCacheForPrimary(fetch, allocation)); - storeShardBatchLookup.values().forEach(batch -> - clearCacheForBatchPrimary(batchIdToStoreShardBatch.get(batch), allocation) - ); - - // recalc to also (lazily) clear out old nodes. this.lastSeenEphemeralIds = newEphemeralIds; } @@ -380,18 +232,6 @@ private static void clearCacheForPrimary( } } - private static void clearCacheForBatchPrimary( - ShardsBatch batch, - RoutingAllocation allocation - ) { - List primaries = batch.getBatchedShards().stream() - .map(allocation.routingNodes()::activePrimary) - .filter(Objects::nonNull) - .collect(Collectors.toList()); - AsyncBatchShardFetch fetch = batch.getAsyncFetcher(); - primaries.forEach(node -> fetch.clearCacheForNode(node.currentNodeId())); - } - private boolean hasNewNodes(DiscoveryNodes nodes) { for (final DiscoveryNode node : nodes.getDataNodes().values()) { if (lastSeenEphemeralIds.contains(node.getEphemeralId()) == false) { @@ -428,32 +268,6 @@ protected void reroute(ShardId shardId, String reason) { } } - class InternalBatchAsyncFetch extends AsyncBatchShardFetch { - - InternalBatchAsyncFetch(Logger logger, - String type, - Map map, - AsyncBatchShardFetch.Lister, T> action, - String batchUUId - ) { - super(logger, type, map, action, batchUUId); - } - - @Override - protected void reroute(String batchUUId, String reason) { - logger.trace("{} scheduling reroute for {}", batchUUId, reason); - assert rerouteService != null; - rerouteService.reroute( - "async_shard_fetch", - Priority.HIGH, - ActionListener.wrap( - r -> logger.trace("{} scheduled reroute completed for {}", batchUUId, reason), - e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", batchUUId, reason), e) - ) - ); - } - } - class InternalPrimaryShardAllocator extends PrimaryShardAllocator { private final TransportNodesListGatewayStartedShards startedAction; @@ -489,59 +303,6 @@ protected AsyncShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation) { - ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; - shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; - if (shardRouting == null) { - return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - - String batchId = startedShardBatchLookup.getOrDefault(shardRouting.shardId(), null); - if (batchId == null) { - logger.debug("Shard {} has no batch id", shardRouting); - throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); - } - - - if (batchIdToStartedShardBatch.containsKey(batchId) == false) { - logger.debug("Batch {} has no started shard batch", batchId); - throw new IllegalStateException("Batch " + batchId + " has no started shard batch"); - } - - ShardsBatch shardsBatch = batchIdToStartedShardBatch.get(batchId); - // remove in eligible shards which allocator is not responsible for - inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); - - if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { - logger.debug("Batch {} is empty", batchId); - return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - - Map> shardToIgnoreNodes = new HashMap<>(); - - for(ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()){ - shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); - } - AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); - AsyncBatchShardFetch.FetchResult shardBatchState = asyncFetcher.fetchData( - allocation.nodes(), - shardToIgnoreNodes - ); - - if (shardBatchState.hasData()) { - shardBatchState.processAllocation(allocation); - } - return (AsyncBatchShardFetch.FetchResult) shardBatchState; - } - - } - class InternalReplicaShardAllocator extends ReplicaShardAllocator { private final TransportNodesListShardStoreMetadata storeAction; @@ -580,180 +341,4 @@ protected boolean hasInitiatedFetching(ShardRouting shard) { return asyncFetchStore.get(shard.shardId()) != null; } } - - class InternalReplicaBatchShardAllocator extends ReplicaShardBatchAllocator { - - @Override - @SuppressWarnings("unchecked") - protected AsyncBatchShardFetch.FetchResult fetchData(Set shardsEligibleForFetch, - Set inEligibleShards, - RoutingAllocation allocation) { - // get batch id for anyone given shard. We are assuming all shards will have same batch Id - ShardRouting shardRouting = shardsEligibleForFetch.iterator().hasNext() ? shardsEligibleForFetch.iterator().next() : null; - shardRouting = shardRouting == null && inEligibleShards.iterator().hasNext() ? inEligibleShards.iterator().next() : shardRouting; - if (shardRouting == null) { - return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - - String batchId = storeShardBatchLookup.getOrDefault(shardRouting.shardId(), null); - if (batchId == null) { - logger.debug("Shard {} has no batch id", shardRouting); - throw new IllegalStateException("Shard " + shardRouting + " has no batch id. Shard should batched before fetching"); - } - - if (batchIdToStoreShardBatch.containsKey(batchId) == false) { - logger.debug("Batch {} has no store shard batch", batchId); - throw new IllegalStateException("Batch " + batchId + " has no shard store batch"); - } - - ShardsBatch shardsBatch = batchIdToStoreShardBatch.get(batchId); - // remove in eligible shards which allocator is not responsible for - inEligibleShards.forEach(GatewayAllocator.this::safelyRemoveShardFromBatch); - - if (shardsBatch.getBatchedShards().isEmpty() && shardsEligibleForFetch.isEmpty()) { - logger.debug("Batch {} is empty", batchId); - return new AsyncBatchShardFetch.FetchResult<>(null, Collections.emptyMap()); - } - Map> shardToIgnoreNodes = new HashMap<>(); - for (ShardId shardId : shardsBatch.asyncBatch.shardsToCustomDataPathMap.keySet()) { - shardToIgnoreNodes.put(shardId, allocation.getIgnoreNodes(shardId)); - } - AsyncBatchShardFetch asyncFetcher = shardsBatch.getAsyncFetcher(); - AsyncBatchShardFetch.FetchResult shardBatchStores = asyncFetcher.fetchData( - allocation.nodes(), - shardToIgnoreNodes - ); - if (shardBatchStores.hasData()) { - shardBatchStores.processAllocation(allocation); - } - return (AsyncBatchShardFetch.FetchResult) shardBatchStores; - } - - @Override - protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { - return null; - } - - @Override - protected boolean hasInitiatedFetching(ShardRouting shard) { - return false; - } - } - - /** - * Holds information about a batch of shards to be allocated. - * Async fetcher is used to fetch the data for the batch. - */ - private class ShardsBatch { - private final String batchId; - boolean primary; - - private final AsyncBatchShardFetch asyncBatch; - - private final Map batchInfo; - - public ShardsBatch(String batchId, Map shardsWithInfo, boolean primary) { - this.batchId = batchId; - this.batchInfo = new HashMap<>(shardsWithInfo); - // create a ShardId -> customDataPath map for async fetch - Map shardIdsMap = batchInfo.entrySet().stream().collect(Collectors.toMap( - Map.Entry::getKey, - entry -> entry.getValue().getCustomDataPath() - )); - this.primary = primary; - if (primary) { - asyncBatch = new InternalBatchAsyncFetch<>( - logger, - "batch_shards_started", - shardIdsMap, - batchStartedAction, - batchId); - } else { - asyncBatch = new InternalBatchAsyncFetch<>( - logger, - "batch_shards_started", - shardIdsMap, - batchStoreAction, - batchId); - - } - } - - public void removeFromBatch(ShardRouting shard) { - - batchInfo.remove(shard.shardId()); - asyncBatch.shardsToCustomDataPathMap.remove(shard.shardId()); - assert shard.primary() == primary : "Illegal call to delete shard from batch"; - // remove from lookup - if (this.primary) { - startedShardBatchLookup.remove(shard.shardId()); - } else { - storeShardBatchLookup.remove(shard.shardId()); - } - // assert that fetcher and shards are the same as batched shards - assert batchInfo.size() == asyncBatch.shardsToCustomDataPathMap.size() : "Shards size is not equal to fetcher size"; - } - - Set getBatchedShardRoutings() { - return batchInfo.values().stream().map(ShardBatchEntry::getShardRouting).collect(Collectors.toSet()); - } - - Set getBatchedShards() { - return batchInfo.keySet(); - } - - public String getBatchId() { - return batchId; - } - - AsyncBatchShardFetch getAsyncFetcher() { - return asyncBatch; - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || o instanceof ShardsBatch == false) { - return false; - } - ShardsBatch shardsBatch = (ShardsBatch) o; - return batchId.equals(shardsBatch.getBatchId()) && batchInfo.keySet().equals(shardsBatch.getBatchedShards()); - } - - @Override - public int hashCode() { - return Objects.hash(batchId); - } - - @Override - public String toString() { - return "batchId: " + batchId; - } - - } - - /** - * Holds information about a shard to be allocated in a batch. - */ - private class ShardBatchEntry { - - private final String customDataPath; - private final ShardRouting shardRouting; - - public ShardBatchEntry(String customDataPath, ShardRouting shardRouting) { - this.customDataPath = customDataPath; - this.shardRouting = shardRouting; - } - - public ShardRouting getShardRouting() { - return shardRouting; - } - - public String getCustomDataPath() { - return customDataPath; - } - } - }