diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java index 99bbe4c1..2b394160 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryDataShard.java @@ -95,7 +95,9 @@ public synchronized void rollback(long txId) { } @Nullable - public synchronized T find(long txId, long version, Entity.Id id) { + public synchronized T find(long txId, long version, Entity.Id id, InMemoryTxLockWatcher watcher) { + checkLocks(version, watcher); + InMemoryEntityLine entityLine = entityLines.get(id); if (entityLine == null) { return null; @@ -105,7 +107,11 @@ public synchronized T find(long txId, long version, Entity.Id id) { } @Nullable - public synchronized V find(long txId, long version, Entity.Id id, Class viewType) { + public synchronized V find( + long txId, long version, Entity.Id id, Class viewType, InMemoryTxLockWatcher watcher + ) { + checkLocks(version, watcher); + InMemoryEntityLine entityLine = entityLines.get(id); if (entityLine == null) { return null; @@ -114,7 +120,9 @@ public synchronized V find(long txId, long version, Entit return columns != null ? columns.toSchema(ViewSchema.of(viewType)) : null; } - public synchronized List findAll(long txId, long version) { + public synchronized List findAll(long txId, long version, InMemoryTxLockWatcher watcher) { + checkLocks(version, watcher); + List entities = new ArrayList<>(); for (InMemoryEntityLine entityLine : entityLines.values()) { Columns columns = entityLine.get(txId, version); diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java index 77e908ae..89e15003 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java @@ -11,6 +11,7 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import java.util.ArrayList; import java.util.List; @@ -34,8 +35,10 @@ public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransact private final InMemoryTxLockWatcher watcher; private final InMemoryStorage storage; + private boolean hasWrites = false; private Long version = null; private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public InMemoryRepositoryTransaction(TxOptions options, InMemoryRepository repository) { this.storage = repository.getStorage(); @@ -62,6 +65,9 @@ public final > InMemoryTable.DbMemory getMemory(Class @Override public void commit() { + if (isBadSession) { + throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); + } endTransaction("commit()", this::commitImpl); } @@ -125,6 +131,8 @@ final > void doInWriteTransaction( Runnable query = () -> logTransaction(log, () -> { WriteTxDataShard shard = storage.getWriteTxDataShard(type, txId, getVersion()); consumer.accept(shard); + + hasWrites = true; }); if (options.isImmediateWrites()) { query.run(); @@ -138,8 +146,14 @@ final , R> R doInTransaction( String action, Class type, Function, R> func ) { return logTransaction(action, () -> { - ReadOnlyTxDataShard shard = storage.getReadOnlyTxDataShard(type, txId, getVersion()); - return func.apply(shard); + InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS; + ReadOnlyTxDataShard shard = storage.getReadOnlyTxDataShard(type, txId, getVersion(), findWatcher); + try { + return func.apply(shard); + } catch (OptimisticLockException e) { + isBadSession = true; + throw e; + } }); } diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java index 94e02d2a..353ce90c 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryStorage.java @@ -65,22 +65,24 @@ public synchronized > WriteTxDataShard getWriteTxDataShar Class type, long txId, long version ) { uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(type); - return getTxDataShard(type, txId, version); + return getTxDataShard(type, txId, version, InMemoryTxLockWatcher.NO_LOCKS); } public synchronized > ReadOnlyTxDataShard getReadOnlyTxDataShard( - Class type, long txId, long version + Class type, long txId, long version, InMemoryTxLockWatcher watcher ) { - return getTxDataShard(type, txId, version); + return getTxDataShard(type, txId, version, watcher); } - private > TxDataShardImpl getTxDataShard(Class type, long txId, long version) { + private > TxDataShardImpl getTxDataShard( + Class type, long txId, long version, InMemoryTxLockWatcher watcher + ) { @SuppressWarnings("unchecked") InMemoryDataShard shard = (InMemoryDataShard) shards.get(type); if (shard == null) { throw new InMemoryRepositoryException("Table is not created: " + type.getSimpleName()); } - return new TxDataShardImpl<>(shard, txId, version); + return new TxDataShardImpl<>(shard, txId, version, watcher); } public synchronized void dropDb() { diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java index da266895..6d7d7b87 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTxLockWatcher.java @@ -1,5 +1,7 @@ package tech.ydb.yoj.repository.test.inmemory; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.EntitySchema; import tech.ydb.yoj.repository.db.Range; @@ -11,9 +13,16 @@ import java.util.Map; import java.util.Set; +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) public final class InMemoryTxLockWatcher { - private final Map, Set>> readRows = new HashMap<>(); - private final Map, List>> readRanges = new HashMap<>(); + public static final InMemoryTxLockWatcher NO_LOCKS = new InMemoryTxLockWatcher(Map.of(), Map.of()); + + private final Map, Set>> readRows; + private final Map, List>> readRanges; + + public InMemoryTxLockWatcher() { + this(new HashMap<>(), new HashMap<>()); + } public > void markRowRead(Class type, Entity.Id id) { readRows.computeIfAbsent(type, __ -> new HashSet<>()).add(id); diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java index 6aa0110a..ba58b673 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/TxDataShardImpl.java @@ -1,37 +1,34 @@ package tech.ydb.yoj.repository.test.inmemory; +import lombok.RequiredArgsConstructor; import tech.ydb.yoj.repository.db.Entity; import tech.ydb.yoj.repository.db.Table; import javax.annotation.Nullable; import java.util.List; +@RequiredArgsConstructor final class TxDataShardImpl> implements ReadOnlyTxDataShard, WriteTxDataShard { private final InMemoryDataShard shard; private final long txId; private final long version; - - public TxDataShardImpl(InMemoryDataShard shard, long txId, long version) { - this.shard = shard; - this.txId = txId; - this.version = version; - } + private final InMemoryTxLockWatcher watcher; @Nullable @Override public T find(Entity.Id id) { - return shard.find(txId, version, id); + return shard.find(txId, version, id, watcher); } @Nullable @Override public V find(Entity.Id id, Class viewType) { - return shard.find(txId, version, id, viewType); + return shard.find(txId, version, id, viewType, watcher); } @Override public List findAll() { - return shard.findAll(txId, version); + return shard.findAll(txId, version, watcher); } @Override diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index 341e3bb4..103bc8aa 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -495,6 +495,88 @@ public void readTableViews() { .isThrownBy(() -> db.tx(() -> db.typeFreaks().readTableIds(ReadTableParams.getDefault()).count())); } + @Test + public void doNotCommitAfterTLI() { + Project.Id id1 = new Project.Id("id1"); + Project.Id id2 = new Project.Id("id2"); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).find(id2); + + db.tx(() -> db.projects().save(new Project(id2, "name2"))); + + tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(() -> tx.table(Project.class).find(id2)); + + assertThatExceptionOfType(IllegalStateException.class) + .isThrownBy(tx::commit); + + tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute + } + + @Test + public void writeDontProduceTLI() { + Project.Id id = new Project.Id("id"); + + db.tx(() -> db.projects().save(new Project(id, "name"))); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).find(id); + + db.tx(() -> { + db.projects().find(id); + db.projects().save(new Project(id, "name2")); + }); + + // write don't produce TLI + tx.table(Project.class).save(new Project(id, "name3")); + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(tx::commit); + } + + @Test + public void consistencyCheckAllColumnsOnFind() { + Project.Id id1 = new Project.Id("id1"); + Project.Id id2 = new Project.Id("id2"); + + db.tx(() -> { + db.projects().save(new Project(id1, "name")); + db.projects().save(new Project(id2, "name")); + }); + + RepositoryTransaction tx = repository.startTransaction( + TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE) + .withImmediateWrites(true) + .withFirstLevelCache(false) + ); + + tx.table(Project.class).save(new Project(new Project.Id("id3"), "name")); // make tx available for TLI + + tx.table(Project.class).find(id1); + tx.table(Project.class).find(id2); + + db.tx(() -> { + db.projects().find(id2); + db.projects().save(new Project(id2, "name2")); + }); + + assertThatExceptionOfType(OptimisticLockException.class) + .isThrownBy(() -> tx.table(Project.class).find(id1)); + } + @Test public void streamAllWithPartitioning() { db.tx(() -> { diff --git a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index 7382cde3..475d5dfb 100644 --- a/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v1/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -5,6 +5,7 @@ import com.yandex.ydb.ValueProtos; import com.yandex.ydb.core.Result; import com.yandex.ydb.core.Status; +import com.yandex.ydb.core.StatusCode; import com.yandex.ydb.table.Session; import com.yandex.ydb.table.query.DataQueryResult; import com.yandex.ydb.table.query.Params; @@ -36,12 +37,15 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RepositoryException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.client.ResultSetConverter; import tech.ydb.yoj.repository.ydb.client.YdbConverter; +import tech.ydb.yoj.repository.ydb.client.YdbValidator; +import tech.ydb.yoj.repository.ydb.exception.BadSessionException; import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException; import tech.ydb.yoj.repository.ydb.exception.UnexpectedException; import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; @@ -65,7 +69,6 @@ import static com.google.common.base.Strings.emptyToNull; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint; import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults; @@ -89,6 +92,7 @@ public class YdbRepositoryTransaction protected String txId = null; private String firstNonNullTxId = null; // used for logs private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) { this.repo = repo; @@ -110,6 +114,9 @@ public > Table table(Class c) { @Override public void commit() { + if (isBadSession) { + throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); + } try { flushPendingWrites(); } catch (Throwable t) { @@ -162,8 +169,18 @@ private void closeStreams() { } } + private void validate(String request, StatusCode statusCode, String response) { + try { + YdbValidator.validate(request, statusCode, response); + } catch (BadSessionException | OptimisticLockException e) { + transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName()); + isBadSession = true; + throw e; + } + } + private boolean isFinalActionNeeded(String actionName) { - if (session == null) { + if (session == null || isBadSession) { transactionLocal.log().info("No-op %s: no active DB session", actionName); return false; } @@ -430,7 +447,7 @@ public void bulkUpsert(BulkMapper mapper, List input, BulkParams pa } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not bulk insert into table " + tableName); + throw new UnexpectedException("Could not bulk insert into table " + tableName, e); } }); } @@ -489,7 +506,7 @@ public Stream readTable(ReadTableMapper } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not read table " + tableName); + throw new UnexpectedException("Could not read table " + tableName, e); } } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index 08279485..0ca7d250 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -10,6 +10,7 @@ import org.slf4j.LoggerFactory; import tech.ydb.core.Result; import tech.ydb.core.Status; +import tech.ydb.core.StatusCode; import tech.ydb.proto.ValueProtos; import tech.ydb.table.Session; import tech.ydb.table.query.DataQueryResult; @@ -37,12 +38,15 @@ import tech.ydb.yoj.repository.db.cache.TransactionLocal; import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException; import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException; +import tech.ydb.yoj.repository.db.exception.OptimisticLockException; import tech.ydb.yoj.repository.db.exception.RepositoryException; import tech.ydb.yoj.repository.db.exception.UnavailableException; import tech.ydb.yoj.repository.db.readtable.ReadTableParams; import tech.ydb.yoj.repository.ydb.bulk.BulkMapper; import tech.ydb.yoj.repository.ydb.client.ResultSetConverter; import tech.ydb.yoj.repository.ydb.client.YdbConverter; +import tech.ydb.yoj.repository.ydb.client.YdbValidator; +import tech.ydb.yoj.repository.ydb.exception.BadSessionException; import tech.ydb.yoj.repository.ydb.exception.ResultTruncatedException; import tech.ydb.yoj.repository.ydb.exception.UnexpectedException; import tech.ydb.yoj.repository.ydb.exception.YdbComponentUnavailableException; @@ -66,7 +70,6 @@ import static com.google.common.base.Strings.emptyToNull; import static java.lang.String.format; import static java.util.stream.Collectors.toList; -import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validate; import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validatePkConstraint; import static tech.ydb.yoj.repository.ydb.client.YdbValidator.validateTruncatedResults; @@ -90,6 +93,7 @@ public class YdbRepositoryTransaction protected String txId = null; private String firstNonNullTxId = null; // used for logs private String closeAction = null; // used to detect of usage transaction after commit()/rollback() + private boolean isBadSession = false; public YdbRepositoryTransaction(REPO repo, @NonNull TxOptions options) { this.repo = repo; @@ -111,6 +115,9 @@ public > Table table(Class c) { @Override public void commit() { + if (isBadSession) { + throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); + } try { flushPendingWrites(); } catch (Throwable t) { @@ -163,8 +170,18 @@ private void closeStreams() { } } + private void validate(String request, StatusCode statusCode, String response) { + try { + YdbValidator.validate(request, statusCode, response); + } catch (BadSessionException | OptimisticLockException e) { + transactionLocal.log().info("Request got %s: DB tx was invalidated", e.getClass().getSimpleName()); + isBadSession = true; + throw e; + } + } + private boolean isFinalActionNeeded(String actionName) { - if (session == null) { + if (session == null || isBadSession) { transactionLocal.log().info("No-op %s: no active DB session", actionName); return false; } @@ -429,7 +446,7 @@ public void bulkUpsert(BulkMapper mapper, List input, BulkParams pa } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not bulk insert into table " + tableName); + throw new UnexpectedException("Could not bulk insert into table " + tableName, e); } }); } @@ -488,7 +505,7 @@ public Stream readTable(ReadTableMapper } catch (RepositoryException e) { throw e; } catch (Exception e) { - throw new UnexpectedException("Could not read table " + tableName); + throw new UnexpectedException("Could not read table " + tableName, e); } } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java index 15184786..7be6daa1 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java @@ -11,9 +11,8 @@ import java.util.Map; import java.util.function.Supplier; -@SuppressWarnings({"rawtypes", "unchecked"}) public class TransactionLocal { - private final Map singletons = new IdentityHashMap<>(); + private final Map, Object> singletons = new IdentityHashMap<>(); private final Supplier firstLevelCacheSupplier; private final Supplier projectionCacheSupplier; @@ -29,6 +28,7 @@ public static TransactionLocal get() { return BaseDb.current(Holder.class).getTransactionLocal(); } + @SuppressWarnings("unchecked") public X instance(@NonNull Supplier supplier) { return (X) singletons.computeIfAbsent(supplier, Supplier::get); }