Skip to content

Commit

Permalink
Forbid ordered/verbose unacknowledged bulk writes (#1570)
Browse files Browse the repository at this point in the history
  • Loading branch information
stIncMale authored Dec 3, 2024
1 parent 3672152 commit 65e72d0
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.mongodb.internal.operation;

import com.mongodb.ClientBulkWriteException;
import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
Expand Down Expand Up @@ -119,7 +120,6 @@
import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState;
import static com.mongodb.internal.operation.CommandOperationHelper.shouldAttemptToRetryWriteAndAddRetryableLabel;
import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException;
import static com.mongodb.internal.operation.CommandOperationHelper.validateAndGetEffectiveWriteConcern;
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor;
import static com.mongodb.internal.operation.SyncOperationHelper.decorateWriteWithRetries;
Expand Down Expand Up @@ -169,8 +169,7 @@ public ClientBulkWriteOperation(

@Override
public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBulkWriteException {
WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(
writeConcernSetting, binding.getOperationContext().getSessionContext());
WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext());
ResultAccumulator resultAccumulator = new ResultAccumulator();
MongoException transformedTopLevelError = null;
try {
Expand Down Expand Up @@ -366,6 +365,19 @@ private ClientBulkWriteCommand createBulkWriteCommand(
}));
}

private WriteConcern validateAndGetEffectiveWriteConcern(final SessionContext sessionContext) {
WriteConcern effectiveWriteConcern = CommandOperationHelper.validateAndGetEffectiveWriteConcern(writeConcernSetting, sessionContext);
if (!effectiveWriteConcern.isAcknowledged()) {
if (options.isVerboseResults()) {
throw new MongoClientException("Cannot request unacknowledged write concern and verbose results");
}
if (options.isOrdered()) {
throw new MongoClientException("Cannot request unacknowledged write concern and ordered writes");
}
}
return effectiveWriteConcern;
}

private <T> void encodeUsingRegistry(final BsonWriter writer, final T value) {
encodeUsingRegistry(writer, value, DEFAULT_ENCODER_CONTEXT);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
}
}
}
]
],
"ordered": false
},
"expectResult": {
"insertedCount": {
Expand Down Expand Up @@ -158,7 +159,7 @@
"command": {
"bulkWrite": 1,
"errorsOnly": true,
"ordered": true,
"ordered": false,
"ops": [
{
"insert": 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,64 @@
}
}
]
},
{
"description": "Requesting unacknowledged write with verboseResults is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"verboseResults": true,
"ordered": false,
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and verbose results"
}
}
]
},
{
"description": "Requesting unacknowledged write with ordered is a client-side error",
"operations": [
{
"name": "clientBulkWrite",
"object": "client0",
"arguments": {
"models": [
{
"insertOne": {
"namespace": "crud-tests.coll0",
"document": {
"_id": 10
}
}
}
],
"writeConcern": {
"w": 0
}
},
"expectError": {
"isClientError": true,
"errorContains": "Cannot request unacknowledged write concern and ordered writes"
}
}
]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ protected void testBulkWriteErrorsForAutoEncryption() {
assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
}

@DisplayName("15. MongoClient.bulkWrite with unacknowledged write concern uses w:0 for all batches")
@Test
protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() {
assumeTrue(java.lang.Boolean.parseBoolean(toString()), "BULK-TODO implement");
}

@ParameterizedTest
@MethodSource("insertMustGenerateIdAtMostOnceArgs")
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.MongoWriteConcernException;
import com.mongodb.MongoWriteException;
import com.mongodb.WriteConcern;
import com.mongodb.assertions.Assertions;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Filters;
Expand Down Expand Up @@ -312,15 +313,6 @@ private void assertBulkWriteHandlesCursorRequiringGetMore(final boolean transact
}
}

private static Stream<Arguments> testBulkWriteErrorsForUnacknowledgedTooLargeInsertArgs() {
return Stream.of(
arguments("insert", false),
arguments("insert", true),
arguments("replace", false),
arguments("replace", true)
);
}

@DisplayName("11. MongoClient.bulkWrite batch splits when the addition of a new namespace exceeds the maximum message size")
@Test
protected void testBulkWriteSplitsWhenExceedingMaxMessageSizeBytesDueToNsInfo() {
Expand Down Expand Up @@ -442,6 +434,40 @@ protected void testBulkWriteErrorsForAutoEncryption() {
}
}

@DisplayName("15. MongoClient.bulkWrite with unacknowledged write concern uses w:0 for all batches")
@Test
protected void testWriteConcernOfAllBatchesWhenUnacknowledgedRequested() {
assumeTrue(serverVersionAtLeast(8, 0));
assumeFalse(isServerlessTest());
TestCommandListener commandListener = new TestCommandListener();
try (MongoClient client = createMongoClient(getMongoClientSettingsBuilder().addCommandListener(commandListener)
.writeConcern(WriteConcern.UNACKNOWLEDGED))) {
MongoDatabase database = droppedDatabase(client);
database.createCollection(NAMESPACE.getCollectionName());
Document helloResponse = database.runCommand(new Document("hello", 1));
int maxBsonObjectSize = helloResponse.getInteger("maxBsonObjectSize");
int maxMessageSizeBytes = helloResponse.getInteger("maxMessageSizeBytes");
ClientNamespacedWriteModel model = ClientNamespacedWriteModel.insertOne(
NAMESPACE,
new Document("a", join("", nCopies(maxBsonObjectSize - 500, "b"))));
int numModels = maxMessageSizeBytes / maxBsonObjectSize + 1;
ClientBulkWriteResult result = client.bulkWrite(nCopies(numModels, model), clientBulkWriteOptions().ordered(false));
assertFalse(result.isAcknowledged());
List<CommandStartedEvent> startedBulkWriteCommandEvents = commandListener.getCommandStartedEvents("bulkWrite");
assertEquals(2, startedBulkWriteCommandEvents.size());
CommandStartedEvent firstEvent = startedBulkWriteCommandEvents.get(0);
BsonDocument firstCommand = firstEvent.getCommand();
CommandStartedEvent secondEvent = startedBulkWriteCommandEvents.get(1);
BsonDocument secondCommand = secondEvent.getCommand();
assertEquals(numModels - 1, firstCommand.getArray("ops").size());
assertEquals(1, secondCommand.getArray("ops").size());
assertEquals(firstEvent.getOperationId(), secondEvent.getOperationId());
assertEquals(0, firstCommand.getDocument("writeConcern").getInt32("w").intValue());
assertEquals(0, secondCommand.getDocument("writeConcern").getInt32("w").intValue());
assertEquals(numModels, database.getCollection(NAMESPACE.getCollectionName()).countDocuments());
}
}

/**
* This test is not from the specification.
*/
Expand Down

0 comments on commit 65e72d0

Please sign in to comment.