diff --git a/driver-core/build.gradle b/driver-core/build.gradle index 40a63c15d4..08fac99c01 100644 --- a/driver-core/build.gradle +++ b/driver-core/build.gradle @@ -46,6 +46,10 @@ dependencies { api "io.netty:netty-transport", optional api "io.netty:netty-handler", optional + implementation 'io.grpc:grpc-netty:1.56.0', optional + // required by Java gRPC for Java SE 9+ + compileOnly 'org.apache.tomcat:annotations-api:6.0.53', optional + // Optionally depend on both AWS SDK v2 and v1. The driver will use v2 is present, v1 if present, or built-in functionality if // neither are present implementation "software.amazon.awssdk:auth:$awsSdkV2Version", optional diff --git a/driver-core/src/main/com/mongodb/ConnectionString.java b/driver-core/src/main/com/mongodb/ConnectionString.java index 9914c0d0aa..2fc4dfbbc4 100644 --- a/driver-core/src/main/com/mongodb/ConnectionString.java +++ b/driver-core/src/main/com/mongodb/ConnectionString.java @@ -112,7 +112,7 @@ *
Replica set configuration:
+ *Replica set configuration (must not be specified when {@code gRPC=true}):
*Proxy Configuration:
*Connection pool configuration:
+ *Connection pool configuration (must not be specified when {@code gRPC=true}):
*+ * Must not be changed if {@linkplain #isGrpc() gRPC is enabled}.
* * @param block the block to apply to the ConnectionPoolSettings. * @return this @@ -409,7 +433,9 @@ public Builder readPreference(final ReadPreference readPreference) { /** * Sets the write concern. * - * @param writeConcern the write concern + * @param writeConcern the write concern. + *+ * {@link WriteConcern#withWTimeout(long, TimeUnit) wtimeout} must not be specified if {@linkplain #isGrpc() gRPC is enabled}.
* @return this * @see MongoClientSettings#getWriteConcern() */ @@ -499,7 +525,7 @@ public Builder codecRegistry(final CodecRegistry codecRegistry) { */ @Deprecated public Builder streamFactoryFactory(final StreamFactoryFactory streamFactoryFactory) { - this.streamFactoryFactory = notNull("streamFactoryFactory", streamFactoryFactory); + this.streamFactoryFactory = StreamFactoryFactoryWrapping.unwrap(notNull("streamFactoryFactory", streamFactoryFactory)); return this; } @@ -799,9 +825,24 @@ public CodecRegistry getCodecRegistry() { @Deprecated @Nullable public StreamFactoryFactory getStreamFactoryFactory() { + StreamFactoryFactoryWrapping.assertRequiresNoFurtherWrapping(streamFactoryFactory); return streamFactoryFactory; } + /** + * Gets whether gRPC is enabled. + * + * @return {@code true} if gRPC is enabled, {@code false} if not. + * @see ConnectionString#isGrpc() + * @see MongoClientSettings.Builder#streamFactoryFactory(StreamFactoryFactory) + * @see GrpcStreamFactoryFactory + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ + public boolean isGrpc() { + return grpc(streamFactoryFactory); + } + /** * Gets the settings for the underlying transport implementation * @@ -1077,6 +1118,10 @@ public String toString() { + '}'; } + private static boolean grpc(@Nullable final StreamFactoryFactory streamFactoryFactory) { + return streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory || streamFactoryFactory instanceof GrpcStreamFactoryFactory; + } + private MongoClientSettings(final Builder builder) { readPreference = builder.readPreference; writeConcern = builder.writeConcern; @@ -1085,7 +1130,6 @@ private MongoClientSettings(final Builder builder) { readConcern = builder.readConcern; credential = builder.credential; transportSettings = builder.transportSettings; - streamFactoryFactory = builder.streamFactoryFactory; codecRegistry = builder.codecRegistry; commandListeners = builder.commandListeners; applicationName = builder.applicationName; @@ -1113,5 +1157,58 @@ private MongoClientSettings(final Builder builder) { heartbeatSocketTimeoutSetExplicitly = builder.heartbeatSocketTimeoutMS != 0; heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0; contextProvider = builder.contextProvider; + StreamFactoryFactory builderStreamFactoryFactory = builder.streamFactoryFactory; + if (grpc(builderStreamFactoryFactory)) { + if (clusterSettings.getRequiredReplicaSetName() != null) { + throw new IllegalArgumentException( + "requiredReplicaSetName can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (writeConcern.getWTimeout(MILLISECONDS) != null) { + throw new IllegalArgumentException( + "Write concern wtimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (socketSettings.getReadTimeout(MILLISECONDS) != SocketSettings.builder().build().getReadTimeout(MILLISECONDS)) { + throw new IllegalArgumentException( + "Socket readTimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (!ConnectionPoolSettings.builder().build().equals(connectionPoolSettings)) { + throw new IllegalArgumentException( + "connectionPoolSettings can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + } + streamFactoryFactory = StreamFactoryFactoryWrapping.wrap(builderStreamFactoryFactory, connectionPoolSettings); + } + + private static final class StreamFactoryFactoryWrapping { + @Nullable + static StreamFactoryFactory wrap( + @Nullable final StreamFactoryFactory streamFactoryFactory, + final ConnectionPoolSettings connectionPoolSettings) { + if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) { + return new SharingGrpcStreamFactoryFactory((GrpcStreamFactoryFactory) streamFactoryFactory, connectionPoolSettings); + } + return streamFactoryFactory; + } + + @Nullable + static StreamFactoryFactory unwrap(@Nullable final StreamFactoryFactory streamFactoryFactory) { + if (streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory) { + return ((SharingGrpcStreamFactoryFactory) streamFactoryFactory).unwrap(); + } + return streamFactoryFactory; + } + + static void assertRequiresNoFurtherWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory) throws AssertionError { + assertFalse(requiresWrapping(streamFactoryFactory, false)); + } + + private static boolean requiresWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory, final boolean ifNull) { + if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) { + return true; + } else if (streamFactoryFactory == null) { + return ifNull; + } + return false; + } } } diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java index a668dd344b..2a6c195f5f 100644 --- a/driver-core/src/main/com/mongodb/MongoException.java +++ b/driver-core/src/main/com/mongodb/MongoException.java @@ -116,7 +116,7 @@ public MongoException(@Nullable final String msg, @Nullable final Throwable t) { * @param msg the message * @param t the throwable cause */ - public MongoException(final int code, final String msg, final Throwable t) { + public MongoException(final int code, final String msg, @Nullable final Throwable t) { super(msg, t); this.code = code; if (t instanceof MongoException) { diff --git a/driver-core/src/main/com/mongodb/MongoSocketException.java b/driver-core/src/main/com/mongodb/MongoSocketException.java index 820c2cb769..3f863fc513 100644 --- a/driver-core/src/main/com/mongodb/MongoSocketException.java +++ b/driver-core/src/main/com/mongodb/MongoSocketException.java @@ -16,6 +16,8 @@ package com.mongodb; +import com.mongodb.lang.Nullable; + /** * Subclass of {@link MongoException} representing a network-related exception * @@ -33,7 +35,7 @@ public class MongoSocketException extends MongoException { * @param msg the message * @param e the cause */ - public MongoSocketException(final String msg, final ServerAddress serverAddress, final Throwable e) { + public MongoSocketException(final String msg, final ServerAddress serverAddress, @Nullable final Throwable e) { super(-2, msg, e); this.serverAddress = serverAddress; } diff --git a/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java b/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java index 4bb658a906..3d06ba5637 100644 --- a/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java +++ b/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java @@ -16,6 +16,8 @@ package com.mongodb; +import com.mongodb.lang.Nullable; + /** * This exception is thrown when there is a timeout reading a response from the socket. * @@ -32,7 +34,7 @@ public class MongoSocketReadTimeoutException extends MongoSocketException { * @param address the address * @param cause the cause */ - public MongoSocketReadTimeoutException(final String message, final ServerAddress address, final Throwable cause) { + public MongoSocketReadTimeoutException(final String message, final ServerAddress address, @Nullable final Throwable cause) { super(message, address, cause); } diff --git a/driver-core/src/main/com/mongodb/connection/ClusterSettings.java b/driver-core/src/main/com/mongodb/connection/ClusterSettings.java index 5bfdfa84f0..2018c6b38f 100644 --- a/driver-core/src/main/com/mongodb/connection/ClusterSettings.java +++ b/driver-core/src/main/com/mongodb/connection/ClusterSettings.java @@ -17,6 +17,7 @@ package com.mongodb.connection; import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.ServerAddress; import com.mongodb.annotations.Immutable; import com.mongodb.annotations.NotThreadSafe; @@ -219,6 +220,8 @@ public Builder mode(final ClusterConnectionMode mode) { /** * Sets the required replica set name for the cluster. + *+ * Must be {@code null} if {@linkplain MongoClientSettings#isGrpc() gRPC is enabled}.
* * @param requiredReplicaSetName the required replica set name. * @return this diff --git a/driver-core/src/main/com/mongodb/connection/SocketSettings.java b/driver-core/src/main/com/mongodb/connection/SocketSettings.java index 7a63790cb6..ea7884ab84 100644 --- a/driver-core/src/main/com/mongodb/connection/SocketSettings.java +++ b/driver-core/src/main/com/mongodb/connection/SocketSettings.java @@ -18,6 +18,7 @@ import com.mongodb.Block; import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.annotations.Immutable; import java.util.concurrent.TimeUnit; @@ -104,6 +105,8 @@ public Builder connectTimeout(final int connectTimeout, final TimeUnit timeUnit) /** * Sets the socket read timeout. + *+ * Must not be specified if {@linkplain MongoClientSettings#isGrpc() gRPC is enabled}.
* * @param readTimeout the read timeout * @param timeUnit the time unit diff --git a/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java new file mode 100644 index 0000000000..b1e1c897e6 --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java @@ -0,0 +1,110 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.connection.grpc; + +import com.mongodb.ConnectionString; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.SslSettings; +import com.mongodb.connection.Stream; +import com.mongodb.connection.StreamFactory; +import com.mongodb.connection.StreamFactoryFactory; +import com.mongodb.internal.connection.grpc.SharingGrpcStreamFactoryFactory; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; +import io.netty.buffer.ByteBufAllocator; + +import java.util.UUID; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * A {@link StreamFactoryFactory} for gRPC-based {@linkplain Stream streams}. + * + * @see ConnectionString#isGrpc() + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ +@ThreadSafe +public final class GrpcStreamFactoryFactory implements StreamFactoryFactory { + private final UUID clientId; + private final ByteBufAllocator allocator; + + @Override + public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { + // Since a user may share the same instance of `GrpcStreamFactoryFactory` between `MongoClient`s, + // all `StreamFactory`s created by `GrpcStreamFactoryFactory` must be isolated from each other, + // which is why we pass a new instance of `Channels` here. + return new GrpcStreamFactory(socketSettings, sslSettings, null, null, clientId, allocator, + new SharingGrpcStreamFactoryFactory.Channels()); + } + + /** + * Creates a builder for {@link GrpcStreamFactoryFactory}. + * + * @return The created {@link Builder}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * A builder for {@link GrpcStreamFactoryFactory}. + * + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ + public static final class Builder { + private ByteBufAllocator allocator; + + Builder() { + allocator = ByteBufAllocator.DEFAULT; + } + + /** + * Sets the allocator. + * + * @param allocator The allocator. + * @return {@code this}. + */ + public Builder allocator(final ByteBufAllocator allocator) { + this.allocator = notNull("allocator", allocator); + return this; + } + + /** + * Creates {@link GrpcStreamFactoryFactory}. + * + * @return The created {@link GrpcStreamFactoryFactory}. + */ + public GrpcStreamFactoryFactory build() { + return new GrpcStreamFactoryFactory(this); + } + } + + private GrpcStreamFactoryFactory(final Builder builder) { + clientId = UUID.randomUUID(); + this.allocator = builder.allocator; + } + + @Override + public String toString() { + return "GrpcStreamFactoryFactory{" + + "clientId=" + clientId + + ", allocator=" + allocator + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/connection/grpc/package-info.java b/driver-core/src/main/com/mongodb/connection/grpc/package-info.java new file mode 100644 index 0000000000..f5ff63a4c7 --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/grpc/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains gRPC-specific program elements. + */ +@NonNullApi +package com.mongodb.connection.grpc; + +import com.mongodb.lang.NonNullApi; diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index bb971603ab..4b362210e3 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -40,7 +40,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -71,6 +70,7 @@ import static com.mongodb.internal.Locks.withLock; import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification; import static com.mongodb.internal.connection.SslHelper.enableSni; +import static com.mongodb.internal.connection.netty.NettyChannelOptionsSetter.configureNettyChannelOptions; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -187,17 +187,7 @@ private void initializeChannel(final AsyncCompletionHandlerThis class is not part of the public API and may be removed or changed at any time
*/ @SuppressWarnings("deprecation") public final class StreamFactoryHelper { @Nullable - public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final MongoClientSettings settings) { + public static StreamFactoryFactory getStreamFactoryFactoryFromSettings( + final MongoClientSettings settings, + @Nullable final MongoDriverInformation mongoDriverInformation) { StreamFactoryFactory streamFactoryFactory; TransportSettings transportSettings = settings.getTransportSettings(); if (transportSettings != null) { @@ -43,7 +48,7 @@ public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final Mon } else { streamFactoryFactory = settings.getStreamFactoryFactory(); } - return streamFactoryFactory; + return configureClientMetadataDocument(streamFactoryFactory, settings, mongoDriverInformation); } private StreamFactoryHelper() { diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java new file mode 100644 index 0000000000..049fe4f326 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java @@ -0,0 +1,128 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.grpc; + +import com.mongodb.MongoConnectionPoolClearedException; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.ConnectionPoolSettings; +import com.mongodb.connection.ServerId; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.connection.ConnectionGenerationSupplier; +import com.mongodb.internal.connection.ConnectionPool; +import com.mongodb.internal.connection.DefaultConnectionPool.ServiceStateManager; +import com.mongodb.internal.connection.InternalConnection; +import com.mongodb.internal.connection.InternalConnectionFactory; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.NonNull; +import com.mongodb.lang.Nullable; +import org.bson.types.ObjectId; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * A non-pooling {@link ConnectionPool} for {@link GrpcStream}-based {@link InternalConnection}s. + *+ * This class is not part of the public API and may be removed or changed at any time.
+ */ +// VAKOTODO events, logging? Here https://docs.google.com/document/d/1efw8AykgYHE9mwuaS3p3H-mHv3wPUDMPasDJxyH6Ts8/edit#heading=h.8r6ec9yofp1 +// it is said not to do any of events/logging that CMAP requires. +@ThreadSafe +public final class GrpcConnectionPool implements ConnectionPool { + private final ServerId serverId; + private final InternalConnectionFactory internalConnectionFactory; + private final ConnectionPoolSettings settings; + private final boolean loadBalanced; + private final ServiceStateManager serviceStateManager; + private final AtomicInteger generation; + private final ConnectionGenerationSupplier generationSupplier; + + public GrpcConnectionPool( + final ServerId serverId, + final InternalConnectionFactory internalConnectionFactory, + final ConnectionPoolSettings settings, + final boolean loadBalanced) { + this.serverId = serverId; + this.internalConnectionFactory = internalConnectionFactory; + this.settings = settings; + this.loadBalanced = loadBalanced; + serviceStateManager = new ServiceStateManager(); + generation = new AtomicInteger(); + generationSupplier = new ConnectionGenerationSupplier() { + @Override + public int getGeneration() { + return GrpcConnectionPool.this.getGeneration(); + } + + @Override + public int getGeneration(@NonNull final ObjectId serviceId) { + return serviceStateManager.getGeneration(serviceId); + } + }; + } + + @Override + public InternalConnection get(final OperationContext operationContext) throws MongoConnectionPoolClearedException { + return get(operationContext, settings.getMaxWaitTime(MILLISECONDS), MILLISECONDS); + } + + @Override + public InternalConnection get(final OperationContext operationContext, final long timeout, final TimeUnit timeUnit) + throws MongoConnectionPoolClearedException { + InternalConnection connection = internalConnectionFactory.create(serverId, generationSupplier); + connection.open(); + return connection; + } + + @Override + public void getAsync(final OperationContext operationContext, final SingleResultCallback+ * This class is not part of the public API and may be removed or changed at any time.
+ */ +// While `GrpcStream` does not have to be thread-safe, some of its state may still be accessed concurrently by a thread +// calling a method on an instance of `GrpcStream`, and an internal Java gRPC thread. +// For example, `GrpcStream` may be closed not only by the rest of the driver but also by Java gRPC. +// The following is a pattern used in this implementation to deal with conflicts arising from the aforementioned concurrency: +// 1) Do actions that may conflict with a concurrent closing activity. +// 2) Check if the closing activity has started. +// 2.1) If "no", then there is nothing to worry about as now we know that actions in 1) are ordered before (in the happens-before order) +// the closing activity, and the closing activity will fully see the results of 1) and act accordingly. +// 2.2) If "yes", then the closing activity is concurrent with 1) and may not fully see the results of 1). +// Start the closing activity to guarantee that it fully sees the results of 1). +// Note that this requires the closing activity to support being run concurrently with itself. +@NotThreadSafe +final class GrpcStream implements Stream { + private static final String SERVICE_NAME = "mongodb.CommandService"; + private static final String UNAUTH_STREAM_FULL_METHOD_NAME = + MethodDescriptor.generateFullMethodName(SERVICE_NAME, "UnauthenticatedCommandStream"); + private static final String AUTH_STREAM_FULL_METHOD_NAME = + MethodDescriptor.generateFullMethodName(SERVICE_NAME, "AuthenticatedCommandStream"); + private static final Marshallers+ * Idempotent.
+ * + * @param fromClientCallListener {@code true} iff called from {@link ClientCall.Listener#onClose(Status, Metadata)}. + */ + @SuppressWarnings("try") + private void close(@Nullable final StatusRuntimeException callStatusException, final boolean fromClientCallListener) { + if (closed.compareAndSet(false, true)) { + try (NoCheckedAutoCloseable writeState = () -> this.writeState.close(callStatusException); + NoCheckedAutoCloseable readState = () -> this.readState.close(callStatusException)) { + if (!fromClientCallListener) { + // At this point we know that we were called by the rest of the driver via the `Stream.close` method, + // rather than by Java gRPC. Therefore, we are not running concurrently with any `ClientCall` methods, + // and are allowed to run `ClientCall.cancel` despite it not being thread-safe. + call.cancel(this + " was closed", assertNull(callStatusException)); + } + } + } + } + + private Timeout startNow(final int additionalTimeout) { + int socketReadTimeoutMillis = socketSettings.getReadTimeout(TimeUnit.MILLISECONDS); + return socketReadTimeoutMillis == INFINITE_SOCKET_READ_TIMEOUT + ? Timeout.infinite() + : Timeout.startNow(socketReadTimeoutMillis + additionalTimeout, TimeUnit.MILLISECONDS); + } + + private static final class ResourceUtil { + static+ * This method cannot be called more than once, but {@link #pendingWrite} may be written to concurrently with this method + * executing, because {@link GrpcStream#close(StatusRuntimeException, boolean)} may be called by Java gRPC at any point. + * We must make sure that {@link WriteState} + * cannot end up in a closed state with {@link #pendingWrite} not being {@linkplain PendingWrite#isCompleted() completed}.
+ */ + void close(@Nullable final StatusRuntimeException callStatusException) { + assertFalse(closed); + closed = true; + closePendingWrite(this.pendingWrite, callStatusException); + } + + private void closePendingWrite(@Nullable final PendingWrite pendingWrite, @Nullable final StatusRuntimeException callStatusException) { + if (pendingWrite != null) { + pendingWrite.completeExceptionally(callStatusException == null + ? exceptions.writeFailedStreamClosed() + : callStatusException); + } + } + + @ThreadSafe + static final class PendingWrite { + /** + * Contains {@code null} only if {@link #message} is {@linkplain #detachMessage() detached}, + * or {@link PendingWrite} is {@linkplain #isCompleted() completed}. + */ + private final AtomicReference+ * This method cannot be called more than once, but {@link #pendingRead} may be written to concurrently with this method + * executing, because {@link GrpcStream#close(StatusRuntimeException, boolean)} may be called by Java gRPC at any point. + * We must make sure that {@link ReadState} + * cannot end up in a closed state with {@link #pendingRead} not being {@linkplain PendingRead#isCompleted() completed}.
+ */ + void close(@Nullable final StatusRuntimeException callStatusException) { + assertFalse(closed); + closed = true; + try { + closePendingRead(this.pendingRead, callStatusException); + } finally { + clearAndClose(inputStreams); + } + } + + private void closePendingRead(@Nullable final PendingRead pendingRead, @Nullable final StatusRuntimeException callStatusException) { + if (pendingRead != null) { + pendingRead.completeExceptionally(callStatusException == null + ? exceptions.readFailedStreamClosed() + : callStatusException); + } + } + + private void clearAndClose(final LinkedBlockingQueue+ * This method relies on streams in {@link #inputStreams} to contain all the data they may produce and report + * {@link #END_OF_STREAM} when that data is exhausted instead of blocking. Common sense suggests that this must be the case, + * but Java gRPC does not state such a guarantee. If this turns out to not be the case, and blocking harms us, + * then we will have to do copying in {@link InputStreamUnmarshaller#parse(InputStream)} + * from an {@link InputStream} to a buffer.
+ */ + void tryComplete() { + try { + boolean timedOut; + if (timeout.isInfinite()) { + messageLock.lockInterruptibly(); + timedOut = false; + } else { + timedOut = !messageLock.tryLock(timeout.remaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + if (timedOut) { + completeExceptionally(exceptions.readTimedOut()); + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + completeExceptionally(exceptions.interrupted(e)); + return; + } + try { + uncheckedTryComplete(); + } finally { + messageLock.unlock(); + } + } + + /** + * Must be guarded with {@link #messageLock}. + */ + private void uncheckedTryComplete() { + // While we are trying to complete, we temporarily detach `message` to prevent other threads + // from releasing it concurrently with us using it. + // This `PendingRead` still may be completed exceptionally concurrently, + // or it may have been completed normally. + io.netty.buffer.ByteBuf detachedMessage = this.message.getAndSet(null); + if (detachedMessage == null) { + assertTrue(isCompleted()); + return; + } else { + assertTrue(unreadNumBytes > 0); + } + try { + InputStream inputStream = inputStreams.peek(); + while (unreadNumBytes > 0 && inputStream != null) { + int readNumBytes = detachedMessage.writeBytes(inputStream, unreadNumBytes); + if (readNumBytes == END_OF_STREAM) { + inputStreams.remove().close(); + inputStream = inputStreams.peek(); + } else { + unreadNumBytes -= readNumBytes; + } + if (unreadNumBytes > 0) { + if (timeout.expired()) { + throw exceptions.readTimedOut(); + } else if (Thread.currentThread().isInterrupted()) { + throw exceptions.interrupted(null); + } + } + } + } catch (Exception e) { + completeExceptionally(e); + } finally { + if (unreadNumBytes == 0) { + complete(detachedMessage); + } else if (unreadNumBytes > 0) { + // reattach `detachedMessage` because we have not read everything we need + this.message.set(detachedMessage); + if (isCompleted()) { + assertTrue(future.isCompletedExceptionally()); + detachAndReleaseMessage(); + } + } else { + fail(); + } + } + } + + private void complete(final io.netty.buffer.ByteBuf detachedMessage) { + ByteBuf result = new NettyByteBuf(detachedMessage); + result.flip(); + if (!future.complete(result)) { + assertTrue(future.isCompletedExceptionally()); + result.release(); + } + } + + private void detachAndReleaseMessage() { + io.netty.buffer.ByteBuf detachedMessage = this.message.getAndSet(null); + if (detachedMessage != null) { + detachedMessage.release(); + } + } + } + } + + @ThreadSafe + private static final class PendingWriteMarshaller implements Marshaller+ * This class is not part of the public API and may be removed or changed at any time.
+ */ +@ThreadSafe +public final class GrpcStreamFactory implements StreamFactory { + /** + * @see MessageSettings#DEFAULT_MAX_DOCUMENT_SIZE + */ + // VAKOTODO use + private static final int MAX_BSON_OBJECT_SIZE = MessageSettings.DEFAULT_MAX_DOCUMENT_SIZE; + /** + * @see MessageSettings#DEFAULT_MAX_MESSAGE_SIZE + */ + static final int MAX_MESSAGE_SIZE_BYTES = 48_000_000; + /** + * @see MessageSettings#DEFAULT_MAX_BATCH_COUNT + */ + // VAKOTODO use + private static final int MAX_WRITE_BATCH_SIZE = 100_000; + + private final SocketSettings socketSettings; + private final SslSettings sslSettings; + @Nullable + private final ConnectionPoolSettings connectionPoolSettings; + private final UUID clientId; + private final NettyByteBufProvider bufferProvider; + @Nullable + private final ClientMetadataDocument clientMetadataDocument; + private final SharingGrpcStreamFactoryFactory.Channels channels; + + /** + * @param connectionPoolSettings Not {@code null} iff this constructor is called from {@link SharingGrpcStreamFactoryFactory}. + * @param clientMetadataDocument May be non-{@code null} only if this constructor is called from {@link SharingGrpcStreamFactoryFactory}. + * @param clientId The value for the {@code mongodb-clientId} {@linkplain Metadata.Key metadata key}. + */ + public GrpcStreamFactory( + final SocketSettings socketSettings, + final SslSettings sslSettings, + @Nullable + final ConnectionPoolSettings connectionPoolSettings, + @Nullable + final BsonDocument clientMetadataDocument, + final UUID clientId, + final ByteBufAllocator allocator, + final SharingGrpcStreamFactoryFactory.Channels channels) { + this.socketSettings = socketSettings; + this.sslSettings = sslSettings; + this.connectionPoolSettings = connectionPoolSettings; + this.clientId = clientId; + bufferProvider = new NettyByteBufProvider(allocator); + this.clientMetadataDocument = clientMetadataDocument == null ? null : new ClientMetadataDocument(clientMetadataDocument, bufferProvider); + this.channels = channels; + } + + @Override + public GrpcStream create(final ServerAddress serverAddress) { + return new GrpcStream(serverAddress, clientId, clientMetadataDocument, socketSettings, bufferProvider, + (fullMethodName, marshallers) -> createCall(serverAddress, fullMethodName, marshallers)); + } + + public UUID clientId() { + return clientId; + } + + public ByteBufAllocator allocator() { + return bufferProvider.allocator(); + } + + private ClientCall+ * This class is not part of the public API and may be removed or changed at any time.
+ */ +@ThreadSafe +public final class SharingGrpcStreamFactoryFactory implements StreamFactoryFactory { + private final GrpcStreamFactoryFactory wrapped; + private final ConnectionPoolSettings connectionPoolSettings; + @Nullable + private final BsonDocument clientMetadataDocument; + private final Channels sharedChannels; + + public SharingGrpcStreamFactoryFactory(final GrpcStreamFactoryFactory wrapped, final ConnectionPoolSettings connectionPoolSettings) { + this(wrapped, connectionPoolSettings, null); + } + + private SharingGrpcStreamFactoryFactory( + final GrpcStreamFactoryFactory wrapped, + final ConnectionPoolSettings connectionPoolSettings, + @Nullable final BsonDocument clientMetadataDocument) { + this.wrapped = wrapped; + this.connectionPoolSettings = connectionPoolSettings; + this.clientMetadataDocument = clientMetadataDocument; + sharedChannels = new Channels(); + } + + /** + * @return The configured {@link SharingGrpcStreamFactoryFactory}, which may or may not be the same as {@code this}. + */ + public SharingGrpcStreamFactoryFactory withClientMetadataDocument(@Nullable final BsonDocument clientMetadataDocument) { + return clientMetadataDocument == null ? this : new SharingGrpcStreamFactoryFactory(wrapped, connectionPoolSettings, clientMetadataDocument); + } + + @Override + public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { + GrpcStreamFactory streamFactoryFromWrapped = (GrpcStreamFactory) wrapped.create(socketSettings, sslSettings); + return new GrpcStreamFactory(socketSettings, sslSettings, connectionPoolSettings, clientMetadataDocument, + streamFactoryFromWrapped.clientId(), streamFactoryFromWrapped.allocator(), sharedChannels); + } + + /** + * Returns the wrapped {@link GrpcStreamFactoryFactory}. + */ + public GrpcStreamFactoryFactory unwrap() { + return wrapped; + } + + @Override + public String toString() { + return "SharingGrpcStreamFactoryFactory{" + + "wrapped=" + wrapped + + '}'; + } + + // VAKOTODO how to release resources? + @ThreadSafe + public static final class Channels { + private final ConcurrentHashMap{@code + * $ export MONGO_DIR=/usr/local/bin/ + * $ export RSNAME="" + * $ export SERVERLESS_MODE=true + * $ export SERVERLESS_METRICS_MODE=true + * $ export GRPC_MODE=true + * $ + * $ # see https://wiki.corp.mongodb.com/display/MMS/MongoDB+Agent+Resources#MongoDBAgentResources-Developing + * $ export GOPRIVATE=github.com/10gen/cloud-agent-common,github.com/10gen/cloud-auth-common,github.com/10gen/bsonio,github.com/10gen/bsonutil,github.com/10gen/mongoast,github.com/10gen/mongonet + * $ git config --global url."ssh://git@github.com/10gen/cloud-agent-common".insteadOf "https://github.com/10gen/cloud-agent-common" + * $ git config --global url."ssh://git@github.com/10gen/cloud-auth-common".insteadOf "https://github.com/10gen/cloud-auth-common" + * $ git config --global url."ssh://git@github.com/10gen/mongoast".insteadOf "https://github.com/10gen/mongoast" + * $ git config --global url."ssh://git@github.com/10gen/bsonio".insteadOf "https://github.com/10gen/bsonio" + * $ git config --global url."ssh://git@github.com/10gen/bsonutil".insteadOf "https://github.com/10gen/bsonutil" + * $ git config --global url."ssh://git@github.com/10gen/mongonet".insteadOf "https://github.com/10gen/mongonet" + * $ + * $ cd /Users/valentin.kovalenko/Documents/projects/atlasproxy/main/ + * $ # works with go 1.20.5 + * $ ./start_test_proxies_and_mtms.sh + * }+ * + * If successful, three {@code mongod} processes and three proxies will be started: + * + *
{@code + * atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27000/?ssl=true' -bindPort 9900 -grpcBindPort 9901 -metricsPort 8100 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9900.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9900 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -serverlessMode=true -serverlessMetricsMode=true -testing=true + * atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27010/?ssl=true' -bindPort 9910 -grpcBindPort 9911 -metricsPort 8110 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9910.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9910 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -disableProfile -serverlessMode=true -serverlessMetricsMode=true -testing=true + * atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27020/?ssl=true' -bindPort 9920 -grpcBindPort 9921 -metricsPort 8120 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9920.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9920 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -disableProfile -serverlessMode=true -serverlessMetricsMode=true -testing=true + * }+ * + * Connect to the fist one, which presents itself as {@code mongos} ({@link ServerType#SHARD_ROUTER}), via {@code mongosh}: + * + *
{@code + * $ mongosh "mongodb://user:pencil@host9.local.10gen.cc:9900/?tls=true" --tlsCAFile=/Users/valentin.kovalenko/Documents/projects/atlasproxy/main/ca.pem + * }+ * + * Create a truststore that will be used by {@link MongoClient} to authenticate the server + * (note that {@link SslSettings.Builder#invalidHostNameAllowed(boolean)} cannot be used to disable the authentication, it may only + * relax it when it comes to verifying the server hostname against the subject in the certificate presented by the server): + * + *
{@code + * $ cd /Users/valentin.kovalenko/Documents/projects/atlasproxy/main/ + * $ cp /Users/valentin.kovalenko/.sdkman/candidates/java/current/lib/security/cacerts ./mongo-truststore + * $ keytool --importcert -trustcacerts -file ./ca.pem -keystore mongo-truststore -storepass changeit -storetype JKS -noprompt + * }+ * + * Connect to the first one via {@link Java5018} by running it with the following {@code java} CLI arguments: + * + *
{@code + * -Djavax.net.trustStoreType=jks -Djavax.net.ssl.trustStore=/Users/valentin.kovalenko/Documents/projects/atlasproxy/main/mongo-truststore -Djavax.net.ssl.trustStorePassword=changeit + * }+ * + * Note that the port 9900 is for mongorpc, while 9901 is for gRPC. + */ +final class Java5018 { + public static void main(final String... args) { + try (MongoClient client = MongoClients.create(MongoClientSettings.builder() + .applyConnectionString( +// new ConnectionString("mongodb://user:pencil@host9.local.10gen.cc:9900/?tls=true") + new ConnectionString("mongodb://user:pencil@host9.local.10gen.cc:9901/?gRPC=true") + ).applyToClusterSettings(builder -> builder + .serverSelectionTimeout(1, TimeUnit.SECONDS) + ).applyToSocketSettings(builder -> builder + .connectTimeout(1, TimeUnit.SECONDS) + ).build())) { + assertTrue(StreamSupport.stream(client.listDatabaseNames().spliterator(), false) + .anyMatch(dbName -> dbName.equals("admin"))); + } + } + + private Java5018() { + } +}