Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC POC phase 1 #1154

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions driver-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ dependencies {
api "io.netty:netty-transport", optional
api "io.netty:netty-handler", optional

implementation 'io.grpc:grpc-netty:1.56.0', optional
// required by Java gRPC for Java SE 9+
compileOnly 'org.apache.tomcat:annotations-api:6.0.53', optional

// Optionally depend on both AWS SDK v2 and v1. The driver will use v2 is present, v1 if present, or built-in functionality if
// neither are present
implementation "software.amazon.awssdk:auth:$awsSdkV2Version", optional
Expand Down
65 changes: 58 additions & 7 deletions driver-core/src/main/com/mongodb/ConnectionString.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@
* <li>{@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
* cluster.</li>
* </ul>
* <p>Replica set configuration:</p>
* <p>Replica set configuration (must not be specified when {@code gRPC=true}):</p>
* <ul>
* <li>{@code replicaSet=name}: Implies that the hosts given are a seed list, and the driver will attempt to find
* all members of the set.</li>
Expand All @@ -129,9 +129,13 @@
* sslInvalidHostNameAllowed option</li>
* <li>{@code connectTimeoutMS=ms}: How long a connection can take to be opened before timing out.</li>
* <li>{@code socketTimeoutMS=ms}: How long a receive on a socket can take before timing out.
* This option is the same as {@link SocketSettings#getReadTimeout(TimeUnit)}.</li>
* <li>{@code maxIdleTimeMS=ms}: Maximum idle time of a pooled connection. A connection that exceeds this limit will be closed</li>
* <li>{@code maxLifeTimeMS=ms}: Maximum life time of a pooled connection. A connection that exceeds this limit will be closed</li>
* This option is the same as {@link SocketSettings#getReadTimeout(TimeUnit)}.
* Must not be specified when {@code gRPC=true}.</li>
* <li>{@code maxIdleTimeMS=ms}: Maximum idle time of a pooled connection. A connection that exceeds this limit will be closed.
* Must not be specified when {@code gRPC=true}</li>
* <li>{@code maxLifeTimeMS=ms}: Maximum life time of a pooled connection. A connection that exceeds this limit will be closed.
* Must not be specified when {@code gRPC=true}</li>
* <li>{@code gRPC=true|false}: Whether to connect using gRPC.</li>
* </ul>
* <p>Proxy Configuration:</p>
* <ul>
Expand All @@ -142,7 +146,7 @@
* <li>{@code proxyUsername=string}: Username for authenticating with the proxy server. Required if proxyPassword is specified.</li>
* <li>{@code proxyPassword=string}: Password for authenticating with the proxy server. Required if proxyUsername is specified.</li>
* </ul>
* <p>Connection pool configuration:</p>
* <p>Connection pool configuration (must not be specified when {@code gRPC=true}):</p>
* <ul>
* <li>{@code maxPoolSize=n}: The maximum number of connections in the connection pool.</li>
* <li>{@code minPoolSize=n}: The minimum number of connections in the connection pool.</li>
Expand Down Expand Up @@ -178,6 +182,7 @@
* <ul>
* <li>The driver adds { wtimeout : ms } to all write commands. Implies {@code safe=true}.</li>
* <li>Used in combination with {@code w}</li>
* <li>Must not be specified when {@code gRPC=true}.</li>
* </ul>
* </li>
* </ul>
Expand Down Expand Up @@ -310,6 +315,7 @@ public class ConnectionString {
private String applicationName;
private List<MongoCompressor> compressorList;
private UuidRepresentation uuidRepresentation;
private Boolean grpc;

/**
* Creates a ConnectionString from the given string.
Expand Down Expand Up @@ -480,6 +486,32 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
if (requiredReplicaSetName != null && srvMaxHosts != null && srvMaxHosts > 0) {
throw new IllegalArgumentException("srvMaxHosts can not be specified with replica set name");
}
if (grpc != null && grpc) {
if (requiredReplicaSetName != null) {
throw new IllegalArgumentException("replicaSet can not be specified with gRPC=true");
}
if (writeConcern != null && writeConcern.getWTimeout(TimeUnit.MILLISECONDS) != null) {
throw new IllegalArgumentException("wTimeoutMS can not be specified with gRPC=true");
}
if (maxConnectionPoolSize != null) {
throw new IllegalArgumentException("maxPoolSize can not be specified with gRPC=true");
}
if (minConnectionPoolSize != null) {
throw new IllegalArgumentException("minPoolSize can not be specified with gRPC=true");
}
if (maxWaitTime != null) {
throw new IllegalArgumentException("waitQueueTimeoutMS can not be specified with gRPC=true");
}
if (maxConnectionLifeTime != null) {
throw new IllegalArgumentException("maxLifeTimeMS can not be specified with gRPC=true");
}
if (maxConnectionIdleTime != null) {
throw new IllegalArgumentException("maxIdleTimeMS can not be specified with gRPC=true");
}
if (maxConnecting != null) {
throw new IllegalArgumentException("maxConnecting can not be specified with gRPC=true");
}
}

validateProxyParameters();

Expand Down Expand Up @@ -542,6 +574,8 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("srvmaxhosts");
GENERAL_OPTIONS_KEYS.add("srvservicename");

GENERAL_OPTIONS_KEYS.add("grpc");

COMPRESSOR_KEYS.add("compressors");
COMPRESSOR_KEYS.add("zlibcompressionlevel");

Expand Down Expand Up @@ -692,6 +726,9 @@ private void translateOptions(final Map<String, List<String>> optionsMap) {
case "srvservicename":
srvServiceName = value;
break;
case "grpc":
grpc = parseBoolean(value, "grpc");
break;
default:
break;
}
Expand Down Expand Up @@ -1672,6 +1709,19 @@ public UuidRepresentation getUuidRepresentation() {
return uuidRepresentation;
}

/**
* Gets whether gRPC is enabled.
*
* @return {@code true} if gRPC is enabled, {@code false} if not, {@code null} if the option is not specified.
* @see MongoClientSettings#isGrpc()
* @since VAKOTODO
* @mongodb.server.release VAKOTODO
*/
@Nullable
public Boolean isGrpc() {
return grpc;
}

@Override
public String toString() {
return connectionString;
Expand Down Expand Up @@ -1719,7 +1769,8 @@ public boolean equals(final Object o) {
&& Objects.equals(compressorList, that.compressorList)
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
&& Objects.equals(srvServiceName, that.srvServiceName)
&& Objects.equals(srvMaxHosts, that.srvMaxHosts);
&& Objects.equals(srvMaxHosts, that.srvMaxHosts)
&& Objects.equals(grpc, that.grpc);
}

@Override
Expand All @@ -1729,6 +1780,6 @@ public int hashCode() {
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
proxyUsername, proxyPassword);
proxyUsername, proxyPassword, grpc);
}
}
105 changes: 101 additions & 4 deletions driver-core/src/main/com/mongodb/MongoClientSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.mongodb.connection.SslSettings;
import com.mongodb.connection.StreamFactoryFactory;
import com.mongodb.connection.TransportSettings;
import com.mongodb.connection.grpc.GrpcStreamFactoryFactory;
import com.mongodb.event.CommandListener;
import com.mongodb.internal.connection.grpc.SharingGrpcStreamFactoryFactory;
import com.mongodb.lang.Nullable;
import com.mongodb.spi.dns.DnsClient;
import com.mongodb.spi.dns.InetAddressResolver;
Expand All @@ -50,7 +52,9 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static java.util.Arrays.asList;
Expand Down Expand Up @@ -92,6 +96,9 @@ public final class MongoClientSettings {
private final ReadConcern readConcern;
private final MongoCredential credential;
private final TransportSettings transportSettings;
/**
* {@linkplain StreamFactoryFactoryWrapping#assertRequiresNoFurtherWrapping(StreamFactoryFactory) Requires no further wrapping}.
*/
private final StreamFactoryFactory streamFactoryFactory;
private final List<CommandListener> commandListeners;
private final CodecRegistry codecRegistry;
Expand Down Expand Up @@ -213,6 +220,9 @@ public static final class Builder {
private ReadConcern readConcern = ReadConcern.DEFAULT;
private CodecRegistry codecRegistry = MongoClientSettings.getDefaultCodecRegistry();
private TransportSettings transportSettings;
/**
* {@linkplain StreamFactoryFactoryWrapping#wrap(StreamFactoryFactory, ConnectionPoolSettings) Requires wrapping}.
*/
private StreamFactoryFactory streamFactoryFactory;
private List<CommandListener> commandListeners = new ArrayList<>();

Expand Down Expand Up @@ -257,7 +267,7 @@ private Builder(final MongoClientSettings settings) {
dnsClient = settings.getDnsClient();
inetAddressResolver = settings.getInetAddressResolver();
transportSettings = settings.getTransportSettings();
streamFactoryFactory = settings.getStreamFactoryFactory();
streamFactoryFactory = StreamFactoryFactoryWrapping.unwrap(settings.getStreamFactoryFactory());
autoEncryptionSettings = settings.getAutoEncryptionSettings();
contextProvider = settings.getContextProvider();
loggerSettingsBuilder.applySettings(settings.getLoggerSettings());
Expand Down Expand Up @@ -318,6 +328,18 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (connectionString.getWriteConcern() != null) {
writeConcern = connectionString.getWriteConcern();
}
Boolean grpc = connectionString.isGrpc();
if (grpc != null) {
if (streamFactoryFactory == null && grpc) {
streamFactoryFactory = GrpcStreamFactoryFactory.builder().build();
} else if (grpc(streamFactoryFactory) && !grpc) {
throw new IllegalArgumentException(
streamFactoryFactory.getClass().getSimpleName() + " can not be specified with gRPC=false");
} else if (!grpc(streamFactoryFactory) && grpc) {
throw new IllegalArgumentException(
streamFactoryFactory.getClass().getSimpleName() + " can not be specified with gRPC=true");
}
}
return this;
}

Expand Down Expand Up @@ -360,6 +382,8 @@ public Builder applyToSocketSettings(final Block<SocketSettings.Builder> block)

/**
* Applies the {@link ConnectionPoolSettings.Builder} block and then sets the connectionPoolSettings.
* <p>
* Must not be changed if {@linkplain #isGrpc() gRPC is enabled}.</p>
*
* @param block the block to apply to the ConnectionPoolSettings.
* @return this
Expand Down Expand Up @@ -409,7 +433,9 @@ public Builder readPreference(final ReadPreference readPreference) {
/**
* Sets the write concern.
*
* @param writeConcern the write concern
* @param writeConcern the write concern.
* <p>
* {@link WriteConcern#withWTimeout(long, TimeUnit) wtimeout} must not be specified if {@linkplain #isGrpc() gRPC is enabled}.</p>
* @return this
* @see MongoClientSettings#getWriteConcern()
*/
Expand Down Expand Up @@ -499,7 +525,7 @@ public Builder codecRegistry(final CodecRegistry codecRegistry) {
*/
@Deprecated
public Builder streamFactoryFactory(final StreamFactoryFactory streamFactoryFactory) {
this.streamFactoryFactory = notNull("streamFactoryFactory", streamFactoryFactory);
this.streamFactoryFactory = StreamFactoryFactoryWrapping.unwrap(notNull("streamFactoryFactory", streamFactoryFactory));
return this;
}

Expand Down Expand Up @@ -799,9 +825,24 @@ public CodecRegistry getCodecRegistry() {
@Deprecated
@Nullable
public StreamFactoryFactory getStreamFactoryFactory() {
StreamFactoryFactoryWrapping.assertRequiresNoFurtherWrapping(streamFactoryFactory);
return streamFactoryFactory;
}

/**
* Gets whether gRPC is enabled.
*
* @return {@code true} if gRPC is enabled, {@code false} if not.
* @see ConnectionString#isGrpc()
* @see MongoClientSettings.Builder#streamFactoryFactory(StreamFactoryFactory)
* @see GrpcStreamFactoryFactory
* @since VAKOTODO
* @mongodb.server.release VAKOTODO
*/
public boolean isGrpc() {
return grpc(streamFactoryFactory);
}

/**
* Gets the settings for the underlying transport implementation
*
Expand Down Expand Up @@ -1077,6 +1118,10 @@ public String toString() {
+ '}';
}

private static boolean grpc(@Nullable final StreamFactoryFactory streamFactoryFactory) {
return streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory || streamFactoryFactory instanceof GrpcStreamFactoryFactory;
}

private MongoClientSettings(final Builder builder) {
readPreference = builder.readPreference;
writeConcern = builder.writeConcern;
Expand All @@ -1085,7 +1130,6 @@ private MongoClientSettings(final Builder builder) {
readConcern = builder.readConcern;
credential = builder.credential;
transportSettings = builder.transportSettings;
streamFactoryFactory = builder.streamFactoryFactory;
codecRegistry = builder.codecRegistry;
commandListeners = builder.commandListeners;
applicationName = builder.applicationName;
Expand Down Expand Up @@ -1113,5 +1157,58 @@ private MongoClientSettings(final Builder builder) {
heartbeatSocketTimeoutSetExplicitly = builder.heartbeatSocketTimeoutMS != 0;
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
contextProvider = builder.contextProvider;
StreamFactoryFactory builderStreamFactoryFactory = builder.streamFactoryFactory;
if (grpc(builderStreamFactoryFactory)) {
if (clusterSettings.getRequiredReplicaSetName() != null) {
throw new IllegalArgumentException(
"requiredReplicaSetName can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName());
}
if (writeConcern.getWTimeout(MILLISECONDS) != null) {
throw new IllegalArgumentException(
"Write concern wtimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName());
}
if (socketSettings.getReadTimeout(MILLISECONDS) != SocketSettings.builder().build().getReadTimeout(MILLISECONDS)) {
throw new IllegalArgumentException(
"Socket readTimeout can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName());
}
if (!ConnectionPoolSettings.builder().build().equals(connectionPoolSettings)) {
throw new IllegalArgumentException(
"connectionPoolSettings can not be specified with " + GrpcStreamFactoryFactory.class.getSimpleName());
}
}
streamFactoryFactory = StreamFactoryFactoryWrapping.wrap(builderStreamFactoryFactory, connectionPoolSettings);
}

private static final class StreamFactoryFactoryWrapping {
@Nullable
static StreamFactoryFactory wrap(
@Nullable final StreamFactoryFactory streamFactoryFactory,
final ConnectionPoolSettings connectionPoolSettings) {
if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) {
return new SharingGrpcStreamFactoryFactory((GrpcStreamFactoryFactory) streamFactoryFactory, connectionPoolSettings);
}
return streamFactoryFactory;
}

@Nullable
static StreamFactoryFactory unwrap(@Nullable final StreamFactoryFactory streamFactoryFactory) {
if (streamFactoryFactory instanceof SharingGrpcStreamFactoryFactory) {
return ((SharingGrpcStreamFactoryFactory) streamFactoryFactory).unwrap();
}
return streamFactoryFactory;
}

static void assertRequiresNoFurtherWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory) throws AssertionError {
assertFalse(requiresWrapping(streamFactoryFactory, false));
}

private static boolean requiresWrapping(@Nullable final StreamFactoryFactory streamFactoryFactory, final boolean ifNull) {
if (streamFactoryFactory instanceof GrpcStreamFactoryFactory) {
return true;
} else if (streamFactoryFactory == null) {
return ifNull;
}
return false;
}
}
}
2 changes: 1 addition & 1 deletion driver-core/src/main/com/mongodb/MongoException.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public MongoException(@Nullable final String msg, @Nullable final Throwable t) {
* @param msg the message
* @param t the throwable cause
*/
public MongoException(final int code, final String msg, final Throwable t) {
public MongoException(final int code, final String msg, @Nullable final Throwable t) {
super(msg, t);
this.code = code;
if (t instanceof MongoException) {
Expand Down
4 changes: 3 additions & 1 deletion driver-core/src/main/com/mongodb/MongoSocketException.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb;

import com.mongodb.lang.Nullable;

/**
* Subclass of {@link MongoException} representing a network-related exception
*
Expand All @@ -33,7 +35,7 @@ public class MongoSocketException extends MongoException {
* @param msg the message
* @param e the cause
*/
public MongoSocketException(final String msg, final ServerAddress serverAddress, final Throwable e) {
public MongoSocketException(final String msg, final ServerAddress serverAddress, @Nullable final Throwable e) {
super(-2, msg, e);
this.serverAddress = serverAddress;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.mongodb;

import com.mongodb.lang.Nullable;

/**
* This exception is thrown when there is a timeout reading a response from the socket.
*
Expand All @@ -32,7 +34,7 @@ public class MongoSocketReadTimeoutException extends MongoSocketException {
* @param address the address
* @param cause the cause
*/
public MongoSocketReadTimeoutException(final String message, final ServerAddress address, final Throwable cause) {
public MongoSocketReadTimeoutException(final String message, final ServerAddress address, @Nullable final Throwable cause) {
super(message, address, cause);
}

Expand Down
Loading