Skip to content

Commit

Permalink
fix disk usage exceed threshold cluster can not spin up issue
Browse files Browse the repository at this point in the history
Signed-off-by: zane-neo <zaniu@amazon.com>
  • Loading branch information
zane-neo committed Aug 16, 2024
1 parent b39adc6 commit 3e8df68
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 56 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix array_index_out_of_bounds_exception when indexing documents with field name containing only dot ([#15126](https://github.com/opensearch-project/OpenSearch/pull/15126))
- Fixed array field name omission in flat_object function for nested JSON ([#13620](https://github.com/opensearch-project/OpenSearch/pull/13620))
- Fix range aggregation optimization ignoring top level queries ([#15194](https://github.com/opensearch-project/OpenSearch/pull/15194))
- Fix disk usage exceeds threshold cluster can't spin up issue ([#15258](https://github.com/opensearch-project/OpenSearch/pull/15258)))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {

private final Set<ClusterBlock> global;

private Map<String, Set<ClusterBlock>> indicesBlocks;
private final Map<String, Set<ClusterBlock>> indicesBlocks;

private EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;
private final EnumMap<ClusterBlockLevel, ImmutableLevelHolder> levelHolders;

ClusterBlocks(Set<ClusterBlock> global, final Map<String, Set<ClusterBlock>> indicesBlocks) {
this.global = global;
Expand Down Expand Up @@ -161,24 +161,6 @@ public boolean hasIndexBlock(String index, ClusterBlock block) {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}

public void removeIndexBlock(String index, ClusterBlock block) {
Map<String, Set<ClusterBlock>> newIndicesBlocks = new HashMap<>(indicesBlocks); // copy to avoid UnsupportedOperationException>
for (Map.Entry<String, Set<ClusterBlock>> entry : indicesBlocks.entrySet()) {
String indexName = entry.getKey();
Set<ClusterBlock> clusterBlockSet = new HashSet<>(entry.getValue());
if (indexName.equals(index)) {
clusterBlockSet.remove(block);
if (clusterBlockSet.isEmpty()) {
newIndicesBlocks.remove(indexName);
} else {
newIndicesBlocks.put(indexName, Collections.unmodifiableSet(clusterBlockSet));
}
}
}
this.indicesBlocks = Collections.unmodifiableMap(newIndicesBlocks);
this.levelHolders = generateLevelHolders(global, indicesBlocks);
}

public boolean hasIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.DiskUsage;
import org.opensearch.cluster.block.ClusterBlock;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -50,8 +50,8 @@
import org.opensearch.cluster.routing.RoutingNodes;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.set.Sets;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -82,6 +82,7 @@ public class DiskThresholdMonitor {
private static final Logger logger = LogManager.getLogger(DiskThresholdMonitor.class);
private final DiskThresholdSettings diskThresholdSettings;
private final Client client;
private final ClusterService clusterService;
private final Supplier<ClusterState> clusterStateSupplier;
private final LongSupplier currentTimeMillisSupplier;
private final RerouteService rerouteService;
Expand All @@ -107,17 +108,16 @@ public class DiskThresholdMonitor {
private final Set<String> nodesOverHighThresholdAndRelocating = Sets.newConcurrentHashSet();

public DiskThresholdMonitor(
Settings settings,
Supplier<ClusterState> clusterStateSupplier,
ClusterSettings clusterSettings,
ClusterService clusterService,
Client client,
LongSupplier currentTimeMillisSupplier,
RerouteService rerouteService
) {
this.clusterStateSupplier = clusterStateSupplier;
this.clusterService = clusterService;
this.clusterStateSupplier = clusterService::state;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.rerouteService = rerouteService;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.diskThresholdSettings = new DiskThresholdSettings(clusterService.getSettings(), clusterService.getClusterSettings());
this.client = client;
}

Expand Down Expand Up @@ -158,6 +158,7 @@ public void onNewInfo(ClusterInfo info) {
final Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
Set<String> indicesNotToAutoRelease = new HashSet<>();
final Set<String> indicesToRemoveReadOnly = new HashSet<>();
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesNotToAutoRelease);

final List<DiskUsage> usagesOverHighThreshold = new ArrayList<>();
Expand Down Expand Up @@ -255,8 +256,10 @@ public void onNewInfo(ClusterInfo info) {
}

} else {
for (Map.Entry<String, Set<ClusterBlock>> indexBlockEntry : state.blocks().indices().entrySet()) {
state.blocks().removeIndexBlock(indexBlockEntry.getKey(), IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
indicesToRemoveReadOnly.add(routing.getIndexName());
}
}

nodesOverHighThresholdAndRelocating.remove(node);
Expand Down Expand Up @@ -290,6 +293,29 @@ public void onNewInfo(ClusterInfo info) {
}
}

// remove read-only blocks for indices.
if (!indicesToRemoveReadOnly.isEmpty()) {
clusterService.submitStateUpdateTask("disk-usage-recovered", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder clusterBlocksBuilder = ClusterBlocks.builder().blocks(state.blocks());
for (String index : indicesToRemoveReadOnly) {
clusterBlocksBuilder.removeIndexBlock(index, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
}
final Metadata metadata = Metadata.builder()
.clusterUUID(state.metadata().clusterUUID())
.coordinationMetadata(state.metadata().coordinationMetadata())
.build();
return ClusterState.builder(state).metadata(metadata).blocks(clusterBlocksBuilder.build()).build();
}

@Override
public void onFailure(String source, Exception e) {
logger.error("failed to update cluster state for disk usage recovery task", e);
}
});
}

final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 4);

if (reroute) {
Expand Down
4 changes: 1 addition & 3 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,7 @@ protected Node(
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
settings,
clusterService::state,
clusterService.getClusterSettings(),
clusterService,
client,
threadPool::relativeTimeInMillis,
rerouteService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Priority;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
Expand All @@ -72,6 +75,8 @@
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class DiskThresholdMonitorTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -116,9 +121,7 @@ public void testMarkFloodStageIndicesReadOnly() {
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicLong currentTime = new AtomicLong();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
Settings.EMPTY,
() -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, clusterState),
null,
currentTime::get,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -178,9 +181,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));

monitor = new DiskThresholdMonitor(
Settings.EMPTY,
() -> anotherFinalClusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, anotherFinalClusterState),
null,
currentTime::get,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -219,9 +220,7 @@ public void testDoesNotSubmitRerouteTaskTooFrequently() {
AtomicLong currentTime = new AtomicLong();
AtomicReference<ActionListener<ClusterState>> listenerReference = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
Settings.EMPTY,
() -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, clusterState),
null,
currentTime::get,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -360,9 +359,7 @@ public void testAutoReleaseIndices() {
);

DiskThresholdMonitor monitor = new DiskThresholdMonitor(
Settings.EMPTY,
() -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, clusterState),
null,
() -> 0L,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -422,9 +419,7 @@ protected void setIndexCreateBlock(ActionListener<Void> listener, boolean indexC

assertTrue(clusterStateWithBlocks.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
monitor = new DiskThresholdMonitor(
Settings.EMPTY,
() -> clusterStateWithBlocks,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, clusterStateWithBlocks),
null,
() -> 0L,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -539,9 +534,7 @@ public long getAsLong() {
final AtomicLong relocatingShardSizeRef = new AtomicLong();

DiskThresholdMonitor monitor = new DiskThresholdMonitor(
Settings.EMPTY,
clusterStateRef::get,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(Settings.EMPTY, clusterState),
null,
timeSupplier,
(reason, priority, listener) -> listener.onResponse(clusterStateRef.get())
Expand Down Expand Up @@ -687,9 +680,7 @@ public void testIndexCreateBlockWhenNoDataNodeHealthy() {
AtomicLong currentTime = new AtomicLong();
Settings settings = Settings.builder().build();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(settings, clusterState),
null,
currentTime::get,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -766,9 +757,7 @@ public void testIndexCreateBlockRemovedOnlyWhenAnyNodeAboveHighWatermark() {
AtomicLong currentTime = new AtomicLong();
Settings settings = Settings.builder().put(CLUSTER_CREATE_INDEX_BLOCK_AUTO_RELEASE.getKey(), true).build();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(
settings,
() -> clusterState,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
createClusterService(settings, clusterState),
null,
currentTime::get,
(reason, priority, listener) -> {
Expand Down Expand Up @@ -905,4 +894,10 @@ private static ClusterInfo clusterInfo(
return new ClusterInfo(diskUsages, null, null, null, reservedSpace, Map.of());
}

private static ClusterService createClusterService(Settings settings, ClusterState clusterState) {
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterApplierService.state()).thenReturn(clusterState);
return new ClusterService(settings, clusterSettings, mock(ClusterManagerService.class), clusterApplierService);
}
}

0 comments on commit 3e8df68

Please sign in to comment.