From 307b994e27b8022e83d4190746eba16a7275ce42 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sat, 28 Dec 2024 08:40:38 +0300 Subject: [PATCH] fix race on writing in case slow execution (asan tests) (#13085) --- .../transactions/operators/ev_write/primary.h | 55 +++++++++-------- .../operators/ev_write/secondary.h | 61 +++++++++++-------- .../columnshard/transactions/tx_controller.h | 5 +- 3 files changed, 69 insertions(+), 52 deletions(-) diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h index f53042bf0e26..3bb8dffa156f 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h @@ -83,28 +83,32 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac const ui64 TxId; const ui64 TabletId; const bool BrokenFlag; + bool SendAckFlag = false; virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - auto copy = *op; - if (copy.WaitShardsBrokenFlags.erase(TabletId)) { - copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag; - Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString()); - } else { + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId)("reason", "absent operation"); + } else if (!op->WaitShardsBrokenFlags.erase(TabletId)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId); + } else { + op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag; + Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString()); + SendAckFlag = true; } return true; } virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - if (op->WaitShardsBrokenFlags.erase(TabletId)) { - op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag; - op->SendBrokenFlagAck(*Self, TabletId); + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId)("reason", "absent operator"); + } else if (!SendAckFlag) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId); + } else { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))( "receive", TabletId); + op->SendBrokenFlagAck(*Self, TabletId); op->InitializeRequests(*Self); - } else { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId); } } @@ -129,23 +133,22 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac const ui64 TabletId; virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - auto copy = *op; - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( - "receive", TabletId); - AFL_VERIFY(copy.WaitShardsResultAck.erase(TabletId)); - Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString()); - return true; - } - virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( - "receive", TabletId); - if (!op->WaitShardsResultAck.erase(TabletId)) { + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("receive", TabletId)( + "reason", "operation absent"); + } else if (!op->WaitShardsResultAck.erase(TabletId)) { AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))( "receive", TabletId); + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))( + "receive", TabletId); + Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString()); + op->CheckFinished(*Self); } - op->CheckFinished(*Self); + return true; + } + virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override { } public: diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h index 8bbf9d4d6f55..f60d7c0b2f4b 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h @@ -62,20 +62,26 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans private: using TBase = NOlap::NDataSharing::TExtendedTransactionBase; const ui64 TxId; - - virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - auto copy = *op; - copy.ReceiveAck = true; - auto proto = copy.SerializeToProto(); - Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, proto.SerializeAsString()); + bool NeedContinueFlag = false; + + virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override { + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_ack_flag")("txId", TxId); + } else { + op->ReceiveAck = true; + if (!op->NeedReceiveBroken) { + op->TxBroken = false; + } + Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString()); + if (!op->NeedReceiveBroken) { + NeedContinueFlag = true; + } + } return true; } virtual void DoComplete(const NActors::TActorContext& ctx) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - op->ReceiveAck = true; - if (!op->NeedReceiveBroken) { - op->TxBroken = false; + if (NeedContinueFlag) { Self->EnqueueProgressTx(ctx, TxId); } } @@ -99,25 +105,30 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans const ui64 TxId; const bool BrokenFlag; - virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - auto copy = *op; - copy.TxBroken = BrokenFlag; - auto proto = copy.SerializeToProto(); - Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, proto.SerializeAsString()); - if (BrokenFlag) { - Self->GetProgressTxController().ExecuteOnCancel(TxId, txc); + virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override { + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_broken_flag")("txId", TxId); + } else { + op->TxBroken = BrokenFlag; + Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString()); + if (BrokenFlag) { + Self->GetProgressTxController().ExecuteOnCancel(TxId, txc); + } } return true; } virtual void DoComplete(const NActors::TActorContext& ctx) override { - auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId); - op->TxBroken = BrokenFlag; - op->SendBrokenFlagAck(*Self); - if (BrokenFlag) { - Self->GetProgressTxController().CompleteOnCancel(TxId, ctx); + auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs(TxId, true); + if (!op) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_broken_flag")("txId", TxId); + } else { + op->SendBrokenFlagAck(*Self); + if (BrokenFlag) { + Self->GetProgressTxController().CompleteOnCancel(TxId, ctx); + } + Self->EnqueueProgressTx(ctx, TxId); } - Self->EnqueueProgressTx(ctx, TxId); } public: diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.h b/ydb/core/tx/columnshard/transactions/tx_controller.h index a546f50001f0..e4b92144ca26 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.h +++ b/ydb/core/tx/columnshard/transactions/tx_controller.h @@ -434,8 +434,11 @@ class TTxController { return TValidator::CheckNotNull(GetTxOperatorOptional(txId)); } template - std::shared_ptr GetTxOperatorVerifiedAs(const ui64 txId) const { + std::shared_ptr GetTxOperatorVerifiedAs(const ui64 txId, const bool optionalExists = false) const { auto result = GetTxOperatorOptional(txId); + if (optionalExists && !result) { + return nullptr; + } AFL_VERIFY(result); auto resultClass = dynamic_pointer_cast(result); AFL_VERIFY(resultClass);