Skip to content

Commit

Permalink
Create RemoteObjectStore class and corresponding implementations
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed May 28, 2024
1 parent 14a279d commit 523889c
Show file tree
Hide file tree
Showing 37 changed files with 676 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateUtils;
import org.opensearch.gateway.remote.RemoteReadResult;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStream;
import org.opensearch.gateway.remote.routingtable.IndexRoutingTableInputStreamReader;
import org.opensearch.index.remote.RemoteStoreEnums;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.RemoteClusterStateUtils.RemoteStateTransferException;
import org.opensearch.gateway.remote.model.AbstractRemoteBlobObject;
import org.opensearch.gateway.remote.model.RemoteClusterBlocks;
import org.opensearch.gateway.remote.model.RemoteClusterBlocksBlobStore;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodes;
import org.opensearch.gateway.remote.model.RemoteDiscoveryNodesBlobStore;
import org.opensearch.gateway.remote.model.RemoteReadResult;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;
Expand All @@ -26,18 +32,21 @@ public class RemoteClusterStateAttributesManager {
public static final String CLUSTER_STATE_ATTRIBUTE = "cluster_state_attribute";
public static final String DISCOVERY_NODES = "nodes";
public static final String CLUSTER_BLOCKS = "blocks";
public static final String CUSTOM_PREFIX = "custom";
public static final int CLUSTER_STATE_ATTRIBUTES_CURRENT_CODEC_VERSION = 1;
private final BlobStoreTransferService blobStoreTransferService;
private final BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;
private final String clusterName;
private final RemoteClusterBlocksBlobStore clusterBlocksBlobStore;
private final RemoteDiscoveryNodesBlobStore discoveryNodesBlobStore;

RemoteClusterStateAttributesManager(BlobStoreTransferService blobStoreTransferService, BlobStoreRepository repository, ThreadPool threadPool, String clusterName) {
this.blobStoreTransferService = blobStoreTransferService;
this.blobStoreRepository = repository;
this.threadPool = threadPool;
this.clusterName = clusterName;
this.clusterBlocksBlobStore = new RemoteClusterBlocksBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
this.discoveryNodesBlobStore = new RemoteDiscoveryNodesBlobStore(blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
}

/**
Expand All @@ -49,45 +58,41 @@ CheckedRunnable<IOException> getAsyncMetadataWriteAction(
ToXContent componentData,
LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener
) {
AbstractRemoteBlobStoreObject remoteObject = getRemoteObject(componentData, clusterState.version(), clusterState.metadata().clusterUUID());
ActionListener<Void> completionListener = ActionListener.wrap(
resp -> latchedActionListener.onResponse(
remoteObject.getUploadedMetadata()
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex))
);
return () -> remoteObject.writeAsync(completionListener);
}

private AbstractRemoteBlobStoreObject getRemoteObject(ToXContent componentData, long stateVersion, String clusterUUID) {
if (componentData instanceof DiscoveryNodes) {
return new RemoteDiscoveryNodes((DiscoveryNodes)componentData, stateVersion, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
RemoteDiscoveryNodes remoteObject = new RemoteDiscoveryNodes((DiscoveryNodes)componentData, clusterState.version(), clusterState.metadata().clusterUUID(), blobStoreRepository);
return () -> discoveryNodesBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else if (componentData instanceof ClusterBlocks) {
return new RemoteClusterBlocks((ClusterBlocks) componentData, stateVersion, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
RemoteClusterBlocks remoteObject = new RemoteClusterBlocks((ClusterBlocks) componentData, clusterState.version(), clusterState.metadata().clusterUUID(), blobStoreRepository);
return () -> clusterBlocksBlobStore.writeAsync(remoteObject, getActionListener(component, remoteObject, latchedActionListener));
} else {
throw new RemoteStateTransferException("Remote object not found for "+ componentData.getClass());
}
}

private ActionListener<Void> getActionListener(String component, AbstractRemoteBlobObject remoteObject, LatchedActionListener<ClusterMetadataManifest.UploadedMetadata> latchedActionListener) {
return ActionListener.wrap(
resp -> latchedActionListener.onResponse(
remoteObject.getUploadedMetadata()
),
ex -> latchedActionListener.onFailure(new RemoteClusterStateUtils.RemoteStateTransferException(component, ex))
);
}

public CheckedRunnable<IOException> getAsyncMetadataReadAction(
String clusterUUID,
String component,
String uploadedFilename,
LatchedActionListener<RemoteReadResult> listener
) {
AbstractRemoteBlobStoreObject remoteObject = getRemoteObject(component, uploadedFilename, clusterUUID);
ActionListener actionListener = ActionListener.wrap(response -> listener.onResponse(new RemoteReadResult((ToXContent) response, CLUSTER_STATE_ATTRIBUTE, component)), listener::onFailure);
return () -> remoteObject.readAsync(actionListener);
}

private AbstractRemoteBlobStoreObject getRemoteObject(String component, String blobName, String clusterUUID) {
if (component.equals(RemoteDiscoveryNodes.DISCOVERY_NODES)) {
return new RemoteDiscoveryNodes(blobName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
RemoteDiscoveryNodes remoteDiscoveryNodes = new RemoteDiscoveryNodes(uploadedFilename, clusterUUID, blobStoreRepository);
return () -> discoveryNodesBlobStore.readAsync(remoteDiscoveryNodes, actionListener);
} else if (component.equals(RemoteClusterBlocks.CLUSTER_BLOCKS)) {
return new RemoteClusterBlocks(blobName, clusterUUID, blobStoreTransferService, blobStoreRepository, clusterName, threadPool);
RemoteClusterBlocks remoteClusterBlocks = new RemoteClusterBlocks(uploadedFilename, clusterUUID, blobStoreRepository);
return () -> clusterBlocksBlobStore.readAsync(remoteClusterBlocks, actionListener);
} else {
throw new RemoteStateTransferException("Remote object not found for "+ component);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,17 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;

import java.io.Closeable;
import java.io.IOException;
import java.sql.Blob;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterMetadataManifest.MANIFEST_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST_FILE_PREFIX;

/**
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteIndexMetadata.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteIndexMetadata.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

Expand Down Expand Up @@ -76,6 +76,7 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.gateway.remote.model.RemoteReadResult;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
Expand Down Expand Up @@ -795,7 +796,7 @@ public void start() {
remoteGlobalMetadataManager = new RemoteGlobalMetadataManager(blobStoreRepository, clusterSettings,threadpool, getBlobStoreTransferService(), clusterName);
remoteIndexMetadataManager = new RemoteIndexMetadataManager(blobStoreRepository, clusterSettings,threadpool, clusterName, getBlobStoreTransferService());
remoteClusterStateAttributesManager = new RemoteClusterStateAttributesManager(getBlobStoreTransferService(), blobStoreRepository, threadpool, clusterName);
remoteManifestManager = new RemoteManifestManager(getBlobStoreTransferService(), blobStoreRepository, clusterSettings, nodeId, threadpool);
remoteManifestManager = new RemoteManifestManager(getBlobStoreTransferService(), blobStoreRepository, clusterSettings, nodeId, threadpool, clusterName);
remoteClusterStateCleanupManager.start();
}

Expand Down Expand Up @@ -916,7 +917,6 @@ private ClusterState readClusterStateInParallel(
for (Map.Entry<String, UploadedMetadataAttribute> entry : customToRead.entrySet()) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterName,
clusterUUID,
CUSTOM_METADATA,
entry.getKey(),
Expand All @@ -929,7 +929,6 @@ private ClusterState readClusterStateInParallel(
if (readCoordinationMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterName,
clusterUUID,
COORDINATION_METADATA,
COORDINATION_METADATA,
Expand All @@ -942,7 +941,6 @@ private ClusterState readClusterStateInParallel(
if (readSettingsMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterName,
clusterUUID,
SETTING_METADATA,
SETTING_METADATA,
Expand All @@ -955,7 +953,6 @@ private ClusterState readClusterStateInParallel(
if (readTransientSettingsMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterName,
clusterUUID,
TRANSIENT_SETTING_METADATA,
TRANSIENT_SETTING_METADATA,
Expand All @@ -968,7 +965,6 @@ private ClusterState readClusterStateInParallel(
if (readTemplatesMetadata) {
asyncMetadataReadActions.add(
remoteGlobalMetadataManager.getAsyncMetadataReadAction(
clusterName,
clusterUUID,
TEMPLATES_METADATA,
TEMPLATES_METADATA,
Expand Down
Loading

0 comments on commit 523889c

Please sign in to comment.