Skip to content

Commit

Permalink
Add improved Bulk Write API for Java Reactive Driver (#1583)
Browse files Browse the repository at this point in the history
- Created and documented the new Reactive Bulk Write API.
- Enabled unified and prose tests for reactive Bulk Write API.

JAVA-5530
---------

Co-authored-by: Valentin Kovalenko <valentin.male.kovalenko@gmail.com>
  • Loading branch information
vbabanin and stIncMale authored Jan 2, 2025
1 parent 65e72d0 commit ed8f8b7
Show file tree
Hide file tree
Showing 31 changed files with 914 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

import static com.mongodb.internal.async.AsyncRunnable.beginAsync;

/**
* MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch
Expand Down Expand Up @@ -72,4 +75,22 @@ public interface AsyncBatchCursor<T> extends Closeable {
*/
@Override
void close();

default void exhaust(final SingleResultCallback<List<List<T>>> finalCallback) {
List<List<T>> results = new ArrayList<>();

beginAsync().thenRunDoWhileLoop(iterationCallback -> {
beginAsync().<List<T>>thenSupply(c -> {
next(c);
}).thenConsume((batch, c) -> {
if (!batch.isEmpty()) {
results.add(batch);
}
c.complete(c);
}).finish(iterationCallback);
}, () -> !this.isClosed()
).<List<List<T>>>thenSupply(c -> {
c.complete(results);
}).finish(finalCallback);
}
}
75 changes: 32 additions & 43 deletions driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
package com.mongodb.internal.async;

import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.function.AsyncCallbackLoop;
import com.mongodb.internal.async.function.LoopState;
import com.mongodb.internal.async.function.RetryState;
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;

import java.util.function.BooleanSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;

Expand Down Expand Up @@ -120,49 +123,6 @@ static AsyncRunnable beginAsync() {
return (c) -> c.complete(c);
}

/**
* Must be invoked at end of async chain
* @param runnable the sync code to invoke (under non-exceptional flow)
* prior to the callback
* @param callback the callback provided by the method the chain is used in
*/
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
if (e != null) {
callback.completeExceptionally(e);
return;
}
try {
runnable.run();
} catch (Throwable t) {
callback.completeExceptionally(t);
return;
}
callback.complete(callback);
});
}

/**
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
* will always be executed, including on the exceptional path.
* @param runnable the runnable
* @param callback the callback
*/
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
try {
runnable.run();
} catch (Throwable t) {
if (e != null) {
t.addSuppressed(e);
}
callback.completeExceptionally(t);
return;
}
callback.onResult(r, e);
});
}

/**
* @param runnable The async runnable to run after this runnable
* @return the composition of this runnable and the runnable, a runnable
Expand Down Expand Up @@ -282,4 +242,33 @@ default AsyncRunnable thenRunRetryingWhile(
).get(callback);
});
}

/**
* This method is equivalent to a do-while loop, where the loop body is executed first and
* then the condition is checked to determine whether the loop should continue.
*
* @param loopBodyRunnable the asynchronous task to be executed in each iteration of the loop
* @param whileCheck a condition to check after each iteration; the loop continues as long as this condition returns true
* @return the composition of this and the looping branch
* @see AsyncCallbackLoop
*/
default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) {
return thenRun(finalCallback -> {
LoopState loopState = new LoopState();
new AsyncCallbackLoop(loopState, iterationCallback -> {

loopBodyRunnable.finish((result, t) -> {
if (t != null) {
iterationCallback.completeExceptionally(t);
return;
}
if (loopState.breakAndCompleteIf(() -> !whileCheck.getAsBoolean(), iterationCallback)) {
return;
}
iterationCallback.complete(iterationCallback);
});

}).run(finalCallback);
});
}
}
43 changes: 43 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,49 @@ default void finish(final SingleResultCallback<T> callback) {
}
}

/**
* Must be invoked at end of async chain
* @param runnable the sync code to invoke (under non-exceptional flow)
* prior to the callback
* @param callback the callback provided by the method the chain is used in
*/
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
this.finish((r, e) -> {
if (e != null) {
callback.completeExceptionally(e);
return;
}
try {
runnable.run();
} catch (Throwable t) {
callback.completeExceptionally(t);
return;
}
callback.onResult(r, null);
});
}

/**
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
* will always be executed, including on the exceptional path.
* @param runnable the runnable
* @param callback the callback
*/
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
this.finish((r, e) -> {
try {
runnable.run();
} catch (Throwable t) {
if (e != null) {
t.addSuppressed(e);
}
callback.completeExceptionally(t);
return;
}
callback.onResult(r, e);
});
}

/**
* @param function The async function to run after this supplier
* @return the composition of this supplier and the function, a supplier
Expand Down
47 changes: 47 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/MutableValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal.async;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;

import static com.mongodb.assertions.Assertions.assertNotNull;

@NotThreadSafe
public final class MutableValue<T> {
private T value;

public MutableValue(@Nullable final T value) {
this.value = value;
}

public MutableValue() {
this(null);
}

public T get() {
return assertNotNull(value);
}

@Nullable
public T getNullable() {
return value;
}

public void set(@Nullable final T value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.mongodb.internal.async.function;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.internal.async.MutableValue;
import com.mongodb.internal.async.SingleResultCallback;

import java.util.function.Supplier;
Expand Down Expand Up @@ -68,16 +68,12 @@ public interface AsyncCallbackSupplier<R> {
* This is a price we have to pay to provide a guarantee similar to that of the {@code finally} block.
*/
default AsyncCallbackSupplier<R> whenComplete(final Runnable after) {
@NotThreadSafe
final class MutableBoolean {
private boolean value;
}
MutableBoolean afterExecuted = new MutableBoolean();
MutableValue<Boolean> afterExecuted = new MutableValue<>(false);
Runnable trackableAfter = () -> {
try {
after.run();
} finally {
afterExecuted.value = true;
afterExecuted.set(true);
}
};
return callback -> {
Expand All @@ -103,7 +99,7 @@ final class MutableBoolean {
primaryUnexpectedException = unexpectedException;
throw unexpectedException;
} finally {
if (primaryUnexpectedException != null && !afterExecuted.value) {
if (primaryUnexpectedException != null && !afterExecuted.get()) {
try {
trackableAfter.run();
} catch (Throwable afterException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ static <T> CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncS
}

static <T> AsyncBatchCursor<T> cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument,
final int batchSize, final Decoder<T> decoder, final BsonValue comment, final AsyncConnectionSource source,
final int batchSize, final Decoder<T> decoder, @Nullable final BsonValue comment, final AsyncConnectionSource source,
final AsyncConnection connection) {
return new AsyncCommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import com.mongodb.client.model.SearchIndexModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutSettings;
Expand Down Expand Up @@ -293,6 +296,12 @@ public AsyncWriteOperation<BulkWriteResult> bulkWrite(final List<? extends Write
return operations.bulkWrite(requests, options);
}

public AsyncWriteOperation<ClientBulkWriteResult> clientBulkWriteOperation(
final List<? extends ClientNamespacedWriteModel> clientWriteModels,
@Nullable final ClientBulkWriteOptions options) {
return operations.clientBulkWriteOperation(clientWriteModels, options);
}

public <TResult> AsyncReadOperation<TResult> commandRead(final Bson command, final Class<TResult> resultClass) {
return operations.commandRead(command, resultClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
import java.util.Iterator;
import java.util.List;

import static java.util.Spliterator.IMMUTABLE;
import static java.util.Spliterator.ORDERED;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;

/**
* MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch
Expand Down Expand Up @@ -98,4 +104,9 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
ServerCursor getServerCursor();

ServerAddress getServerAddress();

default List<List<T>> exhaust() {
return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
.collect(toList());
}
}
Loading

0 comments on commit ed8f8b7

Please sign in to comment.