Skip to content

Commit

Permalink
fix race on writing in case slow execution (asan tests) (#13085)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 28, 2024
1 parent 312885a commit 307b994
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 52 deletions.
55 changes: 29 additions & 26 deletions ydb/core/tx/columnshard/transactions/operators/ev_write/primary.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,28 +83,32 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
const ui64 TxId;
const ui64 TabletId;
const bool BrokenFlag;
bool SendAckFlag = false;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
auto copy = *op;
if (copy.WaitShardsBrokenFlags.erase(TabletId)) {
copy.TxBroken = copy.TxBroken.value_or(false) || BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
} else {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId, true);
if (!op) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId)("reason", "absent operation");
} else if (!op->WaitShardsBrokenFlags.erase(TabletId)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId);
} else {
op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString());
SendAckFlag = true;
}
return true;
}
virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
if (op->WaitShardsBrokenFlags.erase(TabletId)) {
op->TxBroken = op->TxBroken.value_or(false) || BrokenFlag;
op->SendBrokenFlagAck(*Self, TabletId);
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId, true);
if (!op) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId)("reason", "absent operator");
} else if (!SendAckFlag) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId);
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "remove_tablet_id")("wait", JoinSeq(",", op->WaitShardsBrokenFlags))(
"receive", TabletId);
op->SendBrokenFlagAck(*Self, TabletId);
op->InitializeRequests(*Self);
} else {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "repeated shard broken_flag info")("shard_id", TabletId);
}
}

Expand All @@ -129,23 +133,22 @@ class TEvWriteCommitPrimaryTransactionOperator: public TEvWriteCommitSyncTransac
const ui64 TabletId;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
auto copy = *op;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
AFL_VERIFY(copy.WaitShardsResultAck.erase(TabletId));
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, copy.SerializeToProto().SerializeAsString());
return true;
}
virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
if (!op->WaitShardsResultAck.erase(TabletId)) {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitPrimaryTransactionOperator>(TxId, true);
if (!op) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("receive", TabletId)(
"reason", "operation absent");
} else if (!op->WaitShardsResultAck.erase(TabletId)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet_duplication")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "ack_tablet")("wait", JoinSeq(",", op->WaitShardsResultAck))(
"receive", TabletId);
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString());
op->CheckFinished(*Self);
}
op->CheckFinished(*Self);
return true;
}
virtual void DoComplete(const NActors::TActorContext& /*ctx*/) override {
}

public:
Expand Down
61 changes: 36 additions & 25 deletions ydb/core/tx/columnshard/transactions/operators/ev_write/secondary.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,26 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
private:
using TBase = NOlap::NDataSharing::TExtendedTransactionBase<TColumnShard>;
const ui64 TxId;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId);
auto copy = *op;
copy.ReceiveAck = true;
auto proto = copy.SerializeToProto();
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, proto.SerializeAsString());
bool NeedContinueFlag = false;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId, true);
if (!op) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_ack_flag")("txId", TxId);
} else {
op->ReceiveAck = true;
if (!op->NeedReceiveBroken) {
op->TxBroken = false;
}
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString());
if (!op->NeedReceiveBroken) {
NeedContinueFlag = true;
}
}
return true;
}
virtual void DoComplete(const NActors::TActorContext& ctx) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId);
op->ReceiveAck = true;
if (!op->NeedReceiveBroken) {
op->TxBroken = false;
if (NeedContinueFlag) {
Self->EnqueueProgressTx(ctx, TxId);
}
}
Expand All @@ -99,25 +105,30 @@ class TEvWriteCommitSecondaryTransactionOperator: public TEvWriteCommitSyncTrans
const ui64 TxId;
const bool BrokenFlag;

virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& /*ctx*/) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId);
auto copy = *op;
copy.TxBroken = BrokenFlag;
auto proto = copy.SerializeToProto();
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, proto.SerializeAsString());
if (BrokenFlag) {
Self->GetProgressTxController().ExecuteOnCancel(TxId, txc);
virtual bool DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const NActors::TActorContext& ctx) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId, true);
if (!op) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_broken_flag")("txId", TxId);
} else {
op->TxBroken = BrokenFlag;
Self->GetProgressTxController().WriteTxOperatorInfo(txc, TxId, op->SerializeToProto().SerializeAsString());
if (BrokenFlag) {
Self->GetProgressTxController().ExecuteOnCancel(TxId, txc);
}
}
return true;
}
virtual void DoComplete(const NActors::TActorContext& ctx) override {
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId);
op->TxBroken = BrokenFlag;
op->SendBrokenFlagAck(*Self);
if (BrokenFlag) {
Self->GetProgressTxController().CompleteOnCancel(TxId, ctx);
auto op = Self->GetProgressTxController().GetTxOperatorVerifiedAs<TEvWriteCommitSecondaryTransactionOperator>(TxId, true);
if (!op) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD_WRITE)("event", "duplication_tablet_broken_flag")("txId", TxId);
} else {
op->SendBrokenFlagAck(*Self);
if (BrokenFlag) {
Self->GetProgressTxController().CompleteOnCancel(TxId, ctx);
}
Self->EnqueueProgressTx(ctx, TxId);
}
Self->EnqueueProgressTx(ctx, TxId);
}

public:
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/tx/columnshard/transactions/tx_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,11 @@ class TTxController {
return TValidator::CheckNotNull(GetTxOperatorOptional(txId));
}
template <class TExpectedTransactionOperator>
std::shared_ptr<TExpectedTransactionOperator> GetTxOperatorVerifiedAs(const ui64 txId) const {
std::shared_ptr<TExpectedTransactionOperator> GetTxOperatorVerifiedAs(const ui64 txId, const bool optionalExists = false) const {
auto result = GetTxOperatorOptional(txId);
if (optionalExists && !result) {
return nullptr;
}
AFL_VERIFY(result);
auto resultClass = dynamic_pointer_cast<TExpectedTransactionOperator>(result);
AFL_VERIFY(resultClass);
Expand Down

0 comments on commit 307b994

Please sign in to comment.