diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index 266709894c1b7..5d105db4238d5 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -28,10 +28,8 @@ import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; -import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.RemoteClusterStateUtils; -import org.opensearch.gateway.remote.RemoteReadResult; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream; import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader; import org.opensearch.index.remote.RemoteStoreEnums; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java index 380fd5670558d..1130b92289cc2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateAttributesManager.java @@ -16,6 +16,12 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; +import org.opensearch.gateway.remote.model.AbstractRemoteBlobObject; +import org.opensearch.gateway.remote.model.RemoteClusterBlocks; +import org.opensearch.gateway.remote.model.RemoteClusterBlocksBlobStore; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes; +import org.opensearch.gateway.remote.model.RemoteDiscoveryNodesBlobStore; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -26,18 +32,21 @@ public class RemoteClusterStateAttributesManager { public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute"; public static final String DISCOVERY_NODES = "nodes"; public static final String CLUSTER_BLOCKS = "blocks"; - public static final String CUSTOM_PREFIX = "custom"; public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1; private final BlobStoreTransferService blobStoreTransferService; private final BlobStoreRepository blobStoreRepository; private final ThreadPool threadPool; private final String clusterName; + private final RemoteClusterBlocksBlobStore clusterBlocksBlobStore; + private final RemoteDiscoveryNodesBlobStore discoveryNodesBlobStore; RemoteClusterStateAttributesManager(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository repository, ThreadPool threadPool, String clusterName) { this.blobStoreTransferService = blobStoreTransferService; this.blobStoreRepository = repository; this.threadPool = threadPool; this.clusterName = clusterName; + this.clusterBlocksBlobStore = new RemoteClusterBlocksBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.discoveryNodesBlobStore = new RemoteDiscoveryNodesBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); } /** @@ -49,45 +58,41 @@ CheckedRunnable getAsyncMetadataWriteAction( ToXContent componentData, LatchedActionListener latchedActionListener ) { - AbstractRemoteBlobStoreObject remoteObject = getRemoteObject(componentData, clusterState.version(), clusterState.metadata().clusterUUID()); - ActionListener completionListener = ActionListener.wrap( - resp -> latchedActionListener.onResponse( - remoteObject.getUploadedMetadata() - ), - ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex)) - ); - return () -> remoteObject.writeAsync(completionListener); - } - - private AbstractRemoteBlobStoreObject getRemoteObject(ToXContent componentData, long stateVersion, String clusterUUID) { if (componentData instanceof DiscoveryNodes) { - return new RemoteDiscoveryNodes((DiscoveryNodes)componentData, stateVersion, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), blobStoreRepository); + return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); } else if (componentData instanceof ClusterBlocks) { - return new RemoteClusterBlocks((ClusterBlocks) componentData, stateVersion, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), blobStoreRepository); + return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener)); } else { throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass()); } } + private ActionListener getActionListener(String component, AbstractRemoteBlobObject remoteObject, LatchedActionListener latchedActionListener) { + return ActionListener.wrap( + resp -> latchedActionListener.onResponse( + remoteObject.getUploadedMetadata() + ), + ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex)) + ); + } + public CheckedRunnable getAsyncMetadataReadAction( String clusterUUID, String component, String uploadedFilename, LatchedActionListener listener ) { - AbstractRemoteBlobStoreObject remoteObject = getRemoteObject(component, uploadedFilename, clusterUUID); ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure); - return () -> remoteObject.readAsync(actionListener); - } - - private AbstractRemoteBlobStoreObject getRemoteObject(String component, String blobName, String clusterUUID) { if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) { - return new RemoteDiscoveryNodes(blobName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, blobStoreRepository); + return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener); } else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) { - return new RemoteClusterBlocks(blobName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, blobStoreRepository); + return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener); } else { throw new RemoteStateTransferException("Remote object not found for "+ component); } } - } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java index d0ab53d337207..6e3e665d8bb38 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManager.java @@ -24,11 +24,9 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import java.io.Closeable; import java.io.IOException; -import java.sql.Blob; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -36,10 +34,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_METADATA_FORMAT; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX; /** * A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index 3d95ad15023c0..23e4b07b3ee28 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -21,8 +21,8 @@ import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled; @@ -76,6 +76,7 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; @@ -795,7 +796,7 @@ public void start() { remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(blobStoreRepository, clusterSettings,threadpool, getBlobStoreTransferService(), clusterName); remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings,threadpool, clusterName, getBlobStoreTransferService()); remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(getBlobStoreTransferService(), blobStoreRepository, threadpool, clusterName); - remoteManifestManager = new RemoteManifestManager(getBlobStoreTransferService(), blobStoreRepository, clusterSettings, nodeId, threadpool); + remoteManifestManager = new RemoteManifestManager(getBlobStoreTransferService(), blobStoreRepository, clusterSettings, nodeId, threadpool, clusterName); remoteClusterStateCleanupManager.start(); } @@ -916,7 +917,6 @@ private ClusterState readClusterStateInParallel( for (Map.Entry entry : customToRead.entrySet()) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( - clusterName, clusterUUID, CUSTOM_METADATA, entry.getKey(), @@ -929,7 +929,6 @@ private ClusterState readClusterStateInParallel( if (readCoordinationMetadata) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( - clusterName, clusterUUID, COORDINATION_METADATA, COORDINATION_METADATA, @@ -942,7 +941,6 @@ private ClusterState readClusterStateInParallel( if (readSettingsMetadata) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( - clusterName, clusterUUID, SETTING_METADATA, SETTING_METADATA, @@ -955,7 +953,6 @@ private ClusterState readClusterStateInParallel( if (readTransientSettingsMetadata) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( - clusterName, clusterUUID, TRANSIENT_SETTING_METADATA, TRANSIENT_SETTING_METADATA, @@ -968,7 +965,6 @@ private ClusterState readClusterStateInParallel( if (readTemplatesMetadata) { asyncMetadataReadActions.add( remoteGlobalMetadataManager.getAsyncMetadataReadAction( - clusterName, clusterUUID, TEMPLATES_METADATA, TEMPLATES_METADATA, diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java index 4ea3a0cc97e50..ca186fa5711a6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManager.java @@ -12,7 +12,7 @@ import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; -import static org.opensearch.gateway.remote.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import java.io.IOException; import java.util.HashMap; @@ -34,6 +34,18 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.gateway.remote.model.AbstractRemoteBlobObject; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteCustomMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsBlobStore; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsBlobStore; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; @@ -89,6 +101,11 @@ public class RemoteGlobalMetadataManager { private volatile TimeValue globalMetadataUploadTimeout; private final BlobStoreTransferService blobStoreTransferService; private final String clusterName; + private final RemoteCoordinationMetadataBlobStore coordinationMetadataBlobStore; + private final RemoteTransientSettingsBlobStore transientSettingsBlobStore; + private final RemotePersistentSettingsBlobStore persistentSettingsBlobStore; + private final RemoteTemplatesMetadataBlobStore templatesMetadataBlobStore; + private final RemoteCustomMetadataBlobStore customMetadataBlobStore; RemoteGlobalMetadataManager(BlobStoreRepository blobStoreRepository, ClusterSettings clusterSettings, ThreadPool threadPool, BlobStoreTransferService blobStoreTransferService, String clusterName) { @@ -97,6 +114,11 @@ public class RemoteGlobalMetadataManager { this.threadPool = threadPool; this.blobStoreTransferService = blobStoreTransferService; this.clusterName = clusterName; + this.coordinationMetadataBlobStore = new RemoteCoordinationMetadataBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.transientSettingsBlobStore = new RemoteTransientSettingsBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.persistentSettingsBlobStore = new RemotePersistentSettingsBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.templatesMetadataBlobStore = new RemoteTemplatesMetadataBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + this.customMetadataBlobStore = new RemoteCustomMetadataBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout); } @@ -110,40 +132,66 @@ CheckedRunnable getAsyncMetadataWriteAction( LatchedActionListener latchedActionListener, String customType ) { - AbstractRemoteBlobStoreObject remoteBlobStoreObject = getRemoteObject(objectToUpload, metadataVersion, clusterUUID, customType); - ActionListener completionListener = ActionListener.wrap( + if (objectToUpload instanceof CoordinationMetadata) { + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata((CoordinationMetadata) objectToUpload, metadataVersion, clusterUUID, + blobStoreRepository); + return () -> coordinationMetadataBlobStore.writeAsync(remoteCoordinationMetadata, getActionListener(remoteCoordinationMetadata, latchedActionListener)); + } else if (objectToUpload instanceof Settings) { + if (customType != null && customType.equals(TRANSIENT_SETTING_METADATA)) { + RemoteTransientSettingsMetadata remoteTransientSettingsMetadata = new RemoteTransientSettingsMetadata((Settings) objectToUpload, metadataVersion, clusterUUID, + blobStoreRepository); + return () -> transientSettingsBlobStore.writeAsync(remoteTransientSettingsMetadata, getActionListener(remoteTransientSettingsMetadata, latchedActionListener)); + } + RemotePersistentSettingsMetadata remotePersistentSettingsMetadata = new RemotePersistentSettingsMetadata((Settings) objectToUpload, metadataVersion, clusterUUID, + blobStoreRepository); + return () -> persistentSettingsBlobStore.writeAsync(remotePersistentSettingsMetadata, getActionListener(remotePersistentSettingsMetadata, latchedActionListener)); + } else if (objectToUpload instanceof TemplatesMetadata) { + RemoteTemplatesMetadata remoteTemplatesMetadata = new RemoteTemplatesMetadata((TemplatesMetadata) objectToUpload, metadataVersion, clusterUUID, + blobStoreRepository); + return () -> templatesMetadataBlobStore.writeAsync(remoteTemplatesMetadata, getActionListener(remoteTemplatesMetadata, latchedActionListener)); + } else if (objectToUpload instanceof Custom) { + RemoteCustomMetadata remoteCustomMetadata = new RemoteCustomMetadata((Custom) objectToUpload, customType ,metadataVersion, clusterUUID, + blobStoreRepository); + return () -> customMetadataBlobStore.writeAsync(remoteCustomMetadata, getActionListener(remoteCustomMetadata, latchedActionListener)); + } + throw new RemoteStateTransferException("Remote object cannot be created for " + objectToUpload.getClass()); + } + + private ActionListener getActionListener(AbstractRemoteBlobObject remoteBlobStoreObject, LatchedActionListener latchedActionListener) { + return ActionListener.wrap( resp -> latchedActionListener.onResponse( remoteBlobStoreObject.getUploadedMetadata() ), ex -> latchedActionListener.onFailure(new RemoteStateTransferException("Upload failed", ex)) ); - return () -> remoteBlobStoreObject.writeAsync(completionListener); } CheckedRunnable getAsyncMetadataReadAction( - String clusterName, String clusterUUID, String component, String componentName, String uploadFilename, LatchedActionListener listener ) { - AbstractRemoteBlobStoreObject remoteBlobStoreObject; + ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, component, componentName)), listener::onFailure); if (component.equals(COORDINATION_METADATA)) { - remoteBlobStoreObject = new RemoteCoordinationMetadata(uploadFilename, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteCoordinationMetadata remoteBlobStoreObject = new RemoteCoordinationMetadata(uploadFilename, clusterUUID, blobStoreRepository); + return () -> coordinationMetadataBlobStore.readAsync(remoteBlobStoreObject, actionListener); } else if (component.equals(TEMPLATES_METADATA)) { - remoteBlobStoreObject = new RemoteTemplatesMetadata(uploadFilename, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteTemplatesMetadata remoteBlobStoreObject = new RemoteTemplatesMetadata(uploadFilename, clusterUUID, blobStoreRepository); + return () -> templatesMetadataBlobStore.readAsync(remoteBlobStoreObject, actionListener); } else if (component.equals(SETTING_METADATA)) { - remoteBlobStoreObject = new RemotePersistentSettingsMetadata(uploadFilename, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemotePersistentSettingsMetadata remoteBlobStoreObject = new RemotePersistentSettingsMetadata(uploadFilename, clusterUUID, blobStoreRepository); + return () -> persistentSettingsBlobStore.readAsync(remoteBlobStoreObject, actionListener); } else if (component.equals(TRANSIENT_SETTING_METADATA)) { - remoteBlobStoreObject = new RemoteTransientSettingsMetadata(uploadFilename, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteTransientSettingsMetadata remoteBlobStoreObject = new RemoteTransientSettingsMetadata(uploadFilename, clusterUUID, blobStoreRepository); + return () -> transientSettingsBlobStore.readAsync(remoteBlobStoreObject, actionListener); } else if (component.equals(CUSTOM_METADATA)) { - remoteBlobStoreObject = new RemoteCustomMetadata(uploadFilename, componentName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteCustomMetadata remoteBlobStoreObject = new RemoteCustomMetadata(uploadFilename, componentName, clusterUUID, blobStoreRepository); + return () -> customMetadataBlobStore.readAsync(remoteBlobStoreObject, actionListener); } else { throw new RemoteStateTransferException("Unknown component " + componentName); } - ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, component, componentName)), listener::onFailure); - return () -> remoteBlobStoreObject.readAsync(actionListener); } Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) { @@ -201,8 +249,8 @@ public CoordinationMetadata getCoordinationMetadata(String clusterUUID, String c // Fetch Coordination metadata if (coordinationMetadataFileName != null) { RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata(coordinationMetadataFileName, clusterUUID, - blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - return remoteCoordinationMetadata.read(); + blobStoreRepository); + return coordinationMetadataBlobStore.read(remoteCoordinationMetadata); } else { return CoordinationMetadata.EMPTY_METADATA; } @@ -219,8 +267,8 @@ public Settings getSettingsMetadata(String clusterUUID, String settingsMetadataF // Fetch Settings metadata if (settingsMetadataFileName != null) { RemotePersistentSettingsMetadata remotePersistentSettingsMetadata = new RemotePersistentSettingsMetadata(settingsMetadataFileName, clusterUUID, - blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - return remotePersistentSettingsMetadata.read(); + blobStoreRepository); + return persistentSettingsBlobStore.read(remotePersistentSettingsMetadata); } else { return Settings.EMPTY; } @@ -237,8 +285,8 @@ public TemplatesMetadata getTemplatesMetadata(String clusterUUID, String templat // Fetch Templates metadata if (templatesMetadataFileName != null) { RemoteTemplatesMetadata remoteTemplatesMetadata = new RemoteTemplatesMetadata(templatesMetadataFileName, clusterUUID, - blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - return remoteTemplatesMetadata.read(); + blobStoreRepository); + return templatesMetadataBlobStore.read(remoteTemplatesMetadata); } else { return TemplatesMetadata.EMPTY_METADATA; } @@ -254,8 +302,8 @@ public Metadata.Custom getCustomsMetadata(String clusterUUID, String customMetad requireNonNull(customMetadataFileName); try { // Fetch Custom metadata - RemoteCustomMetadata remoteCustomMetadata = new RemoteCustomMetadata(customMetadataFileName, custom, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - return remoteCustomMetadata.read(); + RemoteCustomMetadata remoteCustomMetadata = new RemoteCustomMetadata(customMetadataFileName, custom, clusterUUID, blobStoreRepository); + return customMetadataBlobStore.read(remoteCustomMetadata); } catch (IOException e) { throw new IllegalStateException( String.format(Locale.ROOT, "Error while downloading Custom Metadata - %s", customMetadataFileName), @@ -289,27 +337,6 @@ Map getUpdatedCustoms(ClusterState currentState, Cluste return updatedCustom; } - private AbstractRemoteBlobStoreObject getRemoteObject(Object object, long metadataVersion, String clusterUUID, String customType) { - if (object instanceof CoordinationMetadata) { - return new RemoteCoordinationMetadata((CoordinationMetadata) object, metadataVersion, clusterUUID, blobStoreTransferService, - blobStoreRepository, clusterName, threadPool); - } else if (object instanceof Settings) { - if (customType != null && customType.equals(TRANSIENT_SETTING_METADATA)) { - return new RemoteTransientSettingsMetadata((Settings) object, metadataVersion, clusterUUID, blobStoreTransferService, - blobStoreRepository, clusterName, threadPool); - } - return new RemotePersistentSettingsMetadata((Settings) object, metadataVersion, clusterUUID, blobStoreTransferService, - blobStoreRepository, clusterName, threadPool); - } else if (object instanceof TemplatesMetadata) { - return new RemoteTemplatesMetadata((TemplatesMetadata) object, metadataVersion, clusterUUID, blobStoreTransferService, - blobStoreRepository, clusterName, threadPool); - } else if (object instanceof Custom) { - return new RemoteCustomMetadata((Custom) object, customType ,metadataVersion, clusterUUID, blobStoreTransferService, - blobStoreRepository, clusterName, threadPool); - } - throw new RemoteStateTransferException("Remote object cannot be created for " + object.getClass()); - } - private BlobContainer globalMetadataContainer(String clusterName, String clusterUUID) { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/ return blobStoreRepository.blobStore() diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java index 5f5459994e51e..67825546d54b2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadataManager.java @@ -9,7 +9,7 @@ package org.opensearch.gateway.remote; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; import java.io.IOException; import java.util.Locale; @@ -20,6 +20,9 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; +import org.opensearch.gateway.remote.model.RemoteIndexMetadataBlobStore; +import org.opensearch.gateway.remote.model.RemoteReadResult; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; @@ -39,12 +42,14 @@ public class RemoteIndexMetadataManager { private final ThreadPool threadPool; private final BlobStoreTransferService blobStoreTransferService; + private final RemoteIndexMetadataBlobStore indexMetadataBlobStore; private volatile TimeValue indexMetadataUploadTimeout; private final String clusterName; public RemoteIndexMetadataManager(BlobStoreRepository blobStoreRepository, ClusterSettings clusterSettings, ThreadPool threadPool, String clusterName, BlobStoreTransferService blobStoreTransferService) { + indexMetadataBlobStore = new RemoteIndexMetadataBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); this.blobStoreRepository = blobStoreRepository; this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING); this.threadPool = threadPool; @@ -61,14 +66,14 @@ public RemoteIndexMetadataManager(BlobStoreRepository blobStoreRepository, Clust */ CheckedRunnable getIndexMetadataAsyncAction(IndexMetadata indexMetadata, String clusterUUID, LatchedActionListener latchedActionListener) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); ActionListener completionListener = ActionListener.wrap( resp -> latchedActionListener.onResponse( remoteIndexMetadata.getUploadedMetadata() ), ex -> latchedActionListener.onFailure(new RemoteStateTransferException(indexMetadata.getIndex().getName(), ex)) ); - return () -> remoteIndexMetadata.writeAsync(completionListener); + return () -> indexMetadataBlobStore.writeAsync(remoteIndexMetadata, completionListener); } CheckedRunnable getAsyncIndexMetadataReadAction( @@ -76,11 +81,11 @@ CheckedRunnable getAsyncIndexMetadataReadAction( String uploadedFilename, LatchedActionListener latchedActionListener ) { - RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(uploadedFilename, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(uploadedFilename, clusterUUID, blobStoreRepository); ActionListener actionListener = ActionListener.wrap( response -> latchedActionListener.onResponse(new RemoteReadResult(response, INDEX_PATH_TOKEN, response.getIndexName())), latchedActionListener::onFailure); - return () -> remoteIndexMetadata.readAsync(actionListener); + return () -> indexMetadataBlobStore.readAsync(remoteIndexMetadata, actionListener); } /** @@ -93,9 +98,9 @@ IndexMetadata getIndexMetadata( ClusterMetadataManifest.UploadedIndexMetadata uploadedIndexMetadata, String clusterUUID, int manifestCodecVersion ) { RemoteIndexMetadata remoteIndexMetadata = new RemoteIndexMetadata(RemoteClusterStateUtils.getFormattedFileName( - uploadedIndexMetadata.getUploadedFilename(), manifestCodecVersion), clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + uploadedIndexMetadata.getUploadedFilename(), manifestCodecVersion), clusterUUID, blobStoreRepository); try { - return remoteIndexMetadata.read(); + return indexMetadataBlobStore.read(remoteIndexMetadata); } catch (IOException e) { throw new IllegalStateException( String.format(Locale.ROOT, "Error while downloading IndexMetadata - %s", uploadedIndexMetadata.getUploadedFilename()), diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java index 60b847fe344fe..0bd89c73003ed 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteManifestManager.java @@ -20,10 +20,11 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifestBlobStore; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; -import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; import java.io.IOException; import java.util.HashMap; @@ -38,7 +39,6 @@ import org.opensearch.threadpool.ThreadPool; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; -import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.getCusterMetadataBasePath; @@ -58,14 +58,16 @@ public class RemoteManifestManager { private volatile TimeValue metadataManifestUploadTimeout; private final String nodeId; private final ThreadPool threadPool; + private final RemoteClusterMetadataManifestBlobStore manifestBlobStore; private static final Logger logger = LogManager.getLogger(RemoteManifestManager.class); - RemoteManifestManager(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, ClusterSettings clusterSettings, String nodeId, ThreadPool threadPool) { + RemoteManifestManager(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, ClusterSettings clusterSettings, String nodeId, ThreadPool threadPool, String clusterName) { this.blobStoreTransferService = blobStoreTransferService; this.blobStoreRepository = blobStoreRepository; this.metadataManifestUploadTimeout = clusterSettings.get(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING); this.nodeId = nodeId; this.threadPool = threadPool; + manifestBlobStore = new RemoteClusterMetadataManifestBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout); } @@ -124,8 +126,8 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust logger.trace(String.format(Locale.ROOT, "Manifest file uploaded successfully.")); }, ex -> { exceptionReference.set(ex); }), latch); - RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest(uploadManifest, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - remoteClusterMetadataManifest.writeAsync(completionListener); + RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest(uploadManifest, clusterUUID, blobStoreRepository); + manifestBlobStore.writeAsync(remoteClusterMetadataManifest, completionListener); try { if (latch.await(getMetadataManifestUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) { @@ -188,8 +190,8 @@ ClusterMetadataManifest fetchRemoteClusterMetadataManifest(String clusterName, S throws IllegalStateException { try { String fullBlobName = getManifestFolderPath(clusterName, clusterUUID).buildAsString() + filename; - RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest(fullBlobName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool); - return remoteClusterMetadataManifest.read(); + RemoteClusterMetadataManifest remoteClusterMetadataManifest = new RemoteClusterMetadataManifest(fullBlobName, clusterUUID, blobStoreRepository); + return manifestBlobStore.read(remoteClusterMetadataManifest); } catch (IOException e) { throw new IllegalStateException(String.format(Locale.ROOT, "Error while downloading cluster metadata - %s", filename), e); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java deleted file mode 100644 index 34af997206e84..0000000000000 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteObject.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.gateway.remote; - -import java.io.IOException; -import java.io.InputStream; -import org.opensearch.common.CheckedRunnable; -import org.opensearch.core.action.ActionListener; -import org.opensearch.core.common.bytes.BytesReference; - -public interface RemoteObject { - public T get(); - public String clusterUUID(); - public InputStream serialize() throws IOException; - public T deserialize(InputStream inputStream) throws IOException; - - public void writeAsync(ActionListener listener); - public T read() throws IOException; - public void readAsync(ActionListener listener); - -} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobObject.java b/server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobObject.java new file mode 100644 index 0000000000000..5ab621406bc85 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobObject.java @@ -0,0 +1,70 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; + +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.core.compress.Compressor; +import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.repositories.blobstore.BlobStoreRepository; + +/** + * An extension of {@link RemoteObject} class which caters to the use case of writing to and reading from a blob storage + * + * @param The class type which can be uploaded to or downloaded from a blob storage. + */ +public abstract class AbstractRemoteBlobObject implements RemoteObject { + + protected String blobFileName; + + protected String blobName; + private final BlobStoreRepository blobStoreRepository; + private final String clusterUUID; + + public AbstractRemoteBlobObject(BlobStoreRepository blobStoreRepository, String clusterUUID) { + this.blobStoreRepository = blobStoreRepository; + this.clusterUUID = clusterUUID; + } + + public abstract BlobPathParameters getBlobPathParameters(); + + public String getFullBlobName() { + return blobName; + } + + public String getBlobFileName() { + if (blobFileName == null) { + String[] pathTokens = blobName.split(PATH_DELIMITER); + blobFileName = pathTokens[pathTokens.length - 1]; + } + return blobFileName; + } + + public abstract String generateBlobFileName(); + + public String clusterUUID() { + return clusterUUID; + } + + public abstract UploadedMetadata getUploadedMetadata(); + + protected void setFullBlobName(BlobPath blobPath) { + this.blobName = blobPath.buildAsString() + blobFileName; + } + + protected Compressor getCompressor() { + return blobStoreRepository.getCompressor(); + } + + protected BlobStoreRepository getBlobStoreRepository() { + return this.blobStoreRepository; + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java b/server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobStore.java similarity index 54% rename from server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java rename to server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobStore.java index b826e8999f877..d873fc4eed05a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/AbstractRemoteBlobStoreObject.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/AbstractRemoteBlobStore.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; @@ -14,24 +14,28 @@ import java.io.InputStream; import java.util.Arrays; import java.util.concurrent.ExecutorService; -import org.opensearch.common.CheckedRunnable; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.core.action.ActionListener; -import org.opensearch.core.compress.Compressor; -import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.threadpool.ThreadPool; -public abstract class AbstractRemoteBlobStoreObject implements RemoteObject { +/** + * Abstract class for a blob type storage + * + * @param The entity which can be uploaded to / downloaded from blob store + * @param The concrete class implementing {@link RemoteObject} which is used as a wrapper for T entity. + */ +public class AbstractRemoteBlobStore> implements RemoteObjectStore { private final BlobStoreTransferService transferService; private final BlobStoreRepository blobStoreRepository; private final String clusterName; private final ExecutorService executorService; - public AbstractRemoteBlobStoreObject(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, + public AbstractRemoteBlobStore(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { this.transferService = blobStoreTransferService; this.blobStoreRepository = blobStoreRepository; @@ -39,61 +43,51 @@ public AbstractRemoteBlobStoreObject(BlobStoreTransferService blobStoreTransferS this.executorService = threadPool.executor(ThreadPool.Names.GENERIC); } - public abstract BlobPathParameters getBlobPathParameters(); - - public abstract String getFullBlobName(); - - public String getBlobFileName() { - if (getFullBlobName() == null) { - generateBlobFileName(); - } - String[] pathTokens = getFullBlobName().split(PATH_DELIMITER); - return pathTokens[pathTokens.length - 1]; - } - - public abstract String generateBlobFileName(); - - public abstract UploadedMetadata getUploadedMetadata(); - @Override - public void writeAsync(ActionListener listener) { - assert get() != null; + public void writeAsync(RemoteObject obj, ActionListener listener) { + assert obj instanceof AbstractRemoteBlobObject; + AbstractRemoteBlobObject blobStoreObj = (AbstractRemoteBlobObject) obj; + assert blobStoreObj.get() != null; try { - InputStream inputStream = serialize(); - transferService.uploadBlob(inputStream, getBlobPathForUpload(), getBlobFileName(), WritePriority.URGENT, listener); + InputStream inputStream = obj.serialize(); + BlobPath blobPath = getBlobPathForUpload(blobStoreObj); + blobStoreObj.setFullBlobName(blobPath); + transferService.uploadBlob(inputStream, getBlobPathForUpload(blobStoreObj), blobStoreObj.getBlobFileName(), WritePriority.URGENT, listener); } catch (Exception e) { listener.onFailure(e); } } @Override - public T read() throws IOException { - assert getFullBlobName() != null; - return deserialize( - transferService.downloadBlob(getBlobPathForDownload(), getBlobFileName())); + public T read(RemoteObject obj) throws IOException { + assert obj instanceof AbstractRemoteBlobObject; + AbstractRemoteBlobObject blobStoreObj = (AbstractRemoteBlobObject) obj; + assert blobStoreObj.getFullBlobName() != null; + return blobStoreObj.deserialize( + transferService.downloadBlob(getBlobPathForDownload(blobStoreObj), blobStoreObj.getBlobFileName())); } @Override - public void readAsync(ActionListener listener) { + public void readAsync(RemoteObject obj, ActionListener listener) { executorService.execute(() -> { try { - listener.onResponse(read()); + listener.onResponse(read(obj)); } catch (Exception e) { listener.onFailure(e); } }); } - public BlobPath getBlobPathForUpload() { - BlobPath blobPath = blobStoreRepository.basePath().add(RemoteClusterStateUtils.encodeString(clusterName)).add("cluster-state").add(clusterUUID()); - for (String token : getBlobPathParameters().getPathTokens()) { + private BlobPath getBlobPathForUpload(AbstractRemoteBlobObject obj) { + BlobPath blobPath = blobStoreRepository.basePath().add(RemoteClusterStateUtils.encodeString(clusterName)).add("cluster-state").add(obj.clusterUUID()); + for (String token : obj.getBlobPathParameters().getPathTokens()) { blobPath = blobPath.add(token); } return blobPath; } - public BlobPath getBlobPathForDownload() { - String[] pathTokens = extractBlobPathTokens(getFullBlobName()); + private BlobPath getBlobPathForDownload(AbstractRemoteBlobObject obj) { + String[] pathTokens = extractBlobPathTokens(obj.getFullBlobName()); BlobPath blobPath = new BlobPath(); for (String token : pathTokens) { blobPath = blobPath.add(token); @@ -101,14 +95,6 @@ public BlobPath getBlobPathForDownload() { return blobPath; } - protected Compressor getCompressor() { - return blobStoreRepository.getCompressor(); - } - - protected BlobStoreRepository getBlobStoreRepository() { - return this.blobStoreRepository; - } - private static String[] extractBlobPathTokens(String blobName) { String[] blobNameTokens = blobName.split(PATH_DELIMITER); return Arrays.copyOfRange(blobNameTokens, 0, blobNameTokens.length - 1); diff --git a/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java b/server/src/main/java/org/opensearch/gateway/remote/model/BlobPathParameters.java similarity index 74% rename from server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java rename to server/src/main/java/org/opensearch/gateway/remote/model/BlobPathParameters.java index 4b47bd744ef1a..cc7a2c18b8792 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/BlobPathParameters.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/BlobPathParameters.java @@ -6,14 +6,18 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import java.util.List; +/** + * Parameters which can be used to construct a blob path + * + */ public class BlobPathParameters { - private List pathTokens; - private String filePrefix; + private final List pathTokens; + private final String filePrefix; public BlobPathParameters(List pathTokens, String filePrefix) { this.pathTokens = pathTokens; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterBlocks.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java similarity index 71% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteClusterBlocks.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java index 8239ca76d3b6b..f3906321301c9 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterBlocks.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocks.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -19,13 +19,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteClusterBlocks extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link ClusterBlocks} to/from remote blob store + */ +public class RemoteClusterBlocks extends AbstractRemoteBlobObject { public static final String CLUSTER_BLOCKS = "blocks"; public static final ChecksumBlobStoreFormat CLUSTER_BLOCKS_FORMAT = new ChecksumBlobStoreFormat<>( @@ -36,24 +38,17 @@ public class RemoteClusterBlocks extends AbstractRemoteBlobStoreObject______ @@ -76,8 +66,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -92,10 +81,6 @@ public ClusterBlocks get() { return clusterBlocks; } - @Override - public String clusterUUID() { - return clusterUUID; - } @Override public InputStream serialize() throws IOException { diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksBlobStore.java new file mode 100644 index 0000000000000..31aede0b78dda --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterBlocksBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.block.ClusterBlocks; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link ClusterBlocks} to/from blob store + */ +public class RemoteClusterBlocksBlobStore extends AbstractRemoteBlobStore { + + public RemoteClusterBlocksBlobStore(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, + ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterMetadataManifest.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java similarity index 83% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteClusterMetadataManifest.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java index 54e160e961671..5b4e10e005be2 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterMetadataManifest.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifest.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -14,15 +14,18 @@ import java.io.InputStream; import java.util.List; import org.opensearch.common.io.Streams; +import org.opensearch.gateway.remote.ClusterMetadataManifest; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteClusterMetadataManifest extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link ClusterMetadataManifest} to/from remote blob store + */ +public class RemoteClusterMetadataManifest extends AbstractRemoteBlobObject { public static final String MANIFEST_PATH_TOKEN = "manifest"; public static final int SPLITTED_MANIFEST_FILE_LENGTH = 6; @@ -52,23 +55,17 @@ public class RemoteClusterMetadataManifest extends AbstractRemoteBlobStoreObject ); private ClusterMetadataManifest clusterMetadataManifest; - private String blobName; - private final String clusterUUID; - public RemoteClusterMetadataManifest(ClusterMetadataManifest clusterMetadataManifest, String clusterUUID, BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteClusterMetadataManifest(ClusterMetadataManifest clusterMetadataManifest, String clusterUUID, + BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.clusterMetadataManifest = clusterMetadataManifest; - this.clusterUUID = clusterUUID; } - public RemoteClusterMetadataManifest(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteClusterMetadataManifest(String blobName, String clusterUUID, + BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.blobName = blobName; - this.clusterUUID = clusterUUID; } @Override @@ -76,11 +73,6 @@ public BlobPathParameters getBlobPathParameters() { return new BlobPathParameters(List.of(MANIFEST_PATH_TOKEN), MANIFEST_FILE_PREFIX); } - @Override - public String getFullBlobName() { - return blobName; - } - @Override public String generateBlobFileName() { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest______C/P____ @@ -95,8 +87,7 @@ public String generateBlobFileName() { String.valueOf(clusterMetadataManifest.getCodecVersion()) // Keep the codec version at last place only, during read we reads last place to // determine codec version. ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -110,11 +101,6 @@ public ClusterMetadataManifest get() { return clusterMetadataManifest; } - @Override - public String clusterUUID() { - return clusterUUID; - } - @Override public InputStream serialize() throws IOException { return CLUSTER_METADATA_MANIFEST_FORMAT.serialize(clusterMetadataManifest, generateBlobFileName(), getCompressor(), diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestBlobStore.java new file mode 100644 index 0000000000000..09ed9046e9087 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteClusterMetadataManifestBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link ClusterMetadataManifest} to/from blob store + */ +public class RemoteClusterMetadataManifestBlobStore extends AbstractRemoteBlobStore{ + + public RemoteClusterMetadataManifestBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteCoordinationMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java similarity index 73% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteCoordinationMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java index d3d53aab9a4b8..76f97cd98c796 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteCoordinationMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -19,13 +19,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteCoordinationMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link CoordinationMetadata} to/from remote blob store + */ +public class RemoteCoordinationMetadata extends AbstractRemoteBlobObject { public static final String COORDINATION_METADATA = "coordination"; public static final ChecksumBlobStoreFormat COORDINATION_METADATA_FORMAT = new ChecksumBlobStoreFormat<>( @@ -36,22 +38,16 @@ public class RemoteCoordinationMetadata extends AbstractRemoteBlobStoreObject______ @@ -74,8 +65,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -84,11 +74,6 @@ public CoordinationMetadata get() { return coordinationMetadata; } - @Override - public String clusterUUID() { - return clusterUUID; - } - @Override public InputStream serialize() throws IOException { return COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataBlobStore.java new file mode 100644 index 0000000000000..1790d97ccca2f --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link CoordinationMetadata} to/from blob store + */ +public class RemoteCoordinationMetadataBlobStore extends AbstractRemoteBlobStore { + + public RemoteCoordinationMetadataBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteCustomMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java similarity index 75% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteCustomMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java index f70a99f2fa21c..81e7794fc6949 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteCustomMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -20,13 +20,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteCustomMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link Custom} to/from remote blob store + */ +public class RemoteCustomMetadata extends AbstractRemoteBlobObject { public static final String CUSTOM_METADATA = "custom"; public static final String CUSTOM_DELIMITER = "--"; @@ -41,17 +43,13 @@ public class RemoteCustomMetadata extends AbstractRemoteBlobStoreObject private Custom custom; private final String customType; private long metadataVersion; - private String blobName; - private final String clusterUUID; - public RemoteCustomMetadata(Custom custom, String customType, long metadataVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteCustomMetadata(Custom custom, String customType, long metadataVersion, String clusterUUID, + BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.custom = custom; this.customType = customType; this.metadataVersion = metadataVersion; - this.clusterUUID = clusterUUID; this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( "custom", METADATA_NAME_FORMAT, @@ -59,13 +57,11 @@ public RemoteCustomMetadata(Custom custom, String customType, long metadataVersi ); } - public RemoteCustomMetadata(String blobName, String customType, String clusterUUID, BlobStoreTransferService blobStoreTransferService, - BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteCustomMetadata(String blobName, String customType, String clusterUUID, + BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.blobName = blobName; this.customType = customType; - this.clusterUUID = clusterUUID; this.customBlobStoreFormat = new ChecksumBlobStoreFormat<>( "custom", METADATA_NAME_FORMAT, @@ -79,11 +75,6 @@ public BlobPathParameters getBlobPathParameters() { return new BlobPathParameters(List.of("global-metadata"), prefix); } - @Override - public String getFullBlobName() { - return blobName; - } - @Override public String generateBlobFileName() { // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ @@ -95,8 +86,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -105,11 +95,6 @@ public Custom get() { return custom; } - @Override - public String clusterUUID() { - return clusterUUID; - } - @Override public InputStream serialize() throws IOException { return customBlobStoreFormat.serialize(custom, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataBlobStore.java new file mode 100644 index 0000000000000..1d24e014433d2 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.Metadata.Custom; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link Custom} to/from blob store + */ +public class RemoteCustomMetadataBlobStore extends AbstractRemoteBlobStore { + + public RemoteCustomMetadataBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteDiscoveryNodes.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java similarity index 72% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteDiscoveryNodes.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java index cfa74f683ac4f..568fce6ac0007 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteDiscoveryNodes.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodes.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateAttributesManager.CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; @@ -19,13 +19,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteDiscoveryNodes extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link DiscoveryNodes} to/from remote blob store + */ +public class RemoteDiscoveryNodes extends AbstractRemoteBlobObject { public static final String DISCOVERY_NODES = "nodes"; public static final ChecksumBlobStoreFormat DISCOVERY_NODES_FORMAT = new ChecksumBlobStoreFormat<>( @@ -36,22 +38,16 @@ public class RemoteDiscoveryNodes extends AbstractRemoteBlobStoreObject______ @@ -74,8 +65,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -90,11 +80,6 @@ public DiscoveryNodes get() { return discoveryNodes; } - @Override - public String clusterUUID() { - return clusterUUID; - } - @Override public InputStream serialize() throws IOException { return DISCOVERY_NODES_FORMAT.serialize(discoveryNodes, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesBlobStore.java new file mode 100644 index 0000000000000..68da66665a73f --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteDiscoveryNodesBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link DiscoveryNodes} to/from blob store + */ +public class RemoteDiscoveryNodesBlobStore extends AbstractRemoteBlobStore { + + public RemoteDiscoveryNodesBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java similarity index 71% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java index b177747c8a249..983bc81f6c455 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteIndexMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteIndexMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_PLAIN_FORMAT; @@ -18,13 +18,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteIndexMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link IndexMetadata} to/from remote blob store + */ +public class RemoteIndexMetadata extends AbstractRemoteBlobObject { public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1; @@ -36,23 +38,15 @@ public class RemoteIndexMetadata extends AbstractRemoteBlobStoreObject { + public RemoteIndexMetadataBlobStore(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, + ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObject.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObject.java new file mode 100644 index 0000000000000..9fb4738ff0efa --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObject.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import java.io.IOException; +import java.io.InputStream; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; + +/** + * An interface to read/write and object from/to a remote storage. This interface is agnostic of the remote storage type. + * + * @param The object type which can be upload to or download from remote storage. + */ +public interface RemoteObject { + + /** + * @return The entity T contained within this class + */ + public T get(); + + /** + * @return An InputStream created by serializing the entity T + * @throws IOException Exception encountered while serialization + */ + public InputStream serialize() throws IOException; + + /** + * @param inputStream The InputStream which is used to read the serialized entity + * @return The entity T after deserialization + * @throws IOException Exception encountered while deserialization + */ + public T deserialize(InputStream inputStream) throws IOException; + +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObjectStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObjectStore.java new file mode 100644 index 0000000000000..b8ed8ebb7fa0c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteObjectStore.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import java.io.IOException; +import org.opensearch.core.action.ActionListener; + +/** + * An interface to read/write an object from/to a remote storage. This interface is agnostic of the remote storage type. + * + * @param The object type which can be uploaded to or downloaded from remote storage. + */ +public interface RemoteObjectStore> { + + public void writeAsync(RemoteObject obj, ActionListener listener); + + public T read(RemoteObject obj) throws IOException; + + public void readAsync(RemoteObject obj, ActionListener listener); +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsBlobStore.java new file mode 100644 index 0000000000000..ff63cd528fc2c --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.settings.Settings; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download persistent {@link Settings} to/from blob store + */ +public class RemotePersistentSettingsBlobStore extends AbstractRemoteBlobStore { + + public RemotePersistentSettingsBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistentSettingsMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java similarity index 71% rename from server/src/main/java/org/opensearch/gateway/remote/RemotePersistentSettingsMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java index f23cd6dcc1eb8..7dcbac745b49a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemotePersistentSettingsMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -19,13 +19,15 @@ import org.opensearch.common.settings.Settings; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemotePersistentSettingsMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading persistent {@link Settings} to/from remote blob store + */ +public class RemotePersistentSettingsMetadata extends AbstractRemoteBlobObject { public static final String SETTING_METADATA = "settings"; @@ -37,22 +39,16 @@ public class RemotePersistentSettingsMetadata extends AbstractRemoteBlobStoreObj private Settings persistentSettings; private long metadataVersion; - private String blobName; - private final String clusterUUID; - public RemotePersistentSettingsMetadata(Settings settings, long metadataVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemotePersistentSettingsMetadata(Settings settings, long metadataVersion, String clusterUUID, BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.persistentSettings = settings; this.metadataVersion = metadataVersion; - this.clusterUUID = clusterUUID; } - public RemotePersistentSettingsMetadata(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemotePersistentSettingsMetadata(String blobName, String clusterUUID, BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.blobName = blobName; - this.clusterUUID = clusterUUID; } @Override @@ -60,11 +56,6 @@ public BlobPathParameters getBlobPathParameters() { return new BlobPathParameters(List.of("global-metadata"), SETTING_METADATA); } - @Override - public String getFullBlobName() { - return blobName; - } - @Override public String generateBlobFileName() { String blobFileName = String.join( @@ -74,8 +65,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -84,15 +74,10 @@ public Settings get() { return persistentSettings; } - @Override - public String clusterUUID() { - return clusterUUID; - } - - @Override public InputStream serialize() throws IOException { - return SETTINGS_METADATA_FORMAT.serialize(persistentSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + return SETTINGS_METADATA_FORMAT.serialize(persistentSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) + .streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteReadResult.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java similarity index 87% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteReadResult.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java index dea8a6221d05a..adee09eaeffef 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteReadResult.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteReadResult.java @@ -6,11 +6,15 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import org.opensearch.core.xcontent.ToXContent; +/** + * Container class for entity read from remote store + */ public class RemoteReadResult { + ToXContent obj; String component; String componentName; diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteTemplatesMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java similarity index 70% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteTemplatesMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java index 9c302993536a9..363db7fcabf2a 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteTemplatesMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -19,13 +19,15 @@ import org.opensearch.common.io.Streams; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteTemplatesMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading {@link TemplatesMetadata} to/from remote blob store + */ +public class RemoteTemplatesMetadata extends AbstractRemoteBlobObject { public static final String TEMPLATES_METADATA = "templates"; @@ -36,21 +38,16 @@ public class RemoteTemplatesMetadata extends AbstractRemoteBlobStoreObject______ + // 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/global-metadata/______ + // String blobFileName = String.join( DELIMITER, getBlobPathParameters().getFilePrefix(), @@ -73,8 +66,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -83,14 +75,10 @@ public TemplatesMetadata get() { return templatesMetadata; } - @Override - public String clusterUUID() { - return clusterUUID; - } - @Override public InputStream serialize() throws IOException { - return TEMPLATES_METADATA_FORMAT.serialize(templatesMetadata, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + return TEMPLATES_METADATA_FORMAT.serialize(templatesMetadata, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) + .streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataBlobStore.java new file mode 100644 index 0000000000000..94a0b4f1bf365 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download {@link TemplatesMetadata} to/from blob store + */ +public class RemoteTemplatesMetadataBlobStore extends AbstractRemoteBlobStore { + + public RemoteTemplatesMetadataBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsBlobStore.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsBlobStore.java new file mode 100644 index 0000000000000..5004cef8a9c66 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsBlobStore.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway.remote.model; + +import org.opensearch.common.settings.Settings; +import org.opensearch.index.translog.transfer.BlobStoreTransferService; +import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; + +/** + * An implementation of {@link RemoteObjectStore} which is used to upload/download transient {@link Settings} to/from blob store + */ +public class RemoteTransientSettingsBlobStore extends AbstractRemoteBlobStore { + + public RemoteTransientSettingsBlobStore(BlobStoreTransferService blobStoreTransferService, + BlobStoreRepository blobStoreRepository, String clusterName, ThreadPool threadPool) { + super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteTransientSettingsMetadata.java b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java similarity index 70% rename from server/src/main/java/org/opensearch/gateway/remote/RemoteTransientSettingsMetadata.java rename to server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java index 6b3f072473ccd..ee0116a54bbae 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteTransientSettingsMetadata.java +++ b/server/src/main/java/org/opensearch/gateway/remote/model/RemoteTransientSettingsMetadata.java @@ -6,7 +6,7 @@ * compatible open source license. */ -package org.opensearch.gateway.remote; +package org.opensearch.gateway.remote.model; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_NAME_FORMAT; @@ -19,13 +19,15 @@ import org.opensearch.common.settings.Settings; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.RemoteClusterStateUtils; import org.opensearch.index.remote.RemoteStoreUtils; -import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.ChecksumBlobStoreFormat; -import org.opensearch.threadpool.ThreadPool; -public class RemoteTransientSettingsMetadata extends AbstractRemoteBlobStoreObject { +/** + * Wrapper class for uploading/downloading transient {@link Settings} to/from remote blob store + */ +public class RemoteTransientSettingsMetadata extends AbstractRemoteBlobObject { public static final String TRANSIENT_SETTING_METADATA = "transient-settings"; @@ -37,22 +39,16 @@ public class RemoteTransientSettingsMetadata extends AbstractRemoteBlobStoreObje private Settings transientSettings; private long metadataVersion; - private String blobName; - private final String clusterUUID; - public RemoteTransientSettingsMetadata(Settings transientSettings, long metadataVersion, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteTransientSettingsMetadata(Settings transientSettings, long metadataVersion, String clusterUUID, BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.transientSettings = transientSettings; this.metadataVersion = metadataVersion; - this.clusterUUID = clusterUUID; } - public RemoteTransientSettingsMetadata(String blobName, String clusterUUID, BlobStoreTransferService blobStoreTransferService, BlobStoreRepository blobStoreRepository, String clusterName, - ThreadPool threadPool) { - super(blobStoreTransferService, blobStoreRepository, clusterName, threadPool); + public RemoteTransientSettingsMetadata(String blobName, String clusterUUID, BlobStoreRepository blobStoreRepository) { + super(blobStoreRepository, clusterUUID); this.blobName = blobName; - this.clusterUUID = clusterUUID; } @Override @@ -60,11 +56,6 @@ public BlobPathParameters getBlobPathParameters() { return new BlobPathParameters(List.of("global-metadata"), TRANSIENT_SETTING_METADATA); } - @Override - public String getFullBlobName() { - return blobName; - } - @Override public String generateBlobFileName() { String blobFileName = String.join( @@ -74,8 +65,7 @@ public String generateBlobFileName() { RemoteStoreUtils.invertLong(System.currentTimeMillis()), String.valueOf(GLOBAL_METADATA_CURRENT_CODEC_VERSION) ); - // setting the full blob path with name for future access - this.blobName = getBlobPathForUpload().buildAsString() + blobFileName; + this.blobFileName = blobFileName; return blobFileName; } @@ -84,15 +74,10 @@ public Settings get() { return transientSettings; } - @Override - public String clusterUUID() { - return clusterUUID; - } - - @Override public InputStream serialize() throws IOException { - return SETTINGS_METADATA_FORMAT.serialize(transientSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS).streamInput(); + return SETTINGS_METADATA_FORMAT.serialize(transientSettings, generateBlobFileName(), getCompressor(), RemoteClusterStateUtils.FORMAT_PARAMS) + .streamInput(); } @Override diff --git a/server/src/main/java/org/opensearch/gateway/remote/model/package-info.java b/server/src/main/java/org/opensearch/gateway/remote/model/package-info.java new file mode 100644 index 0000000000000..c0d13d15cc885 --- /dev/null +++ b/server/src/main/java/org/opensearch/gateway/remote/model/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package containing models for remote cluster state + */ +package org.opensearch.gateway.remote.model; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java index 0918236638016..a9b4b67d0e215 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateCleanupManagerTests.java @@ -36,8 +36,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Locale; @@ -46,7 +44,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import java.util.stream.Collectors; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; @@ -69,9 +66,9 @@ import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_PATH_TOKEN; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN; -import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX; -import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index a79205f174c68..4f0bd74a3c6c9 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -36,7 +36,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.common.util.concurrent.AbstractAsyncTask; import org.opensearch.core.ParseField; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesArray; @@ -45,8 +44,10 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; +import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.index.remote.RemoteIndexPathUploader; -import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; @@ -76,7 +77,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Supplier; @@ -95,7 +95,7 @@ import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.COORDINATION_METADATA; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA; import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA; -import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_CURRENT_CODEC_VERSION; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT; diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataTests.java index 2cae382458c83..4c12ba92b7bcc 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteIndexMetadataTests.java @@ -15,8 +15,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata.COMPONENT_PREFIX; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_METADATA_CURRENT_CODEC_VERSION; -import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN; import java.io.IOException; import java.io.InputStream; @@ -31,6 +31,8 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.index.Index; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadata; +import org.opensearch.gateway.remote.model.BlobPathParameters; +import org.opensearch.gateway.remote.model.RemoteIndexMetadata; import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -70,41 +72,34 @@ public void tearDown() throws Exception { public void testGet() { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); assertThat(remoteObjectForUpload.get(), is(indexMetadata)); - RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreRepository); assertThat(remoteObjectForDownload.get(), nullValue()); } public void testClusterUUID() { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); assertThat(remoteObjectForUpload.clusterUUID(), is(clusterUUID)); - RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreRepository); assertThat(remoteObjectForDownload.clusterUUID(), is(clusterUUID)); } public void testFullBlobName() { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); assertThat(remoteObjectForUpload.getFullBlobName(), nullValue()); - RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForDownload = new RemoteIndexMetadata(TEST_BLOB_NAME, clusterUUID, blobStoreRepository); assertThat(remoteObjectForDownload.getFullBlobName(), is(TEST_BLOB_NAME)); } public void testBlobPathParameters() { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); BlobPathParameters params = remoteObjectForUpload.getBlobPathParameters(); assertThat(params.getPathTokens(), is(List.of(INDEX_PATH_TOKEN))); assertThat(params.getFilePrefix(), is("metadata")); @@ -112,8 +107,7 @@ public void testBlobPathParameters() { public void testGenerateBlobFileName() { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); String blobFileName = remoteObjectForUpload.generateBlobFileName(); String[] nameTokens = blobFileName.split(RemoteClusterStateUtils.DELIMITER); assertThat(nameTokens[0], is("metadata")); @@ -124,8 +118,7 @@ public void testGenerateBlobFileName() { public void testGetUploadedMetadata() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); assertThrows(AssertionError.class, remoteObjectForUpload::getUploadedMetadata); try (InputStream inputStream = remoteObjectForUpload.serialize()) { @@ -137,8 +130,7 @@ public void testGetUploadedMetadata() throws IOException { public void testSerDe() throws IOException { IndexMetadata indexMetadata = getIndexMetadata(); - RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreTransferService, blobStoreRepository, - clusterName, threadPool); + RemoteIndexMetadata remoteObjectForUpload = new RemoteIndexMetadata(indexMetadata, clusterUUID, blobStoreRepository); try (InputStream inputStream = remoteObjectForUpload.serialize()) { assertThat(inputStream.available(), greaterThan(0)); IndexMetadata readIndexMetadata = remoteObjectForUpload.deserialize(inputStream); diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java index e0af7808a5853..d1a4b89fe4379 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteManifestManagerTests.java @@ -41,7 +41,7 @@ public class RemoteManifestManagerTests extends OpenSearchTestCase { public void setup() { clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); blobStoreRepository = mock(BlobStoreRepository.class); - remoteManifestManager = new RemoteManifestManager(blobStoreTransferService, blobStoreRepository, clusterSettings, "test-node-id", new TestThreadPool("test")); + remoteManifestManager = new RemoteManifestManager(blobStoreTransferService, blobStoreRepository, clusterSettings, "test-node-id", new TestThreadPool("test"), "test-cluster-name"); blobStoreTransferService = mock(BlobStoreTransferService.class); blobStore = mock(BlobStore.class); when(blobStoreRepository.blobStore()).thenReturn(blobStore);