Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add improved Bulk Write API for Java Reactive Driver #1583

Merged
merged 31 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e352b31
Add improved Bulk Write API for Java Reactive Driver
vbabanin Dec 13, 2024
a05882d
Disable Kotlin tests.
vbabanin Dec 14, 2024
78760c0
Remove redundant methods.
vbabanin Dec 14, 2024
cb0d596
Remove TODOs.
vbabanin Dec 14, 2024
9e9f33d
Fix retryable wrapping.
vbabanin Dec 14, 2024
9ca1fe7
Remove comments.
vbabanin Dec 14, 2024
9a7d9ba
Remove test skips.
vbabanin Dec 16, 2024
0ca8859
Update driver-sync/src/test/functional/com/mongodb/client/CrudProseTe…
vbabanin Dec 16, 2024
99a460a
Update driver-sync/src/test/functional/com/mongodb/client/CrudProseTe…
vbabanin Dec 16, 2024
ae8763f
Update driver-core/src/main/com/mongodb/internal/async/MutableValue.java
vbabanin Dec 16, 2024
7001d9e
Update driver-reactive-streams/src/main/com/mongodb/reactivestreams/c…
vbabanin Dec 16, 2024
bee199e
Update driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.…
vbabanin Dec 16, 2024
e6a0a7e
Apply suggestions from code review
vbabanin Dec 16, 2024
df2cb88
Update javadoc.
vbabanin Dec 16, 2024
f43fc49
Make CursorHelper generic.
vbabanin Dec 17, 2024
b7c5bfa
Move exhaustCursor method to cursor interface.
vbabanin Dec 17, 2024
c2d9b07
Add assertions.
vbabanin Dec 17, 2024
9c23736
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
f091561
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
91030ab
Update driver-core/src/main/com/mongodb/internal/async/AsyncBatchCurs…
vbabanin Dec 17, 2024
24ce23d
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
b775ea4
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
8110a09
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
e8c1520
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
8f3e522
- Adress formatting issues.
vbabanin Dec 17, 2024
dc85f96
Fix static checks.
vbabanin Dec 17, 2024
c0b7292
Fix test.
vbabanin Dec 17, 2024
fc02c9a
Remove throws.
vbabanin Dec 17, 2024
3f36b21
Remove redundant method.
vbabanin Dec 17, 2024
190a067
Fix typo.
vbabanin Dec 19, 2024
3019e07
Apply suggestions from code review
vbabanin Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

import static com.mongodb.internal.async.AsyncRunnable.beginAsync;

/**
* MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch
Expand Down Expand Up @@ -72,4 +75,22 @@ public interface AsyncBatchCursor<T> extends Closeable {
*/
@Override
void close();

default void exhaustCursor(final SingleResultCallback<List<List<T>>> finalCallback) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
List<List<T>> results = new ArrayList<>();

beginAsync().thenRunDoWhileLoop(iterationCallback -> {
beginAsync().
thenSupply(this::next)
.thenConsume((batch, callback) -> {
if (batch != null && !batch.isEmpty()) {
results.add(batch);
}
callback.complete(callback);
}).finish(iterationCallback);
}, () -> !this.isClosed())
.<List<List<T>>>thenSupply(callback -> {
callback.complete(results);
}).finish(finalCallback);
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
import java.util.Iterator;
import java.util.List;

import static java.util.Spliterator.IMMUTABLE;
import static java.util.Spliterator.ORDERED;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;

/**
* MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch
Expand Down Expand Up @@ -98,4 +104,9 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
ServerCursor getServerCursor();

ServerAddress getServerAddress();

default List<List<T>> exhaustCursor() {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
.collect(toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
import static com.mongodb.internal.operation.CommandOperationHelper.shouldAttemptToRetryWriteAndAddRetryableLabel;
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
import static com.mongodb.internal.operation.CursorHelper.exhaustCursorAsync;
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
import static com.mongodb.internal.operation.SyncOperationHelper.decorateWriteWithRetries;
Expand All @@ -141,12 +140,8 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Optional.ofNullable;
import static java.util.Spliterator.IMMUTABLE;
import static java.util.Spliterator.ORDERED;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.StreamSupport.stream;

/**
* This class is not part of the public API and may be removed or changed at any time.
Expand Down Expand Up @@ -544,7 +539,7 @@ private List<List<BsonDocument>> exhaustBulkWriteCommandOkResponseCursor(
options.getComment().orElse(null),
connectionSource,
connection)) {
return stream(spliteratorUnknownSize(cursor, ORDERED | IMMUTABLE), false).collect(toList());
return cursor.exhaustCursor();
}
}

Expand All @@ -562,7 +557,7 @@ private void exhaustBulkWriteCommandOkResponseCursorAsync(final AsyncConnectionS
connection);

beginAsync().<List<List<BsonDocument>>>thenSupply(callback -> {
exhaustCursorAsync(cursor, callback);
cursor.exhaustCursor(callback);
}).thenAlwaysRunAndFinish(() -> {
if (!cursor.isClosed()) {
cursor.close();
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,16 @@

package com.mongodb.internal.operation;

import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
import org.bson.BsonInt32;

import java.util.ArrayList;
import java.util.List;

import static com.mongodb.internal.async.AsyncRunnable.beginAsync;

final class CursorHelper {

static BsonDocument getCursorDocumentFromBatchSize(@Nullable final Integer batchSize) {
return batchSize == null ? new BsonDocument() : new BsonDocument("batchSize", new BsonInt32(batchSize));
}

public static <T> void exhaustCursorAsync(final AsyncBatchCursor<T> cursor, final SingleResultCallback<List<List<T>>> finalCallback) {
List<List<T>> results = new ArrayList<>();

beginAsync().thenRunDoWhileLoop(iterationCallback -> {
beginAsync().
thenSupply(cursor::next)
.thenConsume((batch, callback) -> {
if (batch != null && !batch.isEmpty()) {
results.add(batch);
}
callback.complete(callback);
}).finish(iterationCallback);
}, () -> !cursor.isClosed())
.<List<List<T>>>thenSupply(callback -> {
callback.complete(results);
}).finish(finalCallback);
}

private CursorHelper() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,13 @@ public void deleteOne(final Bson filter) {
.execute(getBinding());
}

public void deleteMany(final Bson filter) {
new MixedBulkWriteOperation(namespace,
singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry)).multi(true)),
true, WriteConcern.ACKNOWLEDGED, false)
.execute(getBinding());
}

public List<T> find(final Bson filter) {
return find(filter, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.mongodb.ServerCursor;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.OperationTest;
import com.mongodb.internal.binding.AsyncConnectionSource;
import com.mongodb.internal.connection.AsyncConnection;
Expand All @@ -46,7 +47,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -103,6 +106,95 @@ void cleanup() {
});
}

@Test
@DisplayName("should exhaust cursor with multiple batches")
void shouldExhaustCursorAsyncWithMultipleBatches() {
// given
BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);

CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();

// when
cursor.exhaustCursor((result, throwable) -> {
if (throwable != null) {
futureResult.completeExceptionally(throwable);
} else {
futureResult.complete(result);
}
});
stIncMale marked this conversation as resolved.
Show resolved Hide resolved

// then
assertDoesNotThrow(() -> {
List<List<Document>> resultBatches = futureResult.get(5, TimeUnit.SECONDS);
stIncMale marked this conversation as resolved.
Show resolved Hide resolved

assertEquals(4, resultBatches.size(), "Expected 4 batches for 10 documents with batch size of 3.");

int totalDocuments = resultBatches.stream().mapToInt(List::size).sum();
assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
});
}

@Test
@DisplayName("should exhaust cursor with closed cursor")
void shouldExhaustCursorAsyncWithClosedCursor() {
// given
BsonDocument commandResult = executeFindCommand(0, 3);
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);

cursor.close();

CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();

// when
cursor.exhaustCursor((result, throwable) -> {
if (throwable != null) {
futureResult.completeExceptionally(throwable);
} else {
futureResult.complete(result);
}
});

//then
ExecutionException executionException = assertThrows(ExecutionException.class, () -> {
futureResult.get(5, TimeUnit.SECONDS);
}, "Expected an exception when operating on a closed cursor.");

IllegalStateException illegalStateException = (IllegalStateException) executionException.getCause();
assertEquals("Cursor has been closed", illegalStateException.getMessage());
}

@Test
@DisplayName("should exhaust cursor with empty cursor")
void shouldExhaustCursorAsyncWithEmptyCursor() {
// given
getCollectionHelper().deleteMany(Filters.empty());

BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);

CompletableFuture<List<List<Document>>> futureResult = new CompletableFuture<>();

// when
cursor.exhaustCursor((result, throwable) -> {
if (throwable != null) {
futureResult.completeExceptionally(throwable);
} else {
futureResult.complete(result);
}
});

// then
assertDoesNotThrow(() -> {
List<List<Document>> resultBatches = futureResult.get(5, TimeUnit.SECONDS);
stIncMale marked this conversation as resolved.
Show resolved Hide resolved

assertTrue(resultBatches.isEmpty(), "Expected no batches for an empty cursor.");
});
}

@Test
@DisplayName("server cursor should not be null")
void theServerCursorShouldNotBeNull() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.ServerCursor;
import com.mongodb.client.cursor.TimeoutMode;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.OperationTest;
import com.mongodb.internal.binding.ConnectionSource;
import com.mongodb.internal.connection.Connection;
Expand Down Expand Up @@ -101,6 +102,55 @@ void cleanup() {
});
}

@Test
@DisplayName("should exhaust cursor with multiple batches")
void shouldExhaustCursorAsyncWithMultipleBatches() {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
// given
BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);

// when
List<List<Document>> result = cursor.exhaustCursor();

// then
assertEquals(4, result.size(), "Expected 4 batches for 10 documents with batch size of 3.");

int totalDocuments = result.stream().mapToInt(List::size).sum();
assertEquals(10, totalDocuments, "Expected a total of 10 documents.");
}

@Test
@DisplayName("should exhaust cursor with closed cursor")
void shouldExhaustCursorAsyncWithClosedCursor() {
// given
BsonDocument commandResult = executeFindCommand(0, 3);
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);
cursor.close();

// when & then
IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, cursor::exhaustCursor);
assertEquals("Cursor has been closed", illegalStateException.getMessage());
}

@Test
@DisplayName("should exhaust cursor async with empty cursor")
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
void shouldExhaustCursorAsyncWithEmptyCursor() {
// given
getCollectionHelper().deleteMany(Filters.empty());

BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch
cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER,
null, connectionSource, connection);

// when
List<List<Document>> result = cursor.exhaustCursor();

// then
assertTrue(result.isEmpty(), "Expected no batches for an empty cursor.");
}

@Test
@DisplayName("server cursor should not be null")
void theServerCursorShouldNotBeNull() {
Expand Down