diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java index 89260ac7b52..bd8d6c64a3f 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncBatchCursor.java @@ -19,8 +19,11 @@ import com.mongodb.internal.operation.BatchCursor; import java.io.Closeable; +import java.util.ArrayList; import java.util.List; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; + /** * MongoDB returns query results as batches, and this interface provides an asynchronous iterator over those batches. The first call to * the {@code next} method will return the first batch, and subsequent calls will trigger an asynchronous request to get the next batch @@ -72,4 +75,22 @@ public interface AsyncBatchCursor extends Closeable { */ @Override void close(); + + default void exhaust(final SingleResultCallback>> finalCallback) { + List> results = new ArrayList<>(); + + beginAsync().thenRunDoWhileLoop(iterationCallback -> { + beginAsync().>thenSupply(c -> { + next(c); + }).thenConsume((batch, c) -> { + if (!batch.isEmpty()) { + results.add(batch); + } + c.complete(c); + }).finish(iterationCallback); + }, () -> !this.isClosed() + ).>>thenSupply(c -> { + c.complete(results); + }).finish(finalCallback); + } } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java index d4ead3c5b96..e404e2b8152 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncRunnable.java @@ -17,9 +17,12 @@ package com.mongodb.internal.async; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.async.function.AsyncCallbackLoop; +import com.mongodb.internal.async.function.LoopState; import com.mongodb.internal.async.function.RetryState; import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier; +import java.util.function.BooleanSupplier; import java.util.function.Predicate; import java.util.function.Supplier; @@ -120,49 +123,6 @@ static AsyncRunnable beginAsync() { return (c) -> c.complete(c); } - /** - * Must be invoked at end of async chain - * @param runnable the sync code to invoke (under non-exceptional flow) - * prior to the callback - * @param callback the callback provided by the method the chain is used in - */ - default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { - this.finish((r, e) -> { - if (e != null) { - callback.completeExceptionally(e); - return; - } - try { - runnable.run(); - } catch (Throwable t) { - callback.completeExceptionally(t); - return; - } - callback.complete(callback); - }); - } - - /** - * See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable - * will always be executed, including on the exceptional path. - * @param runnable the runnable - * @param callback the callback - */ - default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { - this.finish((r, e) -> { - try { - runnable.run(); - } catch (Throwable t) { - if (e != null) { - t.addSuppressed(e); - } - callback.completeExceptionally(t); - return; - } - callback.onResult(r, e); - }); - } - /** * @param runnable The async runnable to run after this runnable * @return the composition of this runnable and the runnable, a runnable @@ -282,4 +242,33 @@ default AsyncRunnable thenRunRetryingWhile( ).get(callback); }); } + + /** + * This method is equivalent to a do-while loop, where the loop body is executed first and + * then the condition is checked to determine whether the loop should continue. + * + * @param loopBodyRunnable the asynchronous task to be executed in each iteration of the loop + * @param whileCheck a condition to check after each iteration; the loop continues as long as this condition returns true + * @return the composition of this and the looping branch + * @see AsyncCallbackLoop + */ + default AsyncRunnable thenRunDoWhileLoop(final AsyncRunnable loopBodyRunnable, final BooleanSupplier whileCheck) { + return thenRun(finalCallback -> { + LoopState loopState = new LoopState(); + new AsyncCallbackLoop(loopState, iterationCallback -> { + + loopBodyRunnable.finish((result, t) -> { + if (t != null) { + iterationCallback.completeExceptionally(t); + return; + } + if (loopState.breakAndCompleteIf(() -> !whileCheck.getAsBoolean(), iterationCallback)) { + return; + } + iterationCallback.complete(iterationCallback); + }); + + }).run(finalCallback); + }); + } } diff --git a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java index 77c289c8723..6dd89e4d9b0 100644 --- a/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/AsyncSupplier.java @@ -81,6 +81,49 @@ default void finish(final SingleResultCallback callback) { } } + /** + * Must be invoked at end of async chain + * @param runnable the sync code to invoke (under non-exceptional flow) + * prior to the callback + * @param callback the callback provided by the method the chain is used in + */ + default void thenRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { + this.finish((r, e) -> { + if (e != null) { + callback.completeExceptionally(e); + return; + } + try { + runnable.run(); + } catch (Throwable t) { + callback.completeExceptionally(t); + return; + } + callback.onResult(r, null); + }); + } + + /** + * See {@link #thenRunAndFinish(Runnable, SingleResultCallback)}, but the runnable + * will always be executed, including on the exceptional path. + * @param runnable the runnable + * @param callback the callback + */ + default void thenAlwaysRunAndFinish(final Runnable runnable, final SingleResultCallback callback) { + this.finish((r, e) -> { + try { + runnable.run(); + } catch (Throwable t) { + if (e != null) { + t.addSuppressed(e); + } + callback.completeExceptionally(t); + return; + } + callback.onResult(r, e); + }); + } + /** * @param function The async function to run after this supplier * @return the composition of this supplier and the function, a supplier diff --git a/driver-core/src/main/com/mongodb/internal/async/MutableValue.java b/driver-core/src/main/com/mongodb/internal/async/MutableValue.java new file mode 100644 index 00000000000..0ee793788ea --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/async/MutableValue.java @@ -0,0 +1,47 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.async; + +import com.mongodb.annotations.NotThreadSafe; +import com.mongodb.lang.Nullable; + +import static com.mongodb.assertions.Assertions.assertNotNull; + +@NotThreadSafe +public final class MutableValue { + private T value; + + public MutableValue(@Nullable final T value) { + this.value = value; + } + + public MutableValue() { + this(null); + } + + public T get() { + return assertNotNull(value); + } + + @Nullable + public T getNullable() { + return value; + } + + public void set(@Nullable final T value) { + this.value = value; + } +} diff --git a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java index 40bfd34de3d..1d98fb91a83 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/AsyncCallbackSupplier.java @@ -15,7 +15,7 @@ */ package com.mongodb.internal.async.function; -import com.mongodb.annotations.NotThreadSafe; +import com.mongodb.internal.async.MutableValue; import com.mongodb.internal.async.SingleResultCallback; import java.util.function.Supplier; @@ -68,16 +68,12 @@ public interface AsyncCallbackSupplier { * This is a price we have to pay to provide a guarantee similar to that of the {@code finally} block. */ default AsyncCallbackSupplier whenComplete(final Runnable after) { - @NotThreadSafe - final class MutableBoolean { - private boolean value; - } - MutableBoolean afterExecuted = new MutableBoolean(); + MutableValue afterExecuted = new MutableValue<>(false); Runnable trackableAfter = () -> { try { after.run(); } finally { - afterExecuted.value = true; + afterExecuted.set(true); } }; return callback -> { @@ -103,7 +99,7 @@ final class MutableBoolean { primaryUnexpectedException = unexpectedException; throw unexpectedException; } finally { - if (primaryUnexpectedException != null && !afterExecuted.value) { + if (primaryUnexpectedException != null && !afterExecuted.get()) { try { trackableAfter.run(); } catch (Throwable afterException) { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java index 072ae8e0d9f..f158b3944ae 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java @@ -344,7 +344,7 @@ static CommandReadTransformerAsync> asyncS } static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final TimeoutMode timeoutMode, final BsonDocument cursorDocument, - final int batchSize, final Decoder decoder, final BsonValue comment, final AsyncConnectionSource source, + final int batchSize, final Decoder decoder, @Nullable final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection) { return new AsyncCommandBatchCursor<>(timeoutMode, cursorDocument, batchSize, 0, decoder, comment, source, connection); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java index 77434bd9781..63a3a64ff98 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java @@ -44,6 +44,9 @@ import com.mongodb.client.model.SearchIndexModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.client.model.bulk.ClientBulkWriteOptions; +import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.changestream.FullDocumentBeforeChange; import com.mongodb.internal.TimeoutSettings; @@ -293,6 +296,12 @@ public AsyncWriteOperation bulkWrite(final List clientBulkWriteOperation( + final List clientWriteModels, + @Nullable final ClientBulkWriteOptions options) { + return operations.clientBulkWriteOperation(clientWriteModels, options); + } + public AsyncReadOperation commandRead(final Bson command, final Class resultClass) { return operations.commandRead(command, resultClass); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java index 5f86eb1f8fb..1463798ef64 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BatchCursor.java @@ -25,6 +25,12 @@ import java.util.Iterator; import java.util.List; +import static java.util.Spliterator.IMMUTABLE; +import static java.util.Spliterator.ORDERED; +import static java.util.Spliterators.spliteratorUnknownSize; +import static java.util.stream.Collectors.toList; +import static java.util.stream.StreamSupport.stream; + /** * MongoDB returns query results as batches, and this interface provideds an iterator over those batches. The first call to * the {@code next} method will return the first batch, and subsequent calls will trigger a request to get the next batch @@ -98,4 +104,9 @@ public interface BatchCursor extends Iterator>, Closeable { ServerCursor getServerCursor(); ServerAddress getServerAddress(); + + default List> exhaust() { + return stream(spliteratorUnknownSize(this, ORDERED | IMMUTABLE), false) + .collect(toList()); + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java index b48031c06c6..6592fcbaed3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java @@ -41,7 +41,14 @@ import com.mongodb.connection.ConnectionDescription; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.VisibleForTesting; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.AsyncSupplier; +import com.mongodb.internal.async.MutableValue; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.async.function.AsyncCallbackSupplier; import com.mongodb.internal.async.function.RetryState; +import com.mongodb.internal.binding.AsyncConnectionSource; +import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.client.model.bulk.AbstractClientDeleteModel; @@ -70,6 +77,7 @@ import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateOptions; import com.mongodb.internal.client.model.bulk.ConcreteClientUpdateResult; import com.mongodb.internal.client.model.bulk.UnacknowledgedClientBulkWriteResult; +import com.mongodb.internal.connection.AsyncConnection; import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.DualMessageSequences; import com.mongodb.internal.connection.IdHoldingBsonWriter; @@ -113,8 +121,12 @@ import static com.mongodb.assertions.Assertions.fail; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PACKAGE; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static com.mongodb.internal.async.AsyncRunnable.beginAsync; import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.FAIL_LIMIT_EXCEEDED; import static com.mongodb.internal.connection.DualMessageSequences.WritersProviderAndLimitsChecker.WriteResult.OK_LIMIT_NOT_REACHED; +import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor; +import static com.mongodb.internal.operation.AsyncOperationHelper.decorateWriteWithRetriesAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.BulkWriteBatch.logWriteModelDoesNotSupportRetries; import static com.mongodb.internal.operation.CommandOperationHelper.commandWriteConcern; import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; @@ -128,22 +140,20 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Optional.ofNullable; -import static java.util.Spliterator.IMMUTABLE; -import static java.util.Spliterator.ORDERED; -import static java.util.Spliterators.spliteratorUnknownSize; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; -import static java.util.stream.StreamSupport.stream; /** * This class is not part of the public API and may be removed or changed at any time. */ -public final class ClientBulkWriteOperation implements WriteOperation { +public final class ClientBulkWriteOperation implements WriteOperation, AsyncWriteOperation { private static final ConcreteClientBulkWriteOptions EMPTY_OPTIONS = new ConcreteClientBulkWriteOptions(); private static final String BULK_WRITE_COMMAND_NAME = "bulkWrite"; private static final EncoderContext DEFAULT_ENCODER_CONTEXT = EncoderContext.builder().build(); private static final EncoderContext COLLECTIBLE_DOCUMENT_ENCODER_CONTEXT = EncoderContext.builder() .isEncodingCollectibleDocument(true).build(); + private static final int INITIAL_BATCH_MODEL_START_INDEX = 0; + private static final int SERVER_DEFAULT_CURSOR_BATCH_SIZE = 0; private final List models; private final ConcreteClientBulkWriteOptions options; @@ -172,6 +182,7 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext()); ResultAccumulator resultAccumulator = new ResultAccumulator(); MongoException transformedTopLevelError = null; + try { executeAllBatches(effectiveWriteConcern, binding, resultAccumulator); } catch (MongoException topLevelError) { @@ -180,6 +191,24 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu return resultAccumulator.build(transformedTopLevelError, effectiveWriteConcern); } + + @Override + public void executeAsync(final AsyncWriteBinding binding, + final SingleResultCallback finalCallback) { + WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext()); + ResultAccumulator resultAccumulator = new ResultAccumulator(); + MutableValue transformedTopLevelError = new MutableValue<>(); + + beginAsync().thenSupply(c -> { + executeAllBatchesAsync(effectiveWriteConcern, binding, resultAccumulator, c); + }).onErrorIf(topLevelError -> topLevelError instanceof MongoException, (topLevelError, c) -> { + transformedTopLevelError.set(transformWriteException((MongoException) topLevelError)); + c.complete(c); + }).thenApply((ignored, c) -> { + c.complete(resultAccumulator.build(transformedTopLevelError.getNullable(), effectiveWriteConcern)); + }).finish(finalCallback); + } + /** * To execute a batch means: *
    @@ -187,18 +216,40 @@ public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBu *
  • consume the cursor, which may involve executing `getMore` commands.
  • *
* - * @throws MongoException When a {@linkplain ClientBulkWriteException#getError() top-level error} happens. + * @throws MongoException When a {@linkplain ClientBulkWriteException#getCause() top-level error} happens. */ private void executeAllBatches( final WriteConcern effectiveWriteConcern, final WriteBinding binding, final ResultAccumulator resultAccumulator) throws MongoException { - Integer nextBatchStartModelIndex = 0; + Integer nextBatchStartModelIndex = INITIAL_BATCH_MODEL_START_INDEX; + do { nextBatchStartModelIndex = executeBatch(nextBatchStartModelIndex, effectiveWriteConcern, binding, resultAccumulator); } while (nextBatchStartModelIndex != null); } + /** + * @see #executeAllBatches(WriteConcern, WriteBinding, ResultAccumulator) + */ + private void executeAllBatchesAsync( + final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, + final ResultAccumulator resultAccumulator, + final SingleResultCallback finalCallback) { + MutableValue nextBatchStartModelIndex = new MutableValue<>(INITIAL_BATCH_MODEL_START_INDEX); + + beginAsync().thenRunDoWhileLoop(iterationCallback -> { + beginAsync().thenSupply(c -> { + executeBatchAsync(nextBatchStartModelIndex.get(), effectiveWriteConcern, binding, resultAccumulator, c); + }).thenApply((nextBatchStartModelIdx, c) -> { + nextBatchStartModelIndex.set(nextBatchStartModelIdx); + c.complete(c); + }).finish(iterationCallback); + }, () -> nextBatchStartModelIndex.getNullable() != null + ).finish(finalCallback); + } + /** * @return The start model index of the next batch, provided that the operation * {@linkplain ExhaustiveClientBulkWriteCommandOkResponse#operationMayContinue(ConcreteClientBulkWriteOptions) may continue} @@ -217,27 +268,30 @@ private Integer executeBatch( TimeoutContext timeoutContext = operationContext.getTimeoutContext(); RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext); BatchEncoder batchEncoder = new BatchEncoder(); + Supplier retryingBatchExecutor = decorateWriteWithRetries( retryState, operationContext, // Each batch re-selects a server and re-checks out a connection because this is simpler, // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502. - // If connection pinning is required, {@code binding} handles that, + // If connection pinning is required, `binding` handles that, // and `ClientSession`, `TransactionContext` are aware of that. - () -> withSourceAndConnection(binding::getWriteConnectionSource, true, (connectionSource, connection) -> { - ConnectionDescription connectionDescription = connection.getDescription(); - boolean effectiveRetryWrites = isRetryableWrite( - retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext); - retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); - resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress()); - retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true) - .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false); - ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand( - retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder, - () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true)); - return executeBulkWriteCommandAndExhaustOkResponse( - retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext); - }) + () -> withSourceAndConnection(binding::getWriteConnectionSource, true, + (connectionSource, connection) -> { + ConnectionDescription connectionDescription = connection.getDescription(); + boolean effectiveRetryWrites = isRetryableWrite( + retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext); + retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); + resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress()); + retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true) + .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false); + ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand( + retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder, + () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true)); + return executeBulkWriteCommandAndExhaustOkResponse( + retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext); + }) ); + try { ExhaustiveClientBulkWriteCommandOkResponse bulkWriteCommandOkResponse = retryingBatchExecutor.get(); return resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse( @@ -248,16 +302,84 @@ private Integer executeBatch( } catch (MongoCommandException bulkWriteCommandException) { resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException); throw bulkWriteCommandException; - } catch (MongoException e) { + } catch (MongoException mongoException) { // The server does not have a chance to add "RetryableWriteError" label to `e`, // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance // to add the label. So we do that explicitly. - shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, e); - resultAccumulator.onBulkWriteCommandErrorWithoutResponse(e); - throw e; + shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException); + resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException); + throw mongoException; } } + /** + * @see #executeBatch(int, WriteConcern, WriteBinding, ResultAccumulator) + */ + private void executeBatchAsync( + final int batchStartModelIndex, + final WriteConcern effectiveWriteConcern, + final AsyncWriteBinding binding, + final ResultAccumulator resultAccumulator, + final SingleResultCallback finalCallback) { + List unexecutedModels = models.subList(batchStartModelIndex, models.size()); + assertFalse(unexecutedModels.isEmpty()); + OperationContext operationContext = binding.getOperationContext(); + SessionContext sessionContext = operationContext.getSessionContext(); + TimeoutContext timeoutContext = operationContext.getTimeoutContext(); + RetryState retryState = initialRetryState(retryWritesSetting, timeoutContext); + BatchEncoder batchEncoder = new BatchEncoder(); + + AsyncCallbackSupplier retryingBatchExecutor = decorateWriteWithRetriesAsync( + retryState, operationContext, + // Each batch re-selects a server and re-checks out a connection because this is simpler, + // and it is allowed by https://jira.mongodb.org/browse/DRIVERS-2502. + // If connection pinning is required, `binding` handles that, + // and `ClientSession`, `TransactionContext` are aware of that. + funcCallback -> withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback, + (connectionSource, connection, resultCallback) -> { + ConnectionDescription connectionDescription = connection.getDescription(); + boolean effectiveRetryWrites = isRetryableWrite( + retryWritesSetting, effectiveWriteConcern, connectionDescription, sessionContext); + retryState.breakAndThrowIfRetryAnd(() -> !effectiveRetryWrites); + resultAccumulator.onNewServerAddress(connectionDescription.getServerAddress()); + retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true) + .attach(AttachmentKeys.commandDescriptionSupplier(), () -> BULK_WRITE_COMMAND_NAME, false); + ClientBulkWriteCommand bulkWriteCommand = createBulkWriteCommand( + retryState, effectiveRetryWrites, effectiveWriteConcern, sessionContext, unexecutedModels, batchEncoder, + () -> retryState.attach(AttachmentKeys.retryableCommandFlag(), true, true)); + executeBulkWriteCommandAndExhaustOkResponseAsync( + retryState, connectionSource, connection, bulkWriteCommand, effectiveWriteConcern, operationContext, resultCallback); + }) + ); + + beginAsync().thenSupply(callback -> { + retryingBatchExecutor.get(callback); + }).thenApply((bulkWriteCommandOkResponse, callback) -> { + callback.complete(resultAccumulator.onBulkWriteCommandOkResponseOrNoResponse( + batchStartModelIndex, bulkWriteCommandOkResponse, batchEncoder.intoEncodedBatchInfo())); + }).onErrorIf(throwable -> true, (t, callback) -> { + if (t instanceof MongoWriteConcernWithResponseException) { + MongoWriteConcernWithResponseException mongoWriteConcernWithOkResponseException = (MongoWriteConcernWithResponseException) t; + callback.complete(resultAccumulator.onBulkWriteCommandOkResponseWithWriteConcernError( + batchStartModelIndex, mongoWriteConcernWithOkResponseException, batchEncoder.intoEncodedBatchInfo())); + } else if (t instanceof MongoCommandException) { + MongoCommandException bulkWriteCommandException = (MongoCommandException) t; + resultAccumulator.onBulkWriteCommandErrorResponse(bulkWriteCommandException); + callback.completeExceptionally(t); + } else if (t instanceof MongoException) { + MongoException mongoException = (MongoException) t; + // The server does not have a chance to add "RetryableWriteError" label to `e`, + // and if it is the last attempt failure, `RetryingSyncSupplier` also may not have a chance + // to add the label. So we do that explicitly. + shouldAttemptToRetryWriteAndAddRetryableLabel(retryState, mongoException); + resultAccumulator.onBulkWriteCommandErrorWithoutResponse(mongoException); + callback.completeExceptionally(mongoException); + } else { + callback.completeExceptionally(t); + } + }).finish(finalCallback); + } + /** * @throws MongoWriteConcernWithResponseException This internal exception must be handled to avoid it being observed by an application. * It {@linkplain MongoWriteConcernWithResponseException#getResponse() bears} the OK response to the {@code bulkWriteCommand}, @@ -287,11 +409,61 @@ private ExhaustiveClientBulkWriteCommandOkResponse executeBulkWriteCommandAndExh } List> cursorExhaustBatches = doWithRetriesDisabledForCommand(retryState, "getMore", () -> exhaustBulkWriteCommandOkResponseCursor(connectionSource, connection, bulkWriteCommandOkResponse)); - ExhaustiveClientBulkWriteCommandOkResponse exhaustiveBulkWriteCommandOkResponse = new ExhaustiveClientBulkWriteCommandOkResponse( - bulkWriteCommandOkResponse, cursorExhaustBatches); + return createExhaustiveClientBulkWriteCommandOkResponse( + bulkWriteCommandOkResponse, + cursorExhaustBatches, + connection.getDescription()); + } + + /** + * @see #executeBulkWriteCommandAndExhaustOkResponse(RetryState, ConnectionSource, Connection, ClientBulkWriteCommand, WriteConcern, OperationContext) + */ + private void executeBulkWriteCommandAndExhaustOkResponseAsync( + final RetryState retryState, + final AsyncConnectionSource connectionSource, + final AsyncConnection connection, + final ClientBulkWriteCommand bulkWriteCommand, + final WriteConcern effectiveWriteConcern, + final OperationContext operationContext, + final SingleResultCallback finalCallback) { + beginAsync().thenSupply(callback -> { + connection.commandAsync( + "admin", + bulkWriteCommand.getCommandDocument(), + NoOpFieldNameValidator.INSTANCE, + null, + CommandResultDocumentCodec.create(codecRegistry.get(BsonDocument.class), CommandBatchCursorHelper.FIRST_BATCH), + operationContext, + effectiveWriteConcern.isAcknowledged(), + bulkWriteCommand.getOpsAndNsInfo(), callback); + }).thenApply((bulkWriteCommandOkResponse, callback) -> { + if (bulkWriteCommandOkResponse == null) { + callback.complete((ExhaustiveClientBulkWriteCommandOkResponse) null); + return; + } + beginAsync().>>thenSupply(c -> { + doWithRetriesDisabledForCommandAsync(retryState, "getMore", (c1) -> { + exhaustBulkWriteCommandOkResponseCursorAsync(connectionSource, connection, bulkWriteCommandOkResponse, c1); + }, c); + }).thenApply((cursorExhaustBatches, c) -> { + c.complete(createExhaustiveClientBulkWriteCommandOkResponse( + bulkWriteCommandOkResponse, + cursorExhaustBatches, + connection.getDescription())); + }).finish(callback); + }).finish(finalCallback); + } + + private static ExhaustiveClientBulkWriteCommandOkResponse createExhaustiveClientBulkWriteCommandOkResponse( + final BsonDocument bulkWriteCommandOkResponse, final List> cursorExhaustBatches, + final ConnectionDescription connectionDescription) { + ExhaustiveClientBulkWriteCommandOkResponse exhaustiveBulkWriteCommandOkResponse = + new ExhaustiveClientBulkWriteCommandOkResponse( + bulkWriteCommandOkResponse, cursorExhaustBatches); + // `Connection.command` does not throw `MongoWriteConcernException`, so we have to construct it ourselves MongoWriteConcernException writeConcernException = Exceptions.createWriteConcernException( - bulkWriteCommandOkResponse, connection.getDescription().getServerAddress()); + bulkWriteCommandOkResponse, connectionDescription.getServerAddress()); if (writeConcernException != null) { throw new MongoWriteConcernWithResponseException(writeConcernException, exhaustiveBulkWriteCommandOkResponse); } @@ -305,6 +477,7 @@ private R doWithRetriesDisabledForCommand( Optional originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag()); Supplier originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier()) .orElseThrow(Assertions::fail); + try { retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true) .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false); @@ -315,23 +488,63 @@ private R doWithRetriesDisabledForCommand( } } + private void doWithRetriesDisabledForCommandAsync( + final RetryState retryState, + final String commandDescription, + final AsyncSupplier actionWithCommand, + final SingleResultCallback finalCallback) { + Optional originalRetryableCommandFlag = retryState.attachment(AttachmentKeys.retryableCommandFlag()); + Supplier originalCommandDescriptionSupplier = retryState.attachment(AttachmentKeys.commandDescriptionSupplier()) + .orElseThrow(Assertions::fail); + + beginAsync().thenSupply(c -> { + retryState.attach(AttachmentKeys.retryableCommandFlag(), false, true) + .attach(AttachmentKeys.commandDescriptionSupplier(), () -> commandDescription, false); + actionWithCommand.finish(c); + }).thenAlwaysRunAndFinish(() -> { + originalRetryableCommandFlag.ifPresent(value -> retryState.attach(AttachmentKeys.retryableCommandFlag(), value, true)); + retryState.attach(AttachmentKeys.commandDescriptionSupplier(), originalCommandDescriptionSupplier, false); + }, finalCallback); + } + private List> exhaustBulkWriteCommandOkResponseCursor( final ConnectionSource connectionSource, final Connection connection, final BsonDocument response) { - int serverDefaultCursorBatchSize = 0; try (CommandBatchCursor cursor = cursorDocumentToBatchCursor( TimeoutMode.CURSOR_LIFETIME, response, - serverDefaultCursorBatchSize, + SERVER_DEFAULT_CURSOR_BATCH_SIZE, codecRegistry.get(BsonDocument.class), options.getComment().orElse(null), connectionSource, connection)) { - return stream(spliteratorUnknownSize(cursor, ORDERED | IMMUTABLE), false).collect(toList()); + + return cursor.exhaust(); } } + private void exhaustBulkWriteCommandOkResponseCursorAsync(final AsyncConnectionSource connectionSource, + final AsyncConnection connection, + final BsonDocument bulkWriteCommandOkResponse, + final SingleResultCallback>> finalCallback) { + AsyncBatchCursor cursor = cursorDocumentToAsyncBatchCursor( + TimeoutMode.CURSOR_LIFETIME, + bulkWriteCommandOkResponse, + SERVER_DEFAULT_CURSOR_BATCH_SIZE, + codecRegistry.get(BsonDocument.class), + options.getComment().orElse(null), + connectionSource, + connection); + + beginAsync().>>thenSupply(callback -> { + cursor.exhaust(callback); + }).thenAlwaysRunAndFinish(() -> { + cursor.close(); + }, finalCallback); + } + + private ClientBulkWriteCommand createBulkWriteCommand( final RetryState retryState, final boolean effectiveRetryWrites, diff --git a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java index d0b95970511..811065d13a6 100644 --- a/driver-core/src/main/com/mongodb/internal/time/TimePoint.java +++ b/driver-core/src/main/com/mongodb/internal/time/TimePoint.java @@ -28,6 +28,7 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; /** @@ -234,7 +235,7 @@ public int hashCode() { public String toString() { String remainingMs = isInfinite() ? "infinite" - : "" + TimeUnit.MILLISECONDS.convert(currentNanos() - assertNotNull(nanos), NANOSECONDS); + : "" + remaining(MILLISECONDS); return "TimePoint{" + "nanos=" + nanos + ", remainingMs=" + remainingMs diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index a889856f394..dde9682de8d 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -125,7 +125,7 @@ public final class ClusterFixture { private static final String MONGODB_OCSP_SHOULD_SUCCEED = "org.mongodb.test.ocsp.tls.should.succeed"; private static final String DEFAULT_DATABASE_NAME = "JavaDriverTest"; private static final int COMMAND_NOT_FOUND_ERROR_CODE = 59; - public static final long TIMEOUT = 60L; + public static final long TIMEOUT = 120L; public static final Duration TIMEOUT_DURATION = Duration.ofSeconds(TIMEOUT); public static final TimeoutSettings TIMEOUT_SETTINGS = new TimeoutSettings(30_000, 10_000, 0, null, SECONDS.toMillis(5)); diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java index adce165ee51..3e58712ca9c 100644 --- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java +++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java @@ -357,9 +357,17 @@ public void replaceOne(final Bson filter, final Bson update, final boolean isUps } public void deleteOne(final Bson filter) { + delete(filter, false); + } + + public void deleteMany(final Bson filter) { + delete(filter, true); + } + + private void delete(final Bson filter, final boolean multi) { new MixedBulkWriteOperation(namespace, - singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry))), - true, WriteConcern.ACKNOWLEDGED, false) + singletonList(new DeleteRequest(filter.toBsonDocument(Document.class, registry)).multi(multi)), + true, WriteConcern.ACKNOWLEDGED, false) .execute(getBinding()); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java index a272f8b0f67..88dc199ee29 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/AsyncCommandBatchCursorFunctionalTest.java @@ -21,8 +21,10 @@ import com.mongodb.MongoQueryException; import com.mongodb.ReadPreference; import com.mongodb.ServerCursor; +import com.mongodb.async.FutureResultCallback; import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.Filters; import com.mongodb.client.model.OperationTest; import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.connection.AsyncConnection; @@ -103,6 +105,69 @@ void cleanup() { }); } + @Test + @DisplayName("should exhaust cursor with multiple batches") + void shouldExhaustCursorAsyncWithMultipleBatches() { + // given + BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3 + cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + + // when + FutureResultCallback>> futureCallback = new FutureResultCallback<>(); + cursor.exhaust(futureCallback); + + // then + List> resultBatches = futureCallback.get(5, TimeUnit.SECONDS); + + assertTrue(cursor.isClosed(), "Expected cursor to be closed."); + assertEquals(4, resultBatches.size(), "Expected 4 batches for 10 documents with batch size of 3."); + + int totalDocuments = resultBatches.stream().mapToInt(List::size).sum(); + assertEquals(10, totalDocuments, "Expected a total of 10 documents."); + } + + @Test + @DisplayName("should exhaust cursor with closed cursor") + void shouldExhaustCursorAsyncWithClosedCursor() { + // given + BsonDocument commandResult = executeFindCommand(0, 3); + cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + + cursor.close(); + + // when + FutureResultCallback>> futureCallback = new FutureResultCallback<>(); + cursor.exhaust(futureCallback); + + //then + IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, () -> { + futureCallback.get(5, TimeUnit.SECONDS); + }, "Expected an exception when operating on a closed cursor."); + assertEquals("Cursor has been closed", illegalStateException.getMessage()); + } + + @Test + @DisplayName("should exhaust cursor with empty cursor") + void shouldExhaustCursorAsyncWithEmptyCursor() { + // given + getCollectionHelper().deleteMany(Filters.empty()); + + BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch + cursor = new AsyncCommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + + // when + FutureResultCallback>> futureCallback = new FutureResultCallback<>(); + cursor.exhaust(futureCallback); + + // then + List> resultBatches = futureCallback.get(5, TimeUnit.SECONDS); + assertTrue(resultBatches.isEmpty(), "Expected no batches for an empty cursor."); + assertTrue(cursor.isClosed(), "Expected cursor to be closed."); + } + @Test @DisplayName("server cursor should not be null") void theServerCursorShouldNotBeNull() { diff --git a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java index 57caf3bdbfc..d9861c71659 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/operation/CommandBatchCursorFunctionalTest.java @@ -22,6 +22,7 @@ import com.mongodb.ServerCursor; import com.mongodb.client.cursor.TimeoutMode; import com.mongodb.client.model.CreateCollectionOptions; +import com.mongodb.client.model.Filters; import com.mongodb.client.model.OperationTest; import com.mongodb.internal.binding.ConnectionSource; import com.mongodb.internal.connection.Connection; @@ -101,6 +102,55 @@ void cleanup() { }); } + @Test + @DisplayName("should exhaust cursor with multiple batches") + void shouldExhaustCursorWithMultipleBatches() { + // given + BsonDocument commandResult = executeFindCommand(0, 3); // Fetch in batches of size 3 + cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + + // when + List> result = cursor.exhaust(); + + // then + assertEquals(4, result.size(), "Expected 4 batches for 10 documents with batch size of 3."); + + int totalDocuments = result.stream().mapToInt(List::size).sum(); + assertEquals(10, totalDocuments, "Expected a total of 10 documents."); + } + + @Test + @DisplayName("should exhaust cursor with closed cursor") + void shouldExhaustCursorWithClosedCursor() { + // given + BsonDocument commandResult = executeFindCommand(0, 3); + cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + cursor.close(); + + // when & then + IllegalStateException illegalStateException = assertThrows(IllegalStateException.class, cursor::exhaust); + assertEquals("Cursor has been closed", illegalStateException.getMessage()); + } + + @Test + @DisplayName("should exhaust cursor with empty cursor") + void shouldExhaustCursorWithEmptyCursor() { + // given + getCollectionHelper().deleteMany(Filters.empty()); + + BsonDocument commandResult = executeFindCommand(0, 3); // No documents to fetch + cursor = new CommandBatchCursor<>(TimeoutMode.CURSOR_LIFETIME, commandResult, 3, 0, DOCUMENT_DECODER, + null, connectionSource, connection); + + // when + List> result = cursor.exhaust(); + + // then + assertTrue(result.isEmpty(), "Expected no batches for an empty cursor."); + } + @Test @DisplayName("server cursor should not be null") void theServerCursorShouldNotBeNull() { diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java index 65636e2f842..9a9b7552d3e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsAbstractTest.java @@ -748,6 +748,26 @@ void testRetryLoop() { }); } + @Test + void testDoWhileLoop() { + assertBehavesSameVariations(67, + () -> { + do { + plain(0); + sync(1); + } while (plainTest(2)); + }, + (finalCallback) -> { + beginAsync().thenRunDoWhileLoop( + callback -> { + plain(0); + async(1, callback); + }, + () -> plainTest(2) + ).finish(finalCallback); + }); + } + @Test void testFinallyWithPlainInsideTry() { // (in try: normal flow + exception + exception) * (in finally: normal + exception) = 6 @@ -793,6 +813,51 @@ void testFinallyWithPlainOutsideTry() { }); } + @Test + void testSupplyFinallyWithPlainInsideTry() { + assertBehavesSameVariations(6, + () -> { + try { + plain(1); + return syncReturns(2); + } finally { + plain(3); + } + }, + (callback) -> { + beginAsync().thenSupply(c -> { + plain(1); + asyncReturns(2, c); + }).thenAlwaysRunAndFinish(() -> { + plain(3); + }, callback); + }); + } + + @Test + void testSupplyFinallyWithPlainOutsideTry() { + assertBehavesSameVariations(5, + () -> { + plain(1); + try { + return syncReturns(2); + } finally { + plain(3); + } + }, + (callback) -> { + beginAsync().thenSupply(c -> { + plain(1); + beginAsync().thenSupply(c2 -> { + asyncReturns(2, c2); + }).thenAlwaysRunAndFinish(() -> { + plain(3); + }, c); + }).finish(callback); + }); + } + + @Test void testUsedAsLambda() { assertBehavesSameVariations(4, diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java index 1229dbcfcad..10a58152d9f 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTestBase.java @@ -256,8 +256,9 @@ private void assertBehavesSame(final Supplier sync, final Runnable betwee await(wasCalledFuture, "Callback should have been called"); // The following code can be used to debug variations: -// System.out.println("===VARIATION START"); +// System.out.println("===VARIATION START: " + invocationTracker.getVariationCount()); // System.out.println("sync: " + expectedEvents); +// System.out.println("sync size: " + expectedEvents.size()); // System.out.println("callback called?: " + wasCalledFuture.isDone()); // System.out.println("value -- sync: " + expectedValue + " -- async: " + actualValue.get()); // System.out.println("excep -- sync: " + expectedException + " -- async: " + actualException.get()); diff --git a/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt b/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt index 4fcb4a8852a..2c377e41d41 100644 --- a/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt +++ b/driver-kotlin-coroutine/src/integration/kotlin/com/mongodb/kotlin/client/coroutine/syncadapter/SyncMongoCluster.kt @@ -115,24 +115,27 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass)) override fun bulkWrite(models: MutableList): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( models: MutableList, options: ClientBulkWriteOptions ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( clientSession: ClientSession, models: MutableList ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( @@ -140,8 +143,9 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu models: MutableList, options: ClientBulkWriteOptions ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped diff --git a/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt b/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt index b7235d80479..a4ad9bd1418 100644 --- a/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt +++ b/driver-kotlin-sync/src/integration/kotlin/com/mongodb/kotlin/client/syncadapter/SyncMongoCluster.kt @@ -114,24 +114,27 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu SyncChangeStreamIterable(wrapped.watch(clientSession.unwrapped(), pipeline, resultClass)) override fun bulkWrite(models: MutableList): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( models: MutableList, options: ClientBulkWriteOptions ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( clientSession: ClientSession, models: MutableList ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } override fun bulkWrite( @@ -139,8 +142,9 @@ internal open class SyncMongoCluster(open val wrapped: MongoCluster) : JMongoClu models: MutableList, options: ClientBulkWriteOptions ): ClientBulkWriteResult { - org.junit.jupiter.api.Assumptions.assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement") - TODO("BULK-TODO implement") + org.junit.jupiter.api.Assumptions.assumeTrue( + java.lang.Boolean.parseBoolean(toString()), "BULK-TODO Kotlin implement") + TODO("BULK-TODO Kotlin implement") } private fun ClientSession.unwrapped() = (this as SyncClientSession).wrapped diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java index ef7c0ddb79d..edcc8f29408 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoCluster.java @@ -16,7 +16,10 @@ package com.mongodb.reactivestreams.client; +import com.mongodb.ClientBulkWriteException; import com.mongodb.ClientSessionOptions; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoException; import com.mongodb.MongoNamespace; import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; @@ -24,6 +27,11 @@ import com.mongodb.annotations.Alpha; import com.mongodb.annotations.Immutable; import com.mongodb.annotations.Reason; +import com.mongodb.client.model.bulk.ClientBulkWriteOptions; +import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedDeleteManyModel; +import com.mongodb.client.model.bulk.ClientNamespacedUpdateManyModel; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.lang.Nullable; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistry; @@ -353,4 +361,135 @@ public interface MongoCluster { * @mongodb.driver.dochub core/changestreams Change Streams */ ChangeStreamPublisher watch(ClientSession clientSession, List pipeline, Class resultClass); + + /** + * Executes a client-level bulk write operation. + * This method is functionally equivalent to {@link #bulkWrite(List, ClientBulkWriteOptions)} + * with the {@linkplain ClientBulkWriteOptions#clientBulkWriteOptions() default options}. + *

+ * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}. + * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect, + * executing this operation may require multiple {@code bulkWrite} commands. + * The eligibility for retries is determined per each {@code bulkWrite} command: + * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.

+ *

+ * This operation is not supported by MongoDB Atlas Serverless instances.

+ * + * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}. + * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful, + * or the following errors: + *
    + *
  • + * {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful, + * and there is at least one of the following pieces of information to report: + * {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()}, + * {@link ClientBulkWriteException#getPartialResult()}.
  • + *
  • + * {@link MongoException} - Only if the operation is unsuccessful.
  • + *
+ * @since 5.3 + * @mongodb.server.release 8.0 + * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite + */ + Publisher bulkWrite(List models); + + /** + * Executes a client-level bulk write operation. + *

+ * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}. + * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect, + * executing this operation may require multiple {@code bulkWrite} commands. + * The eligibility for retries is determined per each {@code bulkWrite} command: + * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.

+ *

+ * This operation is not supported by MongoDB Atlas Serverless instances.

+ * + * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}. + * @param options The options. + * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful, + * or the following errors: + *
    + *
  • + * {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful, + * and there is at least one of the following pieces of information to report: + * {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()}, + * {@link ClientBulkWriteException#getPartialResult()}.
  • + *
  • + * {@link MongoException} - Only if the operation is unsuccessful.
  • + *
+ * @since 5.3 + * @mongodb.server.release 8.0 + * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite + */ + Publisher bulkWrite( + List models, + ClientBulkWriteOptions options); + + /** + * Executes a client-level bulk write operation. + * This method is functionally equivalent to {@link #bulkWrite(ClientSession, List, ClientBulkWriteOptions)} + * with the {@linkplain ClientBulkWriteOptions#clientBulkWriteOptions() default options}. + *

+ * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}. + * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect, + * executing this operation may require multiple {@code bulkWrite} commands. + * The eligibility for retries is determined per each {@code bulkWrite} command: + * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.

+ *

+ * This operation is not supported by MongoDB Atlas Serverless instances.

+ * + * @param clientSession The {@linkplain ClientSession client session} with which to associate this operation. + * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}. + * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful, + * or the following errors: + *
    + *
  • + * {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful, + * and there is at least one of the following pieces of information to report: + * {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()}, + * {@link ClientBulkWriteException#getPartialResult()}.
  • + *
  • + * {@link MongoException} - Only if the operation is unsuccessful.
  • + *
+ * @since 5.3 + * @mongodb.server.release 8.0 + * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite + */ + Publisher bulkWrite( + ClientSession clientSession, + List models); + + /** + * Executes a client-level bulk write operation. + *

+ * This operation supports {@linkplain MongoClientSettings#getRetryWrites() retryable writes}. + * Depending on the number of {@code models}, encoded size of {@code models}, and the size limits in effect, + * executing this operation may require multiple {@code bulkWrite} commands. + * The eligibility for retries is determined per each {@code bulkWrite} command: + * {@link ClientNamespacedUpdateManyModel}, {@link ClientNamespacedDeleteManyModel} in a command render it non-retryable.

+ *

+ * This operation is not supported by MongoDB Atlas Serverless instances.

+ * + * @param clientSession The {@linkplain ClientSession client session} with which to associate this operation. + * @param models The {@linkplain ClientNamespacedWriteModel individual write operations}. + * @param options The options. + * @return The {@link Publisher} signalling at most one element {@link ClientBulkWriteResult} if the operation is successful, + * or the following errors: + *
    + *
  • + * {@link ClientBulkWriteException} - If and only if the operation is unsuccessful or partially unsuccessful, + * and there is at least one of the following pieces of information to report: + * {@link ClientBulkWriteException#getWriteConcernErrors()}, {@link ClientBulkWriteException#getWriteErrors()}, + * {@link ClientBulkWriteException#getPartialResult()}.
  • + *
  • + * {@link MongoException} - Only if the operation is unsuccessful.
  • + *
+ * @since 5.3 + * @mongodb.server.release 8.0 + * @mongodb.driver.manual reference/command/bulkWrite/ bulkWrite + */ + Publisher bulkWrite( + ClientSession clientSession, + List models, + ClientBulkWriteOptions options); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java index 27a0c9195c3..3d4822eb7e3 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java @@ -24,6 +24,9 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; +import com.mongodb.client.model.bulk.ClientBulkWriteOptions; +import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.connection.ClusterDescription; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.connection.Cluster; @@ -229,6 +232,30 @@ public ChangeStreamPublisher watch( return delegate.watch(clientSession, pipeline, resultClass); } + @Override + public Publisher bulkWrite(final List models) { + return delegate.bulkWrite(models); + } + + @Override + public Publisher bulkWrite(final List models, + final ClientBulkWriteOptions options) { + return delegate.bulkWrite(models, options); + } + + @Override + public Publisher bulkWrite(final ClientSession clientSession, + final List models) { + return delegate.bulkWrite(clientSession, models); + } + + @Override + public Publisher bulkWrite(final ClientSession clientSession, + final List models, + final ClientBulkWriteOptions options) { + return delegate.bulkWrite(clientSession, models, options); + } + @Override public Publisher startSession() { return delegate.startSession(); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java index 72bcf53e303..04028ecc684 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClusterImpl.java @@ -20,6 +20,9 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; +import com.mongodb.client.model.bulk.ClientBulkWriteOptions; +import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.client.model.changestream.ChangeStreamLevel; import com.mongodb.internal.connection.Cluster; @@ -42,6 +45,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -237,4 +241,40 @@ public ChangeStreamPublisher watch(final ClientSession clientSession, fin resultClass, pipeline, ChangeStreamLevel.CLIENT); } + @Override + public Publisher bulkWrite(final List clientWriteModels) { + notNull("clientWriteModels", clientWriteModels); + isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty()); + return mongoOperationPublisher.clientBulkWrite(null, clientWriteModels, null); + } + + @Override + public Publisher bulkWrite(final List clientWriteModels, + final ClientBulkWriteOptions options) { + notNull("clientWriteModels", clientWriteModels); + isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty()); + notNull("options", options); + return mongoOperationPublisher.clientBulkWrite(null, clientWriteModels, options); + } + + @Override + public Publisher bulkWrite(final ClientSession clientSession, + final List clientWriteModels) { + notNull("clientSession", clientSession); + notNull("clientWriteModels", clientWriteModels); + isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty()); + return mongoOperationPublisher.clientBulkWrite(clientSession, clientWriteModels, null); + } + + @Override + public Publisher bulkWrite(final ClientSession clientSession, + final List clientWriteModels, + final ClientBulkWriteOptions options) { + notNull("clientSession", clientSession); + notNull("clientWriteModels", clientWriteModels); + isTrueArgument("`clientWriteModels` must not be empty", !clientWriteModels.isEmpty()); + notNull("options", options); + return mongoOperationPublisher.clientBulkWrite(clientSession, clientWriteModels, options); + } + } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java index 5ccea518cb5..58030f75fa9 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoOperationPublisher.java @@ -50,6 +50,9 @@ import com.mongodb.client.model.SearchIndexModel; import com.mongodb.client.model.UpdateOptions; import com.mongodb.client.model.WriteModel; +import com.mongodb.client.model.bulk.ClientBulkWriteOptions; +import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.client.result.DeleteResult; import com.mongodb.client.result.InsertManyResult; import com.mongodb.client.result.InsertOneResult; @@ -80,6 +83,7 @@ import java.util.function.Function; import java.util.function.Supplier; +import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static java.util.Collections.singletonList; import static org.bson.codecs.configuration.CodecRegistries.withUuidRepresentation; @@ -91,6 +95,7 @@ public final class MongoOperationPublisher { private final AsyncOperations operations; private final UuidRepresentation uuidRepresentation; + @Nullable private final AutoEncryptionSettings autoEncryptionSettings; private final OperationExecutor executor; @@ -289,6 +294,16 @@ Publisher bulkWrite( () -> operations.bulkWrite(notNull("requests", requests), notNull("options", options)), clientSession); } + Publisher clientBulkWrite( + @Nullable final ClientSession clientSession, + final List clientWriteModels, + @Nullable final ClientBulkWriteOptions options) { + isTrue("`autoEncryptionSettings` is null, as bulkWrite does not currently support automatic encryption", autoEncryptionSettings == null); + return createWriteOperationMono( + operations::getTimeoutSettings, + () -> operations.clientBulkWriteOperation(clientWriteModels, options), clientSession); + } + Publisher insertOne(@Nullable final ClientSession clientSession, final T document, final InsertOneOptions options) { return createSingleWriteRequestMono(() -> operations.insertOne(notNull("document", document), notNull("options", options)), diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java index fbeffd7b369..75a19536cb7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ClientSideOperationTimeoutProseTest.java @@ -489,13 +489,6 @@ public void testTimeoutMsISHonoredForNnextOperationWhenSeveralGetMoreExecutedInt } } - @DisplayName("11. Multi-batch bulkWrites") - @Test - @Override - protected void test11MultiBatchBulkWrites() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - private static void assertCommandStartedEventsInOder(final List expectedCommandNames, final List commandStartedEvents) { assertEquals(expectedCommandNames.size(), commandStartedEvents.size(), "Expected: " + expectedCommandNames + ". Actual: " diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java index 22bb1a23d77..81d88e6fdb0 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/CrudProseTest.java @@ -18,15 +18,6 @@ import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoClient; import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; - -import java.util.function.Supplier; - -import static org.junit.jupiter.api.Assumptions.assumeTrue; /** * See @@ -37,64 +28,4 @@ final class CrudProseTest extends com.mongodb.client.CrudProseTest { protected MongoClient createMongoClient(final MongoClientSettings.Builder mongoClientSettingsBuilder) { return new SyncMongoClient(MongoClients.create(mongoClientSettingsBuilder.build())); } - - @DisplayName("5. MongoClient.bulkWrite collects WriteConcernErrors across batches") - @Test - @Override - protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("6. MongoClient.bulkWrite handles individual WriteErrors across batches") - @ParameterizedTest - @ValueSource(booleans = {false, true}) - @Override - protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordered) { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("8. MongoClient.bulkWrite handles a cursor requiring getMore within a transaction") - @Test - @Override - protected void testBulkWriteHandlesCursorRequiringGetMoreWithinTransaction() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("11. MongoClient.bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size") - @Test - @Override - protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("12. MongoClient.bulkWrite returns an error if no operations can be added to ops") - @ParameterizedTest - @ValueSource(strings = {"document", "namespace"}) - @Override - protected void testBulkWriteSplitsErrorsForTooLargeOpsOrNsInfo(final String tooLarge) { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("13. MongoClient.bulkWrite returns an error if auto-encryption is configured") - @Test - @Override - protected void testBulkWriteErrorsForAutoEncryption() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @DisplayName("15. MongoClient.bulkWrite with unacknowledged write concern uses w:0 for all batches") - @Test - protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } - - @ParameterizedTest - @MethodSource("insertMustGenerateIdAtMostOnceArgs") - @Override - protected void insertMustGenerateIdAtMostOnce( - final Class documentClass, - final boolean expectIdGenerated, - final Supplier documentSupplier) { - assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement"); - } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java index 8ded1f38865..fc3cad4b6a7 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCluster.java @@ -21,7 +21,6 @@ import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; import com.mongodb.WriteConcern; -import com.mongodb.assertions.Assertions; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.ClientSession; import com.mongodb.client.ListDatabasesIterable; @@ -29,8 +28,8 @@ import com.mongodb.client.MongoDatabase; import com.mongodb.client.MongoIterable; import com.mongodb.client.model.bulk.ClientBulkWriteOptions; -import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import org.bson.BsonDocument; import org.bson.Document; import org.bson.codecs.configuration.CodecRegistry; @@ -286,24 +285,22 @@ public ChangeStreamIterable watch(final ClientSession clientS @Override public ClientBulkWriteResult bulkWrite( final List clientWriteModels) throws ClientBulkWriteException { - org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement"); - throw Assertions.fail("BULK-TODO implement"); + return requireNonNull(Mono.from(wrapped.bulkWrite(clientWriteModels)).contextWrite(CONTEXT).block(TIMEOUT_DURATION)); } @Override public ClientBulkWriteResult bulkWrite( final List clientWriteModels, final ClientBulkWriteOptions options) throws ClientBulkWriteException { - org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement"); - throw Assertions.fail("BULK-TODO implement"); + return requireNonNull(Mono.from(wrapped.bulkWrite(clientWriteModels, options)).contextWrite(CONTEXT).block(TIMEOUT_DURATION)); } @Override public ClientBulkWriteResult bulkWrite( final ClientSession clientSession, final List clientWriteModels) throws ClientBulkWriteException { - org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement"); - throw Assertions.fail("BULK-TODO implement"); + return requireNonNull( + Mono.from(wrapped.bulkWrite(unwrap(clientSession), clientWriteModels)).contextWrite(CONTEXT).block(TIMEOUT_DURATION)); } @Override @@ -311,8 +308,8 @@ public ClientBulkWriteResult bulkWrite( final ClientSession clientSession, final List clientWriteModels, final ClientBulkWriteOptions options) throws ClientBulkWriteException { - org.junit.jupiter.api.Assumptions.assumeTrue(Boolean.parseBoolean(toString()), "BULK-TODO implement"); - throw Assertions.fail("BULK-TODO implement"); + return requireNonNull(Mono.from(wrapped.bulkWrite(unwrap(clientSession), clientWriteModels, options)).contextWrite(CONTEXT) + .block(TIMEOUT_DURATION)); } private com.mongodb.reactivestreams.client.ClientSession unwrap(final ClientSession clientSession) { diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java index 5839a7efd8d..09f77743cde 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/PublisherApiTest.java @@ -51,7 +51,7 @@ public class PublisherApiTest { List testPublisherApiMatchesSyncApi() { return asList( dynamicTest("Client Session Api", () -> assertApis(com.mongodb.client.ClientSession.class, ClientSession.class)), -// BULK-TODO uncomment dynamicTest("MongoClient Api", () -> assertApis(com.mongodb.client.MongoClient.class, MongoClient.class)), + dynamicTest("MongoClient Api", () -> assertApis(com.mongodb.client.MongoClient.class, MongoClient.class)), dynamicTest("MongoDatabase Api", () -> assertApis(com.mongodb.client.MongoDatabase.class, MongoDatabase.class)), dynamicTest("MongoCollection Api", () -> assertApis(com.mongodb.client.MongoCollection.class, MongoCollection.class)), dynamicTest("Aggregate Api", () -> assertApis(AggregateIterable.class, AggregatePublisher.class)), diff --git a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java index 42d6bb14c5c..1c096748c11 100644 --- a/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java +++ b/driver-reactive-streams/src/test/unit/com/mongodb/reactivestreams/client/internal/MongoOperationPublisherTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; +import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_TIMEOUT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -113,7 +114,7 @@ public void withReadPreference() { @Test public void withTimeout() { - assertEquals(DEFAULT_MOP, DEFAULT_MOP.withTimeout(60_000, TimeUnit.MILLISECONDS)); + assertEquals(DEFAULT_MOP, DEFAULT_MOP.withTimeout(TIMEOUT, TimeUnit.SECONDS)); assertEquals(1000, DEFAULT_MOP.withTimeout(1000, TimeUnit.MILLISECONDS).getTimeoutMS()); assertThrows(IllegalArgumentException.class, () -> DEFAULT_MOP.withTimeout(500, TimeUnit.NANOSECONDS)); } diff --git a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala index 4c721ed8774..d5516b984ae 100644 --- a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala +++ b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala @@ -35,7 +35,12 @@ class MongoClientSpec extends BaseSpec with MockitoSugar { wrapped.foreach((name: String) => { val cleanedName = name.stripPrefix("get") - assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name") + + // TODO("BULK-TODO remove this if when bulkWrite is implemented and uncomment line 43") + if (!cleanedName.contains("bulkWrite")) { + assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name") + } + // assert(local.contains(name) | local.contains(cleanedName.head.toLower + cleanedName.tail), s"Missing: $name") }) } diff --git a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java index 72e2fdea0e0..101865079e0 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/CrudProseTest.java @@ -32,8 +32,8 @@ import com.mongodb.client.model.Updates; import com.mongodb.client.model.ValidationOptions; import com.mongodb.client.model.bulk.ClientBulkWriteOptions; -import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.client.model.bulk.ClientBulkWriteResult; +import com.mongodb.client.model.bulk.ClientNamespacedWriteModel; import com.mongodb.event.CommandStartedEvent; import com.mongodb.internal.connection.TestCommandListener; import org.bson.BsonArray; @@ -171,7 +171,7 @@ void testBulkWriteSplitsWhenExceedingMaxWriteBatchSize() { int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize"); ClientBulkWriteResult result = client.bulkWrite(nCopies( maxWriteBatchSize + 1, - ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b")))); + insertOne(NAMESPACE, new Document("a", "b")))); assertEquals(maxWriteBatchSize + 1, result.getInsertedCount()); List startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite"); assertEquals(2, startedBulkWriteCommandEvents.size()); @@ -193,7 +193,7 @@ void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytes() { Document helloResponse = droppedDatabase(client).runCommand(new Document("hello", 1)); int maxBsonObjectSize = helloResponse.getInteger("maxBsonObjectSize"); int maxMessageSizeBytes = helloResponse.getInteger("maxMessageSizeBytes"); - ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne( + ClientNamespacedWriteModel model = insertOne( NAMESPACE, new Document("a", join("", nCopies(maxBsonObjectSize - 500, "b")))); int numModels = maxMessageSizeBytes / maxBsonObjectSize + 1; @@ -227,7 +227,7 @@ protected void testBulkWriteCollectsWriteConcernErrorsAcrossBatches() throws Int .addCommandListener(commandListener)); FailPoint ignored = FailPoint.enable(failPointDocument, getPrimary())) { int maxWriteBatchSize = droppedDatabase(client).runCommand(new Document("hello", 1)).getInteger("maxWriteBatchSize"); - ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b")); + ClientNamespacedWriteModel model = insertOne(NAMESPACE, new Document("a", "b")); int numModels = maxWriteBatchSize + 1; ClientBulkWriteException error = assertThrows(ClientBulkWriteException.class, () -> client.bulkWrite(nCopies(numModels, model))); @@ -253,7 +253,7 @@ protected void testBulkWriteHandlesWriteErrorsAcrossBatches(final boolean ordere Document document = new Document("_id", 1); MongoCollection collection = droppedCollection(client, Document.class); collection.insertOne(document); - ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(collection.getNamespace(), document); + ClientNamespacedWriteModel model = insertOne(collection.getNamespace(), document); int numModels = maxWriteBatchSize + 1; ClientBulkWriteException error = assertThrows(ClientBulkWriteException.class, () -> client.bulkWrite(nCopies(numModels, model), clientBulkWriteOptions().ordered(ordered))); @@ -305,7 +305,8 @@ private void assertBulkWriteHandlesCursorRequiringGetMore(final boolean transact clientUpdateOptions().upsert(true))), clientBulkWriteOptions().verboseResults(true) ); - ClientBulkWriteResult result = transaction ? session.withTransaction(action::get) : action.get(); + + ClientBulkWriteResult result = transaction ? runInTransaction(session, action) : action.get(); assertEquals(2, result.getUpsertedCount()); assertEquals(2, result.getVerboseResults().orElseThrow(Assertions::fail).getUpdateResults().size()); assertEquals(1, commandListener.getCommandStartedEvents("bulkWrite").size()); @@ -322,7 +323,7 @@ protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo() () -> { // Case 1: No batch-splitting required testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo((client, models, commandListener) -> { - models.add(ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b"))); + models.add(insertOne(NAMESPACE, new Document("a", "b"))); ClientBulkWriteResult result = client.bulkWrite(models); assertEquals(models.size(), result.getInsertedCount()); List startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite"); @@ -339,7 +340,7 @@ protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo() // Case 2: Batch-splitting required testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo((client, models, commandListener) -> { MongoNamespace namespace = new MongoNamespace(NAMESPACE.getDatabaseName(), join("", nCopies(200, "c"))); - models.add(ClientNamespacedWriteModel.insertOne(namespace, new Document("a", "b"))); + models.add(insertOne(namespace, new Document("a", "b"))); ClientBulkWriteResult result = client.bulkWrite(models); assertEquals(models.size(), result.getInsertedCount()); List startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite"); @@ -371,11 +372,11 @@ private void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo( int remainderBytes = opsBytes % maxBsonObjectSize; List models = new ArrayList<>(nCopies( numModels, - ClientNamespacedWriteModel.insertOne( + insertOne( NAMESPACE, new Document("a", join("", nCopies(maxBsonObjectSize - 57, "b")))))); if (remainderBytes >= 217) { - models.add(ClientNamespacedWriteModel.insertOne( + models.add(insertOne( NAMESPACE, new Document("a", join("", nCopies(remainderBytes - 57, "b"))))); } @@ -394,13 +395,13 @@ protected void testBulkWriteSplitsErrorsForTooLargeOpsOrNsInfo(final String tooL ClientNamespacedWriteModel model; switch (tooLarge) { case "document": { - model = ClientNamespacedWriteModel.insertOne( + model = insertOne( NAMESPACE, new Document("a", join("", nCopies(maxMessageSizeBytes, "b")))); break; } case "namespace": { - model = ClientNamespacedWriteModel.insertOne( + model = insertOne( new MongoNamespace(NAMESPACE.getDatabaseName(), join("", nCopies(maxMessageSizeBytes, "b"))), new Document("a", "b")); break; @@ -429,8 +430,9 @@ protected void testBulkWriteErrorsForAutoEncryption() { assertTrue( assertThrows( IllegalStateException.class, - () -> client.bulkWrite(singletonList(ClientNamespacedWriteModel.insertOne(NAMESPACE, new Document("a", "b"))))) - .getMessage().contains("bulkWrite does not currently support automatic encryption")); + () -> client.bulkWrite(singletonList(insertOne(NAMESPACE, new Document("a", "b")))) + ).getMessage().contains("bulkWrite does not currently support automatic encryption") + ); } } @@ -447,7 +449,7 @@ protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() { Document helloResponse = database.runCommand(new Document("hello", 1)); int maxBsonObjectSize = helloResponse.getInteger("maxBsonObjectSize"); int maxMessageSizeBytes = helloResponse.getInteger("maxMessageSizeBytes"); - ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne( + ClientNamespacedWriteModel model = insertOne( NAMESPACE, new Document("a", join("", nCopies(maxBsonObjectSize - 500, "b")))); int numModels = maxMessageSizeBytes / maxBsonObjectSize + 1; @@ -594,4 +596,21 @@ public int getV() { private interface TriConsumer { void accept(A1 a1, A2 a2, A3 a3); } + + /** + * This method is used instead of {@link ClientSession#withTransaction(TransactionBody)} + * because reactive {@code com.mongodb.reactivestreams.client.ClientSession} do not support it. + */ + private static ClientBulkWriteResult runInTransaction(final ClientSession session, + final Supplier action) { + session.startTransaction(); + try { + ClientBulkWriteResult result = action.get(); + session.commitTransaction(); + return result; + } catch (Throwable throwable) { + session.abortTransaction(); + throw throwable; + } + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 27ebf2a76bd..1b7c3a40716 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -21,10 +21,6 @@ import com.mongodb.MongoNamespace; import com.mongodb.ReadPreference; import com.mongodb.UnixServerAddress; -import com.mongodb.client.unified.UnifiedTestModifications.TestDef; -import com.mongodb.event.TestServerMonitorListener; -import com.mongodb.internal.logging.LogMessage; -import com.mongodb.logging.TestLoggingInterceptor; import com.mongodb.WriteConcern; import com.mongodb.client.ClientSession; import com.mongodb.client.MongoClient; @@ -32,16 +28,20 @@ import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.model.Filters; import com.mongodb.client.test.CollectionHelper; +import com.mongodb.client.unified.UnifiedTestModifications.TestDef; import com.mongodb.client.vault.ClientEncryption; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterType; import com.mongodb.connection.ServerDescription; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.TestServerMonitorListener; import com.mongodb.internal.connection.TestCommandListener; import com.mongodb.internal.connection.TestConnectionPoolListener; +import com.mongodb.internal.logging.LogMessage; import com.mongodb.lang.NonNull; import com.mongodb.lang.Nullable; +import com.mongodb.logging.TestLoggingInterceptor; import com.mongodb.test.AfterBeforeParameterResolver; import org.bson.BsonArray; import org.bson.BsonBoolean; @@ -279,16 +279,7 @@ protected void postSetUp(final TestDef def) { @AfterEach public void cleanUp() { for (FailPoint failPoint : failPoints) { - try { - // BULK-TODO remove the try-catch block - failPoint.disableFailPoint(); - } catch (Throwable e) { - for (Throwable suppressed : e.getSuppressed()) { - if (suppressed instanceof TestAbortedException) { - throw (TestAbortedException) suppressed; - } - } - } + failPoint.disableFailPoint(); } entities.close(); postCleanUp(testDef); @@ -414,11 +405,15 @@ private static void assertOperationResult(final UnifiedTestContext context, fina final OperationResult result) { if (result.getException() instanceof org.opentest4j.TestAbortedException) { // BULK-TODO remove - throw (org.opentest4j.TestAbortedException) result.getException(); + if (result.getException().getMessage().contains("BULK-TODO Kotlin implement")) { + throw (org.opentest4j.TestAbortedException) result.getException(); + } } if (result.getException() instanceof org.junit.AssumptionViolatedException) { // BULK-TODO remove - throw (org.junit.AssumptionViolatedException) result.getException(); + if (result.getException().getMessage().contains("BULK-TODO Kotlin implement")) { + throw (org.junit.AssumptionViolatedException) result.getException(); + } } context.getAssertionContext().push(ContextElement.ofCompletedOperation(operation, result, operationIndex)); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 3687f156c42..67bf394d6cb 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -45,7 +45,6 @@ public static void doSkips(final TestDef def) { .directory("atlas-data-lake-testing"); // change-streams - def.skipNoncompliantReactive("error required from change stream initialization") // TODO reason? .test("change-streams", "change-streams", "Test with document comment - pre 4.4"); def.skipNoncompliantReactive("event sensitive tests") // TODO reason? @@ -198,24 +197,11 @@ public static void doSkips(final TestDef def) { .test("retryable-writes", "findOneAndDelete-errorLabels", "FindOneAndDelete succeeds after WriteConcernError ShutdownInProgress") .test("retryable-writes", "findOneAndReplace-errorLabels", "FindOneAndReplace succeeds after WriteConcernError ShutdownInProgress") //.testContains("retryable-writes", "succeeds after retryable writeConcernError") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable writeConcernError") .test("retryable-writes", "retryable-writes insertOne serverErrors", "InsertOne succeeds after retryable writeConcernError") .test("retryable-writes", "retryable-writes bulkWrite serverErrors", "BulkWrite succeeds after retryable writeConcernError in first batch"); def.skipJira("https://jira.mongodb.org/browse/JAVA-5341") .when(() -> isDiscoverableReplicaSet() && serverVersionLessThan(4, 4)) .test("retryable-writes", "retryable-writes insertOne serverErrors", "RetryableWriteError label is added based on writeConcernError in pre-4.4 mongod response"); - def.skipJira("https://jira.mongodb.org/browse/JAVA-4586") - //.testContains("retryable-writes", "client bulkWrite") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable top-level error") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with multi: true operations fails after retryable top-level error") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with no multi: true operations succeeds after retryable writeConcernError") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with multi: true operations fails after retryable writeConcernError") - .test("retryable-writes", "client bulkWrite retryable writes", "client bulkWrite with retryWrites: false does not retry") - .test("retryable-writes", "client bulkWrite retryable writes with client errors", "client bulkWrite with one network error succeeds after retry") - .test("retryable-writes", "client bulkWrite retryable writes with client errors", "client bulkWrite with two network errors fails after retry") - //.testContains("retryable-writes", "client.clientBulkWrite") - .test("retryable-writes", "retryable writes handshake failures", "client.clientBulkWrite succeeds after retryable handshake network error") - .test("retryable-writes", "retryable writes handshake failures", "client.clientBulkWrite succeeds after retryable handshake server error (ShutdownInProgress)"); // server-discovery-and-monitoring (SDAM)