Skip to content

Commit

Permalink
better-session-handling: check locks on find in InMemory + more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Lavrukov committed Dec 29, 2023
1 parent 36894cc commit 31406f1
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ public synchronized void rollback(long txId) {
}

@Nullable
public synchronized T find(long txId, long version, Entity.Id<T> id) {
public synchronized T find(long txId, long version, Entity.Id<T> id, InMemoryTxLockWatcher watcher) {
checkLocks(version, watcher);

InMemoryEntityLine entityLine = entityLines.get(id);
if (entityLine == null) {
return null;
Expand All @@ -105,7 +107,11 @@ public synchronized T find(long txId, long version, Entity.Id<T> id) {
}

@Nullable
public synchronized <V extends Table.View> V find(long txId, long version, Entity.Id<T> id, Class<V> viewType) {
public synchronized <V extends Table.View> V find(
long txId, long version, Entity.Id<T> id, Class<V> viewType, InMemoryTxLockWatcher watcher
) {
checkLocks(version, watcher);

InMemoryEntityLine entityLine = entityLines.get(id);
if (entityLine == null) {
return null;
Expand All @@ -114,7 +120,9 @@ public synchronized <V extends Table.View> V find(long txId, long version, Entit
return columns != null ? columns.toSchema(ViewSchema.of(viewType)) : null;
}

public synchronized List<T> findAll(long txId, long version) {
public synchronized List<T> findAll(long txId, long version, InMemoryTxLockWatcher watcher) {
checkLocks(version, watcher);

List<T> entities = new ArrayList<>();
for (InMemoryEntityLine entityLine : entityLines.values()) {
Columns columns = entityLine.get(txId, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import tech.ydb.yoj.repository.db.cache.TransactionLocal;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionIsolationLevelException;
import tech.ydb.yoj.repository.db.exception.IllegalTransactionScanException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -34,8 +35,10 @@ public class InMemoryRepositoryTransaction implements BaseDb, RepositoryTransact
private final InMemoryTxLockWatcher watcher;
private final InMemoryStorage storage;

private boolean hasWrites = false;
private Long version = null;
private String closeAction = null; // used to detect of usage transaction after commit()/rollback()
private boolean isBadSession = false;

public InMemoryRepositoryTransaction(TxOptions options, InMemoryRepository repository) {
this.storage = repository.getStorage();
Expand All @@ -62,6 +65,9 @@ public final <T extends Entity<T>> InMemoryTable.DbMemory<T> getMemory(Class<T>

@Override
public void commit() {
if (isBadSession) {
throw new IllegalStateException("Transaction was invalidated. Commit isn't possible");
}
endTransaction("commit()", this::commitImpl);
}

Expand Down Expand Up @@ -125,6 +131,8 @@ final <T extends Entity<T>> void doInWriteTransaction(
Runnable query = () -> logTransaction(log, () -> {
WriteTxDataShard<T> shard = storage.getWriteTxDataShard(type, txId, getVersion());
consumer.accept(shard);

hasWrites = true;
});
if (options.isImmediateWrites()) {
query.run();
Expand All @@ -138,8 +146,14 @@ final <T extends Entity<T>, R> R doInTransaction(
String action, Class<T> type, Function<ReadOnlyTxDataShard<T>, R> func
) {
return logTransaction(action, () -> {
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion());
return func.apply(shard);
InMemoryTxLockWatcher findWatcher = hasWrites ? watcher : InMemoryTxLockWatcher.NO_LOCKS;
ReadOnlyTxDataShard<T> shard = storage.getReadOnlyTxDataShard(type, txId, getVersion(), findWatcher);
try {
return func.apply(shard);
} catch (OptimisticLockException e) {
isBadSession = true;
throw e;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,24 @@ public synchronized <T extends Entity<T>> WriteTxDataShard<T> getWriteTxDataShar
Class<T> type, long txId, long version
) {
uncommited.computeIfAbsent(txId, __ -> new HashSet<>()).add(type);
return getTxDataShard(type, txId, version);
return getTxDataShard(type, txId, version, InMemoryTxLockWatcher.NO_LOCKS);
}

public synchronized <T extends Entity<T>> ReadOnlyTxDataShard<T> getReadOnlyTxDataShard(
Class<T> type, long txId, long version
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
) {
return getTxDataShard(type, txId, version);
return getTxDataShard(type, txId, version, watcher);
}

private <T extends Entity<T>> TxDataShardImpl<T> getTxDataShard(Class<T> type, long txId, long version) {
private <T extends Entity<T>> TxDataShardImpl<T> getTxDataShard(
Class<T> type, long txId, long version, InMemoryTxLockWatcher watcher
) {
@SuppressWarnings("unchecked")
InMemoryDataShard<T> shard = (InMemoryDataShard<T>) shards.get(type);
if (shard == null) {
throw new InMemoryRepositoryException("Table is not created: " + type.getSimpleName());
}
return new TxDataShardImpl<>(shard, txId, version);
return new TxDataShardImpl<>(shard, txId, version, watcher);
}

public synchronized void dropDb() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package tech.ydb.yoj.repository.test.inmemory;

import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.Range;
Expand All @@ -11,9 +13,16 @@
import java.util.Map;
import java.util.Set;

@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public final class InMemoryTxLockWatcher {
private final Map<Class<?>, Set<Entity.Id<?>>> readRows = new HashMap<>();
private final Map<Class<?>, List<Range<?>>> readRanges = new HashMap<>();
public static final InMemoryTxLockWatcher NO_LOCKS = new InMemoryTxLockWatcher(Map.of(), Map.of());

private final Map<Class<?>, Set<Entity.Id<?>>> readRows;
private final Map<Class<?>, List<Range<?>>> readRanges;

public InMemoryTxLockWatcher() {
this(new HashMap<>(), new HashMap<>());
}

public <T extends Entity<T>> void markRowRead(Class<T> type, Entity.Id<T> id) {
readRows.computeIfAbsent(type, __ -> new HashSet<>()).add(id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,37 +1,34 @@
package tech.ydb.yoj.repository.test.inmemory;

import lombok.RequiredArgsConstructor;
import tech.ydb.yoj.repository.db.Entity;
import tech.ydb.yoj.repository.db.Table;

import javax.annotation.Nullable;
import java.util.List;

@RequiredArgsConstructor
final class TxDataShardImpl<T extends Entity<T>> implements ReadOnlyTxDataShard<T>, WriteTxDataShard<T> {
private final InMemoryDataShard<T> shard;
private final long txId;
private final long version;

public TxDataShardImpl(InMemoryDataShard<T> shard, long txId, long version) {
this.shard = shard;
this.txId = txId;
this.version = version;
}
private final InMemoryTxLockWatcher watcher;

@Nullable
@Override
public T find(Entity.Id<T> id) {
return shard.find(txId, version, id);
return shard.find(txId, version, id, watcher);
}

@Nullable
@Override
public <V extends Table.View> V find(Entity.Id<T> id, Class<V> viewType) {
return shard.find(txId, version, id, viewType);
return shard.find(txId, version, id, viewType, watcher);
}

@Override
public List<T> findAll() {
return shard.findAll(txId, version);
return shard.findAll(txId, version, watcher);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,88 @@ public void readTableViews() {
.isThrownBy(() -> db.tx(() -> db.typeFreaks().readTableIds(ReadTableParams.getDefault()).count()));
}

@Test
public void doNotCommitAfterTLI() {
Project.Id id1 = new Project.Id("id1");
Project.Id id2 = new Project.Id("id2");

RepositoryTransaction tx = repository.startTransaction(
TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE)
.withImmediateWrites(true)
.withFirstLevelCache(false)
);

tx.table(Project.class).find(id2);

db.tx(() -> db.projects().save(new Project(id2, "name2")));

tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI

assertThatExceptionOfType(OptimisticLockException.class)
.isThrownBy(() -> tx.table(Project.class).find(id2));

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(tx::commit);

tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute
}

@Test
public void writeDontProduceTLI() {
Project.Id id = new Project.Id("id");

db.tx(() -> db.projects().save(new Project(id, "name")));

RepositoryTransaction tx = repository.startTransaction(
TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE)
.withImmediateWrites(true)
.withFirstLevelCache(false)
);

tx.table(Project.class).find(id);

db.tx(() -> {
db.projects().find(id);
db.projects().save(new Project(id, "name2"));
});

// write don't produce TLI
tx.table(Project.class).save(new Project(id, "name3"));

assertThatExceptionOfType(OptimisticLockException.class)
.isThrownBy(tx::commit);
}

@Test
public void consistencyCheckAllColumnsOnFind() {
Project.Id id1 = new Project.Id("id1");
Project.Id id2 = new Project.Id("id2");

db.tx(() -> {
db.projects().save(new Project(id1, "name"));
db.projects().save(new Project(id2, "name"));
});

RepositoryTransaction tx = repository.startTransaction(
TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE)
.withImmediateWrites(true)
.withFirstLevelCache(false)
);

tx.table(Project.class).save(new Project(new Project.Id("id3"), "name")); // make tx available for TLI

tx.table(Project.class).find(id1);
tx.table(Project.class).find(id2);

db.tx(() -> {
db.projects().find(id2);
db.projects().save(new Project(id2, "name2"));
});

assertThatExceptionOfType(OptimisticLockException.class)
.isThrownBy(() -> tx.table(Project.class).find(id1));
}

@Test
public void streamAllWithPartitioning() {
db.tx(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@
import tech.ydb.yoj.databind.schema.Column;
import tech.ydb.yoj.databind.schema.ObjectSchema;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.IsolationLevel;
import tech.ydb.yoj.repository.db.Repository;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.exception.ConversionException;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.repository.db.list.ListRequest;
Expand Down Expand Up @@ -866,32 +863,6 @@ public void schemaWithHint() {
repository.schema(HintAutoPartitioningByLoad.class).create();
}

@Test
public void doNotCommitAfterTLI() {
Project.Id id1 = new Project.Id("id1");
Project.Id id2 = new Project.Id("id2");

RepositoryTransaction tx = repository.startTransaction(
TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE)
.withImmediateWrites(true)
.withFirstLevelCache(false)
);

tx.table(Project.class).find(id2);

db.tx(() -> db.projects().save(new Project(id2, "name2")));

tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI

assertThatExceptionOfType(OptimisticLockException.class)
.isThrownBy(() -> tx.table(Project.class).find(id2));

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(tx::commit);

tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute
}

@AllArgsConstructor
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
@Delegate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@
import tech.ydb.yoj.databind.schema.Column;
import tech.ydb.yoj.databind.schema.ObjectSchema;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.IsolationLevel;
import tech.ydb.yoj.repository.db.Repository;
import tech.ydb.yoj.repository.db.RepositoryTransaction;
import tech.ydb.yoj.repository.db.Tx;
import tech.ydb.yoj.repository.db.TxOptions;
import tech.ydb.yoj.repository.db.bulk.BulkParams;
import tech.ydb.yoj.repository.db.exception.ConversionException;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
import tech.ydb.yoj.repository.db.exception.OptimisticLockException;
import tech.ydb.yoj.repository.db.exception.RetryableException;
import tech.ydb.yoj.repository.db.exception.UnavailableException;
import tech.ydb.yoj.repository.db.list.ListRequest;
Expand Down Expand Up @@ -867,32 +864,6 @@ public void schemaWithHint() {
repository.schema(HintAutoPartitioningByLoad.class).create();
}

@Test
public void doNotCommitAfterTLI() {
Project.Id id1 = new Project.Id("id1");
Project.Id id2 = new Project.Id("id2");

RepositoryTransaction tx = repository.startTransaction(
TxOptions.create(IsolationLevel.SERIALIZABLE_READ_WRITE)
.withImmediateWrites(true)
.withFirstLevelCache(false)
);

tx.table(Project.class).find(id2);

db.tx(() -> db.projects().save(new Project(id2, "name2")));

tx.table(Project.class).save(new Project(id1, "name1")); // make tx available for TLI

assertThatExceptionOfType(OptimisticLockException.class)
.isThrownBy(() -> tx.table(Project.class).find(id2));

assertThatExceptionOfType(IllegalStateException.class)
.isThrownBy(tx::commit);

tx.rollback(); // YOJ-tx rollback is possible. session.rollbackCommit() won't execute
}

@AllArgsConstructor
private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.SchemeServiceImplBase {
@Delegate
Expand Down

0 comments on commit 31406f1

Please sign in to comment.