diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java index 683f7584437dd..5310a07c6f7a6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java @@ -157,22 +157,6 @@ public void testRemotePublicationDisableIfRemoteStateDisabled() { assertNull(internalCluster().getCurrentClusterManagerNodeInstance(RemoteClusterStateService.class)); } - private Map getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException { - BlobPath metadataPath = repository.basePath() - .add( - Base64.getUrlEncoder() - .withoutPadding() - .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) - ) - .add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN) - .add(getClusterState().metadata().clusterUUID()) - .add(subDirectory); - return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> { - logger.info(fileName); - return fileName.split(DELIMITER)[0]; - }).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum)); - } - public void testMasterReElectionUsesIncrementalUpload() throws IOException { prepareCluster(3, 2, INDEX_NAME, 1, 1); PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class); @@ -199,4 +183,20 @@ public void testMasterReElectionUsesIncrementalUpload() throws IOException { assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion()); assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting()); } + + private Map getMetadataFiles(BlobStoreRepository repository, String subDirectory) throws IOException { + BlobPath metadataPath = repository.basePath() + .add( + Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8)) + ) + .add(RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN) + .add(getClusterState().metadata().clusterUUID()) + .add(subDirectory); + return repository.blobStore().blobContainer(metadataPath).listBlobs().keySet().stream().map(fileName -> { + logger.info(fileName); + return fileName.split(DELIMITER)[0]; + }).collect(Collectors.toMap(Function.identity(), key -> 1, Integer::sum)); + } } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java index 08685a2c07538..d08f64be0bc0f 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java @@ -41,7 +41,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.io.IOUtils; -import org.opensearch.gateway.GatewayMetaState; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import java.io.Closeable; import java.io.IOException; @@ -465,8 +465,7 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) { // have already updated the last accepted state if (shouldUpdateRemotePersistedState(publishRequest)) { persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(clusterState); - ((GatewayMetaState.RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)) - .setLastAcceptedManifest(publishRequest.getManifest()); + persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(publishRequest.getManifest()); } assert getLastAcceptedState() == clusterState; @@ -580,7 +579,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) { ); persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted(); - if (isRemotePublicationEnabled && localNode.isClusterManagerNode()) { + if (shouldCommitRemotePersistedState()) { persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted(); } assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration()); @@ -636,6 +635,16 @@ private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; } + private boolean shouldCommitRemotePersistedState() { + return isRemotePublicationEnabled + && localNode.isClusterManagerNode() + && persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL) + .getLastAcceptedState() + .getNodes() + .isLocalNodeElectedClusterManager() == false + && persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null; + } + /** * Pluggable persistence layer for {@link CoordinationState}. * @@ -673,6 +682,22 @@ public interface PersistedState extends Closeable { */ PersistedStateStats getStats(); + /** + * Returns the last accepted {@link ClusterMetadataManifest}. + * + * @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest + * has been accepted yet. + */ + default ClusterMetadataManifest getLastAcceptedManifest() { + // return null by default, this method needs to be overridden wherever required + return null; + } + + /** + * Sets the last accepted {@link ClusterMetadataManifest}. + */ + default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {} + /** * Marks the last accepted cluster state as committed. * After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set, diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java index c839484018c2c..0fedcf59c28e5 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublishRequest.java @@ -72,12 +72,14 @@ public boolean equals(Object o) { PublishRequest that = (PublishRequest) o; + if (!Objects.equals(manifest, that.manifest)) return false; + return acceptedState.term() == that.acceptedState.term() && acceptedState.version() == that.acceptedState.version(); } @Override public int hashCode() { - return Objects.hash(acceptedState.term(), acceptedState.version()); + return Objects.hash(acceptedState.term(), acceptedState.version(), manifest); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java index f7cf0ae30b228..227da5ce1190b 100644 --- a/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/opensearch/gateway/GatewayMetaState.java @@ -88,6 +88,7 @@ import java.util.function.UnaryOperator; import static org.opensearch.common.util.concurrent.OpenSearchExecutors.daemonThreadFactory; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemotePublicationEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; /** @@ -697,6 +698,7 @@ public String getLastUploadedManifestFile() { return lastUploadedManifestFile; } + @Override public ClusterMetadataManifest getLastAcceptedManifest() { return lastAcceptedManifest; } @@ -748,6 +750,7 @@ assert verifyManifestAndClusterState(manifestDetails.getClusterMetadataManifest( } } + @Override public void setLastAcceptedManifest(ClusterMetadataManifest manifest) { this.lastAcceptedManifest = manifest; } @@ -772,12 +775,20 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest, } private boolean shouldWriteFullClusterState(ClusterState clusterState) { - if (lastAcceptedState == null - || lastAcceptedManifest == null - || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) { - return true; + if (isRemotePublicationEnabled(clusterState.metadata().settings())) { + // If Remote Publication is enabled, we just need to ensure that we have the state/manifest available. + // As in case of publication enabled, we add the lastAcceptedState as part of publication quorum votes. + return lastAcceptedState == null + || lastAcceptedManifest == null + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT; + } else { + // in case Remote Publication is disabled, we still need to upload the full state on every term update + // As we can't guarantee that nodes will have the latest state as accepted state + return lastAcceptedState == null + || lastAcceptedManifest == null + || lastAcceptedState.term() != clusterState.term() + || lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT; } - return false; } @Override diff --git a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java index a0f745a4270c4..fa503bd4fb6cf 100644 --- a/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java +++ b/server/src/main/java/org/opensearch/node/remotestore/RemoteStoreNodeAttribute.java @@ -205,6 +205,10 @@ public static boolean isRemoteRoutingTableEnabled(Settings settings) { return FeatureFlags.isEnabled(REMOTE_PUBLICATION_EXPERIMENTAL) && isRemoteRoutingTableAttributePresent(settings); } + public static boolean isRemotePublicationEnabled(Settings settings) { + return isRemoteRoutingTableEnabled(settings); + } + public RepositoriesMetadata getRepositoriesMetadata() { return this.repositoriesMetadata; }