From d0e18892195e188776634957aa1595062733e06d Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Tue, 7 Jan 2025 07:05:11 +0000 Subject: [PATCH 1/2] Do not clear shard lists --- .../tx/columnshard/columnshard__write.cpp | 33 +++++++++---------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index b54f174c3a99..4e3e565952fb 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -317,29 +317,28 @@ class TCommitOperation { } TConclusionStatus Parse(const NEvents::TDataEvents::TEvWrite& evWrite) { - AFL_VERIFY(evWrite.Record.GetLocks().GetLocks().size() >= 1); - auto& locks = evWrite.Record.GetLocks(); - auto& lock = evWrite.Record.GetLocks().GetLocks()[0]; + TxId = evWrite.Record.GetTxId(); + NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("tx_id", TxId); + const auto& locks = evWrite.Record.GetLocks(); + AFL_VERIFY(!locks.GetLocks().empty()); + auto& lock = locks.GetLocks()[0]; + LockId = lock.GetLockId(); SendingShards = std::set(locks.GetSendingShards().begin(), locks.GetSendingShards().end()); ReceivingShards = std::set(locks.GetReceivingShards().begin(), locks.GetReceivingShards().end()); - if (!ReceivingShards.size() || !SendingShards.size()) { - ReceivingShards.clear(); - SendingShards.clear(); - } else if (!locks.HasArbiterColumnShard()) { - ArbiterColumnShard = *ReceivingShards.begin(); + const bool singleShardTx = SendingShards.empty() && ReceivingShards.empty(); + if (!singleShardTx) { if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + return TConclusionStatus::Fail("shard is absent in sending and receiving lists"); } - } else { - ArbiterColumnShard = locks.GetArbiterColumnShard(); - AFL_VERIFY(ArbiterColumnShard); - if (!ReceivingShards.contains(TabletId) && !SendingShards.contains(TabletId)) { - return TConclusionStatus::Fail("shard is incorrect for sending/receiving lists"); + if (locks.HasArbiterColumnShard()) { + ArbiterColumnShard = locks.GetArbiterColumnShard(); + } else { + AFL_VERIFY(!ReceivingShards.empty()); + ArbiterColumnShard = *ReceivingShards.begin(); } + AFL_VERIFY(ArbiterColumnShard); } - TxId = evWrite.Record.GetTxId(); - LockId = lock.GetLockId(); Generation = lock.GetGeneration(); InternalGenerationCounter = lock.GetCounter(); if (!GetLockId()) { @@ -348,7 +347,7 @@ class TCommitOperation { if (!TxId) { return TConclusionStatus::Fail("not initialized TxId for commit event"); } - if (evWrite.Record.GetLocks().GetOp() != NKikimrDataEvents::TKqpLocks::Commit) { + if (locks.GetOp() != NKikimrDataEvents::TKqpLocks::Commit) { return TConclusionStatus::Fail("incorrect message type"); } return TConclusionStatus::Success(); From b27f421fd839c8d0337531ace89ed98c9c682784 Mon Sep 17 00:00:00 2001 From: Evgeny Zverev Date: Tue, 7 Jan 2025 07:58:32 +0000 Subject: [PATCH 2/2] fix empty receiving shards --- ydb/core/tx/columnshard/columnshard__write.cpp | 3 +-- .../tx/columnshard/transactions/operators/ev_write/primary.h | 4 +--- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index 4e3e565952fb..bea7766dc3c8 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -304,7 +304,7 @@ class TCommitOperation { using TPtr = std::shared_ptr; bool NeedSyncLocks() const { - return SendingShards.size() && ReceivingShards.size(); + return SendingShards.size() || ReceivingShards.size(); } bool IsPrimary() const { @@ -355,7 +355,6 @@ class TCommitOperation { std::unique_ptr CreateTxOperator( const NKikimrTxColumnShard::ETransactionKind kind) const { - AFL_VERIFY(ReceivingShards.size()); if (IsPrimary()) { return std::make_unique( TFullTxInfo::BuildFake(kind), LockId, ReceivingShards, SendingShards); diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index 0a8137fd9d31..8fc19f3c8587 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -63,7 +63,6 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac for (auto&& i : protoData.GetWaitShardsResultAck()) { WaitShardsResultAck.emplace(i); } - AFL_VERIFY(ReceivingShards.empty() == SendingShards.empty()); if (protoData.HasTxBroken()) { TxBroken = protoData.GetTxBroken(); } @@ -160,8 +159,7 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac }; virtual bool IsTxBroken() const override { - AFL_VERIFY(TxBroken); - return *TxBroken; + return TxBroken.value_or(false); } void InitializeRequests(TColumnShard& owner) {