Skip to content

Commit

Permalink
Upload incremental cluster state on master re-election
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Aug 20, 2024
1 parent f94a99e commit 5979358
Show file tree
Hide file tree
Showing 11 changed files with 480 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import org.opensearch.cluster.coordination.PersistedStateRegistry;
import org.opensearch.gateway.GatewayMetaState;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY;
Expand Down Expand Up @@ -152,4 +155,30 @@ private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, St
}).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));
}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
prepareCluster(3, 2, INDEX_NAME, 1, 1);
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest();
// force elected master to step down
internalCluster().stopCurrentClusterManagerNode();
ensureStableCluster(4);

persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
GatewayMetaState.RemotePersistedState persistedStateAfterElection = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();

// coordination metadata is updated, it will be unequal
assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata());
// all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected
assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID());
assertEquals(manifest.getIndices(), manifestAfterElection.getIndices());
assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata());
assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata());
assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap());
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
verifyRestoredData(indexStats, indexName, true);
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

public void prepareCluster(
int numClusterManagerNodes,
int numDataOnlyNodes,
String indices,
int replicaCount,
int shardCount,
Settings settings
) {
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
}

protected void prepareCluster(
Expand All @@ -370,11 +364,16 @@ protected void prepareCluster(
int shardCount,
Settings settings
) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.GatewayMetaState;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -459,6 +460,16 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
clusterState.term()
);
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
// if Remote publication is enabled, we only need to update the accepted state in followers node as elected cluster manager would
// have already updated the last accepted state
if (isRemotePublicationEnabled
&& publishRequest.getManifest() != null
&& localNode.isClusterManagerNode()
&& clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.setLastAcceptedManifest(publishRequest.getManifest());
}
assert getLastAcceptedState() == clusterState;

return new PublishResponse(clusterState.term(), clusterState.version());
Expand Down Expand Up @@ -571,6 +582,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (isRemotePublicationEnabled && localNode.isClusterManagerNode()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
}

Expand Down Expand Up @@ -661,14 +675,7 @@ public interface PersistedState extends Closeable {
*/
default void markLastAcceptedStateAsCommitted() {
final ClusterState lastAcceptedState = getLastAcceptedState();
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
// the cluster uuid might not been known yet.
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
Expand All @@ -693,6 +700,18 @@ default void markLastAcceptedStateAsCommitted() {
}
}

default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
Metadata.Builder metadataBuilder = null;
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
.build();
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
return metadataBuilder;
}

default void close() throws IOException {}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
fullClusterStateReceivedCount.incrementAndGet();
logger.debug("received full cluster state version [{}] with size [{}]", incomingState.version(), request.bytes().length());
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.set(incomingState);
return response;
} else {
Expand Down Expand Up @@ -220,7 +220,7 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
incomingState.stateUUID(),
request.bytes().length()
);
final PublishWithJoinResponse response = acceptState(incomingState);
final PublishWithJoinResponse response = acceptState(incomingState, null);
lastSeenClusterState.compareAndSet(lastSeen, incomingState);
return response;
}
Expand Down Expand Up @@ -270,7 +270,7 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
true
);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.set(clusterState);
return response;
} else {
Expand All @@ -289,13 +289,13 @@ PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest
transportService.getLocalNode().getId()
);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
final PublishWithJoinResponse response = acceptState(clusterState, manifest);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
private PublishWithJoinResponse acceptState(ClusterState incomingState, ClusterMetadataManifest manifest) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
final PublishRequest publishRequest = currentPublishRequestToSelf.get();
Expand All @@ -305,7 +305,7 @@ private PublishWithJoinResponse acceptState(ClusterState incomingState) {
return handlePublishRequest.apply(publishRequest);
}
}
return handlePublishRequest.apply(new PublishRequest(incomingState));
return handlePublishRequest.apply(new PublishRequest(incomingState, manifest));
}

private PublishWithJoinResponse acceptRemoteStateOnLocalNode(RemotePublishRequest remotePublishRequest) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.ClusterState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.util.Objects;

Expand All @@ -44,15 +45,26 @@
public class PublishRequest {

private final ClusterState acceptedState;
private final ClusterMetadataManifest manifest;

public PublishRequest(ClusterState acceptedState) {
this.acceptedState = acceptedState;
this.manifest = null;
}

public PublishRequest(ClusterState acceptedState, ClusterMetadataManifest manifest) {
this.acceptedState = acceptedState;
this.manifest = manifest;
}

public ClusterState getAcceptedState() {
return acceptedState;
}

public ClusterMetadataManifest getManifest() {
return manifest;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -70,6 +82,13 @@ public int hashCode() {

@Override
public String toString() {
return "PublishRequest{term=" + acceptedState.term() + ", version=" + acceptedState.version() + ", state=" + acceptedState + '}';
return "PublishRequest{term="
+ acceptedState.term()
+ ", version="
+ acceptedState.version()
+ ", state="
+ acceptedState
+ (manifest != null ? ", manifest=" + manifest : "")
+ '}';
}
}
38 changes: 29 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,17 @@ public String getLastUploadedManifestFile() {
return lastUploadedManifestFile;
}

public ClusterMetadataManifest getLastAcceptedManifest() {
return lastAcceptedManifest;
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
// for non leader node, update the lastAcceptedClusterState
if (clusterState.getNodes().isLocalNodeElectedClusterManager() == false) {
lastAcceptedState = clusterState;
return;
}
try {
final RemoteClusterStateManifestInfo manifestDetails;
if (shouldWriteFullClusterState(clusterState)) {
Expand Down Expand Up @@ -730,7 +739,7 @@ assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) ==
}
assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(), clusterState) == true
: "Manifest and ClusterState are not in sync";
lastAcceptedManifest = manifestDetails.getClusterMetadataManifest();
setLastAcceptedManifest(manifestDetails.getClusterMetadataManifest());
lastAcceptedState = clusterState;
lastUploadedManifestFile = manifestDetails.getManifestFileName();
} catch (Exception e) {
Expand All @@ -739,6 +748,10 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}
}

public void setLastAcceptedManifest(ClusterMetadataManifest manifest) {
this.lastAcceptedManifest = manifest;
}

@Override
public PersistedStateStats getStats() {
return remoteClusterStateService.getStats();
Expand All @@ -761,7 +774,6 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
Expand All @@ -774,19 +786,27 @@ public void markLastAcceptedStateAsCommitted() {
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
ClusterState clusterState = lastAcceptedState;
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
if (lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
&& lastAcceptedState.metadata().clusterUUIDCommitted() == false) {
Metadata.Builder metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
if (metadataBuilder == null) {
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
}
metadataBuilder.clusterUUIDCommitted(true);
clusterState = ClusterState.builder(lastAcceptedState).metadata(metadataBuilder).build();
}
final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
lastAcceptedManifest
);
lastAcceptedManifest = committedManifestDetails.getClusterMetadataManifest();
if (clusterState.getNodes().isLocalNodeElectedClusterManager()) {
final RemoteClusterStateManifestInfo committedManifestDetails = remoteClusterStateService.markLastStateAsCommitted(
clusterState,
lastAcceptedManifest
);
assert committedManifestDetails != null;
setLastAcceptedManifest(committedManifestDetails.getClusterMetadataManifest());
lastUploadedManifestFile = committedManifestDetails.getManifestFileName();
} else {
setLastAcceptedManifest(ClusterMetadataManifest.builder(lastAcceptedManifest).committed(true).build());
}
lastAcceptedState = clusterState;
lastUploadedManifestFile = committedManifestDetails.getManifestFileName();
} catch (Exception e) {
handleExceptionOnWrite(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
assert previousClusterState.metadata().coordinationMetadata().term() == clusterState.metadata().coordinationMetadata().term();

boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

Expand Down
Loading

0 comments on commit 5979358

Please sign in to comment.