Skip to content

Commit

Permalink
removed isRemotePublicationEnabled flag from CoordinationState
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
shiv0408 authored and soosinha committed Oct 9, 2024
1 parent 9176284 commit f7cbb02
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
Expand All @@ -54,7 +53,6 @@
import java.util.Set;

import static org.opensearch.cluster.coordination.Coordinator.ZEN1_BWC_TERM;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -82,14 +80,12 @@ public class CoordinationState {
private VotingConfiguration lastPublishedConfiguration;
private VoteCollection publishVotes;
private final boolean isRemoteStateEnabled;
private boolean isRemotePublicationEnabled;

public CoordinationState(
DiscoveryNode localNode,
PersistedStateRegistry persistedStateRegistry,
ElectionStrategy electionStrategy,
Settings settings,
ClusterSettings clusterSettings
Settings settings
) {
this.localNode = localNode;

Expand All @@ -107,14 +103,6 @@ public CoordinationState(
.getLastAcceptedConfiguration();
this.publishVotes = new VoteCollection();
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
this.isRemotePublicationEnabled = isRemoteStateEnabled
&& REMOTE_PUBLICATION_SETTING.get(settings)
&& localNode.isRemoteStatePublicationConfigured();
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
}

public boolean isRemotePublicationEnabled() {
return isRemotePublicationEnabled;
}

public long getCurrentTerm() {
Expand Down Expand Up @@ -653,15 +641,6 @@ private boolean shouldCommitRemotePersistedState() {
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
}

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isRemotePublicationEnabled = false;
} else {
this.isRemotePublicationEnabled = isRemoteStateEnabled && localNode.isRemoteStatePublicationConfigured();
}

}

/**
* Pluggable persistence layer for {@link CoordinationState}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -872,9 +872,7 @@ boolean publicationInProgress() {
@Override
protected void doStart() {
synchronized (mutex) {
coordinationState.set(
new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings, clusterSettings)
);
coordinationState.set(new CoordinationState(getLocalNode(), persistedStateRegistry, electionStrategy, settings));
peerFinder.setCurrentTerm(getCurrentTerm());
configuredHostsResolver.start();
final ClusterState lastAcceptedState = coordinationState.get().getLastAcceptedState();
Expand Down Expand Up @@ -1363,7 +1361,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())

final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
this.isRemotePublicationEnabled(),
persistedStateRegistry
);
logger.debug("initialized PublicationContext using class: {}", publicationContext.getClass().toString());
Expand Down Expand Up @@ -1892,8 +1890,8 @@ public static boolean isZen1Node(DiscoveryNode discoveryNode) {
}

public boolean isRemotePublicationEnabled() {
if (coordinationState.get() != null) {
return coordinationState.get().isRemotePublicationEnabled();
if (remoteClusterStateService != null) {
return remoteClusterStateService.isRemotePublicationEnabled();
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.LongSupplier;
Expand Down Expand Up @@ -234,7 +235,7 @@ public static RemoteClusterStateValidationMode parseString(String mode) {
private final String METADATA_UPDATE_LOG_STRING = "wrote metadata for [{}] indices and skipped [{}] unchanged "
+ "indices, coordination metadata updated : [{}], settings metadata updated : [{}], templates metadata "
+ "updated : [{}], custom metadata updated : [{}], indices routing updated : [{}]";
private boolean isPublicationEnabled;
private AtomicBoolean isPublicationEnabled;
private final String remotePathPrefix;

private final RemoteClusterStateCache remoteClusterStateCache;
Expand Down Expand Up @@ -275,9 +276,11 @@ public RemoteClusterStateService(
this.remoteStateStats = new RemotePersistenceStats();
this.namedWriteableRegistry = namedWriteableRegistry;
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.isPublicationEnabled = clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings);
this.isPublicationEnabled = new AtomicBoolean(
clusterSettings.get(REMOTE_PUBLICATION_SETTING)
&& RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled(settings)
&& RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled(settings)
);
clusterSettings.addSettingsUpdateConsumer(REMOTE_PUBLICATION_SETTING, this::setRemotePublicationSetting);
this.remotePathPrefix = CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.get(settings);
this.remoteRoutingTableService = RemoteRoutingTableServiceFactory.getService(
Expand Down Expand Up @@ -306,19 +309,20 @@ public RemoteClusterStateManifestInfo writeFullMetadata(ClusterState clusterStat
return null;
}

boolean publicationEnabled = isPublicationEnabled.get();
UploadedMetadataResults uploadedMetadataResults = writeMetadataInParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values()),
emptyMap(),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), isPublicationEnabled),
RemoteGlobalMetadataManager.filterCustoms(clusterState.metadata().customs(), publicationEnabled),
true,
true,
true,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled,
isPublicationEnabled ? clusterState.customs() : Collections.emptyMap(),
isPublicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled,
publicationEnabled ? clusterState.customs() : Collections.emptyMap(),
publicationEnabled,
remoteRoutingTableService.getIndicesRouting(clusterState.getRoutingTable()),
null
);
Expand Down Expand Up @@ -397,9 +401,9 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
boolean firstUploadForSplitGlobalMetadata = !previousManifest.hasMetadataAttributesFiles();

final DiffableUtils.MapDiff<String, Metadata.Custom, Map<String, Metadata.Custom>> customsDiff = remoteGlobalMetadataManager
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled);
.getCustomsDiff(clusterState, previousClusterState, firstUploadForSplitGlobalMetadata, isPublicationEnabled.get());
final DiffableUtils.MapDiff<String, ClusterState.Custom, Map<String, ClusterState.Custom>> clusterStateCustomsDiff =
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled, false);
remoteClusterStateAttributesManager.getUpdatedCustoms(clusterState, previousClusterState, isPublicationEnabled.get(), false);
final Map<String, UploadedMetadataAttribute> allUploadedCustomMap = new HashMap<>(previousManifest.getCustomMetadataMap());
final Map<String, UploadedMetadataAttribute> allUploadedClusterStateCustomsMap = new HashMap<>(
previousManifest.getClusterStateCustomMap()
Expand Down Expand Up @@ -464,10 +468,10 @@ public RemoteClusterStateManifestInfo writeIncrementalMetadata(
boolean updateTemplatesMetadata = firstUploadForSplitGlobalMetadata
|| Metadata.isTemplatesMetadataEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

final boolean updateDiscoveryNodes = isPublicationEnabled
final boolean updateDiscoveryNodes = isPublicationEnabled.get()
&& clusterState.getNodes().delta(previousClusterState.getNodes()).hasChanges();
final boolean updateClusterBlocks = isPublicationEnabled && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled
final boolean updateClusterBlocks = isPublicationEnabled.get() && !clusterState.blocks().equals(previousClusterState.blocks());
final boolean updateHashesOfConsistentSettings = isPublicationEnabled.get()
&& Metadata.isHashesOfConsistentSettingsEqual(previousClusterState.metadata(), clusterState.metadata()) == false;

uploadedMetadataResults = writeMetadataInParallel(
Expand Down Expand Up @@ -1120,9 +1124,9 @@ private void setChecksumValidationMode(RemoteClusterStateValidationMode remoteCl

private void setRemotePublicationSetting(boolean remotePublicationSetting) {
if (remotePublicationSetting == false) {
this.isPublicationEnabled = false;
this.isPublicationEnabled.set(false);
} else {
this.isPublicationEnabled = isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings);
this.isPublicationEnabled.set(isRemoteStoreClusterStateEnabled(settings) && isRemoteRoutingTableEnabled(settings));
}
}

Expand Down Expand Up @@ -1841,7 +1845,7 @@ public String getLastKnownUUIDFromRemote(String clusterName) {
}

public boolean isRemotePublicationEnabled() {
return this.isPublicationEnabled;
return this.isPublicationEnabled.get();
}

public void setRemoteStateReadTimeout(TimeValue remoteStateReadTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static org.opensearch.gateway.remote.ClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_PUBLICATION_SETTING_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
Expand Down Expand Up @@ -1268,22 +1267,12 @@ public void testHandleCommitOnFollowerNodeWhenRemotePublicationEnabledWithNullRe
verifyNoInteractions(remoteClusterStateService);
}

public void testIsRemotePublicationEnabled_WithInconsistentSettings() {
// create settings with remote state disabled but publication enabled
Settings settings = Settings.builder()
.put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), false)
.put(REMOTE_PUBLICATION_SETTING_KEY, true)
.build();
CoordinationState coordinationState = createCoordinationState(psr1, node1, settings);
assertFalse(coordinationState.isRemotePublicationEnabled());
}

public static CoordinationState createCoordinationState(
PersistedStateRegistry persistedStateRegistry,
DiscoveryNode localNode,
Settings settings
) {
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings, null);
return new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, settings);
}

public static ClusterState clusterState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,7 @@ public void testPrevotingIndicatesElectionSuccess() {
localNode,
persistedStateRegistry,
ElectionStrategy.DEFAULT_INSTANCE,
Settings.EMPTY,
null
Settings.EMPTY
);

final long newTerm = randomLongBetween(currentTerm + 1, Long.MAX_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,7 @@ class MockNode {
);
PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, new InMemoryPersistedState(0L, initialState));
coordinationState = new CoordinationState(
localNode,
persistedStateRegistry,
ElectionStrategy.DEFAULT_INSTANCE,
Settings.EMPTY,
null
);
coordinationState = new CoordinationState(localNode, persistedStateRegistry, ElectionStrategy.DEFAULT_INSTANCE, Settings.EMPTY);
}

final DiscoveryNode localNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
// TODO Make the publication flag parameterized
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, publicationEnabled).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -388,6 +390,7 @@ public void testWriteFullMetadataSuccessPublicationEnabled() throws IOException
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager())
.customs(
Map.of(
Expand Down Expand Up @@ -747,6 +750,8 @@ public void testWriteIncrementalMetadataSuccess() throws IOException {
public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws IOException {
publicationEnabled = true;
settings = Settings.builder().put(settings).put(REMOTE_PUBLICATION_SETTING_KEY, true).build();
clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
remoteClusterStateService = new RemoteClusterStateService(
"test-node-id",
repositoriesServiceSupplier,
Expand All @@ -765,6 +770,7 @@ public void testWriteIncrementalMetadataSuccessWhenPublicationEnabled() throws I
),
writableRegistry()
);
assertTrue(remoteClusterStateService.isRemotePublicationEnabled());
final ClusterState clusterState = generateClusterStateWithOneIndex().nodes(nodesWithLocalNodeClusterManager()).build();
mockBlobStoreObjects();
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder().term(1L).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ static class ClusterNode {
persistedStateRegistry.addPersistedState(PersistedStateType.LOCAL, persistedState);

this.electionStrategy = electionStrategy;
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
}

void reboot() {
Expand Down Expand Up @@ -189,7 +189,7 @@ void reboot() {
localNode.getVersion()
);

state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY, null);
state = new CoordinationState(localNode, persistedStateRegistry, electionStrategy, Settings.EMPTY);
}

void setInitialState(CoordinationMetadata.VotingConfiguration initialConfig, long initialValue) {
Expand Down

0 comments on commit f7cbb02

Please sign in to comment.