Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 committed Aug 30, 2024
1 parent a54e283 commit f02f1d4
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,6 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() {
assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class));
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN)
.add(getClusterState().metadata().clusterUUID())
.add(subDirectory);
return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> {
logger.info(fileName);
return fileName.split(DELIMITER)[0];
}).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));
}

public void testMasterReElectionUsesIncrementalUpload() throws IOException {
prepareCluster(3, 2, INDEX_NAME, 1, 1);
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
Expand All @@ -199,4 +183,20 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException {
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
}

private Map<String, Integer> getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException {
BlobPath metadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN)
.add(getClusterState().metadata().clusterUUID())
.add(subDirectory);
return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> {
logger.info(fileName);
return fileName.split(DELIMITER)[0];
}).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.ClusterMetadataManifest;

import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -465,8 +465,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
// have already updated the last accepted state
if (shouldUpdateRemotePersistedState(publishRequest)) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState);
((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
.setLastAcceptedManifest(publishRequest.getManifest());
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(publishRequest.getManifest());
}
assert getLastAcceptedState() == clusterState;

Expand Down Expand Up @@ -580,7 +579,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
);

persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
if (isRemotePublicationEnabled && localNode.isClusterManagerNode()) {
if (shouldCommitRemotePersistedState()) {
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
}
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
Expand Down Expand Up @@ -636,6 +635,16 @@ private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest)
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
}

private boolean shouldCommitRemotePersistedState() {
return isRemotePublicationEnabled
&& localNode.isClusterManagerNode()
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
.getLastAcceptedState()
.getNodes()
.isLocalNodeElectedClusterManager() == false
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null;
}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down Expand Up @@ -673,6 +682,22 @@ public interface PersistedState extends Closeable {
*/
PersistedStateStats getStats();

/**
* Returns the last accepted {@link ClusterMetadataManifest}.
*
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
* has been accepted yet.
*/
default ClusterMetadataManifest getLastAcceptedManifest() {
// return null by default, this method needs to be overridden wherever required
return null;
}

/**
* Sets the last accepted {@link ClusterMetadataManifest}.
*/
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}

/**
* Marks the last accepted cluster state as committed.
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ public boolean equals(Object o) {

PublishRequest that = (PublishRequest) o;

if (!Objects.equals(manifest, that.manifest)) return false;

return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version();
}

@Override
public int hashCode() {
return Objects.hash(acceptedState.term(), acceptedState.version());
return Objects.hash(acceptedState.term(), acceptedState.version(), manifest);
}

@Override
Expand Down
21 changes: 16 additions & 5 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
import java.util.function.UnaryOperator;

import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemotePublicationEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -697,6 +698,7 @@ public String getLastUploadedManifestFile() {
return lastUploadedManifestFile;
}

@Override
public ClusterMetadataManifest getLastAcceptedManifest() {
return lastAcceptedManifest;
}
Expand Down Expand Up @@ -748,6 +750,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest(
}
}

@Override
public void setLastAcceptedManifest(ClusterMetadataManifest manifest) {
this.lastAcceptedManifest = manifest;
}
Expand All @@ -772,12 +775,20 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
}

private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
if (isRemotePublicationEnabled(clusterState.metadata().settings())) {
// If Remote Publication is enabled, we just need to ensure that we have the state/manifest available.
// As in case of publication enabled, we add the lastAcceptedState as part of publication quorum votes.
return lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT;
} else {
// in case Remote Publication is disabled, we still need to upload the full state on every term update
// As we can't guarantee that nodes will have the latest state as accepted state
return lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.term() != clusterState.term()
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT;
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,10 @@ public static boolean isRemoteRoutingTableEnabled(Settings settings) {
return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings);
}

public static boolean isRemotePublicationEnabled(Settings settings) {
return isRemoteRoutingTableEnabled(settings);
}

public RepositoriesMetadata getRepositoriesMetadata() {
return this.repositoriesMetadata;
}
Expand Down

0 comments on commit f02f1d4

Please sign in to comment.