Skip to content

Commit

Permalink
fix(util/worker): report worker queue length metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SOF3 committed Nov 29, 2024
1 parent 01716a6 commit 6bdb554
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 3 deletions.
3 changes: 3 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -317,16 +317,19 @@ k8s.io/apiextensions-apiserver v0.29.0 h1:0VuspFG7Hj+SxyF/Z/2T0uFbI5gb5LRgEyUVE3
k8s.io/apiextensions-apiserver v0.29.0/go.mod h1:TKmpy3bTS0mr9pylH0nOt/QzQRrW7/h7yLdRForMZwc=
k8s.io/apiextensions-apiserver v0.30.1 h1:4fAJZ9985BmpJG6PkoxVRpXv9vmPUOVzl614xarePws=
k8s.io/apiextensions-apiserver v0.30.1/go.mod h1:R4GuSrlhgq43oRY9sF2IToFh7PVlF1JjfWdoG3pixk4=
k8s.io/apiextensions-apiserver v0.31.0/go.mod h1:b9aMDEYaEe5sdK+1T0KU78ApR/5ZVp4i56VacZYEHxk=
k8s.io/apiextensions-apiserver v0.31.3/go.mod h1:b9aMDEYaEe5sdK+1T0KU78ApR/5ZVp4i56VacZYEHxk=
k8s.io/apiserver v0.29.0 h1:Y1xEMjJkP+BIi0GSEv1BBrf1jLU9UPfAnnGGbbDdp7o=
k8s.io/apiserver v0.29.0/go.mod h1:31n78PsRKPmfpee7/l9NYEv67u6hOL6AfcE761HapDM=
k8s.io/apiserver v0.30.1 h1:BEWEe8bzS12nMtDKXzCF5Q5ovp6LjjYkSp8qOPk8LZ8=
k8s.io/apiserver v0.30.1/go.mod h1:i87ZnQ+/PGAmSbD/iEKM68bm1D5reX8fO4Ito4B01mo=
k8s.io/apiserver v0.31.0/go.mod h1:KI9ox5Yu902iBnnyMmy7ajonhKnkeZYJhTZ/YI+WEMk=
k8s.io/apiserver v0.31.3/go.mod h1:KI9ox5Yu902iBnnyMmy7ajonhKnkeZYJhTZ/YI+WEMk=
k8s.io/component-base v0.29.0 h1:T7rjd5wvLnPBV1vC4zWd/iWRbV8Mdxs+nGaoaFzGw3s=
k8s.io/component-base v0.29.0/go.mod h1:sADonFTQ9Zc9yFLghpDpmNXEdHyQmFIGbiuZbqAXQ1M=
k8s.io/component-base v0.30.1 h1:bvAtlPh1UrdaZL20D9+sWxsJljMi0QZ3Lmw+kmZAaxQ=
k8s.io/component-base v0.30.1/go.mod h1:e/X9kDiOebwlI41AvBHuWdqFriSRrX50CdwA9TFaHLI=
k8s.io/component-base v0.31.0/go.mod h1:TYVuzI1QmN4L5ItVdMSXKvH7/DtvIuas5/mm8YT3rTo=
k8s.io/component-base v0.31.3/go.mod h1:TYVuzI1QmN4L5ItVdMSXKvH7/DtvIuas5/mm8YT3rTo=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 h1:pWEwq4Asjm4vjW7vcsmijwBhOr1/shsbSYiWXmNGlks=
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
Expand Down
1 change: 1 addition & 0 deletions util/worker/observer/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func ProvideLogging() component.Declared[Observer] {
logger.V(3).WithCallDepth(1).Info("reconcile complete")
}
},
QueueLength: func(_ context.Context, _ QueueLength, _ func() int) {},
}
},
)
Expand Down
20 changes: 17 additions & 3 deletions util/worker/observer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,28 @@ func ProvideMetrics() component.Declared[Observer] {
workerName string
}

type workerTags struct {
type reconcileTags struct {
Worker string
Error string
}

type queueLengthTags struct {
Worker string
}

reconcileHandle := metrics.Register(
deps.Registry(),
"worker_reconcile",
"Duration of a worker reconcile run.",
metrics.FunctionDurationHistogram(),
metrics.NewReflectTags[workerTags](),
metrics.NewReflectTags[reconcileTags](),
)
queueLengthHandle := metrics.Register(
deps.Registry(),
"worker_queue_length",
"Queue length of a worker.",
metrics.IntGauge(),
metrics.NewReflectTags[queueLengthTags](),
)

return Observer{
Expand All @@ -61,11 +72,14 @@ func ProvideMetrics() component.Declared[Observer] {

duration := time.Since(data.startTime)

reconcileHandle.Emit(duration, workerTags{
reconcileHandle.Emit(duration, reconcileTags{
Worker: data.workerName,
Error: errors.SerializeTags(arg.Err),
})
},
QueueLength: func(ctx context.Context, arg QueueLength, getter func() int) {
metrics.Repeating(ctx, deps, queueLengthHandle.With(queueLengthTags{Worker: arg.WorkerName}), getter)
},
}
},
)
Expand Down
5 changes: 5 additions & 0 deletions util/worker/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var Provide = component.RequireDeps(
type Observer struct {
StartReconcile o11y.ObserveScopeFunc[StartReconcile]
EndReconcile o11y.ObserveFunc[EndReconcile]
QueueLength o11y.MonitorFunc[QueueLength, int]
}

func (Observer) ComponentName() string { return "worker" }
Expand All @@ -40,3 +41,7 @@ type StartReconcile struct {
type EndReconcile struct {
Err error
}

type QueueLength struct {
WorkerName string
}
2 changes: 2 additions & 0 deletions util/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ func start[QueueItem comparable](

state.completion.Add(*options.WorkerCount)

go deps.Observer.Get().QueueLength(ctx, observer.QueueLength{WorkerName: args.Name}, state.queue.Len)

for range *options.WorkerCount {
go runWorker(
ctx,
Expand Down

0 comments on commit 6bdb554

Please sign in to comment.