Skip to content

Commit

Permalink
better session handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Dec 26, 2023
1 parent 1e6697f commit 7a608a3
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.yandex.ydb.ValueProtos;
import com.yandex.ydb.core.Result;
import com.yandex.ydb.core.Status;
import com.yandex.ydb.core.StatusCode;
import com.yandex.ydb.table.Session;
import com.yandex.ydb.table.query.DataQueryResult;
import com.yandex.ydb.table.query.Params;
Expand Down Expand Up @@ -36,12 +37,15 @@
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.client.ResultSetConverter;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;
import tech.ydb.yoj.repository.ydb.exception.BadSessionException;
import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException;
import tech.ydb.yoj.repository.ydb.exception.UnexpectedException;
import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException;
Expand All @@ -65,9 +69,6 @@
import static com.google.common.base.Strings.emptyToNull;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults;

public class YdbRepositoryTransaction<REPO extends YdbRepository>
implements BaseDb, RepositoryTransaction, YdbTable.QueryExecutor, TransactionLocal.Holder {
Expand All @@ -89,6 +90,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
protected String txId = null;
private String firstNonNullTxId = null; // used for logs
private String closeAction = null; // used to detect of usage transaction after commit()/rollback()
private boolean isBadSession = false;

public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.repo = repo;
Expand Down Expand Up @@ -136,8 +138,8 @@ public void rollback() {
private void doCommit() {
try {
Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings()));
validatePkConstraint(status.getIssues());
validate("commit", status.getCode(), status.toString());
YdbValidator.validatePkConstraint(status.getIssues());
} catch (YdbComponentUnavailableException | YdbOverloadedException e) {
throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e);
}
Expand All @@ -162,8 +164,18 @@ private void closeStreams() {
}
}

private void validate(String request, StatusCode statusCode, String response) {
try {
YdbValidator.validate(request, statusCode, response);
} catch (BadSessionException | OptimisticLockException e) {
transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName());
isBadSession = true;
throw e;
}
}

private boolean isFinalActionNeeded(String actionName) {
if (session == null) {
if (session == null || isBadSession) {
transactionLocal.log().info("No-op %s: no active DB session", actionName);
return false;
}
Expand Down Expand Up @@ -285,6 +297,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
// settings.setTraceId();

Result<DataQueryResult> result = YdbOperations.safeJoin(session.executeDataQuery(yql, txControl, sdkParams, settings));
validate(yql, result.getCode(), result.toString());

result.ok().ifPresent(queryResult -> {
txId = emptyToNull(queryResult.getTxId());
Expand All @@ -293,8 +306,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
}
});

validatePkConstraint(result.getIssues());
validate(yql, result.getCode(), result.toString());
YdbValidator.validatePkConstraint(result.getIssues());

DataQueryResult queryResult = result.expect("expect result after sql execution");
if (queryResult.getResultSetCount() > 1) {
Expand All @@ -303,7 +315,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
if (queryResult.getResultSetCount() == 0) {
return null;
}
validateTruncatedResults(yql, queryResult);
YdbValidator.validateTruncatedResults(yql, queryResult);

ResultSetReader resultSet = queryResult.getResultSet(0);
return new ResultSetConverter(resultSet).stream(statement::readResult).collect(toList());
Expand Down Expand Up @@ -430,7 +442,7 @@ public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams pa
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not bulk insert into table " + tableName);
throw new UnexpectedException("Could not bulk insert into table " + tableName, e);
}
});
}
Expand Down Expand Up @@ -489,7 +501,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not read table " + tableName);
throw new UnexpectedException("Could not read table " + tableName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.slf4j.LoggerFactory;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
import tech.ydb.core.StatusCode;
import tech.ydb.proto.ValueProtos;
import tech.ydb.table.Session;
import tech.ydb.table.query.DataQueryResult;
Expand Down Expand Up @@ -37,12 +38,15 @@
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
import tech.ydb.yoj.repository.db.exception.RepositoryException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.repository.db.readtable.ReadTableParams;
import tech.ydb.yoj.repository.ydb.bulk.BulkMapper;
import tech.ydb.yoj.repository.ydb.client.ResultSetConverter;
import tech.ydb.yoj.repository.ydb.client.YdbConverter;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;
import tech.ydb.yoj.repository.ydb.exception.BadSessionException;
import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException;
import tech.ydb.yoj.repository.ydb.exception.UnexpectedException;
import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException;
Expand All @@ -66,9 +70,6 @@
import static com.google.common.base.Strings.emptyToNull;
import static java.lang.String.format;
import static java.util.stream.Collectors.toList;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint;
import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults;

public class YdbRepositoryTransaction<REPO extends YdbRepository>
implements BaseDb, RepositoryTransaction, YdbTable.QueryExecutor, TransactionLocal.Holder {
Expand All @@ -90,6 +91,7 @@ public class YdbRepositoryTransaction<REPO extends YdbRepository>
protected String txId = null;
private String firstNonNullTxId = null; // used for logs
private String closeAction = null; // used to detect of usage transaction after commit()/rollback()
private boolean isBadSession = false;

public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) {
this.repo = repo;
Expand Down Expand Up @@ -137,8 +139,8 @@ public void rollback() {
private void doCommit() {
try {
Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings()));
validatePkConstraint(status.getIssues());
validate("commit", status.getCode(), status.toString());
YdbValidator.validatePkConstraint(status.getIssues());
} catch (YdbComponentUnavailableException | YdbOverloadedException e) {
throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e);
}
Expand All @@ -163,8 +165,18 @@ private void closeStreams() {
}
}

private void validate(String request, StatusCode statusCode, String response) {
try {
YdbValidator.validate(request, statusCode, response);
} catch (BadSessionException | OptimisticLockException e) {
transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName());
isBadSession = true;
throw e;
}
}

private boolean isFinalActionNeeded(String actionName) {
if (session == null) {
if (session == null || isBadSession) {
transactionLocal.log().info("No-op %s: no active DB session", actionName);
return false;
}
Expand Down Expand Up @@ -285,6 +297,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
// settings.setTraceId();

Result<DataQueryResult> result = YdbOperations.safeJoin(session.executeDataQuery(yql, txControl, sdkParams, settings));
validate(yql, result.getStatus().getCode(), result.toString());

if (result.isSuccess()) {
txId = emptyToNull(result.getValue().getTxId());
Expand All @@ -293,8 +306,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
}
}

validatePkConstraint(result.getStatus().getIssues());
validate(yql, result.getStatus().getCode(), result.toString());
YdbValidator.validatePkConstraint(result.getStatus().getIssues());

DataQueryResult queryResult = result.getValue();
if (queryResult.getResultSetCount() > 1) {
Expand All @@ -303,7 +315,7 @@ private <PARAMS, RESULT> List<RESULT> doExecuteDataQuery(Statement<PARAMS, RESUL
if (queryResult.getResultSetCount() == 0) {
return null;
}
validateTruncatedResults(yql, queryResult);
YdbValidator.validateTruncatedResults(yql, queryResult);

ResultSetReader resultSet = queryResult.getResultSet(0);
return new ResultSetConverter(resultSet).stream(statement::readResult).collect(toList());
Expand Down Expand Up @@ -429,7 +441,7 @@ public <IN> void bulkUpsert(BulkMapper<IN> mapper, List<IN> input, BulkParams pa
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not bulk insert into table " + tableName);
throw new UnexpectedException("Could not bulk insert into table " + tableName, e);
}
});
}
Expand Down Expand Up @@ -488,7 +500,7 @@ public <PARAMS, RESULT> Stream<RESULT> readTable(ReadTableMapper<PARAMS, RESULT>
} catch (RepositoryException e) {
throw e;
} catch (Exception e) {
throw new UnexpectedException("Could not read table " + tableName);
throw new UnexpectedException("Could not read table " + tableName, e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@
import java.util.Map;
import java.util.function.Supplier;

@SuppressWarnings({"rawtypes", "unchecked"})
public class TransactionLocal {
private final Map<Supplier, Object> singletons = new IdentityHashMap<>();
private final Map<Supplier<?>, Object> singletons = new IdentityHashMap<>();

private final Supplier<FirstLevelCache> firstLevelCacheSupplier;
private final Supplier<ProjectionCache> projectionCacheSupplier;
Expand All @@ -29,6 +28,7 @@ public static TransactionLocal get() {
return BaseDb.current(Holder.class).getTransactionLocal();
}

@SuppressWarnings("unchecked")
public <X> X instance(@NonNull Supplier<X> supplier) {
return (X) singletons.computeIfAbsent(supplier, Supplier::get);
}
Expand Down

0 comments on commit 7a608a3

Please sign in to comment.