Skip to content

Commit

Permalink
YQ-3924 Pass cpu time to read actor (#12546)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds authored Jan 22, 2025
1 parent a30b332 commit a6c613d
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 4 deletions.
3 changes: 0 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/events/data_plane.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,6 @@ struct TEvRowDispatcher {

struct TEvNoSession : public NActors::TEventPB<TEvNoSession, NFq::NRowDispatcherProto::TEvNoSession, EEv::EvNoSession> {
TEvNoSession() = default;
TEvNoSession(ui32 partitionId) {
Record.SetPartitionId(partitionId);
}
};

struct TEvGetInternalStateRequest : public NActors::TEventPB<TEvGetInternalStateRequest,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/row_dispatcher/protos/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ message TEvStatistics {
uint64 NextMessageOffset = 2; // deprecated
uint64 ReadBytes = 3;
repeated TPartitionStatistics Partition = 4;
uint64 CpuMicrosec = 5;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

Expand Down
36 changes: 36 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,19 @@ struct TRowDispatcherMetrics {
::NMonitoring::TDynamicCounters::TCounterPtr NodesReconnect;
};

struct TUserPoolMetrics {
explicit TUserPoolMetrics(const ::NMonitoring::TDynamicCounterPtr& utilsCounters) {
auto execpoolGroup = utilsCounters->GetSubgroup("execpool", "User");
auto microsecGroup = execpoolGroup->GetSubgroup("sensor", "ElapsedMicrosecByActivity");
Session = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_SESSION", true);
RowDispatcher = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER", true);
Compiler = microsecGroup->GetNamedCounter("activity", "FQ_ROW_DISPATCHER_COMPILER", true);
}
::NMonitoring::TDynamicCounters::TCounterPtr Session;
::NMonitoring::TDynamicCounters::TCounterPtr RowDispatcher;
::NMonitoring::TDynamicCounters::TCounterPtr Compiler;
};

struct TEvPrivate {
// Event ids
enum EEv : ui32 {
Expand Down Expand Up @@ -269,10 +282,12 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const ::NMonitoring::TDynamicCounterPtr Counters;
const ::NMonitoring::TDynamicCounterPtr CountersRoot;
TRowDispatcherMetrics Metrics;
TUserPoolMetrics UserPoolMetrics;
NYql::IPqGateway::TPtr PqGateway;
NActors::TMon* Monitoring;
TNodesTracker NodesTracker;
TAggregatedStats AggrStats;
ui64 LastCpuTime = 0;

struct TConsumerCounters {
ui64 NewDataArrived = 0;
Expand Down Expand Up @@ -315,6 +330,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
const TString QueryId;
TConsumerCounters Counters;
TTopicSessionClientStatistic Stat;
ui64 CpuMicrosec = 0; // Increment.
ui64 Generation;
};

Expand Down Expand Up @@ -392,6 +408,7 @@ class TRowDispatcher : public TActorBootstrapped<TRowDispatcher> {
bool CheckSession(TAtomicSharedPtr<TConsumerInfo>& consumer, const TEventPtr& ev);
void SetQueryMetrics(const TQueryStatKey& queryKey, ui64 unreadBytesMax, ui64 unreadBytesAvg, i64 readLagMessagesMax);
void PrintStateToLog();
void UpdateCpuTime();

STRICT_STFUNC(
StateFunc, {
Expand Down Expand Up @@ -442,6 +459,7 @@ TRowDispatcher::TRowDispatcher(
, Counters(counters)
, CountersRoot(countersRoot)
, Metrics(counters)
, UserPoolMetrics(countersRoot->GetSubgroup("counters", "utils"))
, PqGateway(pqGateway)
, Monitoring(monitoring)
{
Expand Down Expand Up @@ -634,6 +652,7 @@ TString TRowDispatcher::GetInternalState() {
str << "Consumers count: " << Consumers.size() << "\n";
str << "TopicSessions count: " << TopicSessions.size() << "\n";
str << "Max session buffer size: " << toHuman(MaxSessionBufferSizeBytes) << "\n";
str << "CpuMicrosec: " << toHuman(LastCpuTime) << "\n";
str << "DataRate (all sessions): ";
printDataRate(AggrStats.AllSessionsReadBytes);
str << "\n";
Expand Down Expand Up @@ -1023,6 +1042,7 @@ void TRowDispatcher::PrintStateToLog() {
void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
LOG_ROW_DISPATCHER_TRACE("TEvPrivate::TEvSendStatistic");

UpdateCpuTime();
Schedule(TDuration::Seconds(Config.GetSendStatusPeriodSec()), new NFq::TEvPrivate::TEvSendStatistic());
for (auto& [actorId, consumer] : Consumers) {
if (!NodesTracker.GetNodeConnected(actorId.NodeId())) {
Expand All @@ -1042,6 +1062,8 @@ void TRowDispatcher::Handle(NFq::TEvPrivate::TEvSendStatistic::TPtr&) {
partition.StatisticsUpdated = false;
}
event->Record.SetReadBytes(readBytes);
event->Record.SetCpuMicrosec(consumer->CpuMicrosec);
consumer->CpuMicrosec = 0;
LWPROBE(Statistics, consumer->ReadActorId.ToString(), consumer->QueryId, consumer->Generation, event->Record.ByteSizeLong());
consumer->EventsQueue.Send(event.release(), consumer->Generation);
}
Expand Down Expand Up @@ -1117,6 +1139,20 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvGetInternalStateResponse::
readActorInternalState.ResponseTime = TInstant::Now();
}

void TRowDispatcher::UpdateCpuTime() {
if (Consumers.empty()) {
return;
}
auto currentCpuTime = UserPoolMetrics.Session->Val()
+ UserPoolMetrics.RowDispatcher->Val()
+ UserPoolMetrics.Compiler->Val();
auto diff = (currentCpuTime - LastCpuTime) / Consumers.size();
for (auto& [actorId, consumer] : Consumers) {
consumer->CpuMicrosec += diff;
}
LastCpuTime = currentCpuTime;
}

} // namespace

////////////////////////////////////////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
std::vector<std::optional<ui64>> ColumnIndexes; // Output column index in schema passed into RowDispatcher
const TType* InputDataType = nullptr; // Multi type (comes from Row Dispatcher)
std::unique_ptr<NKikimr::NMiniKQL::TValuePackerTransport<true>> DataUnpacker;
ui64 CpuMicrosec = 0;

THashMap<ui32, TMaybe<ui64>> NextOffsetFromRD;

Expand Down Expand Up @@ -288,6 +289,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void CommitState(const NDqProto::TCheckpoint& checkpoint) override;
void PassAway() override;
i64 GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& buffer, TMaybe<TInstant>& watermark, bool&, i64 freeSpace) override;
TDuration GetCpuTime() override;
std::vector<ui64> GetPartitionsToRead() const;
void AddMessageBatch(TRope&& serializedBatch, NKikimr::NMiniKQL::TUnboxedValueBatch& buffer);
void ProcessState();
Expand Down Expand Up @@ -535,6 +537,10 @@ i64 TDqPqRdReadActor::GetAsyncInputData(NKikimr::NMiniKQL::TUnboxedValueBatch& b
return usedSpace;
}

TDuration TDqPqRdReadActor::GetCpuTime() {
return TDuration::MicroSeconds(CpuMicrosec);
}

std::vector<ui64> TDqPqRdReadActor::GetPartitionsToRead() const {
std::vector<ui64> res;
ui64 currentPartition = ReadParams.GetPartitioningParams().GetEachTopicPartitionGroupId();
Expand Down Expand Up @@ -575,7 +581,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
SRC_LOG_T("Received TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << " generation " << ev->Cookie);
Counters.Statistics++;

CpuMicrosec += ev->Get()->Record.GetCpuMicrosec();
auto* session = FindSession(ev);
if (!session) {
return;
Expand Down

0 comments on commit a6c613d

Please sign in to comment.