Skip to content

Commit

Permalink
Use isRemoteStateNode condition
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed May 25, 2024
1 parent 9aa707e commit 3aaa6a8
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.cluster.coordination;

import java.util.Locale;
import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -230,14 +231,15 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version);
if (manifestOptional.isPresent() == false) {
// todo change exception
throw new IncompatibleClusterStateVersionException("No remote state for term version");
throw new IllegalStateException(
String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version)
);
}
ClusterMetadataManifest manifest = manifestOptional.get();
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("Diff cannot be applied as there is not last cluster state");
logger.debug("Diff cannot be applied as there is no last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug("There is no diff in the manifest");
Expand All @@ -250,12 +252,14 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish
if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId());
logger.debug("Downloaded full cluster state [{}]", clusterState);
fullClusterStateReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId());
logger.debug("Downloaded full cluster state from diff [{}]", clusterState);
compatibleClusterStateDiffReceivedCount.incrementAndGet();
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
Expand Down Expand Up @@ -392,7 +396,7 @@ public void onFailure(Exception e) {
} else {
responseActionListener = listener;
}
if (sendRemoteState && destination.isRemoteStoreNode()) {
if (sendRemoteState && destination.isRemoteStateNode()) {
sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener);
} else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) {
logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination);
Expand Down Expand Up @@ -471,7 +475,7 @@ public String executor() {
};
transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e);
logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e);
listener.onFailure(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_API;
import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM;
import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX;

/**
Expand Down Expand Up @@ -482,6 +483,17 @@ public boolean isRemoteStoreNode() {
return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX));
}

/**
* Returns whether the node is a remote cluster state enabled node.
* @return true if the node contains remote cluster state node attribute, false otherwise
*/
public boolean isRemoteStateNode() {
return this.getAttributes()
.keySet()
.stream()
.anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY)));
}

/**
* Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name.
* <p>
Expand Down

0 comments on commit 3aaa6a8

Please sign in to comment.