Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into remote-state-threadpool
Browse files Browse the repository at this point in the history
Signed-off-by: Shivansh Arora <shivansh.arora@protonmail.com>
  • Loading branch information
shiv0408 committed Oct 23, 2023
2 parents fc95922 + 8f13dee commit a636b9b
Show file tree
Hide file tree
Showing 18 changed files with 510 additions and 88 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add repository stats for remote store([#10567](https://github.com/opensearch-project/OpenSearch/pull/10567))
- Add search query categorizer ([#10255](https://github.com/opensearch-project/OpenSearch/pull/10255))
- Introduce ConcurrentQueryProfiler to profile query using concurrent segment search path and support concurrency during rewrite and create weight ([10352](https://github.com/opensearch-project/OpenSearch/pull/10352))
- [Remote cluster state] Make index and global metadata upload timeout dynamic cluster settings ([#10814](https://github.com/opensearch-project/OpenSearch/pull/10814))

### Dependencies
- Bump `com.google.api.grpc:proto-google-common-protos` from 2.10.0 to 2.25.1 ([#10208](https://github.com/opensearch-project/OpenSearch/pull/10208), [#10298](https://github.com/opensearch-project/OpenSearch/pull/10298))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public AcknowledgedResponse createDataStream(String name) throws Exception {
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(name);
AcknowledgedResponse response = client().admin().indices().createDataStream(request).get();
assertThat(response.isAcknowledged(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -67,6 +68,7 @@ public RolloverResponse rolloverDataStream(String name) throws Exception {
RolloverResponse response = client().admin().indices().rolloverIndex(request).get();
assertThat(response.isAcknowledged(), is(true));
assertThat(response.isRolledOver(), is(true));
performRemoteStoreTestAction();
return response;
}

Expand Down Expand Up @@ -109,5 +111,4 @@ public AcknowledgedResponse deleteIndexTemplate(String name) throws Exception {
assertThat(response.isAcknowledged(), is(true));
return response;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.action.admin.indices.datastream.DataStreamRolloverIT;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.opensearch.cluster.ClusterState;
Expand Down Expand Up @@ -111,7 +112,7 @@ public void testFullClusterRestore() throws Exception {
* 8. Add data nodes to recover index data
* 9. Verify Metadata and index data is restored.
*/
public void testFullClusterStateRestore() throws Exception {
public void testFullClusterRestoreDoesntFailWithConflictingLocalState() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
Expand Down Expand Up @@ -159,7 +160,7 @@ public Settings onNodeStopped(String nodeName) {

// start data nodes to trigger index data recovery
internalCluster().startDataOnlyNodes(dataNodeCount);
verifyRestoredData(indexStats, INDEX_NAME);
verifyRedIndicesAndTriggerRestore(indexStats, INDEX_NAME, true);
}

public void testFullClusterRestoreMultipleIndices() throws Exception {
Expand Down Expand Up @@ -291,6 +292,14 @@ private void validateCurrentMetadata() throws Exception {
});
}

public void testDataStreamPostRemoteStateRestore() throws Exception {
new DataStreamRolloverIT() {
protected boolean triggerRemoteStateRestore() {
return true;
}
}.testDataStreamRollover();
}

public void testFullClusterRestoreGlobalMetadata() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.action.admin.cluster.remotestore.stats.RemoteStoreStatsResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.FollowersChecker;
import org.opensearch.cluster.coordination.LeaderChecker;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
Expand All @@ -23,15 +25,20 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.remote.RemoteSegmentTransferTracker;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -44,12 +51,17 @@ public class RemoteStoreStatsIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "remote-store-test-idx-1";

@Before
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
}

public void setup() {
internalCluster().startNodes(3);
}

public void testStatsResponseFromAllNodes() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -118,6 +130,7 @@ public void testStatsResponseFromAllNodes() {
}

public void testStatsResponseAllShards() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -175,6 +188,7 @@ public void testStatsResponseAllShards() {
}

public void testStatsResponseFromLocalNode() {
setup();

// Step 1 - We create cluster, create an index, and then index documents into. We also do multiple refreshes/flushes
// during this time frame. This ensures that the segment upload has started.
Expand Down Expand Up @@ -236,6 +250,7 @@ public void testStatsResponseFromLocalNode() {
}

public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exception {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -325,6 +340,7 @@ public void testDownloadStatsCorrectnessSinglePrimarySingleReplica() throws Exce
}

public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() throws Exception {
setup();
// Scenario:
// - Create index with single primary and N-1 replica shards (N = no of data nodes)
// - Disable Refresh Interval for the index
Expand Down Expand Up @@ -416,6 +432,7 @@ public void testDownloadStatsCorrectnessSinglePrimaryMultipleReplicaShards() thr
}

public void testStatsOnShardRelocation() {
setup();
// Scenario:
// - Create index with single primary and single replica shard
// - Index documents
Expand Down Expand Up @@ -471,6 +488,7 @@ public void testStatsOnShardRelocation() {
}

public void testStatsOnShardUnassigned() throws IOException {
setup();
// Scenario:
// - Create index with single primary and two replica shard
// - Index documents
Expand All @@ -497,6 +515,7 @@ public void testStatsOnShardUnassigned() throws IOException {
}

public void testStatsOnRemoteStoreRestore() throws IOException {
setup();
// Creating an index with primary shard count == total nodes in cluster and 0 replicas
int dataNodeCount = client().admin().cluster().prepareHealth().get().getNumberOfDataNodes();
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, dataNodeCount));
Expand Down Expand Up @@ -544,6 +563,7 @@ public void testStatsOnRemoteStoreRestore() throws IOException {
}

public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exception {
setup();
// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);
Expand Down Expand Up @@ -581,6 +601,58 @@ public void testNonZeroPrimaryStatsOnNewlyCreatedIndexWithZeroDocs() throws Exce
}, 5, TimeUnit.SECONDS);
}

public void testStatsCorrectnessOnFailover() {
Settings clusterSettings = Settings.builder()
.put(LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING.getKey(), "100ms")
.put(FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING.getKey(), "500ms")
.put(FollowersChecker.FOLLOWER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
.put(nodeSettings(0))
.build();
String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(clusterSettings);
internalCluster().startDataOnlyNodes(2, clusterSettings);

// Create an index with one primary and one replica shard
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 1));
ensureGreen(INDEX_NAME);

// Index some docs and refresh
indexDocs();
refresh(INDEX_NAME);

String primaryNode = primaryNodeName(INDEX_NAME);
String replicaNode = replicaNodeName(INDEX_NAME);

// Start network disruption - primary node will be isolated
Set<String> nodesInOneSide = Stream.of(clusterManagerNode, replicaNode).collect(Collectors.toCollection(HashSet::new));
Set<String> nodesInOtherSide = Stream.of(primaryNode).collect(Collectors.toCollection(HashSet::new));
NetworkDisruption networkDisruption = new NetworkDisruption(
new NetworkDisruption.TwoPartitions(nodesInOneSide, nodesInOtherSide),
NetworkDisruption.DISCONNECT
);
internalCluster().setDisruptionScheme(networkDisruption);
logger.info("--> network disruption is started");
networkDisruption.startDisrupting();
ensureStableCluster(2, clusterManagerNode);

RemoteStoreStatsResponse response = client(clusterManagerNode).admin().cluster().prepareRemoteStoreStats(INDEX_NAME, "0").get();
final String indexShardId = String.format(Locale.ROOT, "[%s][%s]", INDEX_NAME, "0");
List<RemoteStoreStats> matches = Arrays.stream(response.getRemoteStoreStats())
.filter(stat -> indexShardId.equals(stat.getSegmentStats().shardId.toString()))
.collect(Collectors.toList());
assertEquals(1, matches.size());
RemoteSegmentTransferTracker.Stats segmentStats = matches.get(0).getSegmentStats();
assertEquals(0, segmentStats.refreshTimeLagMs);

networkDisruption.stopDisrupting();
internalCluster().clearDisruptionScheme();
ensureStableCluster(3, clusterManagerNode);
ensureGreen(INDEX_NAME);
logger.info("Test completed");
}

private void indexDocs() {
for (int i = 0; i < randomIntBetween(5, 10); i++) {
if (randomBoolean()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ public NodeStats(StreamInput in) throws IOException {
} else {
resourceUsageStats = null;
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
if (in.getVersion().onOrAfter(Version.V_2_12_0)) {
segmentReplicationRejectionStats = in.readOptionalWriteable(SegmentReplicationRejectionStats::new);
} else {
segmentReplicationRejectionStats = null;
Expand Down Expand Up @@ -431,6 +430,7 @@ public SegmentReplicationRejectionStats getSegmentReplicationRejectionStats() {
return segmentReplicationRejectionStats;
}

@Nullable
public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}
Expand Down Expand Up @@ -481,8 +481,7 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(resourceUsageStats);
}
// TODO: change to V_2_12_0 on main after backport to 2.x
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
out.writeOptionalWriteable(segmentReplicationRejectionStats);
}
if (out.getVersion().onOrAfter(Version.V_2_12_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,8 @@ public void apply(Settings value, Settings current, Settings previous) {

// Remote cluster state settings
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {

// Settings for remote translog
IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING,

// Settings for remote store enablement
IndexMetadata.INDEX_REMOTE_STORE_ENABLED_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,23 @@ public class RemoteClusterStateService implements Closeable {

private static final Logger logger = LogManager.getLogger(RemoteClusterStateService.class);

// TODO make this two variable as dynamic setting [issue: #10688]
public static final int INDEX_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final int GLOBAL_METADATA_UPLOAD_WAIT_MILLIS = 20000;
public static final TimeValue INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final TimeValue GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT = TimeValue.timeValueMillis(20000);

public static final Setting<TimeValue> INDEX_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.index_metadata.upload_timeout",
INDEX_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final Setting<TimeValue> GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.remote_store.state.global_metadata.upload_timeout",
GLOBAL_METADATA_UPLOAD_TIMEOUT_DEFAULT,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static final ChecksumBlobStoreFormat<IndexMetadata> INDEX_METADATA_FORMAT = new ChecksumBlobStoreFormat<>(
"index-metadata",
Expand Down Expand Up @@ -141,6 +155,9 @@ public class RemoteClusterStateService implements Closeable {
private BlobStoreTransferService blobStoreTransferService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
private volatile TimeValue globalMetadataUploadTimeout;

private final AtomicBoolean deleteStaleMetadataRunning = new AtomicBoolean(false);

public static final int INDEX_METADATA_CURRENT_CODEC_VERSION = 1;
Expand Down Expand Up @@ -171,7 +188,11 @@ public RemoteClusterStateService(
this.relativeTimeNanosSupplier = relativeTimeNanosSupplier;
this.threadpool = threadPool;
this.slowWriteLoggingThreshold = clusterSettings.get(SLOW_WRITE_LOGGING_THRESHOLD);
this.indexMetadataUploadTimeout = clusterSettings.get(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING);
this.globalMetadataUploadTimeout = clusterSettings.get(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING);
clusterSettings.addSettingsUpdateConsumer(SLOW_WRITE_LOGGING_THRESHOLD, this::setSlowWriteLoggingThreshold);
clusterSettings.addSettingsUpdateConsumer(INDEX_METADATA_UPLOAD_TIMEOUT_SETTING, this::setIndexMetadataUploadTimeout);
clusterSettings.addSettingsUpdateConsumer(GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING, this::setGlobalMetadataUploadTimeout);
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -367,7 +388,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException
);

try {
if (latch.await(GLOBAL_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getGlobalMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
// TODO: We should add metrics where transfer is timing out. [Issue: #10687]
GlobalMetadataTransferException ex = new GlobalMetadataTransferException(
String.format(Locale.ROOT, "Timed out waiting for transfer of global metadata to complete")
Expand Down Expand Up @@ -422,7 +443,7 @@ private List<UploadedIndexMetadata> writeIndexMetadataParallel(ClusterState clus
}

try {
if (latch.await(INDEX_METADATA_UPLOAD_WAIT_MILLIS, TimeUnit.MILLISECONDS) == false) {
if (latch.await(getIndexMetadataUploadTimeout().millis(), TimeUnit.MILLISECONDS) == false) {
IndexMetadataTransferException ex = new IndexMetadataTransferException(
String.format(
Locale.ROOT,
Expand Down Expand Up @@ -621,6 +642,22 @@ private void setSlowWriteLoggingThreshold(TimeValue slowWriteLoggingThreshold) {
this.slowWriteLoggingThreshold = slowWriteLoggingThreshold;
}

private void setIndexMetadataUploadTimeout(TimeValue newIndexMetadataUploadTimeout) {
this.indexMetadataUploadTimeout = newIndexMetadataUploadTimeout;
}

private void setGlobalMetadataUploadTimeout(TimeValue newGlobalMetadataUploadTimeout) {
this.globalMetadataUploadTimeout = newGlobalMetadataUploadTimeout;
}

public TimeValue getIndexMetadataUploadTimeout() {
return this.indexMetadataUploadTimeout;
}

public TimeValue getGlobalMetadataUploadTimeout() {
return this.globalMetadataUploadTimeout;
}

static String getManifestFileName(long term, long version, boolean committed) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down Expand Up @@ -732,7 +769,10 @@ public Metadata getLatestMetadata(String clusterName, String clusterUUID) {
// Fetch Index Metadata
Map<String, IndexMetadata> indices = getIndexMetadataMap(clusterName, clusterUUID, clusterMetadataManifest.get());

return Metadata.builder(globalMetadata).indices(indices).build();
Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); });

return Metadata.builder(globalMetadata).indices(indexMetadataMap).build();
}

private Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetadataManifest clusterMetadataManifest) {
Expand Down
Loading

0 comments on commit a636b9b

Please sign in to comment.