diff --git a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index 7382cde3..f1ecc5fd 100644 --- a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -5,6 +5,7 @@ import com.yandex.ydb.ValueProtos; import com.yandex.ydb.core.Result; import com.yandex.ydb.core.Status; +import com.yandex.ydb.core.StatusCode; import com.yandex.ydb.table.Session; import com.yandex.ydb.table.query.DataQueryResult; import com.yandex.ydb.table.query.Params; @@ -36,12 +37,15 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RepositoryException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.client.ResultSetConverter; import tech.ydb.yoj.repository.ydb.client.YdbConverter; +import tech.ydb.yoj.repository.ydb.client.YdbValidator; +import tech.ydb.yoj.repository.ydb.exception.BadSessionException; import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException; import tech.ydb.yoj.repository.ydb.exception.UnexpectedException; import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; @@ -65,9 +69,6 @@ import static com.google.common.base.Strings.emptyToNull; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults; public class YdbRepositoryTransaction implements BaseDb, RepositoryTransaction, YdbTable.QueryExecutor, TransactionLocal.Holder { @@ -89,6 +90,7 @@ public class YdbRepositoryTransaction protected String txId = null; private String firstNonNullTxId = null; // used for logs private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) { this.repo = repo; @@ -136,8 +138,8 @@ public void rollback() { private void doCommit() { try { Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings())); - validatePkConstraint(status.getIssues()); validate("commit", status.getCode(), status.toString()); + YdbValidator.validatePkConstraint(status.getIssues()); } catch (YdbComponentUnavailableException | YdbOverloadedException e) { throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e); } @@ -162,8 +164,18 @@ private void closeStreams() { } } + private void validate(String request, StatusCode statusCode, String response) { + try { + YdbValidator.validate(request, statusCode, response); + } catch (BadSessionException | OptimisticLockException e) { + transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName()); + isBadSession = true; + throw e; + } + } + private boolean isFinalActionNeeded(String actionName) { - if (session == null) { + if (session == null || isBadSession) { transactionLocal.log().info("No-op %s: no active DB session", actionName); return false; } @@ -285,6 +297,7 @@ private List doExecuteDataQuery(Statement result = YdbOperations.safeJoin(session.executeDataQuery(yql, txControl, sdkParams, settings)); + validate(yql, result.getCode(), result.toString()); result.ok().ifPresent(queryResult -> { txId = emptyToNull(queryResult.getTxId()); @@ -293,8 +306,7 @@ private List doExecuteDataQuery(Statement 1) { @@ -303,7 +315,7 @@ private List doExecuteDataQuery(Statement void bulkUpsert(BulkMapper mapper, List input, BulkParams pa } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not bulk insert into table " + tableName); + throw new UnexpectedException("Could not bulk insert into table " + tableName, e); } }); } @@ -489,7 +501,7 @@ public Stream readTable(ReadTableMapper } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not read table " + tableName); + throw new UnexpectedException("Could not read table " + tableName, e); } } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index 08279485..4807b6fa 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.core.Result; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; import tech.ydb.proto.ValueProtos; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; @@ -37,12 +38,15 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RepositoryException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.client.ResultSetConverter; import tech.ydb.yoj.repository.ydb.client.YdbConverter; +import tech.ydb.yoj.repository.ydb.client.YdbValidator; +import tech.ydb.yoj.repository.ydb.exception.BadSessionException; import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException; import tech.ydb.yoj.repository.ydb.exception.UnexpectedException; import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; @@ -66,9 +70,6 @@ import static com.google.common.base.Strings.emptyToNull; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults; public class YdbRepositoryTransaction implements BaseDb, RepositoryTransaction, YdbTable.QueryExecutor, TransactionLocal.Holder { @@ -90,6 +91,7 @@ public class YdbRepositoryTransaction protected String txId = null; private String firstNonNullTxId = null; // used for logs private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) { this.repo = repo; @@ -137,8 +139,8 @@ public void rollback() { private void doCommit() { try { Status status = YdbOperations.safeJoin(session.commitTransaction(txId, new CommitTxSettings())); - validatePkConstraint(status.getIssues()); validate("commit", status.getCode(), status.toString()); + YdbValidator.validatePkConstraint(status.getIssues()); } catch (YdbComponentUnavailableException | YdbOverloadedException e) { throw new UnavailableException("Unknown transaction state: commit was sent, but result is unknown", e); } @@ -163,8 +165,18 @@ private void closeStreams() { } } + private void validate(String request, StatusCode statusCode, String response) { + try { + YdbValidator.validate(request, statusCode, response); + } catch (BadSessionException | OptimisticLockException e) { + transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName()); + isBadSession = true; + throw e; + } + } + private boolean isFinalActionNeeded(String actionName) { - if (session == null) { + if (session == null || isBadSession) { transactionLocal.log().info("No-op %s: no active DB session", actionName); return false; } @@ -285,6 +297,7 @@ private List doExecuteDataQuery(Statement result = YdbOperations.safeJoin(session.executeDataQuery(yql, txControl, sdkParams, settings)); + validate(yql, result.getStatus().getCode(), result.toString()); if (result.isSuccess()) { txId = emptyToNull(result.getValue().getTxId()); @@ -293,8 +306,7 @@ private List doExecuteDataQuery(Statement 1) { @@ -303,7 +315,7 @@ private List doExecuteDataQuery(Statement void bulkUpsert(BulkMapper mapper, List input, BulkParams pa } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not bulk insert into table " + tableName); + throw new UnexpectedException("Could not bulk insert into table " + tableName, e); } }); } @@ -488,7 +500,7 @@ public Stream readTable(ReadTableMapper } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not read table " + tableName); + throw new UnexpectedException("Could not read table " + tableName, e); } } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java index 15184786..7be6daa1 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java @@ -11,9 +11,8 @@ import java.util.Map; import java.util.function.Supplier; -@SuppressWarnings({"rawtypes", "unchecked"}) public class TransactionLocal { - private final Map singletons = new IdentityHashMap<>(); + private final Map, Object> singletons = new IdentityHashMap<>(); private final Supplier firstLevelCacheSupplier; private final Supplier projectionCacheSupplier; @@ -29,6 +28,7 @@ public static TransactionLocal get() { return BaseDb.current(Holder.class).getTransactionLocal(); } + @SuppressWarnings("unchecked") public X instance(@NonNull Supplier supplier) { return (X) singletons.computeIfAbsent(supplier, Supplier::get); }