Skip to content

Commit

Permalink
draft PR to fix cluster not able to spin up issue when disk usage exc…
Browse files Browse the repository at this point in the history
…eeds threshold

Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Aug 15, 2024
1 parent 01acf1c commit d9096b2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,17 @@ void setUpdateFrequency(TimeValue updateFrequency) {

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeClusterManager() && refreshAndRescheduleRunnable.get() == null) {
logger.trace("elected as cluster-manager, scheduling cluster info update tasks");
executeRefresh(event.state(), "became cluster-manager");

final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable();
refreshAndRescheduleRunnable.set(newRunnable);
threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable);
if (event.localNodeClusterManager()) {
if (!event.state().blocks().indices().isEmpty()) {
executeRefreshImmediately(event.state());
} else if (refreshAndRescheduleRunnable.get() == null) {
logger.trace("elected as cluster-manager, scheduling cluster info update tasks");
executeRefresh(event.state(), "became cluster-manager");

final RefreshAndRescheduleRunnable newRunnable = new RefreshAndRescheduleRunnable();
refreshAndRescheduleRunnable.set(newRunnable);
threadPool.scheduleUnlessShuttingDown(updateFrequency, REFRESH_EXECUTOR, newRunnable);
}
} else if (event.localNodeClusterManager() == false) {
refreshAndRescheduleRunnable.set(null);
return;
Expand Down Expand Up @@ -204,6 +208,14 @@ private void executeRefresh(ClusterState clusterState, String reason) {
}
}

private void executeRefreshImmediately(ClusterState clusterState) {
String reason = "became cluster-manager with indices blocked";
if (!clusterState.nodes().getDataNodes().isEmpty()) {
logger.trace("refreshing cluster info immediately [{}]", reason);
new RefreshRunnable(reason).run();
}
}

@Override
public ClusterInfo getClusterInfo() {
final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {

private final Set<ClusterBlock> global;

private final Map<String, Set<ClusterBlock>> indicesBlocks;
private Map<String, Set<ClusterBlock>> indicesBlocks;

private final EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;
private EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;

ClusterBlocks(Set<ClusterBlock> global, final Map<String, Set<ClusterBlock>> indicesBlocks) {
this.global = global;
Expand Down Expand Up @@ -161,6 +161,24 @@ public boolean hasIndexBlock(String index, ClusterBlock block) {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}

public void removeIndexBlock(String index, ClusterBlock block) {
Map<String, Set<ClusterBlock>> newIndicesBlocks = new HashMap<>(indicesBlocks); // copy to avoid UnsupportedOperationException>
for (Map.Entry<String, Set<ClusterBlock>> entry : indicesBlocks.entrySet()) {
String indexName = entry.getKey();
Set<ClusterBlock> clusterBlockSet = new HashSet<>(entry.getValue());
if (indexName.equals(index)) {
clusterBlockSet.remove(block);
if (clusterBlockSet.isEmpty()) {
newIndicesBlocks.remove(indexName);
} else {
newIndicesBlocks.put(indexName, Collections.unmodifiableSet(clusterBlockSet));
}
}
}
this.indicesBlocks = Collections.unmodifiableMap(newIndicesBlocks);
this.levelHolders = generateLevelHolders(global, indicesBlocks);
}

public boolean hasIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.DiskUsage;
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.routing.RerouteService;
Expand Down Expand Up @@ -253,6 +255,9 @@ public void onNewInfo(ClusterInfo info) {
}

} else {
for (Map.Entry<String, Set<ClusterBlock>> indexBlockEntry : state.blocks().indices().entrySet()) {
state.blocks().removeIndexBlock(indexBlockEntry.getKey(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}

nodesOverHighThresholdAndRelocating.remove(node);

Expand Down

0 comments on commit d9096b2

Please sign in to comment.