Skip to content

Commit

Permalink
Revert uploading of manifest using min codec version
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Oct 23, 2024
1 parent 66f0110 commit d534f17
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix typo super->sb in method toString() of RemoteStoreNodeAttribute ([#15362](https://github.com/opensearch-project/OpenSearch/pull/15362))
- [Workload Management] Fixing Create/Update QueryGroup TransportActions to execute from non-cluster manager nodes ([16422](https://github.com/opensearch-project/OpenSearch/pull/16422))
- Fix flaky test in `testApproximateRangeWithSizeOverDefault` by adjusting totalHits assertion logic ([#16434](https://github.com/opensearch-project/OpenSearch/pull/16434#pullrequestreview-2386999409))
- Revert changes to upload remote state manifest using minimum codec version([#16403](https://github.com/opensearch-project/OpenSearch/pull/16403))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ public PublicationContext newPublicationContext(
}

private boolean validateRemotePublicationConfiguredOnAllNodes(DiscoveryNodes discoveryNodes) {
assert ClusterMetadataManifest.getCodecForVersion(discoveryNodes.getMinNodeVersion()) >= ClusterMetadataManifest.CODEC_V0;
for (DiscoveryNode node : discoveryNodes.getNodes().values()) {
// if a node is non-remote then created local publication context
if (node.isRemoteStatePublicationEnabled() == false) {
Expand Down
14 changes: 4 additions & 10 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -753,12 +753,8 @@ public void setLastAcceptedState(ClusterState clusterState) {
}
try {
final RemoteClusterStateManifestInfo manifestDetails;
// Decide the codec version
int codecVersion = ClusterMetadataManifest.getCodecForVersion(clusterState.nodes().getMinNodeVersion());
assert codecVersion >= 0 : codecVersion;
logger.info("codec version is {}", codecVersion);

if (shouldWriteFullClusterState(clusterState, codecVersion)) {
if (shouldWriteFullClusterState(clusterState)) {
final Optional<ClusterMetadataManifest> latestManifest = remoteClusterStateService.getLatestClusterMetadataManifest(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
Expand All @@ -775,7 +771,7 @@ public void setLastAcceptedState(ClusterState clusterState) {
clusterState.metadata().clusterUUID()
);
}
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, codecVersion);
manifestDetails = remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID);
} else {
assert verifyManifestAndClusterState(lastAcceptedManifest, lastAcceptedState) == true
: "Previous manifest and previous ClusterState are not in sync";
Expand Down Expand Up @@ -820,13 +816,11 @@ private boolean verifyManifestAndClusterState(ClusterMetadataManifest manifest,
return true;
}

private boolean shouldWriteFullClusterState(ClusterState clusterState, int codecVersion) {
assert lastAcceptedManifest == null || lastAcceptedManifest.getCodecVersion() <= codecVersion;
private boolean shouldWriteFullClusterState(ClusterState clusterState) {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| (remoteClusterStateService.isRemotePublicationEnabled() == false && lastAcceptedState.term() != clusterState.term())
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT
|| lastAcceptedManifest.getCodecVersion() != codecVersion) {
|| lastAcceptedManifest.getOpensearchVersion() != Version.CURRENT) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,8 +301,7 @@ public RemoteClusterStateService(
* @return A manifest object which contains the details of uploaded entity metadata.
*/
@Nullable
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID, int codecVersion)
throws IOException {
public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterState, String previousClusterUUID) throws IOException {
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
Expand Down Expand Up @@ -342,8 +341,7 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
false,
codecVersion
false
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
Expand Down Expand Up @@ -551,8 +549,7 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
false,
previousManifest.getCodecVersion()
false
);

final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
Expand Down Expand Up @@ -1024,8 +1021,7 @@ public RemoteClusterStateManifestInfo markLastStateAsCommitted(
!remoteClusterStateValidationMode.equals(RemoteClusterStateValidationMode.NONE)
? new ClusterStateChecksum(clusterState, threadpool)
: null,
true,
previousManifest.getCodecVersion()
true
);
if (!previousManifest.isClusterUUIDCommitted() && committedManifestDetails.getClusterMetadataManifest().isClusterUUIDCommitted()) {
remoteClusterStateCleanupManager.deleteStaleClusterUUIDs(clusterState, committedManifestDetails.getClusterMetadataManifest());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ RemoteClusterStateManifestInfo uploadManifest(
String previousClusterUUID,
ClusterStateDiffManifest clusterDiffManifest,
ClusterStateChecksum clusterStateChecksum,
boolean committed,
int codecVersion
boolean committed
) {
synchronized (this) {
ClusterMetadataManifest.Builder manifestBuilder = ClusterMetadataManifest.builder();
Expand All @@ -112,7 +111,7 @@ RemoteClusterStateManifestInfo uploadManifest(
.opensearchVersion(Version.CURRENT)
.nodeId(nodeId)
.committed(committed)
.codecVersion(codecVersion)
.codecVersion(ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION)
.indices(uploadedMetadataResult.uploadedIndexMetadata)
.previousClusterUUID(previousClusterUUID)
.clusterUUIDCommitted(clusterState.metadata().clusterUUIDCommitted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@

import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand Down Expand Up @@ -962,7 +961,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep
.previousClusterUUID(randomAlphaOfLength(10))
.clusterUUIDCommitted(true)
.build();
when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION)).thenReturn(
when(remoteClusterStateService.writeFullMetadata(clusterState, previousClusterUUID)).thenReturn(
new RemoteClusterStateManifestInfo(manifest, "path/to/manifest")
);

Expand All @@ -975,8 +974,7 @@ public void testHandlePrePublishAndCommitWhenRemoteStateEnabled() throws IOExcep

final CoordinationState coordinationState = createCoordinationState(persistedStateRegistry, node1, remoteStateSettings());
coordinationState.handlePrePublish(clusterState);
Mockito.verify(remoteClusterStateService, Mockito.times(1))
.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, Mockito.times(1)).writeFullMetadata(clusterState, previousClusterUUID);
assertThat(persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState(), equalTo(clusterState));

when(remoteClusterStateService.markLastStateAsCommitted(any(), any(), eq(false))).thenReturn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ public void testRemotePersistedState() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final ClusterMetadataManifest manifest = ClusterMetadataManifest.builder().clusterTerm(1L).stateVersion(5L).build();
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)))
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any()))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest"));

Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any()))
Expand All @@ -777,7 +777,7 @@ public void testRemotePersistedState() throws IOException {
);

remotePersistedState.setLastAcceptedState(clusterState);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState, previousClusterUUID);

assertThat(remotePersistedState.getLastAcceptedState(), equalTo(clusterState));
assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
Expand All @@ -789,8 +789,7 @@ public void testRemotePersistedState() throws IOException {
);

remotePersistedState.setLastAcceptedState(secondClusterState);
Mockito.verify(remoteClusterStateService, times(1))
.writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, times(1)).writeFullMetadata(secondClusterState, previousClusterUUID);

assertThat(remotePersistedState.getLastAcceptedState(), equalTo(secondClusterState));
assertThat(remotePersistedState.getCurrentTerm(), equalTo(clusterTerm));
Expand Down Expand Up @@ -820,9 +819,9 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx
.clusterTerm(1L)
.stateVersion(5L)
.codecVersion(CODEC_V1)
.opensearchVersion(Version.CURRENT)
.opensearchVersion(Version.V_2_15_0)
.build();
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(CODEC_V1)))
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any()))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest2"));

CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);
Expand All @@ -833,7 +832,7 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx
);
remotePersistedState.setLastAcceptedState(clusterState1);

Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState1, previousClusterUUID, CODEC_V1);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState1, previousClusterUUID);

ClusterState clusterState2 = createClusterState(
randomNonNegativeLong(),
Expand All @@ -846,10 +845,10 @@ public void testRemotePersistedStateWithDifferentNodeConfiguration() throws IOEx
.codecVersion(MANIFEST_CURRENT_CODEC_VERSION)
.opensearchVersion(Version.CURRENT)
.build();
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)))
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any()))
.thenReturn(new RemoteClusterStateManifestInfo(manifest2, "path/to/manifest"));
remotePersistedState.setLastAcceptedState(clusterState2);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState2, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterState2, previousClusterUUID);

ClusterState clusterState3 = createClusterState(
randomNonNegativeLong(),
Expand Down Expand Up @@ -889,8 +888,7 @@ public void testRemotePersistentState_FollowerNode() throws IOException {

remotePersistedState.setLastAcceptedState(clusterState);
remotePersistedState.setLastAcceptedManifest(manifest);
Mockito.verify(remoteClusterStateService, never())
.writeFullMetadata(clusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, never()).writeFullMetadata(clusterState, previousClusterUUID);

assertEquals(clusterState, remotePersistedState.getLastAcceptedState());
assertEquals(clusterTerm, remotePersistedState.getCurrentTerm());
Expand All @@ -906,8 +904,7 @@ public void testRemotePersistentState_FollowerNode() throws IOException {
);

remotePersistedState.setLastAcceptedState(secondClusterState);
Mockito.verify(remoteClusterStateService, never())
.writeFullMetadata(secondClusterState, previousClusterUUID, MANIFEST_CURRENT_CODEC_VERSION);
Mockito.verify(remoteClusterStateService, never()).writeFullMetadata(secondClusterState, previousClusterUUID);

assertEquals(secondClusterState, remotePersistedState.getLastAcceptedState());
assertEquals(clusterTerm, remotePersistedState.getCurrentTerm());
Expand Down Expand Up @@ -940,7 +937,7 @@ public void testRemotePersistedStateNotCommitted() throws IOException {
.build();
Mockito.when(remoteClusterStateService.getLatestClusterMetadataManifest(Mockito.any(), Mockito.any()))
.thenReturn(Optional.of(manifest));
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION)))
Mockito.when(remoteClusterStateService.writeFullMetadata(Mockito.any(), Mockito.any()))
.thenReturn(new RemoteClusterStateManifestInfo(manifest, "path/to/manifest"));

Mockito.when(remoteClusterStateService.writeIncrementalMetadata(Mockito.any(), Mockito.any(), Mockito.any()))
Expand All @@ -966,17 +963,14 @@ public void testRemotePersistedStateNotCommitted() throws IOException {
remotePersistedState.setLastAcceptedState(clusterState);
ArgumentCaptor<String> previousClusterUUIDCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<ClusterState> clusterStateCaptor = ArgumentCaptor.forClass(ClusterState.class);
Mockito.verify(remoteClusterStateService)
.writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.verify(remoteClusterStateService).writeFullMetadata(clusterStateCaptor.capture(), previousClusterUUIDCaptor.capture());
assertEquals(previousClusterUUID, previousClusterUUIDCaptor.getValue());
}

public void testRemotePersistedStateExceptionOnFullStateUpload() throws IOException {
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class)
.when(remoteClusterStateService)
.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any());

CoordinationState.PersistedState remotePersistedState = new RemotePersistedState(remoteClusterStateService, previousClusterUUID);

Expand All @@ -994,9 +988,7 @@ public void testRemotePersistedStateFailureStats() throws IOException {
RemoteUploadStats remoteStateStats = new RemoteUploadStats();
final RemoteClusterStateService remoteClusterStateService = Mockito.mock(RemoteClusterStateService.class);
final String previousClusterUUID = "prev-cluster-uuid";
Mockito.doThrow(IOException.class)
.when(remoteClusterStateService)
.writeFullMetadata(Mockito.any(), Mockito.any(), eq(MANIFEST_CURRENT_CODEC_VERSION));
Mockito.doThrow(IOException.class).when(remoteClusterStateService).writeFullMetadata(Mockito.any(), Mockito.any());
when(remoteClusterStateService.getUploadStats()).thenReturn(remoteStateStats);
doAnswer((i) -> {
remoteStateStats.stateFailed();
Expand Down
Loading

0 comments on commit d534f17

Please sign in to comment.