Skip to content

Commit

Permalink
Added option cachedTransportsCount to shard JDBC connections (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex268 authored Nov 11, 2024
2 parents 577e6c5 + 5a77a64 commit 0e44158
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 24 deletions.
4 changes: 2 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/YdbDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public YdbConnection connect(String url, Properties info) throws SQLException {
}

YdbConfig config = YdbConfig.from(url, info);
LOGGER.log(Level.FINE, "About to connect to [{0}] using properties {1}", new Object[] {
LOGGER.log(Level.FINE, "Connect to [{0}] using properties {1}", new Object[] {
config.getSafeUrl(),
config.getSafeProps()
});
Expand Down Expand Up @@ -80,7 +80,7 @@ public YdbContext getCachedContext(YdbConfig config) throws SQLException {
// Was fixed in Java 9+
YdbContext context = cache.get(config);
if (context != null) {
LOGGER.log(Level.FINE, "Reusing YDB connection to {0}", config.getSafeUrl());
LOGGER.log(Level.FINEST, "Reusing YDB connection to {0}", config.getSafeUrl());
return context;
}

Expand Down
10 changes: 1 addition & 9 deletions jdbc/src/main/java/tech/ydb/jdbc/context/BaseYdbExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
import java.util.concurrent.atomic.AtomicReference;

import tech.ydb.core.Result;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.core.grpc.GrpcReadStream;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.query.QueryType;
import tech.ydb.jdbc.query.YdbQuery;
Expand Down Expand Up @@ -48,13 +46,7 @@ public BaseYdbExecutor(YdbContext ctx) {
}

protected Session createNewTableSession(YdbValidator validator) throws SQLException {
try {
Result<Session> session = tableClient.createSession(sessionTimeout).join();
validator.addStatusIssues(session.getStatus());
return session.getValue();
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
}
return validator.call("Get session", () -> tableClient.createSession(sessionTimeout));
}

protected void closeCurrentResult() throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.core.UnexpectedResultException;
import tech.ydb.jdbc.YdbConst;
import tech.ydb.jdbc.YdbResultSet;
import tech.ydb.jdbc.YdbStatement;
import tech.ydb.jdbc.YdbTracer;
import tech.ydb.jdbc.exception.ExceptionFactory;
import tech.ydb.jdbc.impl.YdbQueryResult;
import tech.ydb.jdbc.impl.YdbStaticResultSet;
import tech.ydb.jdbc.query.QueryType;
Expand Down Expand Up @@ -70,14 +68,7 @@ public QueryServiceExecutor(YdbContext ctx, int transactionLevel, boolean autoCo
}

protected QuerySession createNewQuerySession(YdbValidator validator) throws SQLException {
try {
Result<QuerySession> result = queryClient.createSession(sessionTimeout).join();
validator.addStatusIssues(result.getStatus());
QuerySession session = result.getValue();
return session;
} catch (UnexpectedResultException ex) {
throw ExceptionFactory.createException("Cannot create session with " + ex.getStatus(), ex);
}
return validator.call("Get query session", () -> queryClient.createSession(sessionTimeout));
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion jdbc/src/main/java/tech/ydb/jdbc/context/YdbContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ public void deregister() {

public static YdbContext createContext(YdbConfig config) throws SQLException {
try {
LOGGER.log(Level.INFO, "Creating new YDB connection to {0}", config.getConnectionString());
LOGGER.log(Level.FINE, "Creating new YDB context to {0}", config.getConnectionString());

YdbConnectionProperties connProps = new YdbConnectionProperties(config);
YdbClientProperties clientProps = new YdbClientProperties(config);
Expand Down
7 changes: 7 additions & 0 deletions jdbc/src/main/java/tech/ydb/jdbc/context/YdbValidator.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
Expand All @@ -20,6 +22,8 @@
* @author Aleksandr Gorshenin
*/
public class YdbValidator {
private static final Logger LOGGER = Logger.getLogger(YdbValidator.class.getName());

private final List<Issue> issues = new ArrayList<>();

public SQLWarning toSQLWarnings() {
Expand Down Expand Up @@ -56,6 +60,7 @@ public void execute(String msg, YdbTracer tracer, Supplier<CompletableFuture<Sta

tracer.trace("<-- " + status.toString());
if (!status.isSuccess()) {
LOGGER.log(Level.FINE, "execute problem {0}", status);
tracer.close();
throw ExceptionFactory.createException("Cannot execute '" + msg + "' with " + status,
new UnexpectedResultException("Unexpected status", status));
Expand All @@ -68,6 +73,7 @@ public <R> R call(String msg, Supplier<CompletableFuture<Result<R>>> fn) throws
addStatusIssues(result.getStatus());
return result.getValue();
} catch (UnexpectedResultException ex) {
LOGGER.log(Level.FINE, "call problem {0}", ex.getStatus());
throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex);
}
}
Expand All @@ -80,6 +86,7 @@ public <R> R call(String msg, YdbTracer tracer, Supplier<CompletableFuture<Resul
return result.getValue();
} catch (UnexpectedResultException ex) {
tracer.close();
LOGGER.log(Level.FINE, "call problem {0}", ex.getStatus());
throw ExceptionFactory.createException("Cannot call '" + msg + "' with " + ex.getStatus(), ex);
}
}
Expand Down
18 changes: 16 additions & 2 deletions jdbc/src/main/java/tech/ydb/jdbc/settings/YdbConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;

import tech.ydb.core.utils.URITools;
import tech.ydb.jdbc.YdbConst;
Expand Down Expand Up @@ -50,6 +51,9 @@ public class YdbConfig {
static final YdbProperty<Boolean> TRANSACTION_TRACER = YdbProperty.bool(
"enableTxTracer", "Enable collecting of transaction execution traces", false
);
static final YdbProperty<Integer> CACHED_TRANSPORT_COUNT = YdbProperty.integer(
"cachedTransportsCount", "Use specified count of YDB transports in context cache", 1
);

private final String url;
private final String username;
Expand All @@ -67,6 +71,7 @@ public class YdbConfig {

private final boolean fullScanDetectorEnabled;
private final boolean txTracerEnabled;
private final int transportIndex;

private YdbConfig(
String url, String safeUrl, String connectionString, String username, String password, Properties props
Expand All @@ -85,6 +90,13 @@ private YdbConfig(

this.fullScanDetectorEnabled = FULLSCAN_DETECTOR_ENABLED.readValue(props).getValue();
this.txTracerEnabled = TRANSACTION_TRACER.readValue(props).getValue();

int transportsCount = CACHED_TRANSPORT_COUNT.readValue(props).getValue();
if (transportsCount > 1) {
this.transportIndex = ThreadLocalRandom.current().nextInt(transportsCount);
} else {
this.transportIndex = 0;
}
}

public Properties getSafeProps() {
Expand Down Expand Up @@ -144,12 +156,14 @@ public boolean equals(Object o) {
return false;
}
YdbConfig that = (YdbConfig) o;
return Objects.equals(url, that.url) && Objects.equals(properties, that.properties);
return Objects.equals(url, that.url)
&& Objects.equals(properties, that.properties)
&& transportIndex == that.transportIndex;
}

@Override
public int hashCode() {
return Objects.hash(url, properties);
return Objects.hash(url, properties, transportIndex);
}

public String getUrl() {
Expand Down
23 changes: 23 additions & 0 deletions jdbc/src/test/java/tech/ydb/jdbc/YdbDriverIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -195,6 +197,27 @@ public void testContextCacheConncurrent() throws SQLException {
}
}

@Test
public void testCachedTransportCount() throws SQLException {
Properties props = new Properties();
props.put("cachedTransportsCount", "3");

List<Connection> list = new ArrayList<>();

Set<YdbContext> contexts = new HashSet<>();
for (int idx = 0; idx < 20; idx++) {
Connection connection = DriverManager.getConnection(jdbcURL.build(), props);
contexts.add(connection.unwrap(YdbConnection.class).getCtx());
list.add(connection);
}

Assertions.assertEquals(3, contexts.size());

for (Connection connection: list) {
connection.close();
}
}

@Test
public void testResizeSessionPool() throws SQLException {
String url = jdbcURL.build();
Expand Down

0 comments on commit 0e44158

Please sign in to comment.