From 32b0e618fdb3dd9bb6c83edf3498891199a43dbe Mon Sep 17 00:00:00 2001 From: Ashish Date: Mon, 9 Jan 2023 12:01:24 +0530 Subject: [PATCH] Integrate remote translog download on failover (#5699) * Integrate remote translog download on failover Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../opensearch/index/shard/IndexShardIT.java | 2 +- .../org/opensearch/index/IndexModule.java | 6 +- .../org/opensearch/index/IndexService.java | 20 ++--- .../opensearch/index/shard/IndexShard.java | 9 +- .../index/translog/RemoteFsTranslog.java | 3 +- .../transfer/FileTransferTracker.java | 23 ++--- .../transfer/TranslogTransferManager.java | 35 ++++---- .../opensearch/indices/IndicesService.java | 30 +++++-- .../opensearch/index/IndexModuleTests.java | 16 +++- .../index/shard/IndexShardTests.java | 63 ++++++++++++++ ...overyWithRemoteTranslogOnPrimaryTests.java | 1 + .../TranslogTransferManagerTests.java | 86 ++++++++++--------- ...teStorePeerRecoverySourceHandlerTests.java | 1 + .../index/shard/IndexShardTestCase.java | 31 ++++++- 15 files changed, 224 insertions(+), 103 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f1264a970998..a883658845936 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -100,6 +100,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Prevent deletion of snapshots that are backing searchable snapshot indexes ([#5069](https://github.com/opensearch-project/OpenSearch/pull/5069)) - Add max_shard_size parameter for shrink API ([#5229](https://github.com/opensearch-project/OpenSearch/pull/5229)) - Added support to apply index create block ([#4603](https://github.com/opensearch-project/OpenSearch/issues/4603)) +- Support request level durability for remote-backed indexes ([#5671](https://github.com/opensearch-project/OpenSearch/issues/5671)) ### Dependencies - Bumps `bcpg-fips` from 1.0.5.1 to 1.0.7.1 diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 959b04b7861a3..d8476e6284d98 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -699,7 +699,7 @@ public static final IndexShard newIndexShard( () -> {}, RetentionLeaseSyncer.EMPTY, cbs, - new InternalTranslogFactory(), + (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, null ); diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index 69543577f48b4..e9125256438a5 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -44,6 +44,7 @@ import org.opensearch.Version; import org.opensearch.client.Client; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedFunction; import org.opensearch.common.TriFunction; @@ -71,6 +72,7 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.remote.directory.RemoteSnapshotDirectoryFactory; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.fielddata.cache.IndicesFieldDataCache; @@ -493,7 +495,7 @@ public IndexService newIndexService( BooleanSupplier idFieldDataEnabled, ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory, - Supplier repositoriesServiceSupplier + BiFunction translogFactorySupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -549,7 +551,7 @@ public IndexService newIndexService( expressionResolver, valuesSourceRegistry, recoveryStateFactory, - repositoriesServiceSupplier + translogFactorySupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 36237b56987e6..78211f12f71ad 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -88,8 +88,6 @@ import org.opensearch.index.shard.ShardPath; import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.Store; -import org.opensearch.index.translog.InternalTranslogFactory; -import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.breaker.CircuitBreakerService; @@ -99,7 +97,6 @@ import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.plugins.IndexStorePlugin; -import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.ValuesSourceRegistry; import org.opensearch.threadpool.ThreadPool; @@ -117,6 +114,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import java.util.function.BooleanSupplier; import java.util.function.Consumer; import java.util.function.Function; @@ -176,7 +174,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final IndexNameExpressionResolver expressionResolver; private final Supplier indexSortSupplier; private final ValuesSourceRegistry valuesSourceRegistry; - private final Supplier repositoriesServiceSupplier; + private final BiFunction translogFactorySupplier; public IndexService( IndexSettings indexSettings, @@ -209,7 +207,7 @@ public IndexService( IndexNameExpressionResolver expressionResolver, ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, - Supplier repositoriesServiceSupplier + BiFunction translogFactorySupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -281,7 +279,7 @@ public IndexService( this.trimTranslogTask = new AsyncTrimTranslogTask(this); this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); - this.repositoriesServiceSupplier = repositoriesServiceSupplier; + this.translogFactorySupplier = translogFactorySupplier; updateFsyncTaskIfNecessary(); } @@ -524,14 +522,6 @@ public synchronized IndexShard createShard( remoteStore = new Store(shardId, this.indexSettings, remoteDirectory, lock, Store.OnClose.EMPTY); } - TranslogFactory translogFactory = this.indexSettings.isRemoteTranslogStoreEnabled() && routing.primary() - ? new RemoteBlobStoreInternalTranslogFactory( - repositoriesServiceSupplier, - threadPool, - this.indexSettings.getRemoteStoreTranslogRepository() - ) - : new InternalTranslogFactory(); - Directory directory = directoryFactory.newDirectory(this.indexSettings, path); store = new Store( shardId, @@ -562,7 +552,7 @@ public synchronized IndexShard createShard( () -> globalCheckpointSyncer.accept(shardId), retentionLeaseSyncer, circuitBreakerService, - translogFactory, + translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore ); diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 60c52e9583821..4be11badd0879 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -207,6 +207,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -321,7 +322,7 @@ Runnable getGlobalCheckpointSyncer() { private final RefreshPendingLocationListener refreshPendingLocationListener; private volatile boolean useRetentionLeasesInPeerRecovery; private final Store remoteStore; - private final TranslogFactory translogFactory; + private final BiFunction translogFactorySupplier; public IndexShard( final ShardRouting shardRouting, @@ -344,7 +345,7 @@ public IndexShard( final Runnable globalCheckpointSyncer, final RetentionLeaseSyncer retentionLeaseSyncer, final CircuitBreakerService circuitBreakerService, - final TranslogFactory translogFactory, + final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore ) throws IOException { @@ -431,7 +432,7 @@ public boolean shouldCache(Query query) { this.refreshPendingLocationListener = new RefreshPendingLocationListener(); this.checkpointPublisher = checkpointPublisher; this.remoteStore = remoteStore; - this.translogFactory = translogFactory; + this.translogFactorySupplier = translogFactorySupplier; } public ThreadPool getThreadPool() { @@ -3420,7 +3421,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro () -> getOperationPrimaryTerm(), tombstoneDocSupplier(), isReadOnlyReplica, - translogFactory + translogFactorySupplier.apply(indexSettings, shardRouting) ); } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 2a8031f99ac14..629b1bebccac8 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -130,8 +130,7 @@ public static TranslogTransferManager buildTranslogTransferManager( return new TranslogTransferManager( new BlobStoreTransferService(blobStoreRepository.blobStore(), executorService), blobStoreRepository.basePath().add(shardId.getIndex().getUUID()).add(String.valueOf(shardId.id())), - fileTransferTracker, - fileTransferTracker::exclusionFilter + fileTransferTracker ); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java index 4e697dae5d236..e950be0993e83 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/FileTransferTracker.java @@ -13,6 +13,7 @@ import org.opensearch.index.translog.transfer.listener.FileTransferListener; import java.util.HashSet; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -32,8 +33,16 @@ public FileTransferTracker(ShardId shardId) { @Override public void onSuccess(TransferFileSnapshot fileSnapshot) { - TransferState targetState = TransferState.SUCCESS; - fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> { + add(fileSnapshot.getName(), TransferState.SUCCESS); + } + + void add(String file, boolean success) { + TransferState targetState = success ? TransferState.SUCCESS : TransferState.FAILED; + add(file, targetState); + } + + private void add(String file, TransferState targetState) { + fileTransferTracker.compute(file, (k, v) -> { if (v == null || v.validateNextState(targetState)) { return targetState; } @@ -43,13 +52,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { @Override public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { - TransferState targetState = TransferState.FAILED; - fileTransferTracker.compute(fileSnapshot.getName(), (k, v) -> { - if (v == null || v.validateNextState(targetState)) { - return targetState; - } - throw new IllegalStateException("Unexpected transfer state " + v + "while setting target to" + targetState); - }); + add(fileSnapshot.getName(), TransferState.FAILED); } public Set exclusionFilter(Set original) { @@ -80,7 +83,7 @@ public boolean validateNextState(TransferState target) { case FAILED: return true; case SUCCESS: - return Set.of(SUCCESS).contains(target); + return Objects.equals(SUCCESS, target); } return false; } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 29cc69ad1da44..6750eedd86180 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -17,7 +17,6 @@ import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.lucene.store.ByteArrayIndexInput; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import java.io.IOException; @@ -33,7 +32,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.function.UnaryOperator; import java.util.stream.Collectors; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -48,9 +46,8 @@ public class TranslogTransferManager { private final TransferService transferService; private final BlobPath remoteBaseTransferPath; - private final BlobPath remoteMetadaTransferPath; - private final FileTransferListener fileTransferListener; - private final UnaryOperator> exclusionFilter; + private final BlobPath remoteMetadataTransferPath; + private final FileTransferTracker fileTransferTracker; private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; @@ -61,14 +58,12 @@ public class TranslogTransferManager { public TranslogTransferManager( TransferService transferService, BlobPath remoteBaseTransferPath, - FileTransferListener fileTransferListener, - UnaryOperator> exclusionFilter + FileTransferTracker fileTransferTracker ) { this.transferService = transferService; this.remoteBaseTransferPath = remoteBaseTransferPath; - this.remoteMetadaTransferPath = remoteBaseTransferPath.add(METADATA_DIR); - this.fileTransferListener = fileTransferListener; - this.exclusionFilter = exclusionFilter; + this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); + this.fileTransferTracker = fileTransferTracker; } public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTransferListener translogTransferListener) @@ -76,8 +71,8 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans List exceptionList = new ArrayList<>(transferSnapshot.getTranslogTransferMetadata().getCount()); Set toUpload = new HashSet<>(transferSnapshot.getTranslogTransferMetadata().getCount()); try { - toUpload.addAll(exclusionFilter.apply(transferSnapshot.getTranslogFileSnapshots())); - toUpload.addAll(exclusionFilter.apply(transferSnapshot.getCheckpointFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter(transferSnapshot.getTranslogFileSnapshots())); + toUpload.addAll(fileTransferTracker.exclusionFilter((transferSnapshot.getCheckpointFileSnapshots()))); if (toUpload.isEmpty()) { logger.trace("Nothing to upload for transfer"); translogTransferListener.onUploadComplete(transferSnapshot); @@ -85,7 +80,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans } final CountDownLatch latch = new CountDownLatch(toUpload.size()); LatchedActionListener latchedActionListener = new LatchedActionListener<>( - ActionListener.wrap(fileTransferListener::onSuccess, ex -> { + ActionListener.wrap(fileTransferTracker::onSuccess, ex -> { assert ex instanceof FileTransferException; logger.error( () -> new ParameterizedMessage( @@ -95,7 +90,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans ex ); FileTransferException e = (FileTransferException) ex; - fileTransferListener.onFailure(e.getFileSnapshot(), ex); + fileTransferTracker.onFailure(e.getFileSnapshot(), ex); exceptionList.add(ex); }), latch @@ -119,7 +114,7 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans throw ex; } if (exceptionList.isEmpty()) { - transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadaTransferPath); + transferService.uploadBlob(prepareMetadata(transferSnapshot), remoteMetadataTransferPath); translogTransferListener.onUploadComplete(transferSnapshot); return true; } else { @@ -160,14 +155,16 @@ private void downloadToFS(String fileName, Path location, String primaryTerm) th try (InputStream inputStream = transferService.downloadBlob(remoteBaseTransferPath.add(primaryTerm), fileName)) { Files.copy(inputStream, filePath); } + // Mark in FileTransferTracker so that the same files are not uploaded at the time of translog sync + fileTransferTracker.add(fileName, true); } public TranslogTransferMetadata readMetadata() throws IOException { - return transferService.listAll(remoteMetadaTransferPath) + return transferService.listAll(remoteMetadataTransferPath) .stream() .max(TranslogTransferMetadata.METADATA_FILENAME_COMPARATOR) .map(filename -> { - try (InputStream inputStream = transferService.downloadBlob(remoteMetadaTransferPath, filename);) { + try (InputStream inputStream = transferService.downloadBlob(remoteMetadataTransferPath, filename);) { IndexInput indexInput = new ByteArrayIndexInput("metadata file", inputStream.readAllBytes()); return new TranslogTransferMetadata(indexInput); } catch (IOException e) { @@ -191,12 +188,10 @@ private TransferFileSnapshot prepareMetadata(TransferSnapshot transferSnapshot) ); TranslogTransferMetadata translogTransferMetadata = transferSnapshot.getTranslogTransferMetadata(); translogTransferMetadata.setGenerationToPrimaryTermMapper(new HashMap<>(generationPrimaryTermMap)); - TransferFileSnapshot fileSnapshot = new TransferFileSnapshot( + return new TransferFileSnapshot( translogTransferMetadata.getFileName(), translogTransferMetadata.createMetadataBytes(), translogTransferMetadata.getPrimaryTerm() ); - - return fileSnapshot; } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 2946411fc9238..e015bd11b11db 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -132,6 +132,9 @@ import org.opensearch.index.shard.IndexingOperationListener; import org.opensearch.index.shard.IndexingStats; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.index.translog.TranslogStats; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.cluster.IndicesClusterStateService; @@ -177,11 +180,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; -import java.util.function.Supplier; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import static java.util.Collections.emptyList; @@ -270,7 +274,7 @@ public class IndicesService extends AbstractLifecycleComponent private final boolean nodeWriteDanglingIndicesInfo; private final ValuesSourceRegistry valuesSourceRegistry; private final IndexStorePlugin.RemoteDirectoryFactory remoteDirectoryFactory; - private final Supplier repositoriesServiceSupplier; + private final BiFunction translogFactorySupplier; @Override protected void doStart() { @@ -389,7 +393,7 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; - this.repositoriesServiceSupplier = repositoriesServiceSupplier; + this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool); } public IndicesService( @@ -504,7 +508,23 @@ protected void closeInternal() { this.allowExpensiveQueries = ALLOW_EXPENSIVE_QUERIES.get(clusterService.getSettings()); clusterService.getClusterSettings().addSettingsUpdateConsumer(ALLOW_EXPENSIVE_QUERIES, this::setAllowExpensiveQueries); this.remoteDirectoryFactory = remoteDirectoryFactory; - this.repositoriesServiceSupplier = repositoriesServiceSupplier; + this.translogFactorySupplier = getTranslogFactorySupplier(repositoriesServiceSupplier, threadPool); + } + + private static BiFunction getTranslogFactorySupplier( + Supplier repositoriesServiceSupplier, + ThreadPool threadPool + ) { + return (indexSettings, shardRouting) -> { + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + repositoriesServiceSupplier, + threadPool, + indexSettings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + }; } private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask"; @@ -868,7 +888,7 @@ private synchronized IndexService createIndexService( this::isIdFieldDataEnabled, valuesSourceRegistry, remoteDirectoryFactory, - repositoriesServiceSupplier + translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 429c2126d9a00..d2374e767639c 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -91,6 +91,9 @@ import org.opensearch.index.similarity.SimilarityService; import org.opensearch.index.store.FsDirectoryFactory; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; +import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.IndicesModule; import org.opensearch.indices.IndicesQueryCache; import org.opensearch.indices.analysis.AnalysisModule; @@ -121,6 +124,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -220,6 +224,16 @@ public void tearDown() throws Exception { private IndexService newIndexService(IndexModule module) throws IOException { final SetOnce repositoriesServiceReference = new SetOnce<>(); repositoriesServiceReference.set(repositoriesService); + BiFunction translogFactorySupplier = (indexSettings, shardRouting) -> { + if (indexSettings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + repositoriesServiceReference::get, + threadPool, + indexSettings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + }; return module.newIndexService( CREATE_INDEX, nodeEnvironment, @@ -238,7 +252,7 @@ private IndexService newIndexService(IndexModule module) throws IOException { () -> false, null, new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService), - repositoriesServiceReference::get + translogFactorySupplier ); } diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 7529a22c3f071..32885ed5ed39b 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -131,6 +131,8 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreStats; import org.opensearch.index.store.StoreUtils; +import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.TestTranslog; import org.opensearch.index.translog.Translog; import org.opensearch.index.translog.TranslogStats; @@ -4819,6 +4821,67 @@ public void testReadOnlyReplicaEngineConfig() throws IOException { closeShards(primaryShard, replicaShard); } + public void testTranslogFactoryWithoutRemoteStore() throws IOException { + Settings primarySettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); + assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); + + final IndexShard replicaShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); + assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); + + closeShards(primaryShard, replicaShard); + } + + public void testTranslogFactoryForReplicaShardWithoutRemoteStore() throws IOException { + Settings primarySettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .build(); + final IndexShard primaryShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); + assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); + closeShards(primaryShard); + } + + public void testTranslogFactoryForRemoteTranslogBackedPrimaryShard() throws IOException { + Settings primarySettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "seg-test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") + .build(); + final IndexShard primaryShard = newStartedShard(true, primarySettings, new NRTReplicationEngineFactory()); + assertEquals(primaryShard.getEngine().getClass(), InternalEngine.class); + assertEquals(primaryShard.getEngine().config().getTranslogFactory().getClass(), RemoteBlobStoreInternalTranslogFactory.class); + closeShards(primaryShard); + } + + public void testTranslogFactoryForRemoteTranslogBackedReplicaShard() throws IOException { + Settings primarySettings = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, "seg-test") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, true) + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "txlog-test") + .build(); + final IndexShard replicaShard = newStartedShard(false, primarySettings, new NRTReplicationEngineFactory()); + assertEquals(replicaShard.getEngine().getClass(), InternalEngine.class); + assertEquals(replicaShard.getEngine().config().getTranslogFactory().getClass(), InternalTranslogFactory.class); + closeShards(replicaShard); + } + public void testCloseShardWhileEngineIsWarming() throws Exception { CountDownLatch warmerStarted = new CountDownLatch(1); CountDownLatch warmerBlocking = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java index 950ba047df19d..ba707cc30e6b8 100644 --- a/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java +++ b/server/src/test/java/org/opensearch/index/shard/ReplicaRecoveryWithRemoteTranslogOnPrimaryTests.java @@ -35,6 +35,7 @@ public class ReplicaRecoveryWithRemoteTranslogOnPrimaryTests extends OpenSearchI .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") .build(); public void testStartSequenceForReplicaRecovery() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index 14207c896c733..1a8827ac797a8 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -13,8 +13,8 @@ import org.opensearch.action.ActionListener; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.util.set.Sets; +import org.opensearch.index.shard.ShardId; import org.opensearch.index.translog.Translog; -import org.opensearch.index.translog.transfer.listener.FileTransferListener; import org.opensearch.index.translog.transfer.listener.TranslogTransferListener; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.index.translog.transfer.FileSnapshot.CheckpointFileSnapshot; @@ -74,7 +74,7 @@ public void testTransferSnapshot() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, remoteBaseTransferPath, - new FileTransferListener() { + new FileTransferTracker(new ShardId("index", "indexUUid", 0)) { @Override public void onSuccess(TransferFileSnapshot fileSnapshot) { fileTransferSucceeded.incrementAndGet(); @@ -84,8 +84,7 @@ public void onSuccess(TransferFileSnapshot fileSnapshot) { public void onFailure(TransferFileSnapshot fileSnapshot, Exception e) { fileTransferFailed.incrementAndGet(); } - }, - r -> r + } ); assertTrue(translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { @@ -157,24 +156,14 @@ public TranslogTransferMetadata getTranslogTransferMetadata() { } public void testReadMetadataNoFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); when(transferService.listAll(remoteBaseTransferPath)).thenReturn(Sets.newHashSet()); assertNull(translogTransferManager.readMetadata()); } public void testReadMetadataSingleFile() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); // BlobPath does not have equals method, so we can't use the instance directly in when when(transferService.listAll(any(BlobPath.class))).thenReturn(Sets.newHashSet("12__234__123456789")); @@ -188,12 +177,7 @@ public void testReadMetadataSingleFile() throws IOException { } public void testReadMetadataMultipleFiles() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); when(transferService.listAll(any(BlobPath.class))).thenReturn( Sets.newHashSet("12__234__56789", "12__235__56823", "12__233__56700") @@ -208,12 +192,7 @@ public void testReadMetadataMultipleFiles() throws IOException { } public void testReadMetadataException() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); when(transferService.listAll(any(BlobPath.class))).thenReturn( Sets.newHashSet("12__234__56789", "12__235__56823", "12__233__56700") @@ -225,12 +204,7 @@ public void testReadMetadataException() throws IOException { } public void testReadMetadataSamePrimaryTermGeneration() throws IOException { - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, null); when(transferService.listAll(any(BlobPath.class))).thenReturn( Sets.newHashSet("12__234__56789", "12__235__56823", "12__234__56700") @@ -244,8 +218,7 @@ public void testDownloadTranslog() throws IOException { TranslogTransferManager translogTransferManager = new TranslogTransferManager( transferService, remoteBaseTransferPath, - null, - r -> r + new FileTransferTracker(new ShardId("index", "indexUuid", 0)) ); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( @@ -264,16 +237,12 @@ public void testDownloadTranslog() throws IOException { } public void testDownloadTranslogAlreadyExists() throws IOException { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); Path location = createTempDir(); Files.createFile(location.resolve("translog-23.tlog")); Files.createFile(location.resolve("translog-23.ckp")); - TranslogTransferManager translogTransferManager = new TranslogTransferManager( - transferService, - remoteBaseTransferPath, - null, - r -> r - ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); when(transferService.downloadBlob(any(BlobPath.class), eq("translog-23.tlog"))).thenReturn( new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) @@ -289,4 +258,37 @@ public void testDownloadTranslogAlreadyExists() throws IOException { assertTrue(Files.exists(location.resolve("translog-23.tlog"))); assertTrue(Files.exists(location.resolve("translog-23.ckp"))); } + + public void testDownloadTranslogWithTrackerUpdated() throws IOException { + FileTransferTracker tracker = new FileTransferTracker(new ShardId("index", "indexUuid", 0)); + Path location = createTempDir(); + String translogFile = "translog-23.tlog", checkpointFile = "translog-23.ckp"; + Files.createFile(location.resolve(translogFile)); + Files.createFile(location.resolve(checkpointFile)); + + TranslogTransferManager translogTransferManager = new TranslogTransferManager(transferService, remoteBaseTransferPath, tracker); + + when(transferService.downloadBlob(any(BlobPath.class), eq(translogFile))).thenReturn( + new ByteArrayInputStream("Hello Translog".getBytes(StandardCharsets.UTF_8)) + ); + when(transferService.downloadBlob(any(BlobPath.class), eq(checkpointFile))).thenReturn( + new ByteArrayInputStream("Hello Checkpoint".getBytes(StandardCharsets.UTF_8)) + ); + + translogTransferManager.downloadTranslog("12", "23", location); + + verify(transferService).downloadBlob(any(BlobPath.class), eq(translogFile)); + verify(transferService).downloadBlob(any(BlobPath.class), eq(checkpointFile)); + assertTrue(Files.exists(location.resolve(translogFile))); + assertTrue(Files.exists(location.resolve(checkpointFile))); + + // Since the tracker already holds the files with success state, adding them with failed state would throw exception + assertThrows(IllegalStateException.class, () -> tracker.add(translogFile, false)); + assertThrows(IllegalStateException.class, () -> tracker.add(checkpointFile, false)); + + // Since the tracker already holds the files with success state, adding them with success state is allowed + tracker.add(translogFile, true); + tracker.add(checkpointFile, true); + + } } diff --git a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java index 91953d4db3495..465629406b54b 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RemoteStorePeerRecoverySourceHandlerTests.java @@ -22,6 +22,7 @@ public class RemoteStorePeerRecoverySourceHandlerTests extends OpenSearchIndexLe .put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT) .put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, "true") .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, "true") + .put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_REPOSITORY, "translog-repo") .build(); public void testReplicaShardRecoveryUptoLastFlushedCommit() throws Exception { diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 341598ffa7b88..3ae79a8a17879 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -38,6 +38,7 @@ import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; import org.junit.Assert; +import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; import org.opensearch.Version; import org.opensearch.action.ActionListener; @@ -60,6 +61,7 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.bytes.BytesArray; @@ -96,8 +98,10 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.InternalTranslogFactory; +import org.opensearch.index.translog.RemoteBlobStoreInternalTranslogFactory; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndicesService; +import org.opensearch.index.translog.TranslogFactory; import org.opensearch.indices.breaker.CircuitBreakerService; import org.opensearch.indices.breaker.HierarchyCircuitBreakerService; import org.opensearch.indices.recovery.AsyncRecoveryTarget; @@ -125,7 +129,9 @@ import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; +import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; +import org.opensearch.repositories.blobstore.BlobStoreRepository; import org.opensearch.repositories.blobstore.OpenSearchBlobStoreRepositoryIntegTestCase; import org.opensearch.snapshots.Snapshot; import org.opensearch.test.DummyShardLock; @@ -550,6 +556,17 @@ protected IndexShard newShard( if (remoteStore == null && indexSettings.isRemoteStoreEnabled()) { remoteStore = createRemoteStore(createTempDir(), routing, indexMetadata); } + + final BiFunction translogFactorySupplier = (settings, shardRouting) -> { + if (settings.isRemoteTranslogStoreEnabled() && shardRouting.primary()) { + return new RemoteBlobStoreInternalTranslogFactory( + this::createRepositoriesService, + threadPool, + settings.getRemoteStoreTranslogRepository() + ); + } + return new InternalTranslogFactory(); + }; indexShard = new IndexShard( routing, indexSettings, @@ -571,7 +588,7 @@ protected IndexShard newShard( globalCheckpointSyncer, retentionLeaseSyncer, breakerService, - new InternalTranslogFactory(), + translogFactorySupplier, checkpointPublisher, remoteStore ); @@ -585,6 +602,18 @@ protected IndexShard newShard( return indexShard; } + protected RepositoriesService createRepositoriesService() { + RepositoriesService repositoriesService = Mockito.mock(RepositoriesService.class); + BlobStoreRepository repository = Mockito.mock(BlobStoreRepository.class); + when(repository.basePath()).thenReturn(new BlobPath()); + BlobStore blobStore = Mockito.mock(BlobStore.class); + BlobContainer blobContainer = Mockito.mock(BlobContainer.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(repository.blobStore()).thenReturn(blobStore); + when(repositoriesService.repository(any(String.class))).thenReturn(repository); + return repositoriesService; + } + protected Store createRemoteStore(Path path, ShardRouting shardRouting, IndexMetadata metadata) throws IOException { Settings nodeSettings = Settings.builder().put("node.name", shardRouting.currentNodeId()).build();