Skip to content

Commit

Permalink
additional signals to control write hanging (#13708)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jan 22, 2025
1 parent 2696900 commit ffdfa4f
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
4 changes: 1 addition & 3 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,7 @@ void TColumnShard::Handle(NPrivateEvents::NWrite::TEvWritePortionResult::TPtr& e
Counters.OnWritePutBlobsSuccess(now - i.GetWriteMeta().GetWriteStartInstant(), i.GetRecordsCount());
Counters.GetWritesMonitor()->OnFinishWrite(i.GetDataSize(), 1);
}
Execute(new TTxBlobsWritingFinished(
this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenData)),
ctx);
Execute(new TTxBlobsWritingFinished(this, ev->Get()->GetWriteStatus(), ev->Get()->GetWriteAction(), std::move(writtenData)), ctx);
} else {
const TMonotonic now = TMonotonic::Now();
for (auto&& i : writtenData.GetWriteResults()) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/writer/buffer/actor2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ void TActor::Handle(TEvAddInsertedDataToBuffer::TPtr& ev) {
}
it->second.AddUnit(TWriteUnit(evBase->GetWriteData(), evBase->GetRecordBatch()));
if (it->second.GetSumSize() > (ui64)AppDataVerified().ColumnShardConfig.GetWritingBufferVolumeMb() * 1024 * 1024 || !FlushDuration) {
SumSize -= it->second.GetSumSize();
it->second.Flush(TabletId);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace NKikimr::NOlap::NWritingPortions {

class TPortionWriteController: public NColumnShard::IWriteController,
public NColumnShard::TMonitoringObjectsCounter<TIndexedWriteController, true> {
public NColumnShard::TMonitoringObjectsCounter<TPortionWriteController, true> {
public:
class TInsertPortion {
private:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#pragma once
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
#include <ydb/core/tx/columnshard/operations/common/context.h>
#include <ydb/core/tx/conveyor/usage/abstract.h>
#include <ydb/core/tx/data_events/write_data.h>

namespace NKikimr::NOlap::NWritingPortions {

class TWriteUnit {
class TWriteUnit: public NColumnShard::TMonitoringObjectsCounter<TWriteUnit> {
private:
YDB_READONLY_DEF(std::shared_ptr<NEvWrite::TWriteData>, Data);
YDB_READONLY_DEF(NArrow::TContainerWithIndexes<arrow::RecordBatch>, Batch);
Expand Down Expand Up @@ -49,4 +50,4 @@ class TBuildPackSlicesTask: public NConveyor::ITask, public NColumnShard::TMonit
AFL_VERIFY(WriteUnits.size());
}
};
} // namespace NKikimr::NOlap
} // namespace NKikimr::NOlap::NWritingPortions

0 comments on commit ffdfa4f

Please sign in to comment.