Skip to content

Commit

Permalink
Wilson uploader sensors (#13457)
Browse files Browse the repository at this point in the history
Co-authored-by: Vlad Kuznetsov <va.kuznecov@physics.msu.ru>
  • Loading branch information
robdrynkin and va-kuznecov authored Jan 17, 2025
1 parent 8c2ef27 commit 2f884a8
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
1 change: 1 addition & 0 deletions ydb/core/driver_lib/run/kikimr_services_initializers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,7 @@ void TBasicServicesInitializer::InitializeServices(NActors::TActorSystemSetup* s
mon->RegisterActorPage(actorsMonPage, "wilson_uploader", "Wilson Trace Uploader", false, actorSystem, actorId);
};
}
uploaderParams.Counters = GetServiceCounters(counters, "utils");

wilsonUploader.reset(std::move(uploaderParams).CreateUploader());
break;
Expand Down
28 changes: 23 additions & 5 deletions ydb/library/actors/wilson/wilson_uploader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ namespace NWilson {

TString ErrStr;
TString LastCommitTraceErrStr;
size_t DroppedSpans = 0;

NMonitoring::TDynamicCounters::TCounterPtr DroppedSpansCounter;
NMonitoring::TDynamicCounters::TCounterPtr SentSpansCounter;
NMonitoring::TDynamicCounters::TCounterPtr SentBytesCounter;
NMonitoring::TDynamicCounters::TCounterPtr SentSpanBatchesOkCounter;
NMonitoring::TDynamicCounters::TCounterPtr SentSpanBatchesErrCounter;

public:
TWilsonUploader(TWilsonUploaderParams params)
Expand All @@ -147,6 +152,11 @@ namespace NWilson {
, RegisterMonPage(params.RegisterMonPage)
, GrpcSigner(std::move(params.GrpcSigner))
, CurrentBatch(MaxSpansInBatch, MaxBytesInBatch, ServiceName)
, DroppedSpansCounter(params.Counters ? params.Counters->GetCounter("WilsonUploaderDroppedSpans", true) : MakeIntrusive<NMonitoring::TCounterForPtr>(true))
, SentSpansCounter(params.Counters ? params.Counters->GetCounter("WilsonUploaderSentSpans", true) : MakeIntrusive<NMonitoring::TCounterForPtr>(true))
, SentBytesCounter(params.Counters ? params.Counters->GetCounter("WilsonUploaderSentBytes", true) : MakeIntrusive<NMonitoring::TCounterForPtr>(true))
, SentSpanBatchesOkCounter(params.Counters ? params.Counters->GetCounter("WilsonUploaderSentSpanBatchesOk", "true") : MakeIntrusive<NMonitoring::TCounterForPtr>(true))
, SentSpanBatchesErrCounter(params.Counters ? params.Counters->GetCounter("WilsonUploaderSentSpanBatchesErr", "true") : MakeIntrusive<NMonitoring::TCounterForPtr>(true))
{}

~TWilsonUploader() {
Expand Down Expand Up @@ -204,15 +214,15 @@ namespace NWilson {

void Handle(TEvWilson::TPtr ev) {
if (SpansSizeBytes >= MaxPendingSpanBytes) {
++DroppedSpans;
DroppedSpansCounter->Inc();
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span due to overflow");
} else {
const TMonotonic now = TActivationContext::Monotonic();
const TMonotonic expirationTimestamp = now + MaxSpanTimeInQueue;
auto& span = ev->Get()->Span;
const ui32 size = span.ByteSizeLong();
if (size > MaxBytesInBatch) {
++DroppedSpans;
DroppedSpansCounter->Inc();
ALOG_ERROR(WILSON_SERVICE_ID, "dropped span of size " << size << ", which exceeds max batch size " << MaxBytesInBatch);
return;
}
Expand Down Expand Up @@ -274,7 +284,7 @@ namespace NWilson {
}

if (numSpansDropped) {
DroppedSpans += numSpansDropped;
DroppedSpansCounter->Add(numSpansDropped);
ALOG_ERROR(WILSON_SERVICE_ID,
"dropped " << numSpansDropped << " span(s) due to expiration");
}
Expand All @@ -298,6 +308,8 @@ namespace NWilson {
<< " ParentSpanId# " << HexEncode(span.parent_span_id())
<< " Name# " << span.name());
}
SentSpansCounter->Add(batch.SizeSpans);
SentBytesCounter->Add(batch.SizeBytes);

NextSendTimestamp = now + TDuration::MicroSeconds((batch.SizeSpans * 1'000'000) / MaxSpansPerSecond);
SpansSizeBytes -= batch.SizeBytes;
Expand Down Expand Up @@ -332,10 +344,13 @@ namespace NWilson {
auto node = std::unique_ptr<TExportRequestData>(static_cast<TExportRequestData*>(tag));
ALOG_TRACE(WILSON_SERVICE_ID, "finished export request " << (void*)node.get());
if (!node->Status.ok()) {
SentSpanBatchesErrCounter->Inc();
LastCommitTraceErrStr = node->Status.error_message();

ALOG_ERROR(WILSON_SERVICE_ID,
"failed to commit traces: " << node->Status.error_message());
} else {
SentSpanBatchesOkCounter->Inc();
}

--ExportRequestsCount;
Expand Down Expand Up @@ -408,7 +423,10 @@ namespace NWilson {
str << "Current batch queue size: " << BatchQueue.size();
}
PARA() {
str << "Dropped spans: " << DroppedSpans;
str << "Sent spans: " << SentBytesCounter->Val();
}
PARA() {
str << "Dropped spans: " << DroppedSpansCounter->Val();
}
PARA() {
std::string state;
Expand Down
1 change: 1 addition & 0 deletions ydb/library/actors/wilson/wilson_uploader.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace NWilson {
ui64 MaxExportRequestsInflight = 1;

TRegisterMonPageCallback RegisterMonPage;
NMonitoring::TDynamicCounterPtr Counters;

NActors::IActor* CreateUploader() &&;
};
Expand Down

0 comments on commit 2f884a8

Please sign in to comment.