Skip to content

Commit

Permalink
moved NodeShardState and NodeShardStates to PSA
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
  • Loading branch information
shiv0408 committed Sep 10, 2023
1 parent f829907 commit a54547f
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
Expand All @@ -44,13 +43,9 @@
import org.opensearch.cluster.routing.allocation.NodeAllocationResult;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeMap;

/**
* An abstract class that implements basic functionality for allocating
Expand Down Expand Up @@ -138,70 +133,4 @@ protected static List<NodeAllocationResult> buildDecisionsForAllNodes(ShardRouti
}
return results;
}

protected static class NodeShardState {
private final String allocationId;
private final boolean primary;
private final Exception storeException;
private final ReplicationCheckpoint replicationCheckpoint;
private final DiscoveryNode node;

public NodeShardState(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
this.node = node;
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

public String allocationId() {
return this.allocationId;
}

public boolean primary() {
return this.primary;
}

public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

public Exception storeException() {
return this.storeException;
}

public DiscoveryNode getNode() {
return this.node;
}
}

protected static class NodeShardStates {
TreeMap<NodeShardState, DiscoveryNode> nodeShardStates;

public NodeShardStates(Comparator<NodeShardState> comparator) {
this.nodeShardStates = new TreeMap<>(comparator);
}

public void add(NodeShardState key, DiscoveryNode value) {
this.nodeShardStates.put(key, value);
}

public DiscoveryNode get(NodeShardState key) {
return this.nodeShardStates.get(key);
}

public int size() {
return this.nodeShardStates.size();
}

public Iterator<NodeShardState> iterator() {
return this.nodeShardStates.keySet().iterator();
}
}
}
161 changes: 156 additions & 5 deletions server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.opensearch.env.ShardLockObtainFailedException;
import org.opensearch.gateway.AsyncShardFetch.FetchResult;
import org.opensearch.gateway.TransportNodesListGatewayStartedShards.NodeGatewayStartedShards;
import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -60,6 +61,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -134,11 +136,11 @@ public AllocateUnassignedDecision makeAllocationDecision(
}
return AllocateUnassignedDecision.no(AllocationStatus.FETCHING_SHARD_DATA, nodeDecisions);
}
NodeShardStates nodeShardStates = getNodeShardStates(shardState);
NodeShardStates nodeShardStates = adaptToNodeShardStates(shardState);
return getAllocationDecision(unassignedShard, allocation, nodeShardStates, logger);
}

private static NodeShardStates getNodeShardStates(FetchResult<NodeGatewayStartedShards> shardsState) {
private static NodeShardStates adaptToNodeShardStates(FetchResult<NodeGatewayStartedShards> shardsState) {
NodeShardStates nodeShardStates = new NodeShardStates((o1, o2) -> 1);
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
Expand Down Expand Up @@ -371,7 +373,7 @@ protected NodeShardsResult buildNodeShardsResult(
NodeShardStates shardState,
Logger logger
) {
NodeShardStates nodeShardStates = new NodeShardStates(getComparator(matchAnyShard, inSyncAllocationIds));
NodeShardStates nodeShardStates = new NodeShardStates(createActiveShardComparator(matchAnyShard, inSyncAllocationIds));
int numberOfAllocationsFound = 0;
Iterator<NodeShardState> iterator = shardState.iterator();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -432,13 +434,13 @@ protected NodeShardsResult buildNodeShardsResult(
logger.trace(
"{} candidates for allocation: {}",
shard,
nodeShardStates.nodeShardStates.values().stream().map(DiscoveryNode::getName).collect(Collectors.joining(", "))
nodeShardStates.stream().map(nodeShardStates::get).map(DiscoveryNode::getName).collect(Collectors.joining(", "))
);
}
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
}

protected static Comparator<NodeShardState> getComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
protected static Comparator<NodeShardState> createActiveShardComparator(boolean matchAnyShard, Set<String> inSyncAllocationIds) {
/**
* Orders the active shards copies based on below comparators
* 1. No store exception i.e. shard copy is readable
Expand Down Expand Up @@ -542,4 +544,153 @@ protected DecidedNode(NodeShardState nodeShardState, Decision decision) {
this.decision = decision;
}
}

/**
* The NodeShardState class represents the state of a node shard in a distributed system.
* It includes several key data points about the shard state, such as its allocation ID,
* whether it's a primary shard, any store exception, the replication checkpoint, and the
* DiscoveryNode it belongs to.
* <p>
* This class is designed to be used in conjunction with the {@link NodeShardStates} class, which
* manages multiple NodeShardState instances.
*/
protected static class NodeShardState {
// Allocation ID of the shard
private final String allocationId;
// Whether the shard is primary
private final boolean primary;
// Any store exception associated with the shard
private final Exception storeException;
// The replication checkpoint of the shard
private final ReplicationCheckpoint replicationCheckpoint;
// The DiscoveryNode the shard belongs to
private final DiscoveryNode node;

/**
* Constructs a new NodeShardState with the given parameters.
* @param node The DiscoveryNode the shard belongs to.
* @param allocationId The allocation ID of the shard.
* @param primary Whether the shard is a primary shard.
* @param replicationCheckpoint The replication checkpoint of the shard.
* @param storeException Any store exception associated with the shard.
*/
public NodeShardState(
DiscoveryNode node,
String allocationId,
boolean primary,
ReplicationCheckpoint replicationCheckpoint,
Exception storeException
) {
this.node = node;
this.allocationId = allocationId;
this.primary = primary;
this.replicationCheckpoint = replicationCheckpoint;
this.storeException = storeException;
}

/**
* Returns the allocation ID of the shard.
* @return The allocation ID of the shard.
*/
public String allocationId() {
return this.allocationId;
}

/**
* Returns whether the shard is a primary shard.
* @return True if the shard is a primary shard, false otherwise.
*/
public boolean primary() {
return this.primary;
}

/**
* Returns the replication checkpoint of the shard.
* @return The replication checkpoint of the shard.
*/
public ReplicationCheckpoint replicationCheckpoint() {
return this.replicationCheckpoint;
}

/**
* Returns any store exception associated with the shard.
* @return The store exception associated with the shard, or null if there isn't one.
*/
public Exception storeException() {
return this.storeException;
}

/**
* Returns the DiscoveryNode the shard belongs to.
* @return The DiscoveryNode the shard belongs to.
*/
public DiscoveryNode getNode() {
return this.node;
}
}

/**
* The NodeShardStates class manages pairs of {@link NodeShardState} and {@link DiscoveryNode}.
* It uses a TreeMap to ensure that the entries are sorted based on the natural
* ordering of the {@link NodeShardState} keys, or according to a provided Comparator.
* <p>
* The TreeMap is implemented using a Red-Black tree, which provides efficient
* performance for common operations such as adding, removing, and retrieving
* elements.
* @see TreeMap
*/
protected static class NodeShardStates {
// TreeMap to store NodeShardState and DiscoveryNode pairs
private final TreeMap<NodeShardState, DiscoveryNode> nodeShardStates;

/**
* Constructs a new NodeShardStates with a given Comparator.
* @param comparator Comparator to determine the order of the TreeMap.
*/
public NodeShardStates(Comparator<NodeShardState> comparator) {
this.nodeShardStates = new TreeMap<>(comparator);
}

/**
* Adds a new {@link NodeShardState} and {@link DiscoveryNode} pair to the TreeMap.
* @param key {@link NodeShardState} key.
* @param value {@link DiscoveryNode} value.
*/
public void add(NodeShardState key, DiscoveryNode value) {
this.nodeShardStates.put(key, value);
}

/**
* Retrieves the {@link DiscoveryNode} value associated with a given {@link NodeShardState} key.
* @param key {@link NodeShardState} key.
* @return {@link DiscoveryNode} value associated with the key.
*/
public DiscoveryNode get(NodeShardState key) {
return this.nodeShardStates.get(key);
}

/**
* Returns the number of key-value pairs in the TreeMap.
* @return Number of key-value pairs.
*/
public int size() {
return this.nodeShardStates.size();
}

/**
* Returns an iterator over the {@link NodeShardState} keys in the TreeMap.
* @return Iterator over the keys.
*/
public Iterator<NodeShardState> iterator() {
return this.nodeShardStates.keySet().iterator();
}

/**
** Returns a stream of the {@link NodeShardState} keys in the TreeMap.
* @return Stream of the keys.
*/
public Stream<NodeShardState> stream() {
return this.nodeShardStates.keySet().stream();
}
}
}

0 comments on commit a54547f

Please sign in to comment.