Skip to content

Commit

Permalink
Send manifest file name in remote publish request
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Jun 7, 2024
1 parent 79eb089 commit fa24988
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ assert getLocalNode().equals(clusterState.getNodes().get(getLocalNode().getId())
final PublicationTransportHandler.PublicationContext publicationContext = publicationHandler.newPublicationContext(
clusterChangedEvent,
coordinationState.get().isRemotePublicationEnabled(),
((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE))
persistedStateRegistry
);

final PublishRequest publishRequest = coordinationState.get().handleClientValue(clusterState);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,8 @@ private PublishWithJoinResponse acceptStateOnLocalNode(RemotePublishRequest remo
}

public PublicationContext newPublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled,
String manifestFileName) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, manifestFileName);
PersistedStateRegistry persistedStateRegistry) {
final PublicationContext publicationContext = new PublicationContext(clusterChangedEvent, isRemotePublicationEnabled, persistedStateRegistry);

// Build the serializations we expect to need now, early in the process, so that an error during serialization fails the publication
// straight away. This isn't watertight since we send diffs on a best-effort basis and may fall back to sending a full state (and
Expand Down Expand Up @@ -347,15 +347,15 @@ public class PublicationContext {
private final Map<Version, BytesReference> serializedStates = new HashMap<>();
private final Map<Version, BytesReference> serializedDiffs = new HashMap<>();
private final boolean sendRemoteState;
private final String manifestFileName;
private final PersistedStateRegistry persistedStateRegistry;

PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, String manifestFileName) {
PublicationContext(ClusterChangedEvent clusterChangedEvent, boolean isRemotePublicationEnabled, PersistedStateRegistry persistedStateRegistry) {
discoveryNodes = clusterChangedEvent.state().nodes();
newState = clusterChangedEvent.state();
previousState = clusterChangedEvent.previousState();
sendFullVersion = previousState.getBlocks().disableStatePersistence();
sendRemoteState = isRemotePublicationEnabled;
this.manifestFileName = manifestFileName;
this.persistedStateRegistry = persistedStateRegistry;
}

void buildDiffAndSerializeStates() {
Expand Down Expand Up @@ -468,6 +468,7 @@ public String executor() {

private void sendRemoteClusterState(final DiscoveryNode destination, final ClusterState clusterState, final ActionListener<PublishWithJoinResponse> listener) {
try {
final String manifestFileName = ((RemotePersistedState) persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)).getLastUploadedManifestFile();
final RemotePublishRequest remotePublishRequest = new RemotePublishRequest(discoveryNodes.getLocalNode(), clusterState.term(),
clusterState.getVersion(), clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifestFileName);
final Consumer<TransportException> transportExceptionHandler = exp -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

public class RemotePublishRequest extends TermVersionRequest {

// todo Do we need cluster name and UUID ?
private final String clusterName;
private final String clusterUUID;
private final String manifestFile;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ public static class RemotePersistedState implements PersistedState {

private ClusterState lastAcceptedState;
private ClusterMetadataManifest lastAcceptedManifest;

private String lastUploadedManifestFile;
private final RemoteClusterStateService remoteClusterStateService;
private String previousClusterUUID;
Expand All @@ -692,6 +693,10 @@ public void setCurrentTerm(long currentTerm) {
// But for RemotePersistedState, the state is only pushed by the active cluster. So this method is not required.
}

public String getLastUploadedManifestFile() {
return lastUploadedManifestFile;
}

@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void writeTo(StreamOutput out) throws IOException {

OpenSearchException e = expectThrows(
OpenSearchException.class,
() -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState), false)
() -> handler.newPublicationContext(new ClusterChangedEvent("test", unserializableClusterState, clusterState), false, null)
);
assertNotNull(e.getCause());
assertThat(e.getCause(), instanceOf(IOException.class));
Expand Down

0 comments on commit fa24988

Please sign in to comment.