Skip to content

Commit

Permalink
Recover lost locks as broken (#13181)
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Jan 6, 2025
1 parent baa0367 commit dcb70d0
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 15 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ ydb/core/keyvalue/ut_trace TKeyValueTracingTest.WriteSmall
ydb/core/kqp/ut/cost KqpCost.OlapWriteRow
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.Select
ydb/core/kqp/ut/data_integrity KqpDataIntegrityTrails.UpsertEvWrite
ydb/core/kqp/ut/olap KqpOlap.DeleteAbsent+Reboot
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestAggregation
ydb/core/kqp/ut/olap KqpDecimalColumnShard.TestFilterCompare
ydb/core/kqp/ut/olap KqpOlap.ManyColumnShardsWithRestarts
Expand Down
69 changes: 61 additions & 8 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2992,9 +2992,9 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);
}

Y_UNIT_TEST_TWIN(DeleteAbsent, Reboot) {
//This test tries to DELETE from a table with WHERE condition that matches no rows
//It corresponds to a SCAN, then NO write then COMMIT
void TestDeleteAbsent(const size_t shardCount, bool reboot) {
//This test tries to DELETE from a table when there is no rows to delete at some shard
//It corresponds to a SCAN, then NO write then COMMIT on that shard
auto csController = NYDBTest::TControllers::RegisterCSControllerGuard<NYDBTest::NColumnShard::TController>();

NKikimrConfig::TAppConfig appConfig;
Expand All @@ -3006,20 +3006,73 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
TTestHelper::TColumnSchema().SetName("value").SetType(NScheme::NTypeIds::Int32).SetNullable(true),
};
TTestHelper::TColumnTable testTable;
testTable.SetName("/Root/ttt").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema);
testTable.SetName("/Root/ttt").SetPrimaryKey({ "id" }).SetSharding({ "id" }).SetSchema(schema).SetMinPartitionsCount(shardCount);
testHelper.CreateTable(testTable);
auto client = testHelper.GetKikimr().GetQueryClient();
//1. Insert exactlly one row into a table, so the only shard will contain a row
const auto result = client
.ExecuteQuery(
R"(
INSERT INTO `/Root/ttt` (id, value) VALUES
(1, 11)
)",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
//2. Ensure that there is actually 1 row in the table
{
const auto resultSelect = client
.ExecuteQuery(
"SELECT * FROM `/Root/ttt`",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();

if (Reboot) {
UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
const auto resultSets = resultSelect.GetResultSets();
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
const auto resultSet = resultSets[0];
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
}
if (reboot) {
csController->SetRestartOnLocalTxCommitted("TProposeWriteTransaction");
}
auto client = testHelper.GetKikimr().GetQueryClient();
//DELETE 1 row from one shard and 0 rows from others
const auto resultDelete =
client
.ExecuteQuery(
"DELETE from `/Root/ttt` WHERE value % 2 == 1;",
"DELETE from `/Root/ttt` ",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(resultDelete.IsSuccess(), resultDelete.GetIssues().ToString());
UNIT_ASSERT_C(resultDelete.IsSuccess() != reboot, resultDelete.GetIssues().ToString());
{
const auto resultSelect = client
.ExecuteQuery(
"SELECT * FROM `/Root/ttt`",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();

UNIT_ASSERT_C(resultSelect.IsSuccess(), resultSelect.GetIssues().ToString());
const auto resultSets = resultSelect.GetResultSets();
UNIT_ASSERT_VALUES_EQUAL(resultSets.size(), 1);
const auto resultSet = resultSets[0];
UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), reboot ? 1 : 0);

}
//DELETE 0 rows from every shard
const auto resultDelete2 =
client
.ExecuteQuery(
"DELETE from `/Root/ttt` WHERE id < 100",
NYdb::NQuery::TTxControl::BeginTx().CommitTx())
.GetValueSync();
UNIT_ASSERT_C(resultDelete2.IsSuccess() != reboot, result.GetIssues().ToString());
}
Y_UNIT_TEST_TWIN(DeleteAbsentSingleShard, Reboot) {
TestDeleteAbsent(1, Reboot);
}

Y_UNIT_TEST_TWIN(DeleteAbsentMultipleShards, Reboot) {
TestDeleteAbsent(2, Reboot);
}
}

Expand Down
6 changes: 5 additions & 1 deletion ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ bool TOperationsManager::Load(NTabletFlatExecutor::TTransactionContext& txc) {
while (!rowset.EndOfSet()) {
const ui64 lockId = rowset.GetValue<Schema::OperationTxIds::LockId>();
const ui64 txId = rowset.GetValue<Schema::OperationTxIds::TxId>();
AFL_VERIFY(LockFeatures.contains(lockId))("lock_id", lockId);
if (auto it = LockFeatures.find(lockId); it == LockFeatures.end()) {
auto lock = TLockFeatures(lockId, 0);
lock.SetBroken();
LockFeatures.emplace(lockId, std::move(lock));
}
AFL_VERIFY(Tx2Lock.emplace(txId, lockId).second);
if (!rowset.Next()) {
return false;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/operations/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ namespace NKikimr::NColumnShard {
class TColumnShard;
class TLockFeatures;

class TLockSharingInfo {
class TLockSharingInfo: TMoveOnly {
private:
const ui64 LockId;
const ui64 Generation;
TAtomicCounter InternalGenerationCounter = 0;
TAtomicCounter Broken = 0;
std::atomic<bool> Broken = false;
TAtomicCounter WritesCounter = 0;
friend class TLockFeatures;

Expand All @@ -43,7 +43,7 @@ class TLockSharingInfo {
}

bool IsBroken() const {
return Broken.Val();
return Broken;
}

ui64 GetCounter() const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/transactions/locks/interaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ class TPointInfo {
return StartTxIds.empty() && FinishTxIds.empty() && IntervalTxIds.empty();
}

void ProvideTxIdsFrom(const TPointInfo& previouse) {
for (auto&& i : previouse.IntervalTxIds) {
void ProvideTxIdsFrom(const TPointInfo& previous) {
for (auto&& i : previous.IntervalTxIds) {
auto provided = i.second;
{
auto it = StartTxIds.find(i.first);
Expand Down

0 comments on commit dcb70d0

Please sign in to comment.