Skip to content

Commit

Permalink
WaitUntil & Segrep & cluster config tweaks.
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <handalm@amazon.com>
  • Loading branch information
mch2 committed Jan 10, 2023
1 parent 6a7a9a1 commit bc17e73
Show file tree
Hide file tree
Showing 13 changed files with 96 additions and 25 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/precommit.yml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Gradle Precommit
on: [pull_request]
on: [workflow_dispatch]

jobs:
precommit:
runs-on: ${{ matrix.os }}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
os: [windows-latest]
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
Expand All @@ -16,4 +16,4 @@ jobs:
distribution: adopt
- name: Run Gradle
run: |
./gradlew javadoc precommit --parallel
./gradlew :server:test -x internalClusterTest
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class ClusterConfiguration {
boolean autoSetHostsProvider = true

@Input
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '512m') +
" " + "-Xmx" + System.getProperty('tests.heap.size', '512m') +
String jvmArgs = "-Xms" + System.getProperty('tests.heap.size', '1g') +
" " + "-Xmx" + System.getProperty('tests.heap.size', '6g') +
" " + System.getProperty('tests.jvm.argline', '')

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ public void execute(Task t) {
test.exclude("**/*$*.class");

test.jvmArgs(
"-Xmx" + System.getProperty("tests.heap.size", "512m"),
"-Xms" + System.getProperty("tests.heap.size", "512m"),
"-Xmx" + System.getProperty("tests.heap.size", "6g"),
"-Xms" + System.getProperty("tests.heap.size", "1g"),
"-XX:+HeapDumpOnOutOfMemoryError"
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.admin.indices.segments.IndicesSegmentsRequest;
import org.opensearch.action.admin.indices.segments.ShardSegments;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Requests;
Expand All @@ -38,6 +39,7 @@
import org.opensearch.indices.recovery.FileChunkRequest;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.plugins.Plugin;
import org.opensearch.rest.RestStatus;
import org.opensearch.test.BackgroundIndexer;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand Down Expand Up @@ -291,14 +293,30 @@ public void testAddNewReplicaFailure() throws Exception {
assertFalse(indicesService.hasIndex(resolveIndex(INDEX_NAME)));
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testWaitUntil() {
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);
IndexResponse index = client().prepareIndex(INDEX_NAME)
.setId("1")
.setSource("foo", "bar")
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
.get();
assertEquals(RestStatus.CREATED, index.status());
assertFalse("request shouldn't have forced a refresh", index.forcedRefresh());
assertSearchHits(client(nodeA).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1");
assertSearchHits(client(nodeB).prepareSearch(INDEX_NAME).setPreference("_only_local").setQuery(matchQuery("foo", "bar")).get(), "1");
}

// @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/5669")
public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
final String nodeA = internalCluster().startNode(featureFlagSettings());
final String nodeB = internalCluster().startNode(featureFlagSettings());
createIndex(INDEX_NAME);
ensureGreen(INDEX_NAME);

final int initialDocCount = scaledRandomIntBetween(0, 200);
final int initialDocCount = 200;
try (
BackgroundIndexer indexer = new BackgroundIndexer(
INDEX_NAME,
Expand All @@ -313,7 +331,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
indexer.start(initialDocCount);
waitForDocs(initialDocCount, indexer);
refresh(INDEX_NAME);
waitForReplicaUpdate();
// waitForReplicaUpdate();

assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), initialDocCount);
Expand All @@ -324,7 +342,7 @@ public void testReplicationAfterPrimaryRefreshAndFlush() throws Exception {
waitForDocs(expectedHitCount, indexer);

flushAndRefresh(INDEX_NAME);
waitForReplicaUpdate();
// waitForReplicaUpdate();
assertHitCount(client(nodeA).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);
assertHitCount(client(nodeB).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), expectedHitCount);

Expand Down Expand Up @@ -553,9 +571,9 @@ public void testStartReplicaAfterPrimaryIndexesDocs() throws Exception {
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 2);

client().prepareIndex(INDEX_NAME).setId("3").setSource("foo", "bar").get();
client().prepareIndex(INDEX_NAME).setId("3").setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL).setSource("foo", "bar").get();
refresh(INDEX_NAME);
waitForReplicaUpdate();
// waitForReplicaUpdate();
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), 3);
assertSegmentStats(REPLICA_COUNT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1201,6 +1201,7 @@ protected void acquireReplicaOperationPermit(
final long globalCheckpoint,
final long maxSeqNoOfUpdatesOrDeletes
) {
logger.info("acquireReplicaOperationPermit - GLOBAL CHECKPOINT {} MAXSeqNoOfUpdatesOrDeletes {}", globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, onAcquired, executor, request);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
ensureOpen();
try (ReleasableLock lock = writeLock.acquire()) {
final long incomingGeneration = infos.getGeneration();
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
readerManager.updateSegments(infos);

// Commit and roll the translog when we receive a different generation than what was last received.
Expand All @@ -136,7 +137,6 @@ public synchronized void updateSegments(final SegmentInfos infos, long seqNo) th
translogManager.rollTranslogGeneration();
}
lastReceivedGen = incomingGeneration;
localCheckpointTracker.fastForwardProcessedSeqNo(seqNo);
}
}

Expand Down Expand Up @@ -305,7 +305,9 @@ public List<Segment> segments(boolean verbose) {
}

@Override
public void refresh(String source) throws EngineException {}
public void refresh(String source) throws EngineException {
logger.info("Refresh invoked on Engine");
}

@Override
public boolean maybeRefresh(String source) throws EngineException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
logger.info("Returning from refreshIfNeeded");
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
}

Expand All @@ -78,6 +79,7 @@ public synchronized void updateSegments(SegmentInfos infos) throws IOException {
// is always increased.
infos.updateGeneration(currentInfos);
currentInfos = infos;
logger.info("Invoking maybeRefresh");
maybeRefresh();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ public CheckpointRefreshListener(IndexShard shard, SegmentReplicationCheckpointP

@Override
public void beforeRefresh() throws IOException {
logger.info("Before Refresh invoked?");
// Do nothing
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
logger.info("After Refresh invoked: {}", didRefresh);
if (didRefresh && shard.state() != IndexShardState.CLOSED && shard.getReplicationTracker().isPrimaryMode()) {
publisher.publish(shard);
}
Expand Down
29 changes: 27 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1502,6 +1502,7 @@ public boolean isSegmentReplicationAllowed() {
* @return true if checkpoint should be processed
*/
public final boolean shouldProcessCheckpoint(ReplicationCheckpoint requestCheckpoint) {
logger.info("Checking if segrep is allowed on received checkpoint {}", requestCheckpoint);
if (isSegmentReplicationAllowed() == false) {
return false;
}
Expand Down Expand Up @@ -3669,6 +3670,7 @@ private void innerAcquireReplicaOperationPermit(
final boolean allowCombineOperationWithPrimaryTermUpdate,
final Consumer<ActionListener<Releasable>> operationExecutor
) {
logger.info("innerAcquireReplicaOperationPermit - GLOBAL CHECKPOINT {} - CURRENT {}", globalCheckpoint, replicationTracker.getGlobalCheckpoint());
verifyNotClosed();

// This listener is used for the execution of the operation. If the operation requires all the permits for its
Expand Down Expand Up @@ -3816,6 +3818,26 @@ public void sync() throws IOException {
getEngine().translogManager().syncTranslog();
}

public final boolean isOperationIndexed(Translog.Location location) {
if (indexSettings.isSegRepEnabled() && routingEntry().primary() == false) {
final Translog.Operation operation;
try {
operation = getEngine().translogManager().readOperation(location);
} catch (IOException e) {
logger.error("Error fetching operation from xlog?", e);
return false;
}
logger.info("Operation read from xlog - {} - {}", location, operation);
final ReplicationCheckpoint latestReplicationCheckpoint = getLatestReplicationCheckpoint();
logger.info("Latest repl checkpoint {}", latestReplicationCheckpoint);
final boolean b = operation.seqNo() <= latestReplicationCheckpoint.getSeqNo();
logger.info("IS OP IN INDEX: {}", b);
return b;
// return latestReplicationCheckpoint.getSeqNo() >= replicationTracker.getGlobalCheckpoint();
}
return true;
}

/**
* Checks if the underlying storage sync is required.
*/
Expand Down Expand Up @@ -3909,7 +3931,8 @@ private RefreshListeners buildRefreshListeners() {
() -> refresh("too_many_listeners"),
logger,
threadPool.getThreadContext(),
externalRefreshMetric
externalRefreshMetric,
this::isOperationIndexed
);
}

Expand Down Expand Up @@ -4081,8 +4104,10 @@ public void addRefreshListener(Translog.Location location, Consumer<Boolean> lis
}
}
if (readAllowed) {
refreshListeners.addOrNotify(location, listener);
final boolean b = refreshListeners.addOrNotify(location, listener);
logger.info("Register listener to call later? {}", b);
} else {
logger.info("readAllowed is false");
// we're not yet ready fo ready for reads, just ignore refresh cycles
listener.accept(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Supplier;

Expand All @@ -65,6 +66,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
private final Logger logger;
private final ThreadContext threadContext;
private final MeanMetric refreshMetric;
private final Function<Translog.Location, Boolean> isOperationIndexed;

/**
* Time in nanosecond when beforeRefresh() is called. Used for calculating refresh metrics.
Expand Down Expand Up @@ -107,6 +109,16 @@ public RefreshListeners(
this.logger = logger;
this.threadContext = threadContext;
this.refreshMetric = refreshMetric;
isOperationIndexed = null;
}

public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable too_many_listeners, Logger logger, ThreadContext threadContext, MeanMetric externalRefreshMetric, Function<Translog.Location, Boolean> isOperationIndexed) {
this.getMaxRefreshListeners = getMaxRefreshListeners;
this.forceRefresh = too_many_listeners;
this.logger = logger;
this.threadContext = threadContext;
this.refreshMetric = externalRefreshMetric;
this.isOperationIndexed = isOperationIndexed;
}

/**
Expand Down Expand Up @@ -148,8 +160,9 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
requireNonNull(listener, "listener cannot be null");
requireNonNull(location, "location cannot be null");

if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0) {
if (lastRefreshedLocation != null && lastRefreshedLocation.compareTo(location) >= 0 && this.isOperationIndexed.apply(currentRefreshLocation)) {
// Location already visible, just call the listener
logger.info("lastRefreshedLocation? {}", lastRefreshedLocation);
listener.accept(false);
return true;
}
Expand All @@ -172,12 +185,14 @@ public boolean addOrNotify(Translog.Location location, Consumer<Boolean> listene
}
// We have a free slot so register the listener
listeners.add(new Tuple<>(location, contextPreservingListener));
logger.info("Set listeners - lastRefreshedLocation? {}", lastRefreshedLocation);
refreshListeners = listeners;
return false;
}
}
// No free slot so force a refresh and call the listener in this thread
forceRefresh.run();
logger.info("End - lastRefreshedLocation? {}", lastRefreshedLocation);
listener.accept(true);
return true;
}
Expand Down Expand Up @@ -270,7 +285,8 @@ public void afterRefresh(boolean didRefresh) throws IOException {
List<Tuple<Translog.Location, Consumer<Boolean>>> preservedListeners = null;
for (Tuple<Translog.Location, Consumer<Boolean>> tuple : candidates) {
Translog.Location location = tuple.v1();
if (location.compareTo(currentRefreshLocation) <= 0) {
if (location.compareTo(currentRefreshLocation) <= 0
&& (this.isOperationIndexed.apply(location))) {
if (listenersToFire == null) {
listenersToFire = new ArrayList<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -817,7 +817,7 @@ private void forceSegmentReplication(
new SegmentReplicationTargetService.SegmentReplicationListener() {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication complete, timing data: {}",
indexShard.shardId().getId(),
Expand All @@ -834,7 +834,7 @@ public void onReplicationFailure(
ReplicationFailedException e,
boolean sendShardFailure
) {
logger.trace(
logger.info(
() -> new ParameterizedMessage(
"[shardId {}] [replication id {}] Replication failed, timing data: {}",
indexShard.shardId().getId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ public SegmentReplicationTarget startReplication(
final IndexShard indexShard,
final SegmentReplicationListener listener
) {
logger.info("Starting replication from {}", indexShard.getLatestReplicationCheckpoint());
final SegmentReplicationTarget target = new SegmentReplicationTarget(
checkpoint,
indexShard,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ConcurrentCollections;
Expand Down Expand Up @@ -145,7 +146,7 @@ public BackgroundIndexer(
logger.info("--> creating {} indexing threads (auto start: [{}], numOfDocs: [{}])", writerCount, autoStart, numOfDocs);
for (int i = 0; i < writers.length; i++) {
final int indexerId = i;
final boolean batch = random.nextBoolean();
final boolean batch = true;
final Random threadRandom = new Random(random.nextLong());
writers[i] = new Thread() {
@Override
Expand All @@ -170,12 +171,15 @@ public void run() {
for (int i = 0; i < batchSize; i++) {
id = idGenerator.incrementAndGet();
if (useAutoGeneratedIDs) {
bulkRequest.add(client.prepareIndex(index).setSource(generateSource(id, threadRandom)));
bulkRequest.add(client.prepareIndex(index)
.setSource(generateSource(id, threadRandom)));
} else {
bulkRequest.add(
client.prepareIndex(index).setId(Long.toString(id)).setSource(generateSource(id, threadRandom))
client.prepareIndex(index)
.setId(Long.toString(id)).setSource(generateSource(id, threadRandom))
);
}
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
}
try {
BulkResponse bulkResponse = bulkRequest.get();
Expand Down

0 comments on commit bc17e73

Please sign in to comment.