diff --git a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java index 1f1f795139651..aa2faab5ffd01 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/PublicationTransportHandler.java @@ -31,6 +31,7 @@ package org.opensearch.cluster.coordination; +import java.util.Locale; import java.util.Optional; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -230,14 +231,15 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException { final Optional manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version); if (manifestOptional.isPresent() == false) { - // todo change exception - throw new IncompatibleClusterStateVersionException("No remote state for term version"); + throw new IllegalStateException( + String.format(Locale.ROOT, "Manifest is not present for term - %s version - %s", request.term, request.version) + ); } ClusterMetadataManifest manifest = manifestOptional.get(); boolean applyFullState = false; final ClusterState lastSeen = lastSeenClusterState.get(); if (lastSeen == null) { - logger.debug("Diff cannot be applied as there is not last cluster state"); + logger.debug("Diff cannot be applied as there is no last cluster state"); applyFullState = true; } else if (manifest.getDiffManifest() == null) { logger.debug("There is no diff in the manifest"); @@ -250,12 +252,14 @@ private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublish if (applyFullState == true) { ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest, transportService.getLocalNode().getId()); logger.debug("Downloaded full cluster state [{}]", clusterState); + fullClusterStateReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.set(clusterState); return response; } else { ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get(), transportService.getLocalNode().getId()); logger.debug("Downloaded full cluster state from diff [{}]", clusterState); + compatibleClusterStateDiffReceivedCount.incrementAndGet(); final PublishWithJoinResponse response = acceptState(clusterState); lastSeenClusterState.compareAndSet(lastSeen, clusterState); return response; @@ -392,7 +396,7 @@ public void onFailure(Exception e) { } else { responseActionListener = listener; } - if (sendRemoteState && destination.isRemoteStoreNode()) { + if (sendRemoteState && destination.isRemoteStateNode()) { sendRemoteClusterState(destination, publishRequest.getAcceptedState(), responseActionListener); } else if (sendFullVersion || previousState.nodes().nodeExists(destination) == false) { logger.trace("sending full cluster state version [{}] to [{}]", newState.version(), destination); @@ -471,7 +475,7 @@ public String executor() { }; transportService.sendRequest(destination, PUBLISH_REMOTE_STATE_ACTION_NAME, remotePublishRequest, stateRequestOptions, responseHandler); } catch (Exception e) { - logger.warn(() -> new ParameterizedMessage("error sending cluster state to {}", destination), e); + logger.warn(() -> new ParameterizedMessage("error sending remote cluster state to {}", destination), e); listener.onFailure(e); } } diff --git a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java index 644e5f3de9352..fb27fccda0737 100644 --- a/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/opensearch/cluster/node/DiscoveryNode.java @@ -65,6 +65,7 @@ import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_API; import static org.opensearch.cluster.metadata.Metadata.CONTEXT_MODE_PARAM; import static org.opensearch.node.NodeRoleSettings.NODE_ROLES_SETTING; +import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX; /** @@ -482,6 +483,17 @@ public boolean isRemoteStoreNode() { return this.getAttributes().keySet().stream().anyMatch(key -> key.startsWith(REMOTE_STORE_NODE_ATTRIBUTE_KEY_PREFIX)); } + /** + * Returns whether the node is a remote cluster state enabled node. + * @return true if the node contains remote cluster state node attribute, false otherwise + */ + public boolean isRemoteStateNode() { + return this.getAttributes() + .keySet() + .stream() + .anyMatch(key -> (key.equals(REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY))); + } + /** * Returns a set of all the roles that the node has. The roles are returned in sorted order by the role name. *