Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add improved Bulk Write API for Java Reactive Driver #1583

Merged
merged 31 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
e352b31
Add improved Bulk Write API for Java Reactive Driver
vbabanin Dec 13, 2024
a05882d
Disable Kotlin tests.
vbabanin Dec 14, 2024
78760c0
Remove redundant methods.
vbabanin Dec 14, 2024
cb0d596
Remove TODOs.
vbabanin Dec 14, 2024
9e9f33d
Fix retryable wrapping.
vbabanin Dec 14, 2024
9ca1fe7
Remove comments.
vbabanin Dec 14, 2024
9a7d9ba
Remove test skips.
vbabanin Dec 16, 2024
0ca8859
Update driver-sync/src/test/functional/com/mongodb/client/CrudProseTe…
vbabanin Dec 16, 2024
99a460a
Update driver-sync/src/test/functional/com/mongodb/client/CrudProseTe…
vbabanin Dec 16, 2024
ae8763f
Update driver-core/src/main/com/mongodb/internal/async/MutableValue.java
vbabanin Dec 16, 2024
7001d9e
Update driver-reactive-streams/src/main/com/mongodb/reactivestreams/c…
vbabanin Dec 16, 2024
bee199e
Update driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.…
vbabanin Dec 16, 2024
e6a0a7e
Apply suggestions from code review
vbabanin Dec 16, 2024
df2cb88
Update javadoc.
vbabanin Dec 16, 2024
f43fc49
Make CursorHelper generic.
vbabanin Dec 17, 2024
b7c5bfa
Move exhaustCursor method to cursor interface.
vbabanin Dec 17, 2024
c2d9b07
Add assertions.
vbabanin Dec 17, 2024
9c23736
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
f091561
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
91030ab
Update driver-core/src/main/com/mongodb/internal/async/AsyncBatchCurs…
vbabanin Dec 17, 2024
24ce23d
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
b775ea4
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
8110a09
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
e8c1520
Update driver-core/src/main/com/mongodb/internal/operation/ClientBulk…
vbabanin Dec 17, 2024
8f3e522
- Adress formatting issues.
vbabanin Dec 17, 2024
dc85f96
Fix static checks.
vbabanin Dec 17, 2024
c0b7292
Fix test.
vbabanin Dec 17, 2024
fc02c9a
Remove throws.
vbabanin Dec 17, 2024
3f36b21
Remove redundant method.
vbabanin Dec 17, 2024
190a067
Fix typo.
vbabanin Dec 19, 2024
3019e07
Apply suggestions from code review
vbabanin Dec 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
import com.mongodb.internal.operation.BatchCursor;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;

import static com.mongodb.internal.async.AsyncRunnable.beginAsync;

/**
* MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch
Expand Down Expand Up @@ -72,4 +75,22 @@ public interface AsyncBatchCursor<T> extends Closeable {
*/
@Override
void close();

default void exhaustCursor(final SingleResultCallback<List<List<T>>> finalCallback) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
List<List<T>> results = new ArrayList<>();

beginAsync().thenRunDoWhileLoop(iterationCallback -> {
beginAsync().
thenSupply(this::next)
.thenConsume((batch, callback) -> {
if (batch != null && !batch.isEmpty()) {
results.add(batch);
}
callback.complete(callback);
}).finish(iterationCallback);
}, () -> !this.isClosed())
.<List<List<T>>>thenSupply(callback -> {
callback.complete(results);
}).finish(finalCallback);
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public interface AsyncFunction<T, R> {
* @param value A {@code @}{@link Nullable} argument of the asynchronous function.
* @param callback the callback
*/
void unsafeFinish(T value, SingleResultCallback<R> callback);
void unsafeFinish(@Nullable T value, SingleResultCallback<R> callback);
stIncMale marked this conversation as resolved.
Show resolved Hide resolved

/**
* Must be invoked at end of async chain or when executing a callback handler supplied by the caller.
Expand Down
74 changes: 31 additions & 43 deletions driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.mongodb.internal.async;

import com.mongodb.internal.TimeoutContext;
import com.mongodb.internal.async.function.AsyncCallbackLoop;
import com.mongodb.internal.async.function.LoopState;
import com.mongodb.internal.async.function.RetryState;
import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier;

Expand Down Expand Up @@ -120,49 +122,6 @@ static AsyncRunnable beginAsync() {
return (c) -> c.complete(c);
}

/**
* Must be invoked at end of async chain
* @param runnable the sync code to invoke (under non-exceptional flow)
* prior to the callback
* @param callback the callback provided by the method the chain is used in
*/
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
if (e != null) {
callback.completeExceptionally(e);
return;
}
try {
runnable.run();
} catch (Throwable t) {
callback.completeExceptionally(t);
return;
}
callback.complete(callback);
});
}

/**
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
* will always be executed, including on the exceptional path.
* @param runnable the runnable
* @param callback the callback
*/
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<Void> callback) {
this.finish((r, e) -> {
try {
runnable.run();
} catch (Throwable t) {
if (e != null) {
t.addSuppressed(e);
}
callback.completeExceptionally(t);
return;
}
callback.onResult(r, e);
});
}

/**
* @param runnable The async runnable to run after this runnable
* @return the composition of this runnable and the runnable, a runnable
Expand Down Expand Up @@ -282,4 +241,33 @@ default AsyncRunnable thenRunRetryingWhile(
).get(callback);
});
}

/**
* This method is equivalent to a do-while loop, where the loop body is executed first and
* then the condition is checked to determine whether the loop should continue.
*
* @param loopBodyRunnable the asynchronous task to be executed in each iteration of the loop
* @param whileCheck a condition to check after each iteration; the loop continues as long as this condition returns true
* @return the composition of this and the looping branch
* @see AsyncCallbackLoop
*/
default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final Supplier<Boolean> whileCheck) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
return thenRun(finalCallback -> {
LoopState loopState = new LoopState();
new AsyncCallbackLoop(loopState, iterationCallback -> {

loopBodyRunnable.finish((result, t) -> {
if (t != null) {
iterationCallback.completeExceptionally(t);
return;
}
if (loopState.breakAndCompleteIf(() -> !whileCheck.get(), iterationCallback)) {
return;
}
iterationCallback.complete(iterationCallback);
});

}).run(finalCallback);
});
}
}
43 changes: 43 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,49 @@ default void finish(final SingleResultCallback<T> callback) {
}
}

/**
* Must be invoked at end of async chain
* @param runnable the sync code to invoke (under non-exceptional flow)
* prior to the callback
* @param callback the callback provided by the method the chain is used in
*/
default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
this.finish((r, e) -> {
if (e != null) {
callback.completeExceptionally(e);
return;
}
try {
runnable.run();
} catch (Throwable t) {
callback.completeExceptionally(t);
return;
}
callback.onResult(r, null);
});
}

/**
* See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable
* will always be executed, including on the exceptional path.
* @param runnable the runnable
* @param callback the callback
*/
default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback<T> callback) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
this.finish((r, e) -> {
try {
runnable.run();
} catch (Throwable t) {
if (e != null) {
t.addSuppressed(e);
}
callback.completeExceptionally(t);
return;
}
callback.onResult(r, e);
});
}

/**
* @param function The async function to run after this supplier
* @return the composition of this supplier and the function, a supplier
Expand Down
47 changes: 47 additions & 0 deletions driver-core/src/main/com/mongodb/internal/async/MutableValue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal.async;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.lang.Nullable;

import static com.mongodb.assertions.Assertions.assertNotNull;

@NotThreadSafe
public final class MutableValue<T> {
private T value;

public MutableValue(@Nullable final T value) {
this.value = value;
}

public MutableValue() {
this(null);
}

public T get() {
return assertNotNull(value);
}

@Nullable
public T getNullable() {
return value;
}

public void set(@Nullable final T value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package com.mongodb.internal.async.function;

import com.mongodb.annotations.NotThreadSafe;
import com.mongodb.internal.async.MutableValue;
import com.mongodb.internal.async.SingleResultCallback;

import java.util.function.Supplier;
Expand Down Expand Up @@ -68,16 +68,12 @@ public interface AsyncCallbackSupplier<R> {
* This is a price we have to pay to provide a guarantee similar to that of the {@code finally} block.
*/
default AsyncCallbackSupplier<R> whenComplete(final Runnable after) {
@NotThreadSafe
final class MutableBoolean {
private boolean value;
}
MutableBoolean afterExecuted = new MutableBoolean();
MutableValue<Boolean> afterExecuted = new MutableValue<>(false);
Runnable trackableAfter = () -> {
try {
after.run();
} finally {
afterExecuted.value = true;
afterExecuted.set(true);
}
};
return callback -> {
Expand All @@ -103,7 +99,7 @@ final class MutableBoolean {
primaryUnexpectedException = unexpectedException;
throw unexpectedException;
} finally {
if (primaryUnexpectedException != null && !afterExecuted.value) {
if (primaryUnexpectedException != null && !afterExecuted.get()) {
try {
trackableAfter.run();
} catch (Throwable afterException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import com.mongodb.client.model.SearchIndexModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import com.mongodb.client.model.bulk.ClientBulkWriteOptions;
import com.mongodb.client.model.bulk.ClientBulkWriteResult;
import com.mongodb.client.model.bulk.ClientNamespacedWriteModel;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
import com.mongodb.internal.TimeoutSettings;
Expand Down Expand Up @@ -293,6 +296,12 @@ public AsyncWriteOperation<BulkWriteResult> bulkWrite(final List<? extends Write
return operations.bulkWrite(requests, options);
}

public AsyncWriteOperation<ClientBulkWriteResult> clientBulkWriteOperation(
final List<? extends ClientNamespacedWriteModel> clientWriteModels,
@Nullable final ClientBulkWriteOptions options) {
return operations.clientBulkWriteOperation(clientWriteModels, options);
}

public <TResult> AsyncReadOperation<TResult> commandRead(final Bson command, final Class<TResult> resultClass) {
return operations.commandRead(command, resultClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@
import java.util.Iterator;
import java.util.List;

import static java.util.Spliterator.IMMUTABLE;
import static java.util.Spliterator.ORDERED;
import static java.util.Spliterators.spliteratorUnknownSize;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;

/**
* MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to
* the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch
Expand Down Expand Up @@ -98,4 +104,9 @@ public interface BatchCursor<T> extends Iterator<List<T>>, Closeable {
ServerCursor getServerCursor();

ServerAddress getServerAddress();

default List<List<T>> exhaustCursor() {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false)
.collect(toList());
}
}
Loading