Skip to content

Commit

Permalink
Modified NodeShardStates to only have getter method
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
  • Loading branch information
shiv0408 committed Sep 20, 2023
1 parent 26ae860 commit 90a03a0
Showing 1 changed file with 36 additions and 73 deletions.
109 changes: 36 additions & 73 deletions server/src/main/java/org/opensearch/gateway/PrimaryShardAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public AllocateUnassignedDecision makeAllocationDecision(
private static NodeShardStates adaptToNodeShardStates(FetchResult<NodeGatewayStartedShards> shardsState) {
NodeShardStates nodeShardStates = new NodeShardStates();
shardsState.getData().forEach((node, nodeGatewayStartedShard) -> {
nodeShardStates.add(
nodeShardStates.getNodeShardStates().add(
new NodeShardState(
node,
nodeGatewayStartedShard.allocationId(),
Expand Down Expand Up @@ -179,12 +179,12 @@ protected AllocateUnassignedDecision getAllocationDecision(
shardState,
logger
);
final boolean enoughAllocationsFound = nodeShardsResult.orderedAllocationCandidates.size() > 0;
final boolean enoughAllocationsFound = !nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().isEmpty();
logger.debug(
"[{}][{}]: found {} allocation candidates of {} based on allocation ids: [{}]",
unassignedShard.index(),
unassignedShard.id(),
nodeShardsResult.orderedAllocationCandidates.size(),
nodeShardsResult.orderedAllocationCandidates.getNodeShardStates().size(),
unassignedShard,
inSyncAllocationIds
);
Expand Down Expand Up @@ -323,15 +323,15 @@ private static List<NodeAllocationResult> buildNodeDecisions(
})
.collect(Collectors.toList())
);
fetchedShardData.iterator().forEachRemaining(shardData -> {
if (discoNodes.contains(shardData.getNode()) == false) {
ineligibleShards.add(shardData);
}
});
ineligibleShards = fetchedShardData
.getNodeShardStates()
.stream()
.filter(shardData -> discoNodes.contains(shardData.getNode()) == false)
.collect(Collectors.toList());
} else {
// there were no shard copies that were eligible for being assigned the allocation,
// so all fetched shard data are ineligible shards
fetchedShardData.iterator().forEachRemaining(ineligibleShards::add);
ineligibleShards = fetchedShardData.getNodeShardStates();
}

nodeResults.addAll(
Expand Down Expand Up @@ -374,9 +374,7 @@ protected NodeShardsResult buildNodeShardsResult(
) {
NodeShardStates nodeShardStates = new NodeShardStates();
int numberOfAllocationsFound = 0;
Iterator<NodeShardState> iterator = shardState.iterator();
while (iterator.hasNext()) {
NodeShardState nodeShardState = iterator.next();
for (NodeShardState nodeShardState : shardState.getNodeShardStates()) {
DiscoveryNode node = nodeShardState.getNode();
String allocationId = nodeShardState.allocationId();

Expand All @@ -394,48 +392,48 @@ protected NodeShardsResult buildNodeShardsResult(
final String finalAllocationId = allocationId;
if (nodeShardState.storeException() instanceof ShardLockObtainFailedException) {
logger.trace(
() -> new ParameterizedMessage(
"[{}] on node [{}] has allocation id [{}] but the store can not be "
+ "opened as it's locked, treating as valid shard",
shard,
nodeShardState.getNode(),
finalAllocationId
),
nodeShardState.storeException()
() -> new ParameterizedMessage(
"[{}] on node [{}] has allocation id [{}] but the store can not be "
+ "opened as it's locked, treating as valid shard",
shard,
nodeShardState.getNode(),
finalAllocationId
),
nodeShardState.storeException()
);
} else {
logger.trace(
() -> new ParameterizedMessage(
"[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id",
shard,
nodeShardState.getNode(),
finalAllocationId
),
nodeShardState.storeException()
() -> new ParameterizedMessage(
"[{}] on node [{}] has allocation id [{}] but the store can not be " + "opened, treating as no allocation id",
shard,
nodeShardState.getNode(),
finalAllocationId
),
nodeShardState.storeException()
);
allocationId = null;
}
}

if (allocationId != null) {
assert nodeShardState.storeException() == null || nodeShardState.storeException() instanceof ShardLockObtainFailedException
: "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a "
: "only allow store that can be opened or that throws a ShardLockObtainFailedException while being opened but got a "
+ "store throwing "
+ nodeShardState.storeException();
numberOfAllocationsFound++;
if (matchAnyShard || inSyncAllocationIds.contains(nodeShardState.allocationId())) {
nodeShardStates.add(nodeShardState);
nodeShardStates.getNodeShardStates().add(nodeShardState);
}
}
}

nodeShardStates.sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds));
nodeShardStates.getNodeShardStates().sort(createActiveShardComparator(matchAnyShard, inSyncAllocationIds));

if (logger.isTraceEnabled()) {
logger.trace(
"{} candidates for allocation: {}",
shard,
nodeShardStates.stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))
nodeShardStates.getNodeShardStates().stream().map(s -> s.getNode().getName()).collect(Collectors.joining(", "))
);
}
return new NodeShardsResult(nodeShardStates, numberOfAllocationsFound);
Expand Down Expand Up @@ -477,17 +475,15 @@ protected static NodesToAllocate buildNodesToAllocate(
List<DecidedNode> yesNodeShards = new ArrayList<>();
List<DecidedNode> throttledNodeShards = new ArrayList<>();
List<DecidedNode> noNodeShards = new ArrayList<>();
Iterator<NodeShardState> iterator = nodeShardStates.iterator();
while (iterator.hasNext()) {
NodeShardState nodeShardState = iterator.next();
for (NodeShardState nodeShardState : nodeShardStates.getNodeShardStates()) {
RoutingNode node = allocation.routingNodes().node(nodeShardState.getNode().getId());
if (node == null) {
continue;
}

Decision decision = forceAllocate
? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation)
: allocation.deciders().canAllocate(shardRouting, node, allocation);
? allocation.deciders().canForceAllocatePrimary(shardRouting, node, allocation)
: allocation.deciders().canAllocate(shardRouting, node, allocation);
DecidedNode decidedNode = new DecidedNode(nodeShardState, decision);
if (decision.type() == Type.THROTTLE) {
throttledNodeShards.add(decidedNode);
Expand Down Expand Up @@ -647,7 +643,7 @@ public DiscoveryNode getNode() {
* @see TreeMap
*/
protected static class NodeShardStates {
// TreeMap to store NodeShardState and DiscoveryNode pairs
// List of entries to store NodeShardState
private final List<NodeShardState> nodeShardStates;

/**
Expand All @@ -658,43 +654,10 @@ public NodeShardStates() {
}

/**
* Adds a new {@link NodeShardState} to the list.
* @param state {@link NodeShardState} node shard state.
*/
public void add(NodeShardState state) {
this.nodeShardStates.add(state);
}

/**
* Returns the number of key-value pairs in the TreeMap.
* @return Number of key-value pairs.
*/
public int size() {
return this.nodeShardStates.size();
}

/**
* Returns an iterator over the {@link NodeShardState} keys in the TreeMap.
* @return Iterator over the keys.
*/
public Iterator<NodeShardState> iterator() {
return this.nodeShardStates.iterator();
}

/**
** Returns a stream of the {@link NodeShardState} keys in the TreeMap.
* @return Stream of the keys.
*/
public Stream<NodeShardState> stream() {
return this.nodeShardStates.stream();
}

/**
* Sorts the NodeShardStates based on the provided Comparator.
* @param comparator The Comparator to use.
* Returns the list of {@link NodeShardState}.
*/
public void sort(Comparator<NodeShardState> comparator) {
this.nodeShardStates.sort(comparator);
public List<NodeShardState> getNodeShardStates() {
return this.nodeShardStates;
}
}
}

0 comments on commit 90a03a0

Please sign in to comment.