From 6bdb5548c8b592b97f1a20cf904546f7dbfce170 Mon Sep 17 00:00:00 2001 From: chankyin Date: Fri, 29 Nov 2024 12:06:23 +0800 Subject: [PATCH] fix(util/worker): report worker queue length metrics --- go.work.sum | 3 +++ util/worker/observer/logging.go | 1 + util/worker/observer/metrics.go | 20 +++++++++++++++++--- util/worker/observer/observer.go | 5 +++++ util/worker/worker.go | 2 ++ 5 files changed, 28 insertions(+), 3 deletions(-) diff --git a/go.work.sum b/go.work.sum index b62a7fe..cf08fc5 100644 --- a/go.work.sum +++ b/go.work.sum @@ -317,16 +317,19 @@ k8s.io/apiextensions-apiserver v0.29.0 h1:0VuspFG7Hj+SxyF/Z/2T0uFbI5gb5LRgEyUVE3 k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc= k8s.io/apiextensions-apiserver v0.30.1 h1:4fAJZ9985BmpJG6PkoxVRpXv9vmPUOVzl614xarePws= k8s.io/apiextensions-apiserver v0.30.1/go.mod h1:R4GuSrlhgq43oRY9sF2IToFh7PVlF1JjfWdoG3pixk4= +k8s.io/apiextensions-apiserver v0.31.0/go.mod h1:b9aMDEYaEe5sdK+1T0KU78ApR/5ZVp4i56VacZYEHxk= k8s.io/apiextensions-apiserver v0.31.3/go.mod h1:b9aMDEYaEe5sdK+1T0KU78ApR/5ZVp4i56VacZYEHxk= k8s.io/apiserver v0.29.0 h1:Y1xEMjJkP+BIi0GSEv1BBrf1jLU9UPfAnnGGbbDdp7o= k8s.io/apiserver v0.29.0/go.mod h1:31n78PsRKPmfpee7/l9NYEv67u6hOL6AfcE761HapDM= k8s.io/apiserver v0.30.1 h1:BEWEe8bzS12nMtDKXzCF5Q5ovp6LjjYkSp8qOPk8LZ8= k8s.io/apiserver v0.30.1/go.mod h1:i87ZnQ+/PGAmSbD/iEKM68bm1D5reX8fO4Ito4B01mo= +k8s.io/apiserver v0.31.0/go.mod h1:KI9ox5Yu902iBnnyMmy7ajonhKnkeZYJhTZ/YI+WEMk= k8s.io/apiserver v0.31.3/go.mod h1:KI9ox5Yu902iBnnyMmy7ajonhKnkeZYJhTZ/YI+WEMk= k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s= k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M= k8s.io/component-base v0.30.1 h1:bvAtlPh1UrdaZL20D9+sWxsJljMi0QZ3Lmw+kmZAaxQ= k8s.io/component-base v0.30.1/go.mod h1:e/X9kDiOebwlI41AvBHuWdqFriSRrX50CdwA9TFaHLI= +k8s.io/component-base v0.31.0/go.mod h1:TYVuzI1QmN4L5ItVdMSXKvH7/DtvIuas5/mm8YT3rTo= k8s.io/component-base v0.31.3/go.mod h1:TYVuzI1QmN4L5ItVdMSXKvH7/DtvIuas5/mm8YT3rTo= k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks= k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E= diff --git a/util/worker/observer/logging.go b/util/worker/observer/logging.go index 0490f99..38a8aea 100644 --- a/util/worker/observer/logging.go +++ b/util/worker/observer/logging.go @@ -62,6 +62,7 @@ func ProvideLogging() component.Declared[Observer] { logger.V(3).WithCallDepth(1).Info("reconcile complete") } }, + QueueLength: func(_ context.Context, _ QueueLength, _ func() int) {}, } }, ) diff --git a/util/worker/observer/metrics.go b/util/worker/observer/metrics.go index 9399a19..06ba002 100644 --- a/util/worker/observer/metrics.go +++ b/util/worker/observer/metrics.go @@ -36,17 +36,28 @@ func ProvideMetrics() component.Declared[Observer] { workerName string } - type workerTags struct { + type reconcileTags struct { Worker string Error string } + type queueLengthTags struct { + Worker string + } + reconcileHandle := metrics.Register( deps.Registry(), "worker_reconcile", "Duration of a worker reconcile run.", metrics.FunctionDurationHistogram(), - metrics.NewReflectTags[workerTags](), + metrics.NewReflectTags[reconcileTags](), + ) + queueLengthHandle := metrics.Register( + deps.Registry(), + "worker_queue_length", + "Queue length of a worker.", + metrics.IntGauge(), + metrics.NewReflectTags[queueLengthTags](), ) return Observer{ @@ -61,11 +72,14 @@ func ProvideMetrics() component.Declared[Observer] { duration := time.Since(data.startTime) - reconcileHandle.Emit(duration, workerTags{ + reconcileHandle.Emit(duration, reconcileTags{ Worker: data.workerName, Error: errors.SerializeTags(arg.Err), }) }, + QueueLength: func(ctx context.Context, arg QueueLength, getter func() int) { + metrics.Repeating(ctx, deps, queueLengthHandle.With(queueLengthTags{Worker: arg.WorkerName}), getter) + }, } }, ) diff --git a/util/worker/observer/observer.go b/util/worker/observer/observer.go index aa5bb82..fe5c806 100644 --- a/util/worker/observer/observer.go +++ b/util/worker/observer/observer.go @@ -27,6 +27,7 @@ var Provide = component.RequireDeps( type Observer struct { StartReconcile o11y.ObserveScopeFunc[StartReconcile] EndReconcile o11y.ObserveFunc[EndReconcile] + QueueLength o11y.MonitorFunc[QueueLength, int] } func (Observer) ComponentName() string { return "worker" } @@ -40,3 +41,7 @@ type StartReconcile struct { type EndReconcile struct { Err error } + +type QueueLength struct { + WorkerName string +} diff --git a/util/worker/worker.go b/util/worker/worker.go index e7148d1..45bf83a 100644 --- a/util/worker/worker.go +++ b/util/worker/worker.go @@ -231,6 +231,8 @@ func start[QueueItem comparable]( state.completion.Add(*options.WorkerCount) + go deps.Observer.Get().QueueLength(ctx, observer.QueueLength{WorkerName: args.Name}, state.queue.Len) + for range *options.WorkerCount { go runWorker( ctx,