diff --git a/allinone/main.go b/allinone/main.go index a83612a..6fb7a58 100644 --- a/allinone/main.go +++ b/allinone/main.go @@ -28,11 +28,12 @@ import ( "github.com/kubewharf/podseidon/aggregator/aggregator" aggregatorobserver "github.com/kubewharf/podseidon/aggregator/observer" "github.com/kubewharf/podseidon/generator/generator" + "github.com/kubewharf/podseidon/generator/monitor" generatorobserver "github.com/kubewharf/podseidon/generator/observer" "github.com/kubewharf/podseidon/generator/resource" "github.com/kubewharf/podseidon/generator/resource/deployment" webhookobserver "github.com/kubewharf/podseidon/webhook/observer" - "github.com/kubewharf/podseidon/webhook/server" + webhookserver "github.com/kubewharf/podseidon/webhook/server" ) func main() { @@ -54,6 +55,7 @@ func main() { }, }, )), - component.RequireDep(server.New(util.Empty{})), + component.RequireDep(monitor.New(monitor.Args{})), + component.RequireDep(webhookserver.New(util.Empty{})), ) } diff --git a/chart/templates/_generator.yaml b/chart/templates/_generator.yaml index 0587aa4..be45b25 100644 --- a/chart/templates/_generator.yaml +++ b/chart/templates/_generator.yaml @@ -46,6 +46,8 @@ generator-worker-concurrency: {{toJson .main.Values.generator.workerCount}} {{- if empty $selector | not}} deployment-plugin-protection-selector: {{toJson $selector}} {{- end}} + +generator-monitor-enable: {{toJson .main.Values.generator.monitor.enable}} {{- end}} {{- define "podseidon.generator.env.yaml"}} diff --git a/chart/values.yaml b/chart/values.yaml index dca8d3a..1dddb9f 100644 --- a/chart/values.yaml +++ b/chart/values.yaml @@ -82,6 +82,9 @@ generator: protectedSelector: # Only objects matching the selector have a generated protector. deployments.apps: 'podseidon.kubewharf.io/protect=true' + monitor: # Report global PodProtector metrics + enable: true + aggregator: replicas: 3 minReadySeconds: 60 diff --git a/generator/main.go b/generator/main.go index e56fea4..3c1c5e6 100644 --- a/generator/main.go +++ b/generator/main.go @@ -26,6 +26,7 @@ import ( workerobserver "github.com/kubewharf/podseidon/util/worker/observer" "github.com/kubewharf/podseidon/generator/generator" + "github.com/kubewharf/podseidon/generator/monitor" generatorobserver "github.com/kubewharf/podseidon/generator/observer" "github.com/kubewharf/podseidon/generator/resource" "github.com/kubewharf/podseidon/generator/resource/deployment" @@ -47,5 +48,6 @@ func main() { }, }, )), + component.RequireDep(monitor.New(monitor.Args{})), ) } diff --git a/generator/monitor/monitor.go b/generator/monitor/monitor.go new file mode 100644 index 0000000..36024d6 --- /dev/null +++ b/generator/monitor/monitor.go @@ -0,0 +1,162 @@ +// Copyright 2024 The Podseidon Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Reports global metrics about all PodProtector objects. +package monitor + +import ( + "context" + "flag" + "sync" + + "k8s.io/apimachinery/pkg/types" + + podseidonv1a1 "github.com/kubewharf/podseidon/apis/v1alpha1" + podseidoninformers "github.com/kubewharf/podseidon/client/informers/externalversions" + + "github.com/kubewharf/podseidon/util/component" + "github.com/kubewharf/podseidon/util/errors" + "github.com/kubewharf/podseidon/util/kube" + "github.com/kubewharf/podseidon/util/o11y" + "github.com/kubewharf/podseidon/util/optional" + "github.com/kubewharf/podseidon/util/util" + + "github.com/kubewharf/podseidon/generator/constants" + "github.com/kubewharf/podseidon/generator/observer" +) + +const ProportionPpmUnits = 1000000 + +var New = component.Declare[Args, Options, Deps, State, util.Empty]( + func(Args) string { return "generator-monitor" }, + func(_ Args, fs *flag.FlagSet) Options { + return Options{ + Enable: fs.Bool("enable", true, "Enable global PodProtector monitor"), + } + }, + func(_ Args, requests *component.DepRequests) Deps { + return Deps{ + observer: o11y.Request[observer.Observer](requests), + podseidonInformers: component.DepPtr(requests, kube.NewInformers(kube.PodseidonInformers( + constants.CoreClusterName, + constants.LeaderPhase, + optional.Some(constants.GeneratorElectorArgs), + ))), + } + }, + func(_ context.Context, _ Args, options Options, deps Deps) (*State, error) { + state := &State{ + statusMu: sync.Mutex{}, + status: util.Zero[observer.MonitorWorkloads](), + addedDeltaCache: map[types.NamespacedName]observer.MonitorWorkloads{}, + } + + if *options.Enable { + pprInformer := deps.podseidonInformers.Get().Podseidon().V1alpha1().PodProtectors() + _, err := pprInformer.Informer().AddEventHandler(kube.GenericEventHandlerWithStaleState( + func(ppr *podseidonv1a1.PodProtector, stillPresent bool) { + nsName := types.NamespacedName{Namespace: ppr.Namespace, Name: ppr.Name} + + if stillPresent { + status := pprToStatus(ppr) + state.add(nsName, status) + } else { + state.remove(nsName) + } + }, + )) + if err != nil { + return nil, errors.TagWrapf("AddEventHandler", err, "add event handler to ppr informer") + } + } + + return state, nil + }, + component.Lifecycle[Args, Options, Deps, State]{ + Start: func(ctx context.Context, _ *Args, options *Options, deps *Deps, state *State) error { + if *options.Enable { + deps.observer.Get().MonitorWorkloads(ctx, util.Empty{}, func() observer.MonitorWorkloads { + state.statusMu.Lock() + defer state.statusMu.Unlock() + + return state.status + }) + } + + return nil + }, + Join: nil, + HealthChecks: nil, + }, + func(_ *component.Data[Args, Options, Deps, State]) util.Empty { return util.Empty{} }, +) + +type Args struct{} + +type Options struct { + Enable *bool +} + +type Deps struct { + observer component.Dep[observer.Observer] + podseidonInformers component.Dep[podseidoninformers.SharedInformerFactory] +} + +type State struct { + statusMu sync.Mutex + status observer.MonitorWorkloads + + addedDeltaCache map[types.NamespacedName]observer.MonitorWorkloads +} + +func pprToStatus(ppr *podseidonv1a1.PodProtector) observer.MonitorWorkloads { + availableProportionPpm := int64(0) + if ppr.Spec.MinAvailable > 0 { + availableProportionPpm = min(ProportionPpmUnits, util.RoundedIntDiv( + int64(ppr.Status.Summary.AggregatedAvailable)*ProportionPpmUnits, + int64(ppr.Spec.MinAvailable), + )) + } + + return observer.MonitorWorkloads{ + NumWorkloads: 1, + MinAvailable: int64(ppr.Spec.MinAvailable), + TotalReplicas: int64(ppr.Status.Summary.Total), + AggregatedAvailableReplicas: int64(ppr.Status.Summary.AggregatedAvailable), + EstimatedAvailableReplicas: int64(ppr.Status.Summary.EstimatedAvailable), + SumAvailableProportionPpm: availableProportionPpm, + SumLatencyMillis: ppr.Status.Summary.MaxLatencyMillis, + } +} + +func (state *State) add(nsName types.NamespacedName, newDelta observer.MonitorWorkloads) { + old := state.addedDeltaCache[nsName] // use zero value if absent + state.addedDeltaCache[nsName] = newDelta + + state.statusMu.Lock() + defer state.statusMu.Unlock() + + state.status.Subtract(old) + state.status.Add(newDelta) +} + +func (state *State) remove(nsName types.NamespacedName) { + old := state.addedDeltaCache[nsName] + delete(state.addedDeltaCache, nsName) + + state.statusMu.Lock() + defer state.statusMu.Unlock() + + state.status.Subtract(old) +} diff --git a/generator/monitor/monitor_test.go b/generator/monitor/monitor_test.go new file mode 100644 index 0000000..7c3e7fc --- /dev/null +++ b/generator/monitor/monitor_test.go @@ -0,0 +1,209 @@ +// Copyright 2024 The Podseidon Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitor_test + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + + podseidonv1a1 "github.com/kubewharf/podseidon/apis/v1alpha1" + + "github.com/kubewharf/podseidon/util/cmd" + "github.com/kubewharf/podseidon/util/component" + "github.com/kubewharf/podseidon/util/kube" + "github.com/kubewharf/podseidon/util/o11y" + "github.com/kubewharf/podseidon/util/util" + + "github.com/kubewharf/podseidon/generator/monitor" + "github.com/kubewharf/podseidon/generator/observer" +) + +func TestMonitor(t *testing.T) { + t.Parallel() + + setup := setup() + //nolint:exhaustruct + setup.Step(t, Step{ + Expect: observer.MonitorWorkloads{}, + }) + //nolint:exhaustruct + setup.Step(t, Step{ + Create: []*podseidonv1a1.PodProtector{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "1", + }, + Spec: podseidonv1a1.PodProtectorSpec{ + MinAvailable: 10, + }, + Status: podseidonv1a1.PodProtectorStatus{ + Summary: podseidonv1a1.PodProtectorStatusSummary{ + Total: 13, + AggregatedAvailable: 8, + EstimatedAvailable: 4, + MaxLatencyMillis: 300, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "2", + }, + Spec: podseidonv1a1.PodProtectorSpec{ + MinAvailable: 100, + }, + Status: podseidonv1a1.PodProtectorStatus{ + Summary: podseidonv1a1.PodProtectorStatusSummary{ + Total: 200, + AggregatedAvailable: 150, + EstimatedAvailable: 140, + MaxLatencyMillis: 70, + }, + }, + }, + }, + Expect: observer.MonitorWorkloads{ + NumWorkloads: 2, + MinAvailable: 10 + 100, + TotalReplicas: 13 + 200, + AggregatedAvailableReplicas: 8 + 150, + EstimatedAvailableReplicas: 4 + 140, + SumAvailableProportionPpm: 800000 + 1000000, + SumLatencyMillis: 300 + 70, + }, + }) + //nolint:exhaustruct + setup.Step(t, Step{ + Update: map[types.NamespacedName]func(*podseidonv1a1.PodProtector){ + {Namespace: "default", Name: "2"}: func(ppr *podseidonv1a1.PodProtector) { + ppr.Spec.MinAvailable = 200 + }, + }, + Delete: []types.NamespacedName{{Namespace: "default", Name: "1"}}, + Expect: observer.MonitorWorkloads{ + NumWorkloads: 1, + MinAvailable: 200, + TotalReplicas: 200, + AggregatedAvailableReplicas: 150, + EstimatedAvailableReplicas: 140, + SumAvailableProportionPpm: 750000, + SumLatencyMillis: 70, + }, + }) +} + +type Step struct { + Create []*podseidonv1a1.PodProtector + Update map[types.NamespacedName]func(*podseidonv1a1.PodProtector) + UpdateStatus map[types.NamespacedName]func(*podseidonv1a1.PodProtector) + Delete []types.NamespacedName + + Expect observer.MonitorWorkloads +} + +type Setup struct { + //nolint:containedctx // only to reduce boilerplate + ctx context.Context + apiMap component.ApiMap + nextStepNo int + client *kube.Client + statusGetter util.LateInitReader[func() observer.MonitorWorkloads] +} + +func setup() *Setup { + ctx := context.Background() + + client := kube.MockClient() + + statusGetter, statusGetterWriter := util.NewLateInit[func() observer.MonitorWorkloads]() + + //nolint:exhaustruct + obs := o11y.ReflectPopulate(observer.Observer{ + MonitorWorkloads: func(_ context.Context, _ util.Empty, getter func() observer.MonitorWorkloads) { + statusGetterWriter(getter) + }, + }) + + apiMap := cmd.MockStartup(ctx, []func(*component.DepRequests){ + component.ApiOnly("core-kube", client), + component.ApiOnly("generator-leader-elector", kube.MockReadyElector(ctx)), + component.ApiOnly("observer-generator", obs), + component.RequireDep(monitor.New(monitor.Args{})), + }) + + return &Setup{ + ctx: ctx, + apiMap: apiMap, + client: client, + statusGetter: statusGetter, + nextStepNo: 0, + } +} + +func (setup *Setup) Step(t *testing.T, step Step) { + t.Helper() + + stepNo := setup.nextStepNo + setup.nextStepNo++ + + pprClient := setup.client.PodseidonClientSet().PodseidonV1alpha1().PodProtectors + + for _, ppr := range step.Create { + _, err := pprClient(ppr.Namespace).Create(setup.ctx, ppr, metav1.CreateOptions{}) + require.NoError(t, err) + } + + for nsName, patch := range step.Update { + existing, err := pprClient(nsName.Namespace).Get(setup.ctx, nsName.Name, metav1.GetOptions{}) + require.NoError(t, err) + + existing = existing.DeepCopy() + patch(existing) + + _, err = pprClient(nsName.Namespace).Update(setup.ctx, existing, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + for nsName, patch := range step.UpdateStatus { + existing, err := pprClient(nsName.Namespace).Get(setup.ctx, nsName.Name, metav1.GetOptions{}) + require.NoError(t, err) + + existing = existing.DeepCopy() + patch(existing) + + _, err = pprClient(nsName.Namespace).UpdateStatus(setup.ctx, existing, metav1.UpdateOptions{}) + require.NoError(t, err) + } + + for _, nsName := range step.Delete { + err := pprClient(nsName.Namespace).Delete(setup.ctx, nsName.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + } + + var actual observer.MonitorWorkloads + + assert.Eventuallyf(t, func() bool { + actual = setup.statusGetter.Get()() + return actual == step.Expect + }, time.Second, time.Millisecond, "step #%d status mismatch:\nexpected\n\t%#v\ngot\n\t%#v", stepNo, &step.Expect, &actual) +} diff --git a/generator/observer/logging.go b/generator/observer/logging.go index 5a2c4c6..eda722a 100644 --- a/generator/observer/logging.go +++ b/generator/observer/logging.go @@ -35,6 +35,17 @@ func ProvideLogging() component.Declared[Observer] { }, func(util.Empty) Observer { return Observer{ + InterpretProtectors: func(ctx context.Context, arg InterpretProtectors) { + klog.FromContext(ctx).WithValues( + "group", arg.Group, + "version", arg.Version, + "resource", arg.Resource, + "namespace", arg.Namespace, + "name", arg.Name, + "decisions", arg.Decisions, + "requiredProtectors", len(arg.RequiredProtectors), + ).V(4).Info("interpret protectors") + }, StartReconcile: func(ctx context.Context, arg StartReconcile) (context.Context, context.CancelFunc) { logger := klog.FromContext(ctx) logger = logger.WithValues( @@ -101,17 +112,7 @@ func ProvideLogging() component.Declared[Observer] { Info("No PodProtectors present, ensuring finalizer removal from source object") return ctx, util.NoOp }, - InterpretProtectors: func(ctx context.Context, arg InterpretProtectors) { - klog.FromContext(ctx).WithValues( - "group", arg.Group, - "version", arg.Version, - "resource", arg.Resource, - "namespace", arg.Namespace, - "name", arg.Name, - "decisions", arg.Decisions, - "requiredProtectors", len(arg.RequiredProtectors), - ).V(4).Info("interpret protectors") - }, + MonitorWorkloads: func(context.Context, util.Empty, func() MonitorWorkloads) {}, } }, ) diff --git a/generator/observer/metrics.go b/generator/observer/metrics.go index eedaaee..02ef221 100644 --- a/generator/observer/metrics.go +++ b/generator/observer/metrics.go @@ -71,7 +71,65 @@ func ProvideMetrics() component.Declared[Observer] { }) } + monitorWorkloadsHandle := metrics.RegisterMultiField( + deps.Registry(), + "generator_monitor_workloads", + "Global workload status", + metrics.NewReflectTags[util.Empty](), + metrics.NewField( + "num_workloads", + "Number of workload objects managed by this generator", + metrics.IntGauge(), + func(status MonitorWorkloads) int { return status.NumWorkloads }, + ), + metrics.NewField( + "min_available", + "Sum of minAvailable over workloads managed by this generator", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.MinAvailable }, + ), + metrics.NewField( + "current_total_replicas", + "Current aggregated sum of total replicas over workloads managed by this generator.", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.TotalReplicas }, + ), + metrics.NewField( + "current_aggregated_available_replicas", + "Current aggregated sum of available replicas over workloads managed by this generator.", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.AggregatedAvailableReplicas }, + ), + metrics.NewField( + "current_estimated_available_replicas", + "Current estimated sum of available replicas over workloads managed by this generator.", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.EstimatedAvailableReplicas }, + ), + metrics.NewField( + "sum_available_proportion_ppm", + "Sum of the proportion of aggregated available replicas, saturated at 1, rounded to nearest ppm (parts-per-million).", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.SumAvailableProportionPpm }, + ), + metrics.NewField( + "avg_available_proportion_ppm", + "Unweighted average of service availability for every PodProtector (0 to 1).", + metrics.FloatGauge(), + func(status MonitorWorkloads) float64 { + return float64(status.SumAvailableProportionPpm) / 1e6 / float64(status.NumWorkloads) + }, + ), + metrics.NewField( + "sum_latency_millis", + "Sum of the .status.summary.maxLatencyMillis field over workloads managed by this generator.", + metrics.Int64Gauge(), + func(status MonitorWorkloads) int64 { return status.SumLatencyMillis }, + ), + ) + return Observer{ + InterpretProtectors: func(context.Context, InterpretProtectors) {}, StartReconcile: func(ctx context.Context, arg StartReconcile) (context.Context, context.CancelFunc) { ctx = context.WithValue(ctx, reconcileGvrKey{}, ReconcileGvr{ Group: arg.Group, @@ -120,7 +178,9 @@ func ProvideMetrics() component.Declared[Observer] { }) return ctx, util.NoOp }, - InterpretProtectors: func(context.Context, InterpretProtectors) {}, + MonitorWorkloads: func(ctx context.Context, _ util.Empty, getter func() MonitorWorkloads) { + metrics.Repeating(ctx, deps, monitorWorkloadsHandle.With(util.Empty{}), getter) + }, } }, ) diff --git a/generator/observer/observer.go b/generator/observer/observer.go index 4f9acaf..5c09765 100644 --- a/generator/observer/observer.go +++ b/generator/observer/observer.go @@ -40,6 +40,8 @@ type Observer struct { SyncProtector o11y.ObserveScopeFunc[*podseidonv1a1.PodProtector] DeleteProtector o11y.ObserveScopeFunc[*podseidonv1a1.PodProtector] CleanSourceFinalizer o11y.ObserveScopeFunc[StartReconcile] + + MonitorWorkloads o11y.MonitorFunc[util.Empty, MonitorWorkloads] } func (Observer) ComponentName() string { return "generator" } @@ -86,3 +88,47 @@ const ( ActionDeleteProtector Action = "DeleteProtector" ActionError Action = "Error" ) + +type MonitorWorkloads struct { + // Number of workload objects managed by this generator. + NumWorkloads int + + // Sum of minAvailable over workloads managed by this generator. + MinAvailable int64 + + // Sum of total aggregated replicas over workloads managed by this generator. + TotalReplicas int64 + + // Sum of available aggregated replicas over workloads managed by this generator. + AggregatedAvailableReplicas int64 + + // Sum of available estimated replicas over workloads managed by this generator. + EstimatedAvailableReplicas int64 + + // Sum of the proportion of aggregated available replicas, saturated at 1, rounded to nearest ppm (parts-per-million). + // When divided by NumWorkloads, this is the unweighted average of service availability for every PodProtector. + SumAvailableProportionPpm int64 + + // Sum of the .status.summary.maxLatencyMillis field over workloads managed by this generator. + SumLatencyMillis int64 +} + +func (dest *MonitorWorkloads) Add(delta MonitorWorkloads) { + dest.NumWorkloads += delta.NumWorkloads + dest.MinAvailable += delta.MinAvailable + dest.TotalReplicas += delta.TotalReplicas + dest.AggregatedAvailableReplicas += delta.AggregatedAvailableReplicas + dest.EstimatedAvailableReplicas += delta.EstimatedAvailableReplicas + dest.SumAvailableProportionPpm += delta.SumAvailableProportionPpm + dest.SumLatencyMillis += delta.SumLatencyMillis +} + +func (dest *MonitorWorkloads) Subtract(delta MonitorWorkloads) { + dest.NumWorkloads -= delta.NumWorkloads + dest.MinAvailable -= delta.MinAvailable + dest.TotalReplicas -= delta.TotalReplicas + dest.AggregatedAvailableReplicas -= delta.AggregatedAvailableReplicas + dest.EstimatedAvailableReplicas -= delta.EstimatedAvailableReplicas + dest.SumAvailableProportionPpm -= delta.SumAvailableProportionPpm + dest.SumLatencyMillis -= delta.SumLatencyMillis +} diff --git a/util/component/component.go b/util/component/component.go index 040262f..231ac8a 100644 --- a/util/component/component.go +++ b/util/component/component.go @@ -62,6 +62,7 @@ import ( "flag" "fmt" "net/http" + "reflect" "sync/atomic" "k8s.io/apimachinery/pkg/util/sets" @@ -161,8 +162,8 @@ func DepPtr[Api any](requests *DepRequests, base Declared[Api]) Dep[Api] { typedApi, ok := api.(func() Api) if !ok { panic(fmt.Sprintf( - "Components of types %T and %T declared the same name %q", - comp, base, comp.manifest().Name, + "Components of types %T and %T declared the same name %q with incompatible APIs %T and %v", + comp, base, comp.manifest().Name, util.Type[Api]().Out(0), reflect.TypeOf(api).Out(0), )) } diff --git a/util/kube/leaderelection.go b/util/kube/leaderelection.go index d21f220..ba8b883 100644 --- a/util/kube/leaderelection.go +++ b/util/kube/leaderelection.go @@ -331,10 +331,10 @@ func (e *Elector) HasElected() bool { } // A mock elector that always gets ready immediately. -func MockReadyElector() *Elector { +func MockReadyElector(ctx context.Context) *Elector { return &Elector{ electedCh: util.ClosedChan[util.Empty](), - electionCloseCh: nil, + electionCloseCh: ptr.To(ctx.Done()), electionErr: ptr.To(error(nil)), } } diff --git a/util/o11y/metrics/api.go b/util/o11y/metrics/api.go index d4971f9..b6414b4 100644 --- a/util/o11y/metrics/api.go +++ b/util/o11y/metrics/api.go @@ -41,7 +41,7 @@ func Register[Value any, Tags any]( } type Handle[Tags any, Value any] struct { - metricType Type[Value] + metricType Emitter[Value] tagsDesc TagsDesc[Tags] } @@ -57,7 +57,7 @@ func (handle Handle[Tags, Value]) With(tags Tags) TaggedHandle[Value] { } type TaggedHandle[Value any] struct { - metricType Type[Value] + metricType Emitter[Value] tagValues []string } @@ -67,6 +67,10 @@ func (handle TaggedHandle[Value]) Emit(value Value) { type Type[Value any] interface { InitCollector(name string, help string, tagKeys []string) prometheus.Collector + Emitter[Value] +} + +type Emitter[Value any] interface { Emit(tagValues []string, value Value) } @@ -95,6 +99,10 @@ func (ty *typeGauge[Value]) Emit(tags []string, value Value) { func IntGauge() Type[int] { return &typeGauge[int]{typeConv: intToFloat64, collector: nil} } +func Int32Gauge() Type[int32] { return &typeGauge[int32]{typeConv: int32ToFloat64, collector: nil} } + +func Int64Gauge() Type[int64] { return &typeGauge[int64]{typeConv: int64ToFloat64, collector: nil} } + func FloatGauge() Type[float64] { return &typeGauge[float64]{typeConv: util.Identity[float64], collector: nil} } @@ -298,3 +306,7 @@ func (def reflectTags[T]) TagValues(instance T) []string { } func intToFloat64(i int) float64 { return float64(i) } + +func int32ToFloat64(i int32) float64 { return float64(i) } + +func int64ToFloat64(i int64) float64 { return float64(i) } diff --git a/util/o11y/metrics/multifield.go b/util/o11y/metrics/multifield.go new file mode 100644 index 0000000..57b9c4b --- /dev/null +++ b/util/o11y/metrics/multifield.go @@ -0,0 +1,86 @@ +// Copyright 2024 The Podseidon Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "fmt" + + "github.com/prometheus/client_golang/prometheus" +) + +func RegisterMultiField[Value any, Tags any]( + registry *prometheus.Registry, + namePrefix string, + helpPrefix string, + tagsDesc TagsDesc[Tags], + fields ...AnyField[Value], +) Handle[Tags, Value] { + tagKeys := tagsDesc.TagKeys() + for _, field := range fields { + registry.MustRegister(field.InitFieldCollector(namePrefix, helpPrefix, tagKeys)) + } + + return Handle[Tags, Value]{ + metricType: multiFieldType[Value](fields), + tagsDesc: tagsDesc, + } +} + +type Field[StructType any, FieldType any] struct { + Name string + Help string + MetricType Type[FieldType] + Extractor func(StructType) FieldType +} + +func NewField[StructType any, FieldType any]( + name string, + help string, + metricType Type[FieldType], + extractor func(StructType) FieldType, +) AnyField[StructType] { + return Field[StructType, FieldType]{ + Name: name, + Help: help, + MetricType: metricType, + Extractor: extractor, + } +} + +// A type-erased interface for Field[StructType, *]. +type AnyField[StructType any] interface { + InitFieldCollector(namePrefix string, helpPrefix string, tagKeys []string) prometheus.Collector + EmitField(tagValues []string, value StructType) +} + +func (field Field[StructType, FieldType]) InitFieldCollector(namePrefix string, helpPrefix string, tagKeys []string) prometheus.Collector { + return field.MetricType.InitCollector( + fmt.Sprintf("%s_%s", namePrefix, field.Name), + fmt.Sprintf("%s: %s", helpPrefix, field.Help), + tagKeys, + ) +} + +func (field Field[StructType, FieldType]) EmitField(tagValues []string, value StructType) { + field.MetricType.Emit(tagValues, field.Extractor(value)) +} + +type multiFieldType[Value any] []AnyField[Value] + +func (fields multiFieldType[Value]) Emit(tagValues []string, value Value) { + for _, field := range fields { + field.EmitField(tagValues, value) + } +} diff --git a/util/util/math.go b/util/util/math.go new file mode 100644 index 0000000..59afe57 --- /dev/null +++ b/util/util/math.go @@ -0,0 +1,37 @@ +// Copyright 2024 The Podseidon Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "golang.org/x/exp/constraints" + +// Returns the nearest integer to {numerator / divisor} with half-positive rounding. +func RoundedIntDiv[T constraints.Integer](numerator, divisor T) T { + if divisor < 0 { + divisor = -divisor + } + + quotient := numerator / divisor + remainder := numerator % divisor + + if numerator > 0 && remainder*2 >= divisor { + quotient++ + } + + if numerator < 0 && (divisor+remainder)*2 < divisor { + quotient-- + } + + return quotient +} diff --git a/util/util/math_test.go b/util/util/math_test.go new file mode 100644 index 0000000..3cc6ab9 --- /dev/null +++ b/util/util/math_test.go @@ -0,0 +1,62 @@ +// Copyright 2024 The Podseidon Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/exp/constraints" + + "github.com/kubewharf/podseidon/util/util" +) + +func testRoundedIntDiv[T constraints.Integer](t *testing.T, numerator T, divisor T, expect T) { + t.Helper() + assert.Equal(t, expect, util.RoundedIntDiv(numerator, divisor)) +} + +func TestRoundDownIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, 14, 10, 1) +} + +func TestRoundUpIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, 16, 10, 2) +} + +func TestRoundHalfUpIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, 15, 10, 2) +} + +func TestRoundNegativeHalfUpIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, -5, 10, 0) + testRoundedIntDiv(t, -15, 10, -1) +} + +func TestRoundNegativeUpIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, -4, 10, 0) + testRoundedIntDiv(t, -14, 10, -1) +} + +func TestRoundNegativeDownIntDiv(t *testing.T) { + t.Parallel() + testRoundedIntDiv(t, -6, 10, -1) + testRoundedIntDiv(t, -16, 10, -2) +}