diff --git a/.github/workflows/precommit.yml b/.github/workflows/precommit.yml index e264d65cdf191..f71b363af449f 100644 --- a/.github/workflows/precommit.yml +++ b/.github/workflows/precommit.yml @@ -1,12 +1,12 @@ name: Gradle Precommit -on: [pull_request] - +on: [workflow_dispatch] + jobs: precommit: runs-on: ${{ matrix.os }} strategy: matrix: - os: [ubuntu-latest, windows-latest, macos-latest] + os: [windows-latest] steps: - uses: actions/checkout@v2 - name: Set up JDK 11 @@ -16,4 +16,4 @@ jobs: distribution: adopt - name: Run Gradle run: | - ./gradlew javadoc precommit --parallel + ./gradlew :server:test -x internalClusterTest diff --git a/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy b/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy index a5207933c3c72..5759483dbe86d 100644 --- a/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy +++ b/buildSrc/src/main/groovy/org/opensearch/gradle/test/ClusterConfiguration.groovy @@ -103,8 +103,8 @@ class ClusterConfiguration { boolean autoSetHostsProvider = true @Input - String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') + - " " + "-Xmx" + System.getProperty('tests.heap.size', '512m') + + String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '1g') + + " " + "-Xmx" + System.getProperty('tests.heap.size', '6g') + " " + System.getProperty('tests.jvm.argline', '') /** diff --git a/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java b/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java index cdf22407f6076..05481af495702 100644 --- a/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java +++ b/buildSrc/src/main/java/org/opensearch/gradle/OpenSearchTestBasePlugin.java @@ -127,8 +127,8 @@ public void execute(Task t) { test.exclude("**/*$*.class"); test.jvmArgs( - "-Xmx" + System.getProperty("tests.heap.size", "512m"), - "-Xms" + System.getProperty("tests.heap.size", "512m"), + "-Xmx" + System.getProperty("tests.heap.size", "6g"), + "-Xms" + System.getProperty("tests.heap.size", "1g"), "-XX:+HeapDumpOnOutOfMemoryError" ); diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java index 43bbb81b7a046..c2c640714458d 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java @@ -15,6 +15,7 @@ import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse; import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest; import org.opensearch.action.admin.indices.segments.ShardSegments; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Requests; @@ -38,6 +39,7 @@ import org.opensearch.indices.recovery.FileChunkRequest; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestStatus; import org.opensearch.test.BackgroundIndexer; import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; @@ -291,14 +293,30 @@ public void testAddNewReplicaFailure() throws Exception { assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME))); } - @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") + public void testWaitUntil() { + final String nodeA = internalCluster().startNode(featureFlagSettings()); + final String nodeB = internalCluster().startNode(featureFlagSettings()); + createIndex(INDEX_NAME); + ensureGreen(INDEX_NAME); + IndexResponse index = client().prepareIndex(INDEX_NAME) + .setId("1") + .setSource("foo", "bar") + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) + .get(); + assertEquals(RestStatus.CREATED, index.status()); + assertFalse("request shouldn't have forced a refresh", index.forcedRefresh()); + assertSearchHits(client(nodeA).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1"); + assertSearchHits(client(nodeB).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1"); + } + +// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669") public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { final String nodeA = internalCluster().startNode(featureFlagSettings()); final String nodeB = internalCluster().startNode(featureFlagSettings()); createIndex(INDEX_NAME); ensureGreen(INDEX_NAME); - final int initialDocCount = scaledRandomIntBetween(0, 200); + final int initialDocCount = 200; try ( BackgroundIndexer indexer = new BackgroundIndexer( INDEX_NAME, @@ -313,7 +331,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { indexer.start(initialDocCount); waitForDocs(initialDocCount, indexer); refresh(INDEX_NAME); - waitForReplicaUpdate(); +// waitForReplicaUpdate(); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount); @@ -324,7 +342,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception { waitForDocs(expectedHitCount, indexer); flushAndRefresh(INDEX_NAME); - waitForReplicaUpdate(); +// waitForReplicaUpdate(); assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount); @@ -553,9 +571,9 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception { assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2); - client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get(); + client().prepareIndex(INDEX_NAME).setId("3").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).setSource("foo", "bar").get(); refresh(INDEX_NAME); - waitForReplicaUpdate(); +// waitForReplicaUpdate(); assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3); assertSegmentStats(REPLICA_COUNT); diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index e804aa31adb4e..ff822d4f9472e 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -1201,6 +1201,7 @@ protected void acquireReplicaOperationPermit( final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes ) { + logger.info("acquireReplicaOperationPermit - GLOBAL CHECKPOINT {} MAXSeqNoOfUpdatesOrDeletes {}", globalCheckpoint, maxSeqNoOfUpdatesOrDeletes); replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request); } diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java index 41abbce91c48c..2b381fadde2fe 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java @@ -126,6 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th ensureOpen(); try (ReleasableLock lock = writeLock.acquire()) { final long incomingGeneration = infos.getGeneration(); + localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); readerManager.updateSegments(infos); // Commit and roll the translog when we receive a different generation than what was last received. @@ -136,7 +137,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th translogManager.rollTranslogGeneration(); } lastReceivedGen = incomingGeneration; - localCheckpointTracker.fastForwardProcessedSeqNo(seqNo); } } @@ -305,7 +305,9 @@ public List segments(boolean verbose) { } @Override - public void refresh(String source) throws EngineException {} + public void refresh(String source) throws EngineException { + logger.info("Refresh invoked on Engine"); + } @Override public boolean maybeRefresh(String source) throws EngineException { diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java index 8fbb24720aedc..0115f40eadfef 100644 --- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java +++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationReaderManager.java @@ -64,6 +64,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re logger.trace( () -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader) ); + logger.info("Returning from refreshIfNeeded"); return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId()); } @@ -78,6 +79,7 @@ public synchronized void updateSegments(SegmentInfos infos) throws IOException { // is always increased. infos.updateGeneration(currentInfos); currentInfos = infos; + logger.info("Invoking maybeRefresh"); maybeRefresh(); } diff --git a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java index fb046e2310d93..de07a15d5c0bd 100644 --- a/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/CheckpointRefreshListener.java @@ -35,11 +35,13 @@ public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointP @Override public void beforeRefresh() throws IOException { + logger.info("Before Refresh invoked?"); // Do nothing } @Override public void afterRefresh(boolean didRefresh) throws IOException { + logger.info("After Refresh invoked: {}", didRefresh); if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) { publisher.publish(shard); } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 4be11badd0879..89855312e8d5d 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1502,6 +1502,7 @@ public boolean isSegmentReplicationAllowed() { * @return true if checkpoint should be processed */ public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) { + logger.info("Checking if segrep is allowed on received checkpoint {}", requestCheckpoint); if (isSegmentReplicationAllowed() == false) { return false; } @@ -3669,6 +3670,7 @@ private void innerAcquireReplicaOperationPermit( final boolean allowCombineOperationWithPrimaryTermUpdate, final Consumer> operationExecutor ) { + logger.info("innerAcquireReplicaOperationPermit - GLOBAL CHECKPOINT {} - CURRENT {}", globalCheckpoint, replicationTracker.getGlobalCheckpoint()); verifyNotClosed(); // This listener is used for the execution of the operation. If the operation requires all the permits for its @@ -3816,6 +3818,26 @@ public void sync() throws IOException { getEngine().translogManager().syncTranslog(); } + public final boolean isOperationIndexed(Translog.Location location) { + if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) { + final Translog.Operation operation; + try { + operation = getEngine().translogManager().readOperation(location); + } catch (IOException e) { + logger.error("Error fetching operation from xlog?", e); + return false; + } + logger.info("Operation read from xlog - {} - {}", location, operation); + final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint(); + logger.info("Latest repl checkpoint {}", latestReplicationCheckpoint); + final boolean b = operation.seqNo() <= latestReplicationCheckpoint.getSeqNo(); + logger.info("IS OP IN INDEX: {}", b); + return b; +// return latestReplicationCheckpoint.getSeqNo() >= replicationTracker.getGlobalCheckpoint(); + } + return true; + } + /** * Checks if the underlying storage sync is required. */ @@ -3909,7 +3931,8 @@ private RefreshListeners buildRefreshListeners() { () -> refresh("too_many_listeners"), logger, threadPool.getThreadContext(), - externalRefreshMetric + externalRefreshMetric, + this::isOperationIndexed ); } @@ -4081,8 +4104,10 @@ public void addRefreshListener(Translog.Location location, Consumer lis } } if (readAllowed) { - refreshListeners.addOrNotify(location, listener); + final boolean b = refreshListeners.addOrNotify(location, listener); + logger.info("Register listener to call later? {}", b); } else { + logger.info("readAllowed is false"); // we're not yet ready fo ready for reads, just ignore refresh cycles listener.accept(false); } diff --git a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java index 7dbbcbb2d7d20..c79e28524f15d 100644 --- a/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java +++ b/server/src/main/java/org/opensearch/index/shard/RefreshListeners.java @@ -46,6 +46,7 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; +import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.Supplier; @@ -65,6 +66,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener, private final Logger logger; private final ThreadContext threadContext; private final MeanMetric refreshMetric; + private final Function isOperationIndexed; /** * Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics. @@ -107,6 +109,16 @@ public RefreshListeners( this.logger = logger; this.threadContext = threadContext; this.refreshMetric = refreshMetric; + isOperationIndexed = null; + } + + public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable too_many_listeners, Logger logger, ThreadContext threadContext, MeanMetric externalRefreshMetric, Function isOperationIndexed) { + this.getMaxRefreshListeners = getMaxRefreshListeners; + this.forceRefresh = too_many_listeners; + this.logger = logger; + this.threadContext = threadContext; + this.refreshMetric = externalRefreshMetric; + this.isOperationIndexed = isOperationIndexed; } /** @@ -148,8 +160,9 @@ public boolean addOrNotify(Translog.Location location, Consumer listene requireNonNull(listener, "listener cannot be null"); requireNonNull(location, "location cannot be null"); - if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0) { + if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0 && this.isOperationIndexed.apply(currentRefreshLocation)) { // Location already visible, just call the listener + logger.info("lastRefreshedLocation? {}", lastRefreshedLocation); listener.accept(false); return true; } @@ -172,12 +185,14 @@ public boolean addOrNotify(Translog.Location location, Consumer listene } // We have a free slot so register the listener listeners.add(new Tuple<>(location, contextPreservingListener)); + logger.info("Set listeners - lastRefreshedLocation? {}", lastRefreshedLocation); refreshListeners = listeners; return false; } } // No free slot so force a refresh and call the listener in this thread forceRefresh.run(); + logger.info("End - lastRefreshedLocation? {}", lastRefreshedLocation); listener.accept(true); return true; } @@ -270,7 +285,8 @@ public void afterRefresh(boolean didRefresh) throws IOException { List>> preservedListeners = null; for (Tuple> tuple : candidates) { Translog.Location location = tuple.v1(); - if (location.compareTo(currentRefreshLocation) <= 0) { + if (location.compareTo(currentRefreshLocation) <= 0 + && (this.isOperationIndexed.apply(location))) { if (listenersToFire == null) { listenersToFire = new ArrayList<>(); } diff --git a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java index e8adcbdc1c89a..99c2a4aa8a448 100644 --- a/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/opensearch/indices/cluster/IndicesClusterStateService.java @@ -817,7 +817,7 @@ private void forceSegmentReplication( new SegmentReplicationTargetService.SegmentReplicationListener() { @Override public void onReplicationDone(SegmentReplicationState state) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication complete, timing data: {}", indexShard.shardId().getId(), @@ -834,7 +834,7 @@ public void onReplicationFailure( ReplicationFailedException e, boolean sendShardFailure ) { - logger.trace( + logger.info( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}", indexShard.shardId().getId(), diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 85a34878af03f..8031f125ad373 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -237,6 +237,7 @@ public SegmentReplicationTarget startReplication( final IndexShard indexShard, final SegmentReplicationListener listener ) { + logger.info("Starting replication from {}", indexShard.getLatestReplicationCheckpoint()); final SegmentReplicationTarget target = new SegmentReplicationTarget( checkpoint, indexShard, diff --git a/test/framework/src/main/java/org/opensearch/test/BackgroundIndexer.java b/test/framework/src/main/java/org/opensearch/test/BackgroundIndexer.java index f7271a5fe8c20..f3b056ce69c48 100644 --- a/test/framework/src/main/java/org/opensearch/test/BackgroundIndexer.java +++ b/test/framework/src/main/java/org/opensearch/test/BackgroundIndexer.java @@ -44,6 +44,7 @@ import org.opensearch.action.bulk.BulkResponse; import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.index.IndexResponse; +import org.opensearch.action.support.WriteRequest; import org.opensearch.client.Client; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; @@ -145,7 +146,7 @@ public BackgroundIndexer( logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs); for (int i = 0; i < writers.length; i++) { final int indexerId = i; - final boolean batch = random.nextBoolean(); + final boolean batch = true; final Random threadRandom = new Random(random.nextLong()); writers[i] = new Thread() { @Override @@ -170,12 +171,15 @@ public void run() { for (int i = 0; i < batchSize; i++) { id = idGenerator.incrementAndGet(); if (useAutoGeneratedIDs) { - bulkRequest.add(client.prepareIndex(index).setSource(generateSource(id, threadRandom))); + bulkRequest.add(client.prepareIndex(index) + .setSource(generateSource(id, threadRandom))); } else { bulkRequest.add( - client.prepareIndex(index).setId(Long.toString(id)).setSource(generateSource(id, threadRandom)) + client.prepareIndex(index) + .setId(Long.toString(id)).setSource(generateSource(id, threadRandom)) ); } + bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); } try { BulkResponse bulkResponse = bulkRequest.get();