diff --git a/driver-core/build.gradle b/driver-core/build.gradle index 40a63c15d4..08fac99c01 100644 --- a/driver-core/build.gradle +++ b/driver-core/build.gradle @@ -46,6 +46,10 @@ dependencies { api "io.netty:netty-transport", optional api "io.netty:netty-handler", optional + implementation 'io.grpc:grpc-netty:1.56.0', optional + // required by Java gRPC for Java SE 9+ + compileOnly 'org.apache.tomcat:annotations-api:6.0.53', optional + // Optionally depend on both AWS SDK v2 and v1. The driver will use v2 is present, v1 if present, or built-in functionality if // neither are present implementation "software.amazon.awssdk:auth:$awsSdkV2Version", optional diff --git a/driver-core/src/main/com/mongodb/ConnectionString.java b/driver-core/src/main/com/mongodb/ConnectionString.java index 9914c0d0aa..2fc4dfbbc4 100644 --- a/driver-core/src/main/com/mongodb/ConnectionString.java +++ b/driver-core/src/main/com/mongodb/ConnectionString.java @@ -112,7 +112,7 @@ *
  • {@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the * cluster.
  • * - *

    Replica set configuration:

    + *

    Replica set configuration (must not be specified when {@code gRPC=true}):

    * *

    Proxy Configuration:

    * - *

    Connection pool configuration:

    + *

    Connection pool configuration (must not be specified when {@code gRPC=true}):

    * @@ -310,6 +315,7 @@ public class ConnectionString { private String applicationName; private List compressorList; private UuidRepresentation uuidRepresentation; + private Boolean grpc; /** * Creates a ConnectionString from the given string. @@ -480,6 +486,32 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient if (requiredReplicaSetName != null && srvMaxHosts != null && srvMaxHosts > 0) { throw new IllegalArgumentException("srvMaxHosts can not be specified with replica set name"); } + if (grpc != null && grpc) { + if (requiredReplicaSetName != null) { + throw new IllegalArgumentException("replicaSet can not be specified with gRPC=true"); + } + if (writeConcern != null && writeConcern.getWTimeout(TimeUnit.MILLISECONDS) != null) { + throw new IllegalArgumentException("wTimeoutMS can not be specified with gRPC=true"); + } + if (maxConnectionPoolSize != null) { + throw new IllegalArgumentException("maxPoolSize can not be specified with gRPC=true"); + } + if (minConnectionPoolSize != null) { + throw new IllegalArgumentException("minPoolSize can not be specified with gRPC=true"); + } + if (maxWaitTime != null) { + throw new IllegalArgumentException("waitQueueTimeoutMS can not be specified with gRPC=true"); + } + if (maxConnectionLifeTime != null) { + throw new IllegalArgumentException("maxLifeTimeMS can not be specified with gRPC=true"); + } + if (maxConnectionIdleTime != null) { + throw new IllegalArgumentException("maxIdleTimeMS can not be specified with gRPC=true"); + } + if (maxConnecting != null) { + throw new IllegalArgumentException("maxConnecting can not be specified with gRPC=true"); + } + } validateProxyParameters(); @@ -542,6 +574,8 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient GENERAL_OPTIONS_KEYS.add("srvmaxhosts"); GENERAL_OPTIONS_KEYS.add("srvservicename"); + GENERAL_OPTIONS_KEYS.add("grpc"); + COMPRESSOR_KEYS.add("compressors"); COMPRESSOR_KEYS.add("zlibcompressionlevel"); @@ -692,6 +726,9 @@ private void translateOptions(final Map> optionsMap) { case "srvservicename": srvServiceName = value; break; + case "grpc": + grpc = parseBoolean(value, "grpc"); + break; default: break; } @@ -1672,6 +1709,19 @@ public UuidRepresentation getUuidRepresentation() { return uuidRepresentation; } + /** + * Gets whether gRPC is enabled. + * + * @return {@code true} if gRPC is enabled, {@code false} if not, {@code null} if the option is not specified. + * @see MongoClientSettings#isGrpc() + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ + @Nullable + public Boolean isGrpc() { + return grpc; + } + @Override public String toString() { return connectionString; @@ -1719,7 +1769,8 @@ public boolean equals(final Object o) { && Objects.equals(compressorList, that.compressorList) && Objects.equals(uuidRepresentation, that.uuidRepresentation) && Objects.equals(srvServiceName, that.srvServiceName) - && Objects.equals(srvMaxHosts, that.srvMaxHosts); + && Objects.equals(srvMaxHosts, that.srvMaxHosts) + && Objects.equals(grpc, that.grpc); } @Override @@ -1729,6 +1780,6 @@ public int hashCode() { maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled, sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort, - proxyUsername, proxyPassword); + proxyUsername, proxyPassword, grpc); } } diff --git a/driver-core/src/main/com/mongodb/MongoClientSettings.java b/driver-core/src/main/com/mongodb/MongoClientSettings.java index 579a030cb7..9a0e47e9f1 100644 --- a/driver-core/src/main/com/mongodb/MongoClientSettings.java +++ b/driver-core/src/main/com/mongodb/MongoClientSettings.java @@ -28,7 +28,9 @@ import com.mongodb.connection.SslSettings; import com.mongodb.connection.StreamFactoryFactory; import com.mongodb.connection.TransportSettings; +import com.mongodb.connection.grpc.GrpcStreamFactoryFactory; import com.mongodb.event.CommandListener; +import com.mongodb.internal.connection.grpc.SharingGrpcStreamFactoryFactory; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.DnsClient; import com.mongodb.spi.dns.InetAddressResolver; @@ -50,7 +52,9 @@ import java.util.Collections; import java.util.List; import java.util.Objects; +import java.util.concurrent.TimeUnit; +import static com.mongodb.assertions.Assertions.assertFalse; import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static java.util.Arrays.asList; @@ -92,6 +96,9 @@ public final class MongoClientSettings { private final ReadConcern readConcern; private final MongoCredential credential; private final TransportSettings transportSettings; + /** + * {@linkplain StreamFactoryFactoryWrapping#assertRequiresNoFurtherWrapping(StreamFactoryFactory) Requires no further wrapping}. + */ private final StreamFactoryFactory streamFactoryFactory; private final List commandListeners; private final CodecRegistry codecRegistry; @@ -213,6 +220,9 @@ public static final class Builder { private ReadConcern readConcern = ReadConcern.DEFAULT; private CodecRegistry codecRegistry = MongoClientSettings.getDefaultCodecRegistry(); private TransportSettings transportSettings; + /** + * {@linkplain StreamFactoryFactoryWrapping#wrap(StreamFactoryFactory, ConnectionPoolSettings) Requires wrapping}. + */ private StreamFactoryFactory streamFactoryFactory; private List commandListeners = new ArrayList<>(); @@ -257,7 +267,7 @@ private Builder(final MongoClientSettings settings) { dnsClient = settings.getDnsClient(); inetAddressResolver = settings.getInetAddressResolver(); transportSettings = settings.getTransportSettings(); - streamFactoryFactory = settings.getStreamFactoryFactory(); + streamFactoryFactory = StreamFactoryFactoryWrapping.unwrap(settings.getStreamFactoryFactory()); autoEncryptionSettings = settings.getAutoEncryptionSettings(); contextProvider = settings.getContextProvider(); loggerSettingsBuilder.applySettings(settings.getLoggerSettings()); @@ -318,6 +328,18 @@ public Builder applyConnectionString(final ConnectionString connectionString) { if (connectionString.getWriteConcern() != null) { writeConcern = connectionString.getWriteConcern(); } + Boolean grpc = connectionString.isGrpc(); + if (grpc != null) { + if (streamFactoryFactory == null && grpc) { + streamFactoryFactory = GrpcStreamFactoryFactory.builder().build(); + } else if (grpc(streamFactoryFactory) && !grpc) { + throw new IllegalArgumentException( + streamFactoryFactory.getClass().getSimpleName() + " can not be specified with gRPC=false"); + } else if (!grpc(streamFactoryFactory) && grpc) { + throw new IllegalArgumentException( + streamFactoryFactory.getClass().getSimpleName() + " can not be specified with gRPC=true"); + } + } return this; } @@ -360,6 +382,8 @@ public Builder applyToSocketSettings(final Block block) /** * Applies the {@link ConnectionPoolSettings.Builder} block and then sets the connectionPoolSettings. + *

    + * Must not be changed if {@linkplain #isGrpc() gRPC is enabled}.

    * * @param block the block to apply to the ConnectionPoolSettings. * @return this @@ -409,7 +433,9 @@ public Builder readPreference(final ReadPreference readPreference) { /** * Sets the write concern. * - * @param writeConcern the write concern + * @param writeConcern the write concern. + *

    + * {@link WriteConcern#withWTimeout(long, TimeUnit) wtimeout} must not be specified if {@linkplain #isGrpc() gRPC is enabled}.

    * @return this * @see MongoClientSettings#getWriteConcern() */ @@ -499,7 +525,7 @@ public Builder codecRegistry(final CodecRegistry codecRegistry) { */ @Deprecated public Builder streamFactoryFactory(final StreamFactoryFactory streamFactoryFactory) { - this.streamFactoryFactory = notNull("streamFactoryFactory", streamFactoryFactory); + this.streamFactoryFactory = StreamFactoryFactoryWrapping.unwrap(notNull("streamFactoryFactory", streamFactoryFactory)); return this; } @@ -799,9 +825,24 @@ public CodecRegistry getCodecRegistry() { @Deprecated @Nullable public StreamFactoryFactory getStreamFactoryFactory() { + StreamFactoryFactoryWrapping.assertRequiresNoFurtherWrapping(streamFactoryFactory); return streamFactoryFactory; } + /** + * Gets whether gRPC is enabled. + * + * @return {@code true} if gRPC is enabled, {@code false} if not. + * @see ConnectionString#isGrpc() + * @see MongoClientSettings.Builder#streamFactoryFactory(StreamFactoryFactory) + * @see GrpcStreamFactoryFactory + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ + public boolean isGrpc() { + return grpc(streamFactoryFactory); + } + /** * Gets the settings for the underlying transport implementation * @@ -1077,6 +1118,10 @@ public String toString() { + '}'; } + private static boolean grpc(@Nullable final StreamFactoryFactory streamFactoryFactory) { + return streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory || streamFactoryFactory instanceof GrpcStreamFactoryFactory; + } + private MongoClientSettings(final Builder builder) { readPreference = builder.readPreference; writeConcern = builder.writeConcern; @@ -1085,7 +1130,6 @@ private MongoClientSettings(final Builder builder) { readConcern = builder.readConcern; credential = builder.credential; transportSettings = builder.transportSettings; - streamFactoryFactory = builder.streamFactoryFactory; codecRegistry = builder.codecRegistry; commandListeners = builder.commandListeners; applicationName = builder.applicationName; @@ -1113,5 +1157,58 @@ private MongoClientSettings(final Builder builder) { heartbeatSocketTimeoutSetExplicitly = builder.heartbeatSocketTimeoutMS != 0; heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0; contextProvider = builder.contextProvider; + StreamFactoryFactory builderStreamFactoryFactory = builder.streamFactoryFactory; + if (grpc(builderStreamFactoryFactory)) { + if (clusterSettings.getRequiredReplicaSetName() != null) { + throw new IllegalArgumentException( + "requiredReplicaSetName can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (writeConcern.getWTimeout(MILLISECONDS) != null) { + throw new IllegalArgumentException( + "Write concern wtimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (socketSettings.getReadTimeout(MILLISECONDS) != SocketSettings.builder().build().getReadTimeout(MILLISECONDS)) { + throw new IllegalArgumentException( + "Socket readTimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + if (!ConnectionPoolSettings.builder().build().equals(connectionPoolSettings)) { + throw new IllegalArgumentException( + "connectionPoolSettings can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName()); + } + } + streamFactoryFactory = StreamFactoryFactoryWrapping.wrap(builderStreamFactoryFactory, connectionPoolSettings); + } + + private static final class StreamFactoryFactoryWrapping { + @Nullable + static StreamFactoryFactory wrap( + @Nullable final StreamFactoryFactory streamFactoryFactory, + final ConnectionPoolSettings connectionPoolSettings) { + if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) { + return new SharingGrpcStreamFactoryFactory((GrpcStreamFactoryFactory) streamFactoryFactory, connectionPoolSettings); + } + return streamFactoryFactory; + } + + @Nullable + static StreamFactoryFactory unwrap(@Nullable final StreamFactoryFactory streamFactoryFactory) { + if (streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory) { + return ((SharingGrpcStreamFactoryFactory) streamFactoryFactory).unwrap(); + } + return streamFactoryFactory; + } + + static void assertRequiresNoFurtherWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory) throws AssertionError { + assertFalse(requiresWrapping(streamFactoryFactory, false)); + } + + private static boolean requiresWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory, final boolean ifNull) { + if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) { + return true; + } else if (streamFactoryFactory == null) { + return ifNull; + } + return false; + } } } diff --git a/driver-core/src/main/com/mongodb/MongoException.java b/driver-core/src/main/com/mongodb/MongoException.java index a668dd344b..2a6c195f5f 100644 --- a/driver-core/src/main/com/mongodb/MongoException.java +++ b/driver-core/src/main/com/mongodb/MongoException.java @@ -116,7 +116,7 @@ public MongoException(@Nullable final String msg, @Nullable final Throwable t) { * @param msg the message * @param t the throwable cause */ - public MongoException(final int code, final String msg, final Throwable t) { + public MongoException(final int code, final String msg, @Nullable final Throwable t) { super(msg, t); this.code = code; if (t instanceof MongoException) { diff --git a/driver-core/src/main/com/mongodb/MongoSocketException.java b/driver-core/src/main/com/mongodb/MongoSocketException.java index 820c2cb769..3f863fc513 100644 --- a/driver-core/src/main/com/mongodb/MongoSocketException.java +++ b/driver-core/src/main/com/mongodb/MongoSocketException.java @@ -16,6 +16,8 @@ package com.mongodb; +import com.mongodb.lang.Nullable; + /** * Subclass of {@link MongoException} representing a network-related exception * @@ -33,7 +35,7 @@ public class MongoSocketException extends MongoException { * @param msg the message * @param e the cause */ - public MongoSocketException(final String msg, final ServerAddress serverAddress, final Throwable e) { + public MongoSocketException(final String msg, final ServerAddress serverAddress, @Nullable final Throwable e) { super(-2, msg, e); this.serverAddress = serverAddress; } diff --git a/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java b/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java index 4bb658a906..3d06ba5637 100644 --- a/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java +++ b/driver-core/src/main/com/mongodb/MongoSocketReadTimeoutException.java @@ -16,6 +16,8 @@ package com.mongodb; +import com.mongodb.lang.Nullable; + /** * This exception is thrown when there is a timeout reading a response from the socket. * @@ -32,7 +34,7 @@ public class MongoSocketReadTimeoutException extends MongoSocketException { * @param address the address * @param cause the cause */ - public MongoSocketReadTimeoutException(final String message, final ServerAddress address, final Throwable cause) { + public MongoSocketReadTimeoutException(final String message, final ServerAddress address, @Nullable final Throwable cause) { super(message, address, cause); } diff --git a/driver-core/src/main/com/mongodb/connection/ClusterSettings.java b/driver-core/src/main/com/mongodb/connection/ClusterSettings.java index 5bfdfa84f0..2018c6b38f 100644 --- a/driver-core/src/main/com/mongodb/connection/ClusterSettings.java +++ b/driver-core/src/main/com/mongodb/connection/ClusterSettings.java @@ -17,6 +17,7 @@ package com.mongodb.connection; import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.ServerAddress; import com.mongodb.annotations.Immutable; import com.mongodb.annotations.NotThreadSafe; @@ -219,6 +220,8 @@ public Builder mode(final ClusterConnectionMode mode) { /** * Sets the required replica set name for the cluster. + *

    + * Must be {@code null} if {@linkplain MongoClientSettings#isGrpc() gRPC is enabled}.

    * * @param requiredReplicaSetName the required replica set name. * @return this diff --git a/driver-core/src/main/com/mongodb/connection/SocketSettings.java b/driver-core/src/main/com/mongodb/connection/SocketSettings.java index 7a63790cb6..ea7884ab84 100644 --- a/driver-core/src/main/com/mongodb/connection/SocketSettings.java +++ b/driver-core/src/main/com/mongodb/connection/SocketSettings.java @@ -18,6 +18,7 @@ import com.mongodb.Block; import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; import com.mongodb.annotations.Immutable; import java.util.concurrent.TimeUnit; @@ -104,6 +105,8 @@ public Builder connectTimeout(final int connectTimeout, final TimeUnit timeUnit) /** * Sets the socket read timeout. + *

    + * Must not be specified if {@linkplain MongoClientSettings#isGrpc() gRPC is enabled}.

    * * @param readTimeout the read timeout * @param timeUnit the time unit diff --git a/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java new file mode 100644 index 0000000000..b1e1c897e6 --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/grpc/GrpcStreamFactoryFactory.java @@ -0,0 +1,110 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.connection.grpc; + +import com.mongodb.ConnectionString; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.SslSettings; +import com.mongodb.connection.Stream; +import com.mongodb.connection.StreamFactory; +import com.mongodb.connection.StreamFactoryFactory; +import com.mongodb.internal.connection.grpc.SharingGrpcStreamFactoryFactory; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; +import io.netty.buffer.ByteBufAllocator; + +import java.util.UUID; + +import static com.mongodb.assertions.Assertions.notNull; + +/** + * A {@link StreamFactoryFactory} for gRPC-based {@linkplain Stream streams}. + * + * @see ConnectionString#isGrpc() + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ +@ThreadSafe +public final class GrpcStreamFactoryFactory implements StreamFactoryFactory { + private final UUID clientId; + private final ByteBufAllocator allocator; + + @Override + public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { + // Since a user may share the same instance of `GrpcStreamFactoryFactory` between `MongoClient`s, + // all `StreamFactory`s created by `GrpcStreamFactoryFactory` must be isolated from each other, + // which is why we pass a new instance of `Channels` here. + return new GrpcStreamFactory(socketSettings, sslSettings, null, null, clientId, allocator, + new SharingGrpcStreamFactoryFactory.Channels()); + } + + /** + * Creates a builder for {@link GrpcStreamFactoryFactory}. + * + * @return The created {@link Builder}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * A builder for {@link GrpcStreamFactoryFactory}. + * + * @since VAKOTODO + * @mongodb.server.release VAKOTODO + */ + public static final class Builder { + private ByteBufAllocator allocator; + + Builder() { + allocator = ByteBufAllocator.DEFAULT; + } + + /** + * Sets the allocator. + * + * @param allocator The allocator. + * @return {@code this}. + */ + public Builder allocator(final ByteBufAllocator allocator) { + this.allocator = notNull("allocator", allocator); + return this; + } + + /** + * Creates {@link GrpcStreamFactoryFactory}. + * + * @return The created {@link GrpcStreamFactoryFactory}. + */ + public GrpcStreamFactoryFactory build() { + return new GrpcStreamFactoryFactory(this); + } + } + + private GrpcStreamFactoryFactory(final Builder builder) { + clientId = UUID.randomUUID(); + this.allocator = builder.allocator; + } + + @Override + public String toString() { + return "GrpcStreamFactoryFactory{" + + "clientId=" + clientId + + ", allocator=" + allocator + + '}'; + } +} diff --git a/driver-core/src/main/com/mongodb/connection/grpc/package-info.java b/driver-core/src/main/com/mongodb/connection/grpc/package-info.java new file mode 100644 index 0000000000..f5ff63a4c7 --- /dev/null +++ b/driver-core/src/main/com/mongodb/connection/grpc/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains gRPC-specific program elements. + */ +@NonNullApi +package com.mongodb.connection.grpc; + +import com.mongodb.lang.NonNullApi; diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index bb971603ab..4b362210e3 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -40,7 +40,6 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; -import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.SimpleChannelInboundHandler; @@ -71,6 +70,7 @@ import static com.mongodb.internal.Locks.withLock; import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification; import static com.mongodb.internal.connection.SslHelper.enableSni; +import static com.mongodb.internal.connection.netty.NettyChannelOptionsSetter.configureNettyChannelOptions; import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -187,17 +187,7 @@ private void initializeChannel(final AsyncCompletionHandler handler, final bootstrap.group(workerGroup); bootstrap.channel(socketChannelClass); - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, settings.getConnectTimeout(MILLISECONDS)); - bootstrap.option(ChannelOption.TCP_NODELAY, true); - bootstrap.option(ChannelOption.SO_KEEPALIVE, true); - - if (settings.getReceiveBufferSize() > 0) { - bootstrap.option(ChannelOption.SO_RCVBUF, settings.getReceiveBufferSize()); - } - if (settings.getSendBufferSize() > 0) { - bootstrap.option(ChannelOption.SO_SNDBUF, settings.getSendBufferSize()); - } - bootstrap.option(ChannelOption.ALLOCATOR, allocator); + configureNettyChannelOptions(settings, allocator, bootstrap::option); bootstrap.handler(new ChannelInitializer() { @Override diff --git a/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java b/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java index 11e3ef7edb..67cf5eb60b 100644 --- a/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java +++ b/driver-core/src/main/com/mongodb/internal/binding/TransactionContext.java @@ -47,7 +47,9 @@ public void pinConnection(final C connection, final BiConsumer + * This class is not part of the public API and may be removed or changed at any time.

    */ @ThreadSafe -interface ConnectionPool extends Closeable { +public interface ConnectionPool extends Closeable { /** * Is equivalent to {@link #get(OperationContext, long, TimeUnit)} called with {@link ConnectionPoolSettings#getMaxWaitTime(TimeUnit)}. */ diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java index 9b8ac1399b..f00954904b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultClusterableServerFactory.java @@ -29,6 +29,8 @@ import com.mongodb.connection.StreamFactory; import com.mongodb.event.CommandListener; import com.mongodb.event.ServerListener; +import com.mongodb.internal.connection.grpc.GrpcConnectionPool; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; import com.mongodb.internal.inject.SameObjectProvider; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -94,10 +96,13 @@ public ClusterableServer create(final Cluster cluster, final ServerAddress serve new InternalStreamConnectionFactory(clusterMode, true, heartbeatStreamFactory, null, applicationName, mongoDriverInformation, emptyList(), loggerSettings, null, serverApi, inetAddressResolver), clusterMode, serverApi, sdamProvider); - ConnectionPool connectionPool = new DefaultConnectionPool(serverId, - new InternalStreamConnectionFactory(clusterMode, streamFactory, credential, applicationName, - mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi, inetAddressResolver), - connectionPoolSettings, internalConnectionPoolSettings, sdamProvider); + InternalStreamConnectionFactory internalStreamConnectionFactory = new InternalStreamConnectionFactory( + clusterMode, streamFactory, credential, applicationName, + mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi, inetAddressResolver); + ConnectionPool connectionPool = streamFactory instanceof GrpcStreamFactory + ? new GrpcConnectionPool(serverId, internalStreamConnectionFactory, connectionPoolSettings, false) + : new DefaultConnectionPool( + serverId, internalStreamConnectionFactory, connectionPoolSettings, internalConnectionPoolSettings, sdamProvider); ServerListener serverListener = singleServerListener(serverSettings); SdamServerDescriptionManager sdam = new DefaultSdamServerDescriptionManager(cluster, serverId, serverListener, serverMonitor, connectionPool, clusterMode); diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 61ef1f09c2..3d07973475 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -125,9 +125,12 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; +/** + * This class is not part of the public API and may be removed or changed at any time. + */ @SuppressWarnings("deprecation") @ThreadSafe -final class DefaultConnectionPool implements ConnectionPool { +public final class DefaultConnectionPool implements ConnectionPool { private static final Logger LOGGER = Loggers.getLogger("connection"); private static final StructuredLogger STRUCTURED_LOGGER = new StructuredLogger("connection"); private final ConcurrentPool pool; @@ -1194,8 +1197,11 @@ private MutableReference() { } } + /** + * This class is not part of the public API and may be removed or changed at any time. + */ @ThreadSafe - static final class ServiceStateManager { + public static final class ServiceStateManager { private final ConcurrentHashMap stateByServiceId = new ConcurrentHashMap<>(); void addConnection(final ObjectId serviceId) { @@ -1232,7 +1238,7 @@ boolean incrementGeneration(final ObjectId serviceId, final int expectedGenerati return state == null || state.incrementGeneration(expectedGeneration); } - int getGeneration(final ObjectId serviceId) { + public int getGeneration(final ObjectId serviceId) { ServiceState state = stateByServiceId.get(serviceId); return state == null ? 0 : state.getGeneration(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionFactory.java b/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionFactory.java index c788f4668e..74cf956e66 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalConnectionFactory.java @@ -21,8 +21,11 @@ import com.mongodb.lang.NonNull; import org.bson.types.ObjectId; +/** + * This class is not part of the public API and may be removed or changed at any time. + */ @ThreadSafe -interface InternalConnectionFactory { +public interface InternalConnectionFactory { default InternalConnection create(ServerId serverId) { return create(serverId, new ConnectionGenerationSupplier() { @Override diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index cfeeece612..f355a80ebc 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -29,6 +29,7 @@ import com.mongodb.RequestContext; import com.mongodb.ServerAddress; import com.mongodb.UnixServerAddress; +import com.mongodb.annotations.Immutable; import com.mongodb.annotations.NotThreadSafe; import com.mongodb.connection.AsyncCompletionHandler; import com.mongodb.connection.ClusterConnectionMode; @@ -45,6 +46,7 @@ import com.mongodb.internal.ResourceUtil; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.logging.StructuredLogger; @@ -65,11 +67,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.fail; import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; @@ -209,12 +214,13 @@ public void open() { stream = streamFactory.create(getServerAddressWithResolver()); try { stream.open(); - - InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this); - initAfterHandshakeStart(initializationDescription); - - initializationDescription = connectionInitializer.finishHandshake(this, initializationDescription); - initAfterHandshakeFinish(initializationDescription); + if (streamFactory instanceof GrpcStreamFactory) { + Handshakes.HandshakeResults handshakeResults = Handshakes.getOrHandshake(this); + initAfterHandshakeStart(handshakeResults.start()); + initAfterHandshakeFinish(handshakeResults.finish()); + } else { + handshake(); + } } catch (Throwable t) { close(); if (t instanceof MongoException) { @@ -225,8 +231,20 @@ public void open() { } } + private Handshakes.HandshakeResults handshake() { + InternalConnectionInitializationDescription startResult = connectionInitializer.startHandshake(this); + initAfterHandshakeStart(startResult); + + InternalConnectionInitializationDescription finishResult = connectionInitializer.finishHandshake(this, startResult); + initAfterHandshakeFinish(finishResult); + return new Handshakes.HandshakeResults(startResult, finishResult); + } + @Override public void openAsync(final SingleResultCallback callback) { + if (streamFactory instanceof GrpcStreamFactory) { + fail("VAKOTODO"); + } isTrue("Open already called", stream == null, callback); try { stream = streamFactory.create(getServerAddressWithResolver()); @@ -873,4 +891,75 @@ private CommandEventSender createCommandEventSender(final CommandMessage message private ClusterId getClusterId() { return description.getConnectionId().getServerId().getClusterId(); } + + /** + * VAKOTODO + * This is a hack that allows us to avoid handshaking the same gRPC channel needlessly. + * Note that sometimes we have to handshake the same gRPC channel again, at the very least in the following situations + *
      + *
    1. when it starts using a different HTTP/2 connection because the previous one was terminated;
    2. + *
    3. when it starts using a different HTTP/2 connection because it decided to use more than one connection.
    4. + *
    + * We will not be able to implement this in a reasonable way until the authentication in the MongoDB gRPC protocol is specified and + * is supported by the server. For now, using {@link Handshakes.Key} should give us roughly the right behavior for the first item. + */ + private static final class Handshakes { + private static final ConcurrentHashMap CACHE = new ConcurrentHashMap<>(); + + static HandshakeResults getOrHandshake(final InternalStreamConnection connection) { + return CACHE.computeIfAbsent(new Key(connection), key -> connection.handshake()); + } + + @Immutable + private static final class HandshakeResults { + private final InternalConnectionInitializationDescription start; + private final InternalConnectionInitializationDescription finish; + + HandshakeResults( + final InternalConnectionInitializationDescription start, + final InternalConnectionInitializationDescription finish) { + this.start = start; + this.finish = finish; + } + + InternalConnectionInitializationDescription start() { + return start; + } + + InternalConnectionInitializationDescription finish() { + return finish; + } + } + + private static final class Key { + private final ServerId serverId; + private final int generation; + + private Key(final InternalStreamConnection connection) { + serverId = connection.serverId; + // I do not know if we support switching from `LOAD_BALANCED` to a different mode, or vice versa, + // but if we do, I am not sure this logic will work in such switching happens. + generation = connection.clusterConnectionMode == ClusterConnectionMode.LOAD_BALANCED + ? connection.connectionGenerationSupplier.getGeneration(assertNotNull(connection.description.getServiceId())) + : connection.connectionGenerationSupplier.getGeneration(); + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final Key key = (Key) o; + return generation == key.generation && Objects.equals(serverId, key.serverId); + } + + @Override + public int hashCode() { + return Objects.hash(serverId, generation); + } + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionFactory.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionFactory.java index c1b071baaf..924e6106b1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionFactory.java @@ -24,6 +24,7 @@ import com.mongodb.connection.ServerId; import com.mongodb.connection.StreamFactory; import com.mongodb.event.CommandListener; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; import org.bson.BsonDocument; @@ -39,6 +40,7 @@ class InternalStreamConnectionFactory implements InternalConnectionFactory { private final ClusterConnectionMode clusterConnectionMode; private final boolean isMonitoringConnection; private final StreamFactory streamFactory; + @Nullable private final BsonDocument clientMetadataDocument; private final List compressorList; private final LoggerSettings loggerSettings; @@ -74,7 +76,9 @@ class InternalStreamConnectionFactory implements InternalConnectionFactory { this.commandListener = commandListener; this.serverApi = serverApi; this.inetAddressResolver = inetAddressResolver; - this.clientMetadataDocument = createClientMetadataDocument(applicationName, mongoDriverInformation); + this.clientMetadataDocument = streamFactory instanceof GrpcStreamFactory + ? null + : createClientMetadataDocument(applicationName, mongoDriverInformation); this.credential = credential; } diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java index 5752d41b9b..540c8776b5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedClusterableServerFactory.java @@ -29,6 +29,8 @@ import com.mongodb.connection.ServerSettings; import com.mongodb.connection.StreamFactory; import com.mongodb.event.CommandListener; +import com.mongodb.internal.connection.grpc.GrpcConnectionPool; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory; import com.mongodb.internal.inject.EmptyProvider; import com.mongodb.lang.Nullable; import com.mongodb.spi.dns.InetAddressResolver; @@ -81,10 +83,14 @@ public LoadBalancedClusterableServerFactory(final ServerSettings serverSettings, @Override public ClusterableServer create(final Cluster cluster, final ServerAddress serverAddress) { - ConnectionPool connectionPool = new DefaultConnectionPool(new ServerId(cluster.getClusterId(), serverAddress), - new InternalStreamConnectionFactory(ClusterConnectionMode.LOAD_BALANCED, streamFactory, credential, applicationName, - mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi, inetAddressResolver), - connectionPoolSettings, internalConnectionPoolSettings, EmptyProvider.instance()); + ServerId serverId = new ServerId(cluster.getClusterId(), serverAddress); + InternalStreamConnectionFactory internalStreamConnectionFactory = new InternalStreamConnectionFactory( + ClusterConnectionMode.LOAD_BALANCED, streamFactory, credential, applicationName, + mongoDriverInformation, compressorList, loggerSettings, commandListener, serverApi, inetAddressResolver); + ConnectionPool connectionPool = streamFactory instanceof GrpcStreamFactory + ? new GrpcConnectionPool(serverId, internalStreamConnectionFactory, connectionPoolSettings, true) + : new DefaultConnectionPool(serverId, internalStreamConnectionFactory, + connectionPoolSettings, internalConnectionPoolSettings, EmptyProvider.instance()); connectionPool.ready(); return new LoadBalancedServer(new ServerId(cluster.getClusterId(), serverAddress), connectionPool, new DefaultConnectionFactory(), diff --git a/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java b/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java index 3157635feb..02ef9d69b1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java +++ b/driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java @@ -31,17 +31,17 @@ public final class MessageSettings { * * {@code maxBsonObjectSize}. */ - private static final int DEFAULT_MAX_DOCUMENT_SIZE = 0x1000000; // 16MB + public static final int DEFAULT_MAX_DOCUMENT_SIZE = 0x1000000; // 16MB /** * * {@code maxMessageSizeBytes}. */ - private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB + public static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB /** * * {@code maxWriteBatchSize}. */ - private static final int DEFAULT_MAX_BATCH_COUNT = 1000; + public static final int DEFAULT_MAX_BATCH_COUNT = 1000; private final int maxDocumentSize; private final int maxMessageSize; diff --git a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java index ccd05a1710..4dba3da708 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/StreamFactoryHelper.java @@ -18,19 +18,24 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; +import com.mongodb.MongoDriverInformation; import com.mongodb.connection.NettyTransportSettings; import com.mongodb.connection.StreamFactoryFactory; import com.mongodb.connection.TransportSettings; import com.mongodb.connection.netty.NettyStreamFactoryFactory; import com.mongodb.lang.Nullable; +import static com.mongodb.internal.connection.ClientMetadataHelper.configureClientMetadataDocument; + /** *

    This class is not part of the public API and may be removed or changed at any time

    */ @SuppressWarnings("deprecation") public final class StreamFactoryHelper { @Nullable - public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final MongoClientSettings settings) { + public static StreamFactoryFactory getStreamFactoryFactoryFromSettings( + final MongoClientSettings settings, + @Nullable final MongoDriverInformation mongoDriverInformation) { StreamFactoryFactory streamFactoryFactory; TransportSettings transportSettings = settings.getTransportSettings(); if (transportSettings != null) { @@ -43,7 +48,7 @@ public static StreamFactoryFactory getStreamFactoryFactoryFromSettings(final Mon } else { streamFactoryFactory = settings.getStreamFactoryFactory(); } - return streamFactoryFactory; + return configureClientMetadataDocument(streamFactoryFactory, settings, mongoDriverInformation); } private StreamFactoryHelper() { diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java new file mode 100644 index 0000000000..049fe4f326 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcConnectionPool.java @@ -0,0 +1,128 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.grpc; + +import com.mongodb.MongoConnectionPoolClearedException; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.ConnectionPoolSettings; +import com.mongodb.connection.ServerId; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.connection.ConnectionGenerationSupplier; +import com.mongodb.internal.connection.ConnectionPool; +import com.mongodb.internal.connection.DefaultConnectionPool.ServiceStateManager; +import com.mongodb.internal.connection.InternalConnection; +import com.mongodb.internal.connection.InternalConnectionFactory; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.lang.NonNull; +import com.mongodb.lang.Nullable; +import org.bson.types.ObjectId; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +/** + * A non-pooling {@link ConnectionPool} for {@link GrpcStream}-based {@link InternalConnection}s. + *

    + * This class is not part of the public API and may be removed or changed at any time.

    + */ +// VAKOTODO events, logging? Here https://docs.google.com/document/d/1efw8AykgYHE9mwuaS3p3H-mHv3wPUDMPasDJxyH6Ts8/edit#heading=h.8r6ec9yofp1 +// it is said not to do any of events/logging that CMAP requires. +@ThreadSafe +public final class GrpcConnectionPool implements ConnectionPool { + private final ServerId serverId; + private final InternalConnectionFactory internalConnectionFactory; + private final ConnectionPoolSettings settings; + private final boolean loadBalanced; + private final ServiceStateManager serviceStateManager; + private final AtomicInteger generation; + private final ConnectionGenerationSupplier generationSupplier; + + public GrpcConnectionPool( + final ServerId serverId, + final InternalConnectionFactory internalConnectionFactory, + final ConnectionPoolSettings settings, + final boolean loadBalanced) { + this.serverId = serverId; + this.internalConnectionFactory = internalConnectionFactory; + this.settings = settings; + this.loadBalanced = loadBalanced; + serviceStateManager = new ServiceStateManager(); + generation = new AtomicInteger(); + generationSupplier = new ConnectionGenerationSupplier() { + @Override + public int getGeneration() { + return GrpcConnectionPool.this.getGeneration(); + } + + @Override + public int getGeneration(@NonNull final ObjectId serviceId) { + return serviceStateManager.getGeneration(serviceId); + } + }; + } + + @Override + public InternalConnection get(final OperationContext operationContext) throws MongoConnectionPoolClearedException { + return get(operationContext, settings.getMaxWaitTime(MILLISECONDS), MILLISECONDS); + } + + @Override + public InternalConnection get(final OperationContext operationContext, final long timeout, final TimeUnit timeUnit) + throws MongoConnectionPoolClearedException { + InternalConnection connection = internalConnectionFactory.create(serverId, generationSupplier); + connection.open(); + return connection; + } + + @Override + public void getAsync(final OperationContext operationContext, final SingleResultCallback callback) { + fail("VAKOTODO"); + } + + @Override + public void invalidate(@Nullable final Throwable cause) { + assertFalse(loadBalanced); + generation.incrementAndGet(); + // VAKOTODO pause + } + + @Override + public void invalidate(final ObjectId serviceId, final int generation) { + assertTrue(loadBalanced); + // there is nothing to do here because we do not pool connections + } + + @Override + public void ready() { + // VAKOTODO + } + + @Override + public void close() { + // VAKOTODO + } + + @Override + public int getGeneration() { + return generation.get(); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStream.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStream.java new file mode 100644 index 0000000000..a27ea02391 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStream.java @@ -0,0 +1,979 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.grpc; + +import com.mongodb.Function; +import com.mongodb.MongoInterruptedException; +import com.mongodb.MongoSocketClosedException; +import com.mongodb.MongoSocketReadException; +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.MongoSocketWriteException; +import com.mongodb.ServerAddress; +import com.mongodb.annotations.NotThreadSafe; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.AsyncCompletionHandler; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.Stream; +import com.mongodb.internal.time.Timeout; +import com.mongodb.internal.connection.grpc.GrpcStream.WriteState.PendingWrite; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory.ClientMetadataDocument; +import com.mongodb.internal.connection.grpc.GrpcStreamFactory.NettyByteBufProvider; +import com.mongodb.internal.connection.netty.NettyByteBuf; +import com.mongodb.lang.Nullable; +import io.grpc.ClientCall; +import io.grpc.Detachable; +import io.grpc.KnownLength; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.grpc.MethodDescriptor.Marshaller; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.bson.ByteBuf; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.StampedLock; +import java.util.function.BiFunction; + +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.assertNull; +import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.assertions.Assertions.fail; +import static io.grpc.Metadata.ASCII_STRING_MARSHALLER; + +/** + * A {@link Stream} implemented via + * gRPC bidirectional streaming. + *

    + * This class is not part of the public API and may be removed or changed at any time.

    + */ +// While `GrpcStream` does not have to be thread-safe, some of its state may still be accessed concurrently by a thread +// calling a method on an instance of `GrpcStream`, and an internal Java gRPC thread. +// For example, `GrpcStream` may be closed not only by the rest of the driver but also by Java gRPC. +// The following is a pattern used in this implementation to deal with conflicts arising from the aforementioned concurrency: +// 1) Do actions that may conflict with a concurrent closing activity. +// 2) Check if the closing activity has started. +// 2.1) If "no", then there is nothing to worry about as now we know that actions in 1) are ordered before (in the happens-before order) +// the closing activity, and the closing activity will fully see the results of 1) and act accordingly. +// 2.2) If "yes", then the closing activity is concurrent with 1) and may not fully see the results of 1). +// Start the closing activity to guarantee that it fully sees the results of 1). +// Note that this requires the closing activity to support being run concurrently with itself. +@NotThreadSafe +final class GrpcStream implements Stream { + private static final String SERVICE_NAME = "mongodb.CommandService"; + private static final String UNAUTH_STREAM_FULL_METHOD_NAME = + MethodDescriptor.generateFullMethodName(SERVICE_NAME, "UnauthenticatedCommandStream"); + private static final String AUTH_STREAM_FULL_METHOD_NAME = + MethodDescriptor.generateFullMethodName(SERVICE_NAME, "AuthenticatedCommandStream"); + private static final Marshallers MARSHALLERS = new Marshallers<>( + new PendingWriteMarshaller(), new InputStreamUnmarshaller()); + private static final int INFINITE_SOCKET_READ_TIMEOUT = 0; + + /** + * {@value #END_OF_STREAM}. + */ + static final int END_OF_STREAM = -1; + + private final ServerAddress serverAddress; + private final UUID clientId; + @Nullable + private final ClientMetadataDocument clientMetadataDocument; + private final SocketSettings socketSettings; + private final NettyByteBufProvider bufferProvider; + private final WriteState writeState; + private final ReadState readState; + private final ClientCall call; + private final AtomicBoolean closed; + + GrpcStream( + final ServerAddress serverAddress, + final UUID clientId, + @Nullable + final ClientMetadataDocument clientMetadataDocument, + final SocketSettings socketSettings, + final NettyByteBufProvider bufferProvider, + final BiFunction, ClientCall> callCreator) { + this.serverAddress = serverAddress; + this.clientId = clientId; + this.clientMetadataDocument = clientMetadataDocument; + this.socketSettings = socketSettings; + this.bufferProvider = bufferProvider; + Exceptions exceptions = new Exceptions(); + this.writeState = new WriteState(exceptions); + this.readState = new ReadState(exceptions); + this.call = callCreator.apply(UNAUTH_STREAM_FULL_METHOD_NAME, MARSHALLERS); + closed = new AtomicBoolean(); + } + + @Override + public ByteBuf getBuffer(final int size) { + return bufferProvider.getBuffer(size); + } + + @Override + public void open() { + assertFalse(closed.get()); + Metadata metadata = new Metadata(); + // if we set a value for the `Content-Type` key, Java gRPC overrides the value back to `application/grpc`, so we are not setting it + metadata.put(Metadata.Key.of("mongodb-clientId", ASCII_STRING_MARSHALLER), clientId.toString()); + if (clientMetadataDocument != null) { + metadata.put(Metadata.Key.of("mongodb-client", ASCII_STRING_MARSHALLER), clientMetadataDocument.base64()); + } + // VAKOTODO what value to use for mongodb-wireVersion? + metadata.put(Metadata.Key.of("mongodb-wireVersion", ASCII_STRING_MARSHALLER), "18"); + configureJunkMetadata(metadata); + // VAKOTODO use `CallOptions.withDeadlineAfter` when calling `ManagedChannel.newCall` in `GrpcStreamFactory`? + // See https://grpc.io/docs/guides/deadlines/ and https://grpc.io/blog/deadlines/, + // don't forget to handle the DEADLINE_EXCEEDED status. + call.start(new ClientCall.Listener() { + @Override + public void onHeaders(final Metadata metadata) { + // we do not expect any useful metadata + } + + @Override + public void onMessage( + // We must close `response`. Java gRPC does not do that + // because for Java gRPC this is just a message we created in `Marshaller.parse`, + // which makes us responsible for its lifecycle. + final InputStream response) { + readState.add(response); + } + + @Override + public void onReady() { + // There seem to be no reason for us to implement this callback. + // Java gRPC says "Calls that send exactly one message should not await this callback". + // While we may sometimes send more than one message + // (for example, https://www.mongodb.com/docs/current/reference/command/getMore/), + // we still wait for a server response after each one. + } + + @Override + public void onClose(final Status status, final Metadata metadata) { + StatusRuntimeException statusException; + if (status == Status.OK || status == Status.CANCELLED) { + assertNull(status.getCause()); + statusException = null; + } else { + statusException = status.asRuntimeException(metadata); + } + close(statusException, true); + } + }, metadata); + // We do not care about flow control because a server usually replies with a single message. + // The server will not flood up with messages even if there are `exhaustAllowed`/`moreToCome` `OP_MSG` flags + // (https://www.mongodb.com/docs/upcoming/reference/mongodb-wire-protocol/#op_msg). + call.request(Integer.MAX_VALUE); + } + + @Override + public void openAsync(final AsyncCompletionHandler handler) { + fail("VAKOTODO"); + } + + /** + * @param buffers {@inheritDoc} {@code buffers} must contain exactly one + * MongoDB Wire Protocol message, + * which is not a usual requirement for {@link Stream#write(List)}. + */ + @Override + public void write(final List buffers) { + writeState.startWrite(call, buffers).blockInterruptiblyUntilCompleted(); + } + + @Override + public ByteBuf read(final int numBytes) { + return read(numBytes, 0); + } + + @Override + public boolean supportsAdditionalTimeout() { + return true; + } + + @Override + public ByteBuf read(final int numBytes, final int additionalTimeout) { + Timeout timeout = startNow(additionalTimeout); + ReadState.PendingRead pendingRead; + io.netty.buffer.ByteBuf buffer = bufferProvider.allocator().buffer(numBytes, numBytes); + try { + buffer.touch(); + pendingRead = readState.startRead(numBytes, buffer, timeout); + } finally { + buffer.release(); + } + return pendingRead.blockInterruptiblyUntilCompleted(); + } + + /** + * @param buffers {@inheritDoc} {@code buffers} must contain exactly one + * MongoDB Wire Protocol message, + * which is not a usual requirement for {@link Stream#writeAsync(List, AsyncCompletionHandler)}. + */ + @Override + public void writeAsync(final List buffers, final AsyncCompletionHandler handler) { + fail("VAKOTODO"); + } + + @Override + public void readAsync(final int numBytes, final AsyncCompletionHandler handler) { + fail("VAKOTODO"); + } + + @Override + public ServerAddress getAddress() { + return serverAddress; + } + + @Override + @SuppressWarnings("try") + public void close() { + close(null, false); + } + + @Override + public boolean isClosed() { + return closed.get(); + } + + @Override + public String toString() { + return "GrpcStream{" + + "serverAddress=" + serverAddress + + ", clientId=" + clientId + + '}'; + } + + // VAKOTODO this is junk required by https://github.com/10gen/atlasproxy, delete in the future. + private void configureJunkMetadata(final Metadata metadata) { + // VAKOTODO is this even needed? + metadata.put(Metadata.Key.of("Username", ASCII_STRING_MARSHALLER), "user"); + metadata.put(Metadata.Key.of("ServerName", ASCII_STRING_MARSHALLER), "host.local.10gen.cc"); + // This is for the `removeConnectionId` command and for `UnauthenticatedCommandStream` + // See https://docs.google.com/document/d/1ozvF6TTjRpl1g8alx0joOJ5X9elYDnM7UfTKTki8AfA/edit#bookmark=id.5tnw539alz3. + metadata.put(Metadata.Key.of("security-uuid", ASCII_STRING_MARSHALLER), clientId.toString()); + metadata.put(Metadata.Key.of("x-forwarded-for", ASCII_STRING_MARSHALLER), "127.0.0.1:9901"); + } + + /** + * This method may be called concurrently with itself because Java gRPC may call it at any point, + * potentially concurrently with it being called by the rest of the driver via {@link GrpcStream#close()}. + *

    + * Idempotent.

    + * + * @param fromClientCallListener {@code true} iff called from {@link ClientCall.Listener#onClose(Status, Metadata)}. + */ + @SuppressWarnings("try") + private void close(@Nullable final StatusRuntimeException callStatusException, final boolean fromClientCallListener) { + if (closed.compareAndSet(false, true)) { + try (NoCheckedAutoCloseable writeState = () -> this.writeState.close(callStatusException); + NoCheckedAutoCloseable readState = () -> this.readState.close(callStatusException)) { + if (!fromClientCallListener) { + // At this point we know that we were called by the rest of the driver via the `Stream.close` method, + // rather than by Java gRPC. Therefore, we are not running concurrently with any `ClientCall` methods, + // and are allowed to run `ClientCall.cancel` despite it not being thread-safe. + call.cancel(this + " was closed", assertNull(callStatusException)); + } + } + } + } + + private Timeout startNow(final int additionalTimeout) { + int socketReadTimeoutMillis = socketSettings.getReadTimeout(TimeUnit.MILLISECONDS); + return socketReadTimeoutMillis == INFINITE_SOCKET_READ_TIMEOUT + ? Timeout.infinite() + : Timeout.startNow(socketReadTimeoutMillis + additionalTimeout, TimeUnit.MILLISECONDS); + } + + private static final class ResourceUtil { + static > T retain(final T buffers) { + // we assume `ByteBuf::retain` does not complete abruptly + buffers.forEach(ByteBuf::retain); + return buffers; + } + + static void release(final Iterable buffers) { + // we assume `ByteBuf::release` does not complete abruptly + buffers.forEach(buffer -> { + if (buffer != null) { + buffer.release(); + } + }); + } + + static void close(final Iterable autoCloseables) { + Function exceptionTransformer = e -> { + if (e instanceof RuntimeException) { + return (RuntimeException) e; + } + return new RuntimeException(e); + }; + RuntimeException mainException = null; + for (AutoCloseable autoCloseable : autoCloseables) { + try { + autoCloseable.close(); + } catch (Exception e) { + if (mainException == null) { + mainException = exceptionTransformer.apply(e); + } else { + mainException.addSuppressed(e); + } + } + } + if (mainException != null) { + throw mainException; + } + } + + private ResourceUtil() { + fail(); + } + } + + @FunctionalInterface + private interface NoCheckedAutoCloseable extends AutoCloseable { + void close(); + } + + @ThreadSafe + static final class Marshallers { + private final Marshaller marshaller; + private final Marshaller unmarshaller; + + Marshallers( + final Marshaller marshaller, + final Marshaller unmarshaller) { + this.marshaller = marshaller; + this.unmarshaller = unmarshaller; + } + + Marshaller marshaller() { + return marshaller; + } + + Marshaller unmarshaller() { + return unmarshaller; + } + } + + private final class Exceptions { + Exceptions() { + } + + MongoSocketClosedException writeFailedStreamClosed() { + return new MongoSocketClosedException("Cannot write to a closed stream", serverAddress); + } + + MongoSocketClosedException readFailedStreamClosed() { + return new MongoSocketClosedException("Cannot read from a closed stream", serverAddress); + } + + MongoSocketWriteException writeFailed(final Throwable cause) { + return new MongoSocketWriteException("Exception sending message", serverAddress, assertNotNull(cause)); + } + + MongoSocketReadException readFailed(final Throwable cause) { + return new MongoSocketReadException("Exception receiving message", serverAddress, assertNotNull(cause)); + } + + MongoSocketReadTimeoutException readTimedOut() { + return new MongoSocketReadTimeoutException("Timeout while receiving message", serverAddress, null); + } + + MongoInterruptedException interrupted(@Nullable final InterruptedException e) { + return new MongoInterruptedException(null, e); + } + } + + @ThreadSafe + static final class WriteState { + private final Exceptions exceptions; + /** + * {@code null} only if {@link #startWrite(ClientCall, List)} has not been called. + */ + @Nullable + private volatile PendingWrite pendingWrite; + private volatile boolean closed; + + WriteState(final Exceptions exceptions) { + this.exceptions = exceptions; + closed = false; + } + + /** + * Must not be called if there is another {@link PendingWrite} that has not been {@linkplain PendingWrite#isCompleted() completed}. + * + * @param message Retained until {@link PendingWrite} is {@linkplain PendingWrite#isCompleted() completed}. + */ + PendingWrite startWrite(final ClientCall call, final List message) { + PendingWrite pendingWrite = this.pendingWrite; + if (pendingWrite != null) { + assertTrue(pendingWrite.isCompleted()); + } + pendingWrite = new PendingWrite(message, exceptions); + this.pendingWrite = pendingWrite; + if (closed) { + closePendingWrite(pendingWrite, null); + return pendingWrite; + } + try { + // `ClientCall.sendMessage` may not call `Marshaller.stream` + // using call stack, but rather store the message (`pendingWrite`) in heap memory, + // so that later (in the happens-before order) `Marshaller.stream` could access it. + // `Marshaller.stream` may never access and complete `pendingWrite`, + // but that is fine because we also store a reference to `pendingWrite` in heap memory, + // and we guarantee that eventually it will be complete either by us or by `Marshaller.stream`. + // + // Note that due to the contract of the `startWrite` method, + // `call.sendMessage` is guaranteed to be called sequentially. We must guarantee this + // because `sendMessage` is not thread-safe. + call.sendMessage(pendingWrite); + } catch (Exception e) { + pendingWrite.completeExceptionally(e); + } + return pendingWrite; + } + + /** + * Must be called only from {@link GrpcStream#close(StatusRuntimeException, boolean)}, + * which ensures that it is called not more than once. + *

    + * This method cannot be called more than once, but {@link #pendingWrite} may be written to concurrently with this method + * executing, because {@link GrpcStream#close(StatusRuntimeException, boolean)} may be called by Java gRPC at any point. + * We must make sure that {@link WriteState} + * cannot end up in a closed state with {@link #pendingWrite} not being {@linkplain PendingWrite#isCompleted() completed}.

    + */ + void close(@Nullable final StatusRuntimeException callStatusException) { + assertFalse(closed); + closed = true; + closePendingWrite(this.pendingWrite, callStatusException); + } + + private void closePendingWrite(@Nullable final PendingWrite pendingWrite, @Nullable final StatusRuntimeException callStatusException) { + if (pendingWrite != null) { + pendingWrite.completeExceptionally(callStatusException == null + ? exceptions.writeFailedStreamClosed() + : callStatusException); + } + } + + @ThreadSafe + static final class PendingWrite { + /** + * Contains {@code null} only if {@link #message} is {@linkplain #detachMessage() detached}, + * or {@link PendingWrite} is {@linkplain #isCompleted() completed}. + */ + private final AtomicReference> message; + private final Exceptions exceptions; + private final CompletableFuture future; + + private PendingWrite(final List message, final Exceptions exceptions) { + this.message = new AtomicReference<>(ResourceUtil.retain(message)); + this.exceptions = exceptions; + this.future = new CompletableFuture<>(); + } + + /** + * A caller is responsible for {@linkplain InputStream#close() closing} the {@link InputStream}. + * Must not be called more than once. + */ + InputStream detachedMessageInputStream() { + return new DetachedMessageInputStream(); + } + + void blockInterruptiblyUntilCompleted() throws MongoInterruptedException { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw exceptions.interrupted(e); + } catch (ExecutionException e) { + throw exceptions.writeFailed(assertNotNull(e.getCause())); + } + } + + boolean isCompleted() { + return future.isDone(); + } + + void completeExceptionally(final Exception cause) { + try { + future.completeExceptionally(cause); + } finally { + detachAndReleaseMessage(); + } + } + + void complete() { + try { + future.complete(null); + } finally { + detachAndReleaseMessage(); + } + } + + private void detachAndReleaseMessage() { + List detachedMessage = this.message.getAndSet(null); + if (detachedMessage != null) { + ResourceUtil.release(detachedMessage); + } + } + + /** + * A caller becomes responsible for {@linkplain ByteBuf#release() releasing} the buffers. + * Must not be called more than once. + */ + private List detachMessage() { + List detachedMessage = this.message.getAndSet(null); + if (detachedMessage == null) { + // this `PendingWrite` was completed exceptionally + assertTrue(future.isCompletedExceptionally()); + // `join` to propagate the exception + future.join(); + throw fail(); + } + return detachedMessage; + } + + final class DetachedMessageInputStream extends InputStream implements KnownLength { + /** + * {@code null} iff {@linkplain #close() closed}. + */ + @Nullable + private ArrayDeque buffers; + private int available; + + private DetachedMessageInputStream() { + this.buffers = new ArrayDeque<>(detachMessage()); + available = buffers.stream() + .filter(Objects::nonNull) + .mapToInt(ByteBuf::remaining) + .sum(); + } + + @Override + public int read() throws IOException { + throwIfClosed(); + ArrayDeque buffers = assertNotNull(this.buffers); + int bufferRemaining = 0; + int result = END_OF_STREAM; + ByteBuf buffer = buffers.peek(); + while (buffer != null) { + bufferRemaining = buffer.remaining(); + if (bufferRemaining == 0) { + buffers.remove().release(); + buffer = buffers.peek(); + } else { + result = buffer.get(); + bufferRemaining--; + available--; + break; + } + } + if (bufferRemaining == 0 && buffer != null) { + buffers.remove().release(); + } + return result; + } + + @Override + public int read(final byte[] target, final int off, final int len) throws IOException { + if (off < 0 || len < 0 || target.length - off < len) { + // `InputStream` requires `IllegalArgumentException` + throw new IllegalArgumentException(String.format("off=%d, len=%d, target.length=%d", off, len, target.length)); + } + throwIfClosed(); + if (len == 0) { + return 0; + } + ArrayDeque buffers = assertNotNull(this.buffers); + int bufferRemaining = 0; + int unreadLen = len; + ByteBuf buffer = buffers.peek(); + while (buffer != null) { + bufferRemaining = buffer.remaining(); + if (bufferRemaining < unreadLen) { + buffer.get(target, off, bufferRemaining); + unreadLen -= bufferRemaining; + bufferRemaining = 0; + buffers.remove().release(); + buffer = buffers.peek(); + } else { + buffer.get(target, off, unreadLen); + bufferRemaining -= unreadLen; + unreadLen = 0; + break; + } + } + if (bufferRemaining == 0 && buffer != null) { + buffers.remove().release(); + } + int readLen = len - unreadLen; + available -= readLen; + return readLen > 0 ? readLen : END_OF_STREAM; + } + + @Override + public int available() throws IOException { + throwIfClosed(); + return available; + } + + @Override + @SuppressWarnings("try") + public void close() { + ArrayDeque buffers = this.buffers; + if (buffers != null) { + try { + completePendingWrite(); + } finally { + ResourceUtil.release(buffers); + buffers.clear(); + this.buffers = null; + } + } + } + + private void completePendingWrite() { + if (available > 0) { + completeExceptionally(new RuntimeException("Message was not fully sent by Java gRPC")); + } else { + complete(); + } + } + + private void throwIfClosed() throws IOException { + if (buffers == null) { + // `InputStream` requires `IOException` + throw new IOException(DetachedMessageInputStream.class.getSimpleName() + " is closed"); + } + } + } + } + } + + private static final class ReadState { + private final Exceptions exceptions; + // I expect `LinkedBlockingQueue` adding smaller latency overhead than `ConcurrentLinkedQueue` when used here + // (the latter appears to be quite bad at the very least when there is not much contention). + // `ArrayBlockingQueue` would have been much better than both, but it is bounded, which is not suitable here. + private final LinkedBlockingQueue inputStreams; + /** + * {@code null} only if {@link #startRead(int, io.netty.buffer.ByteBuf, Timeout)} has not been called. + */ + @Nullable + private volatile PendingRead pendingRead; + private volatile boolean closed; + + ReadState(final Exceptions exceptions) { + this.exceptions = exceptions; + inputStreams = new LinkedBlockingQueue<>(); + closed = false; + } + + /** + * Must not be called if there is another {@link PendingRead} that has not been {@linkplain PendingRead#isCompleted() completed}. + * + * @param message Retained until {@link PendingRead} is {@linkplain PendingRead#isCompleted() completed}. + */ + PendingRead startRead(final int numBytes, final io.netty.buffer.ByteBuf message, final Timeout timeout) { + PendingRead pendingRead = this.pendingRead; + if (pendingRead != null) { + assertTrue(pendingRead.isCompleted()); + } + pendingRead = new PendingRead(numBytes, message, timeout, inputStreams, exceptions); + this.pendingRead = pendingRead; + if (closed) { + closePendingRead(pendingRead, null); + return pendingRead; + } + pendingRead.tryComplete(); + return pendingRead; + } + + void add(final InputStream inputStream) { + this.inputStreams.add(inputStream); + if (closed) { + clearAndClose(inputStreams); + return; + } + PendingRead pendingRead = this.pendingRead; + if (pendingRead != null) { + pendingRead.tryComplete(); + } + } + + /** + * Must be called only from {@link GrpcStream#close(StatusRuntimeException, boolean)}, + * which ensures that it is called not more than once. + *

    + * This method cannot be called more than once, but {@link #pendingRead} may be written to concurrently with this method + * executing, because {@link GrpcStream#close(StatusRuntimeException, boolean)} may be called by Java gRPC at any point. + * We must make sure that {@link ReadState} + * cannot end up in a closed state with {@link #pendingRead} not being {@linkplain PendingRead#isCompleted() completed}.

    + */ + void close(@Nullable final StatusRuntimeException callStatusException) { + assertFalse(closed); + closed = true; + try { + closePendingRead(this.pendingRead, callStatusException); + } finally { + clearAndClose(inputStreams); + } + } + + private void closePendingRead(@Nullable final PendingRead pendingRead, @Nullable final StatusRuntimeException callStatusException) { + if (pendingRead != null) { + pendingRead.completeExceptionally(callStatusException == null + ? exceptions.readFailedStreamClosed() + : callStatusException); + } + } + + private void clearAndClose(final LinkedBlockingQueue inputStreams) { + ArrayList autoCloseables = new ArrayList<>(); + inputStreams.drainTo(autoCloseables); + ResourceUtil.close(autoCloseables); + } + + @ThreadSafe + static final class PendingRead { + /** + * Must be guarded with {@link #messageLock}. + */ + private int unreadNumBytes; + /** + * Contains {@code null} only if {@link PendingRead} is completed or is being tried to be completed. + */ + private final AtomicReference message; + private final Lock messageLock; + private final Timeout timeout; + /** + * Stays not {@linkplain ResourceUtil#close(Iterable) closed} when {@link PendingWrite} is {@linkplain #isCompleted() completed}. + */ + private final LinkedBlockingQueue inputStreams; + private final Exceptions exceptions; + private final CompletableFuture future; + + private PendingRead( + final int numBytes, + final io.netty.buffer.ByteBuf message, + final Timeout timeout, + final LinkedBlockingQueue inputStreams, + final Exceptions exceptions) { + this.unreadNumBytes = numBytes; + this.message = new AtomicReference<>(message.retain()); + messageLock = new StampedLock().asWriteLock(); + this.timeout = timeout; + this.inputStreams = inputStreams; + this.exceptions = exceptions; + this.future = new CompletableFuture<>(); + } + + ByteBuf blockInterruptiblyUntilCompleted() throws MongoInterruptedException { + try { + return timeout.isInfinite() + ? future.get() + : future.get(timeout.remaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw exceptions.interrupted(e); + } catch (ExecutionException e) { + throw exceptions.readFailed(assertNotNull(e.getCause())); + } catch (TimeoutException e) { + throw exceptions.readTimedOut(); + } + } + + boolean isCompleted() { + return future.isDone(); + } + + void completeExceptionally(final Exception cause) { + try { + future.completeExceptionally(cause); + } finally { + detachAndReleaseMessage(); + } + } + + /** + * This method does not block while waiting for more data, + * but may briefly block if called concurrently with itself, which is unlikely. + *

    + * This method relies on streams in {@link #inputStreams} to contain all the data they may produce and report + * {@link #END_OF_STREAM} when that data is exhausted instead of blocking. Common sense suggests that this must be the case, + * but Java gRPC does not state such a guarantee. If this turns out to not be the case, and blocking harms us, + * then we will have to do copying in {@link InputStreamUnmarshaller#parse(InputStream)} + * from an {@link InputStream} to a buffer.

    + */ + void tryComplete() { + try { + boolean timedOut; + if (timeout.isInfinite()) { + messageLock.lockInterruptibly(); + timedOut = false; + } else { + timedOut = !messageLock.tryLock(timeout.remaining(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); + } + if (timedOut) { + completeExceptionally(exceptions.readTimedOut()); + return; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + completeExceptionally(exceptions.interrupted(e)); + return; + } + try { + uncheckedTryComplete(); + } finally { + messageLock.unlock(); + } + } + + /** + * Must be guarded with {@link #messageLock}. + */ + private void uncheckedTryComplete() { + // While we are trying to complete, we temporarily detach `message` to prevent other threads + // from releasing it concurrently with us using it. + // This `PendingRead` still may be completed exceptionally concurrently, + // or it may have been completed normally. + io.netty.buffer.ByteBuf detachedMessage = this.message.getAndSet(null); + if (detachedMessage == null) { + assertTrue(isCompleted()); + return; + } else { + assertTrue(unreadNumBytes > 0); + } + try { + InputStream inputStream = inputStreams.peek(); + while (unreadNumBytes > 0 && inputStream != null) { + int readNumBytes = detachedMessage.writeBytes(inputStream, unreadNumBytes); + if (readNumBytes == END_OF_STREAM) { + inputStreams.remove().close(); + inputStream = inputStreams.peek(); + } else { + unreadNumBytes -= readNumBytes; + } + if (unreadNumBytes > 0) { + if (timeout.expired()) { + throw exceptions.readTimedOut(); + } else if (Thread.currentThread().isInterrupted()) { + throw exceptions.interrupted(null); + } + } + } + } catch (Exception e) { + completeExceptionally(e); + } finally { + if (unreadNumBytes == 0) { + complete(detachedMessage); + } else if (unreadNumBytes > 0) { + // reattach `detachedMessage` because we have not read everything we need + this.message.set(detachedMessage); + if (isCompleted()) { + assertTrue(future.isCompletedExceptionally()); + detachAndReleaseMessage(); + } + } else { + fail(); + } + } + } + + private void complete(final io.netty.buffer.ByteBuf detachedMessage) { + ByteBuf result = new NettyByteBuf(detachedMessage); + result.flip(); + if (!future.complete(result)) { + assertTrue(future.isCompletedExceptionally()); + result.release(); + } + } + + private void detachAndReleaseMessage() { + io.netty.buffer.ByteBuf detachedMessage = this.message.getAndSet(null); + if (detachedMessage != null) { + detachedMessage.release(); + } + } + } + } + + @ThreadSafe + private static final class PendingWriteMarshaller implements Marshaller { + PendingWriteMarshaller() { + } + + @Override + public InputStream stream(final PendingWrite value) { + return value.detachedMessageInputStream(); + } + + @Override + public PendingWrite parse(final InputStream stream) { + throw fail(); + } + } + + @ThreadSafe + private static final class InputStreamUnmarshaller implements Marshaller { + InputStreamUnmarshaller() { + } + + @Override + public InputStream stream(final InputStream value) { + throw fail(); + } + + @Override + @SuppressWarnings("try") + public InputStream parse( + // we must not close `stream`, Java gRPC does that + final InputStream stream) { + // We do not rely on `io.grpc.HasByteBuffer` because `HasByteBuffer.byteBufferSupported` is not always `true`, + // and `HasByteBuffer.getByteBuffer` is really weird: on `io.grpc.internal.CompositeReadableBuffer` it returns only the + // first buffer, and does not allow accessing the rest of them. + // + // We, however, rely on `Detachable` because it allows us to avoid copying bytes twice when reading. + // We still have to copy them once to return `ByteBuf` from `GrpcStream.read`. + // + // We could call `ReadState.add` here, but Java gRPC expects us to rather do that in `ClientCall.Listener.onMessage`. + // Let us hope that Java gRPC calls `onMessage` for each `parse` that completes normally, regardless of how bad things are, + // otherwise, we have a resource leak. It would have been helpful if Java gRPC explicitly documented such a crucial guarantee, + // because if it does not exist, then the Java gRPC API is inherently broken. + assertTrue(stream instanceof Detachable); + return ((Detachable) stream).detach(); + } + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStreamFactory.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStreamFactory.java new file mode 100644 index 0000000000..df0c5e00a2 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/GrpcStreamFactory.java @@ -0,0 +1,181 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.grpc; + +import com.mongodb.ServerAddress; +import com.mongodb.annotations.Immutable; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.BufferProvider; +import com.mongodb.connection.ConnectionPoolSettings; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.SslSettings; +import com.mongodb.connection.StreamFactory; +import com.mongodb.internal.connection.ByteBufferBsonOutput; +import com.mongodb.internal.connection.MessageSettings; +import com.mongodb.internal.connection.grpc.GrpcStream.Marshallers; +import com.mongodb.internal.connection.grpc.GrpcStream.WriteState.PendingWrite; +import com.mongodb.internal.connection.netty.NettyByteBuf; +import com.mongodb.lang.Nullable; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; +import io.netty.buffer.ByteBufAllocator; +import org.bson.BsonBinaryWriter; +import org.bson.BsonBinaryWriterSettings; +import org.bson.BsonDocument; +import org.bson.BsonWriterSettings; +import org.bson.ByteBuf; +import org.bson.codecs.BsonValueCodecProvider; +import org.bson.codecs.EncoderContext; +import org.bson.codecs.configuration.CodecRegistry; + +import java.io.InputStream; +import java.util.Base64; +import java.util.UUID; + +import static org.bson.codecs.configuration.CodecRegistries.fromProviders; + +/** + * A {@link StreamFactory} for {@link GrpcStream}s. + *

    + * This class is not part of the public API and may be removed or changed at any time.

    + */ +@ThreadSafe +public final class GrpcStreamFactory implements StreamFactory { + /** + * @see MessageSettings#DEFAULT_MAX_DOCUMENT_SIZE + */ + // VAKOTODO use + private static final int MAX_BSON_OBJECT_SIZE = MessageSettings.DEFAULT_MAX_DOCUMENT_SIZE; + /** + * @see MessageSettings#DEFAULT_MAX_MESSAGE_SIZE + */ + static final int MAX_MESSAGE_SIZE_BYTES = 48_000_000; + /** + * @see MessageSettings#DEFAULT_MAX_BATCH_COUNT + */ + // VAKOTODO use + private static final int MAX_WRITE_BATCH_SIZE = 100_000; + + private final SocketSettings socketSettings; + private final SslSettings sslSettings; + @Nullable + private final ConnectionPoolSettings connectionPoolSettings; + private final UUID clientId; + private final NettyByteBufProvider bufferProvider; + @Nullable + private final ClientMetadataDocument clientMetadataDocument; + private final SharingGrpcStreamFactoryFactory.Channels channels; + + /** + * @param connectionPoolSettings Not {@code null} iff this constructor is called from {@link SharingGrpcStreamFactoryFactory}. + * @param clientMetadataDocument May be non-{@code null} only if this constructor is called from {@link SharingGrpcStreamFactoryFactory}. + * @param clientId The value for the {@code mongodb-clientId} {@linkplain Metadata.Key metadata key}. + */ + public GrpcStreamFactory( + final SocketSettings socketSettings, + final SslSettings sslSettings, + @Nullable + final ConnectionPoolSettings connectionPoolSettings, + @Nullable + final BsonDocument clientMetadataDocument, + final UUID clientId, + final ByteBufAllocator allocator, + final SharingGrpcStreamFactoryFactory.Channels channels) { + this.socketSettings = socketSettings; + this.sslSettings = sslSettings; + this.connectionPoolSettings = connectionPoolSettings; + this.clientId = clientId; + bufferProvider = new NettyByteBufProvider(allocator); + this.clientMetadataDocument = clientMetadataDocument == null ? null : new ClientMetadataDocument(clientMetadataDocument, bufferProvider); + this.channels = channels; + } + + @Override + public GrpcStream create(final ServerAddress serverAddress) { + return new GrpcStream(serverAddress, clientId, clientMetadataDocument, socketSettings, bufferProvider, + (fullMethodName, marshallers) -> createCall(serverAddress, fullMethodName, marshallers)); + } + + public UUID clientId() { + return clientId; + } + + public ByteBufAllocator allocator() { + return bufferProvider.allocator(); + } + + private ClientCall createCall( + final ServerAddress serverAddress, + final String fullMethodName, + final Marshallers marshallers) { + ManagedChannel channel = channels.channel(serverAddress, socketSettings, sslSettings, connectionPoolSettings, allocator()); + return channel.newCall( + MethodDescriptor.newBuilder(marshallers.marshaller(), marshallers.unmarshaller()) + .setFullMethodName(fullMethodName) + .setType(MethodDescriptor.MethodType.BIDI_STREAMING) + .setSafe(false) + .setIdempotent(false) + .build(), + CallOptions.DEFAULT + .withMaxOutboundMessageSize(MAX_MESSAGE_SIZE_BYTES) + .withMaxInboundMessageSize(MAX_MESSAGE_SIZE_BYTES) + ); + } + + static final class NettyByteBufProvider implements BufferProvider { + private final ByteBufAllocator allocator; + + NettyByteBufProvider(final ByteBufAllocator allocator) { + this.allocator = allocator; + } + + @Override + public ByteBuf getBuffer(final int size) { + io.netty.buffer.ByteBuf allocatedBuffer = allocator.buffer(size, size); + allocatedBuffer.touch(); + return new NettyByteBuf(allocatedBuffer); + } + + ByteBufAllocator allocator() { + return allocator; + } + } + + @Immutable + static final class ClientMetadataDocument { + private static final CodecRegistry CODEC_REGISTRY = fromProviders(new BsonValueCodecProvider()); + + private final String base64; + + private ClientMetadataDocument(final BsonDocument clientMetadataDocument, final BufferProvider bufferProvider) { + try (ByteBufferBsonOutput out = new ByteBufferBsonOutput(bufferProvider)) { + CODEC_REGISTRY.get(BsonDocument.class).encode( + new BsonBinaryWriter(new BsonWriterSettings(), new BsonBinaryWriterSettings(MAX_BSON_OBJECT_SIZE), out), + clientMetadataDocument, + EncoderContext.builder().build()); + base64 = Base64.getEncoder().encodeToString(out.toByteArray()); + } + } + + String base64() { + return base64; + } + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/SharingGrpcStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/SharingGrpcStreamFactoryFactory.java new file mode 100644 index 0000000000..2665f69ba9 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/SharingGrpcStreamFactoryFactory.java @@ -0,0 +1,179 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.grpc; + +import com.mongodb.ServerAddress; +import com.mongodb.annotations.ThreadSafe; +import com.mongodb.connection.ConnectionPoolSettings; +import com.mongodb.connection.grpc.GrpcStreamFactoryFactory; +import com.mongodb.connection.SocketSettings; +import com.mongodb.connection.SslSettings; +import com.mongodb.connection.StreamFactory; +import com.mongodb.connection.StreamFactoryFactory; +import com.mongodb.lang.Nullable; +import io.grpc.Channel; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import io.grpc.netty.NegotiationType; +import io.grpc.netty.NettyChannelBuilder; +import io.netty.buffer.ByteBufAllocator; +import io.netty.handler.ssl.ApplicationProtocolConfig; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import org.bson.BsonDocument; + +import javax.net.ssl.SSLException; +import java.util.Collections; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.internal.connection.netty.NettyChannelOptionsSetter.configureNettyChannelOptions; + +/** + * A {@link StreamFactory} that is similar to {@link GrpcStreamFactoryFactory}, + * but shares gRPC {@link Channel}s between all {@link StreamFactory}s + * {@linkplain #create(SocketSettings, SslSettings) created} by it. + * Consequently, it must not be shared between {@code MongoClient}s. + *

    + * This class is not part of the public API and may be removed or changed at any time.

    + */ +@ThreadSafe +public final class SharingGrpcStreamFactoryFactory implements StreamFactoryFactory { + private final GrpcStreamFactoryFactory wrapped; + private final ConnectionPoolSettings connectionPoolSettings; + @Nullable + private final BsonDocument clientMetadataDocument; + private final Channels sharedChannels; + + public SharingGrpcStreamFactoryFactory(final GrpcStreamFactoryFactory wrapped, final ConnectionPoolSettings connectionPoolSettings) { + this(wrapped, connectionPoolSettings, null); + } + + private SharingGrpcStreamFactoryFactory( + final GrpcStreamFactoryFactory wrapped, + final ConnectionPoolSettings connectionPoolSettings, + @Nullable final BsonDocument clientMetadataDocument) { + this.wrapped = wrapped; + this.connectionPoolSettings = connectionPoolSettings; + this.clientMetadataDocument = clientMetadataDocument; + sharedChannels = new Channels(); + } + + /** + * @return The configured {@link SharingGrpcStreamFactoryFactory}, which may or may not be the same as {@code this}. + */ + public SharingGrpcStreamFactoryFactory withClientMetadataDocument(@Nullable final BsonDocument clientMetadataDocument) { + return clientMetadataDocument == null ? this : new SharingGrpcStreamFactoryFactory(wrapped, connectionPoolSettings, clientMetadataDocument); + } + + @Override + public StreamFactory create(final SocketSettings socketSettings, final SslSettings sslSettings) { + GrpcStreamFactory streamFactoryFromWrapped = (GrpcStreamFactory) wrapped.create(socketSettings, sslSettings); + return new GrpcStreamFactory(socketSettings, sslSettings, connectionPoolSettings, clientMetadataDocument, + streamFactoryFromWrapped.clientId(), streamFactoryFromWrapped.allocator(), sharedChannels); + } + + /** + * Returns the wrapped {@link GrpcStreamFactoryFactory}. + */ + public GrpcStreamFactoryFactory unwrap() { + return wrapped; + } + + @Override + public String toString() { + return "SharingGrpcStreamFactoryFactory{" + + "wrapped=" + wrapped + + '}'; + } + + // VAKOTODO how to release resources? + @ThreadSafe + public static final class Channels { + private final ConcurrentHashMap channels; + + public Channels() { + channels = new ConcurrentHashMap<>(); + } + + ManagedChannel channel( + final ServerAddress serverAddress, + final SocketSettings socketSettings, + final SslSettings sslSettings, + @Nullable + final ConnectionPoolSettings connectionPoolSettings, + final ByteBufAllocator allocator) { + return channels.compute(serverAddress, (address, channel) -> { + if (channel == null || channel.getState(false) == ConnectivityState.SHUTDOWN) { + return createChannel(serverAddress, socketSettings, sslSettings, connectionPoolSettings, allocator); + } else { + return channel; + } + }); + } + + private static ManagedChannel createChannel( + final ServerAddress serverAddress, + final SocketSettings socketSettings, + final SslSettings sslSettings, + @Nullable + final ConnectionPoolSettings connectionPoolSettings, + final ByteBufAllocator allocator) { + NettyChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(serverAddress.getHost(), serverAddress.getPort()) + .disableRetry(); + if (connectionPoolSettings != null) { + long maxIdleMillis = connectionPoolSettings.getMaxConnectionIdleTime(TimeUnit.MICROSECONDS); + if (maxIdleMillis > 0) { + channelBuilder.idleTimeout(maxIdleMillis, TimeUnit.MILLISECONDS); + } + } + configureJunk(channelBuilder); + configureTls(channelBuilder, sslSettings); + configureNettyChannelOptions(socketSettings, allocator, channelBuilder::withOption); + return channelBuilder.build(); + } + + private static void configureTls(final NettyChannelBuilder channelBuilder, final SslSettings sslSettings) { + if (sslSettings.isEnabled()) { + channelBuilder.negotiationType(NegotiationType.TLS); + SslContext nettySslContext; + try { + nettySslContext = SslContextBuilder.forClient() + .sslProvider(SslProvider.JDK) + .applicationProtocolConfig(new ApplicationProtocolConfig( + ApplicationProtocolConfig.Protocol.ALPN, + ApplicationProtocolConfig.SelectorFailureBehavior.FATAL_ALERT, + ApplicationProtocolConfig.SelectedListenerFailureBehavior.FATAL_ALERT, + // "h2" stands for HTTP/2, see `javax.net.ssl.SSLParameters` + Collections.singletonList("h2")) + ).build(); + } catch (SSLException e) { + throw new RuntimeException(e); + } + channelBuilder.sslContext(nettySslContext); + } else { + channelBuilder.negotiationType(NegotiationType.PLAINTEXT); + } + } + + // VAKOTODO this is junk required by https://github.com/10gen/atlasproxy, delete in the future. + private static void configureJunk(final NettyChannelBuilder channelBuilder) { + channelBuilder.overrideAuthority("host.local.10gen.cc"); + } + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/grpc/package-info.java b/driver-core/src/main/com/mongodb/internal/connection/grpc/package-info.java new file mode 100644 index 0000000000..a3528e21d7 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/grpc/package-info.java @@ -0,0 +1,23 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Contains gRPC-specific program elements. + */ +@NonNullApi +package com.mongodb.internal.connection.grpc; + +import com.mongodb.lang.NonNullApi; diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyChannelOptionsSetter.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyChannelOptionsSetter.java new file mode 100644 index 0000000000..2e36f765e1 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyChannelOptionsSetter.java @@ -0,0 +1,44 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.connection.netty; + +import com.mongodb.connection.SocketSettings; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelOption; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@FunctionalInterface +public interface NettyChannelOptionsSetter { + void set(ChannelOption option, T value); + + static void configureNettyChannelOptions( + final SocketSettings socketSettings, + final ByteBufAllocator allocator, + final NettyChannelOptionsSetter optionSetter) { + optionSetter.set(ChannelOption.CONNECT_TIMEOUT_MILLIS, socketSettings.getConnectTimeout(MILLISECONDS)); + optionSetter.set(ChannelOption.TCP_NODELAY, true); + optionSetter.set(ChannelOption.SO_KEEPALIVE, true); + if (socketSettings.getReceiveBufferSize() > 0) { + optionSetter.set(ChannelOption.SO_RCVBUF, socketSettings.getReceiveBufferSize()); + } + if (socketSettings.getSendBufferSize() > 0) { + optionSetter.set(ChannelOption.SO_SNDBUF, socketSettings.getSendBufferSize()); + } + optionSetter.set(ChannelOption.ALLOCATOR, allocator); + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java index 96b841283b..c0994aaf68 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java @@ -127,7 +127,9 @@ class AsyncQueryBatchCursor implements AsyncAggregateResponseBatchCursor { if (limitReached()) { killCursor(connection); } else { - if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) { + if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER + // VAKOTODO somehow check if gRPC is being used. For now, we assume it is always used + || Boolean.valueOf(true)) { this.pinnedConnection = connection.retain(); this.pinnedConnection.markAsPinned(Connection.PinningMode.CURSOR); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java index 587237fcaf..e0cabb1d41 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java @@ -133,7 +133,9 @@ class QueryBatchCursor implements AggregateResponseBatchCursor { releaseServerAndResources = true; } else { assertNotNull(connectionSource); - if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER) { + if (connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER + // VAKOTODO somehow check if gRPC is being used. For now, we assume it is always used + || Boolean.valueOf(true)) { connectionToPin = connection; } } diff --git a/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy b/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy index 536a1e482e..c5e4120922 100644 --- a/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/ConnectionStringSpecification.groovy @@ -512,6 +512,7 @@ class ConnectionStringSpecification extends Specification { connectionString.getCompressorList() == [] connectionString.getRetryWritesValue() == null connectionString.getRetryReads() == null + connectionString.isGrpc() == null } @Unroll @@ -694,8 +695,10 @@ class ConnectionStringSpecification extends Specification { + 'secondaryPreferred') | new ConnectionString('mongodb://localhost/?readPreference=' + 'secondaryPreferred') new ConnectionString('mongodb://ross:123@localhost/?' - + 'authMechanism=SCRAM-SHA-1') | new ConnectionString('mongodb://ross:123@localhost/?' - + 'authMechanism=SCRAM-SHA-1') + + 'authMechanism=SCRAM-SHA-1' + + '&gRPC=true') | new ConnectionString('mongodb://ross:123@localhost/?' + + 'authMechanism=SCRAM-SHA-1' + + '&gRPC=true') new ConnectionString('mongodb://ross:123@localhost/?' + 'proxyHost=proxy.com' + '&proxyPort=1080' @@ -718,7 +721,8 @@ class ConnectionStringSpecification extends Specification { + 'safe=false;w=1;wtimeout=2500;' + 'fsync=true;readPreference=primary;' + 'directConnection=true;' - + 'ssl=true') | new ConnectionString('mongodb://localhost/db.coll?minPoolSize=5;' + + 'ssl=true;' + + 'gRPC=false') | new ConnectionString('mongodb://localhost/db.coll?minPoolSize=5;' + 'maxPoolSize=10;' + 'waitQueueTimeoutMS=150;' + 'maxIdleTimeMS=200&maxLifeTimeMS=300' @@ -728,7 +732,8 @@ class ConnectionStringSpecification extends Specification { + 'socketTimeoutMS=5500&safe=false&w=1;' + 'wtimeout=2500;fsync=true' + '&directConnection=true' - + '&readPreference=primary;ssl=true') + + '&readPreference=primary;ssl=true' + + '&gRPC=false') } def 'should be not equal to another ConnectionString with the different string values'() { @@ -759,6 +764,7 @@ class ConnectionStringSpecification extends Specification { new ConnectionString('mongodb://ross:123@localhost/?' + 'authMechanism=SCRAM-SHA-1') | new ConnectionString('mongodb://ross:123@localhost/?' + 'authMechanism=GSSAPI') + new ConnectionString('mongodb://localhost/?gRPC=true') | new ConnectionString('mongodb://localhost/?gRPC=false') new ConnectionString('mongodb://ross:123@localhost/?' + 'proxyHost=proxy.com') | new ConnectionString('mongodb://ross:123@localhost/?' + 'proxyHost=1proxy.com') diff --git a/driver-core/src/test/unit/com/mongodb/MongoClientSettingsSpecification.groovy b/driver-core/src/test/unit/com/mongodb/MongoClientSettingsSpecification.groovy index 6f1e01f2f5..66c2341355 100644 --- a/driver-core/src/test/unit/com/mongodb/MongoClientSettingsSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/MongoClientSettingsSpecification.groovy @@ -322,6 +322,7 @@ class MongoClientSettingsSpecification extends Specification { + '&proxyPort=1080' + '&proxyUsername=username' + '&proxyPassword=password' + + '&gRPC=false' ) MongoClientSettings settings = MongoClientSettings.builder().applyConnectionString(connectionString).build() MongoClientSettings expected = MongoClientSettings.builder() @@ -442,6 +443,7 @@ class MongoClientSettingsSpecification extends Specification { .compressorList([MongoCompressor.createZlibCompressor().withProperty(MongoCompressor.LEVEL, 5)]) .retryWrites(true) .retryReads(true) + .streamFactoryFactory(NettyStreamFactoryFactory.builder().build()) def expectedSettings = builder.build() def settingsWithDefaultConnectionStringApplied = builder diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java index e71d9e10f5..704c58bf77 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/StreamFactoryHelperTest.java @@ -34,7 +34,7 @@ class StreamFactoryHelperTest { @Test void streamFactoryFactoryIsNullWithDefaultSettings() { MongoClientSettings settings = MongoClientSettings.builder().build(); - assertNull(StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings)); + assertNull(StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings, null)); } @Test @@ -43,7 +43,7 @@ void streamFactoryFactoryIsEqualToSettingsStreamFactoryFactory() { MongoClientSettings settings = MongoClientSettings.builder() .streamFactoryFactory(streamFactoryFactory) .build(); - assertEquals(streamFactoryFactory, StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings)); + assertEquals(streamFactoryFactory, StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings, null)); } @Test @@ -57,6 +57,6 @@ void streamFactoryFactoryIsDerivedFromTransportSettings() { .transportSettings(nettyTransportSettings) .build(); assertEquals(NettyStreamFactoryFactory.builder().applySettings(nettyTransportSettings).build(), - StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings)); + StreamFactoryHelper.getStreamFactoryFactoryFromSettings(settings, null)); } } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java index 388fca2918..e2918d1e71 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClients.java @@ -115,7 +115,7 @@ public static MongoClient create(final MongoClientSettings settings, @Nullable f if (settings.getSocketSettings().getProxySettings().isProxyEnabled()) { throw new MongoClientException("Proxy is not supported for reactive clients"); } - StreamFactoryFactory streamFactoryFactory = getStreamFactoryFactoryFromSettings(settings); + StreamFactoryFactory streamFactoryFactory = getStreamFactoryFactoryFromSettings(settings, mongoDriverInformation); if (streamFactoryFactory == null) { if (settings.getSslSettings().isEnabled()) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java index 46fa37bf8d..087cd41732 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java @@ -115,7 +115,9 @@ private void getConnectionSource(final AsyncCallbackSupplier transactionContext = new TransactionContext<>(clusterType); session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext); transactionContext.release(); // The session is responsible for retaining a reference to the context diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java index a265ca01a7..981b4152c1 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java @@ -131,7 +131,9 @@ private ConnectionSource getConnectionSource(final Supplier wr if (TransactionContext.get(session) == null) { ConnectionSource source = wrappedConnectionSourceSupplier.get(); ClusterType clusterType = source.getServerDescription().getClusterType(); - if (clusterType == SHARDED || clusterType == LOAD_BALANCED) { + if (clusterType == SHARDED || clusterType == LOAD_BALANCED + // VAKOTODO somehow check if gRPC is being used. For now, we assume it is always used + || Boolean.valueOf(true)) { TransactionContext transactionContext = new TransactionContext<>(clusterType); session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext); transactionContext.release(); // The session is responsible for retaining a reference to the context diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java index 7fb1adcc14..bde2096286 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClientImpl.java @@ -224,15 +224,18 @@ private static Cluster createCluster(final MongoClientSettings settings, notNull("settings", settings); return new DefaultClusterFactory().createCluster(settings.getClusterSettings(), settings.getServerSettings(), settings.getConnectionPoolSettings(), InternalConnectionPoolSettings.builder().build(), - getStreamFactory(settings, false), getStreamFactory(settings, true), + getStreamFactory(settings, mongoDriverInformation, false), getStreamFactory(settings, mongoDriverInformation, true), settings.getCredential(), settings.getLoggerSettings(), getCommandListener(settings.getCommandListeners()), settings.getApplicationName(), mongoDriverInformation, settings.getCompressorList(), settings.getServerApi(), settings.getDnsClient(), settings.getInetAddressResolver()); } @SuppressWarnings("deprecation") - private static StreamFactory getStreamFactory(final MongoClientSettings settings, final boolean isHeartbeat) { - StreamFactoryFactory streamFactoryFactory = getStreamFactoryFactoryFromSettings(settings); + private static StreamFactory getStreamFactory( + final MongoClientSettings settings, + @Nullable final MongoDriverInformation mongoDriverInformation, + final boolean isHeartbeat) { + StreamFactoryFactory streamFactoryFactory = getStreamFactoryFactoryFromSettings(settings, mongoDriverInformation); SocketSettings socketSettings = isHeartbeat ? settings.getHeartbeatSocketSettings() : settings.getSocketSettings(); if (streamFactoryFactory == null) { return new SocketStreamFactory(socketSettings, settings.getSslSettings()); diff --git a/driver-sync/src/test/functional/example/Java5018.java b/driver-sync/src/test/functional/example/Java5018.java new file mode 100644 index 0000000000..b5bd70b5da --- /dev/null +++ b/driver-sync/src/test/functional/example/Java5018.java @@ -0,0 +1,109 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package example; + +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.connection.ServerType; +import com.mongodb.connection.SslSettings; + +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import static com.mongodb.assertions.Assertions.assertTrue; + +/** + * Clone atlasproxy to + * {@code /Users/valentin.kovalenko/Documents/projects/atlasproxy/main/} and checkout the commit d63a315fa0b9d81d714855d77961dbf77c203c08. + * Make sure you have read access to mongonet. + * If you do not, then request access to {@code 10gen/mongonet} via MANA. + * Then do the following from Bash to build and start MongoDB and proxies that support gRPC: + * + *
    {@code
    + *  $ export MONGO_DIR=/usr/local/bin/
    + *  $ export RSNAME=""
    + *  $ export SERVERLESS_MODE=true
    + *  $ export SERVERLESS_METRICS_MODE=true
    + *  $ export GRPC_MODE=true
    + *  $
    + *  $ # see https://wiki.corp.mongodb.com/display/MMS/MongoDB+Agent+Resources#MongoDBAgentResources-Developing
    + *  $ export GOPRIVATE=github.com/10gen/cloud-agent-common,github.com/10gen/cloud-auth-common,github.com/10gen/bsonio,github.com/10gen/bsonutil,github.com/10gen/mongoast,github.com/10gen/mongonet
    + *  $ git config --global url."ssh://git@github.com/10gen/cloud-agent-common".insteadOf "https://github.com/10gen/cloud-agent-common"
    + *  $ git config --global url."ssh://git@github.com/10gen/cloud-auth-common".insteadOf "https://github.com/10gen/cloud-auth-common"
    + *  $ git config --global url."ssh://git@github.com/10gen/mongoast".insteadOf "https://github.com/10gen/mongoast"
    + *  $ git config --global url."ssh://git@github.com/10gen/bsonio".insteadOf "https://github.com/10gen/bsonio"
    + *  $ git config --global url."ssh://git@github.com/10gen/bsonutil".insteadOf "https://github.com/10gen/bsonutil"
    + *  $ git config --global url."ssh://git@github.com/10gen/mongonet".insteadOf "https://github.com/10gen/mongonet"
    + *  $
    + *  $ cd /Users/valentin.kovalenko/Documents/projects/atlasproxy/main/
    + *  $ # works with go 1.20.5
    + *  $ ./start_test_proxies_and_mtms.sh
    + * }
    + * + * If successful, three {@code mongod} processes and three proxies will be started: + * + *
    {@code
    + *  atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27000/?ssl=true' -bindPort 9900 -grpcBindPort 9901 -metricsPort 8100 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9900.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9900 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -serverlessMode=true -serverlessMetricsMode=true -testing=true
    + *  atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27010/?ssl=true' -bindPort 9910 -grpcBindPort 9911 -metricsPort 8110 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9910.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9910 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -disableProfile -serverlessMode=true -serverlessMetricsMode=true -testing=true
    + *  atlasproxy -mongoURI 'mongodb://u:p@host1.local.10gen.cc:27000,host2.local.10gen.cc:27010,host3.local.10gen.cc:27020/?ssl=true' -localMongoURI 'mongodb://u:p@host1.local.10gen.cc:27020/?ssl=true' -bindPort 9920 -grpcBindPort 9921 -metricsPort 8120 -configPath configReplicaSet.json -sslPEMKeyFile star.local.10gen.cc.pem -sslCAFile ca.pem -logPath=proxylogs/proxy9920.log -v -rssThresholdPct 10 -proxyHostnameForTests donorProxy9920 -remoteConfigPathForTests=configReplicaSetRecipient.json -enableTestCommands -disableProfile -serverlessMode=true -serverlessMetricsMode=true -testing=true
    + * }
    + * + * Connect to the fist one, which presents itself as {@code mongos} ({@link ServerType#SHARD_ROUTER}), via {@code mongosh}: + * + *
    {@code
    + *  $ mongosh "mongodb://user:pencil@host9.local.10gen.cc:9900/?tls=true" --tlsCAFile=/Users/valentin.kovalenko/Documents/projects/atlasproxy/main/ca.pem
    + * }
    + * + * Create a truststore that will be used by {@link MongoClient} to authenticate the server + * (note that {@link SslSettings.Builder#invalidHostNameAllowed(boolean)} cannot be used to disable the authentication, it may only + * relax it when it comes to verifying the server hostname against the subject in the certificate presented by the server): + * + *
    {@code
    + *  $ cd /Users/valentin.kovalenko/Documents/projects/atlasproxy/main/
    + *  $ cp /Users/valentin.kovalenko/.sdkman/candidates/java/current/lib/security/cacerts ./mongo-truststore
    + *  $ keytool --importcert -trustcacerts -file ./ca.pem -keystore mongo-truststore -storepass changeit -storetype JKS -noprompt
    + * }
    + * + * Connect to the first one via {@link Java5018} by running it with the following {@code java} CLI arguments: + * + *
    {@code
    + *  -Djavax.net.trustStoreType=jks -Djavax.net.ssl.trustStore=/Users/valentin.kovalenko/Documents/projects/atlasproxy/main/mongo-truststore -Djavax.net.ssl.trustStorePassword=changeit
    + * }
    + * + * Note that the port 9900 is for mongorpc, while 9901 is for gRPC. + */ +final class Java5018 { + public static void main(final String... args) { + try (MongoClient client = MongoClients.create(MongoClientSettings.builder() + .applyConnectionString( +// new ConnectionString("mongodb://user:pencil@host9.local.10gen.cc:9900/?tls=true") + new ConnectionString("mongodb://user:pencil@host9.local.10gen.cc:9901/?gRPC=true") + ).applyToClusterSettings(builder -> builder + .serverSelectionTimeout(1, TimeUnit.SECONDS) + ).applyToSocketSettings(builder -> builder + .connectTimeout(1, TimeUnit.SECONDS) + ).build())) { + assertTrue(StreamSupport.stream(client.listDatabaseNames().spliterator(), false) + .anyMatch(dbName -> dbName.equals("admin"))); + } + } + + private Java5018() { + } +}