Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace internal aggregate Aggregator with Measure/ComputeAggregation and a Builder #4304

Merged
merged 15 commits into from
Jul 17, 2023
Merged
32 changes: 16 additions & 16 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ type streamID struct {
}

type int64Inst struct {
aggregators []aggregate.Aggregator[int64]
measures []aggregate.Measure[int64]

embedded.Int64Counter
embedded.Int64UpDownCounter
Expand All @@ -219,13 +219,13 @@ func (i *int64Inst) aggregate(ctx context.Context, val int64, s attribute.Set) {
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
for _, in := range i.measures {
in(ctx, val, s)
}
}

type float64Inst struct {
aggregators []aggregate.Aggregator[float64]
measures []aggregate.Measure[float64]

embedded.Float64Counter
embedded.Float64UpDownCounter
Expand All @@ -250,8 +250,8 @@ func (i *float64Inst) aggregate(ctx context.Context, val float64, s attribute.Se
if err := ctx.Err(); err != nil {
return
}
for _, agg := range i.aggregators {
agg.Aggregate(val, s)
for _, in := range i.measures {
in(ctx, val, s)
}
}

Expand All @@ -277,9 +277,9 @@ var _ metric.Float64ObservableCounter = float64Observable{}
var _ metric.Float64ObservableUpDownCounter = float64Observable{}
var _ metric.Float64ObservableGauge = float64Observable{}

func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[float64]) float64Observable {
func newFloat64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[float64]) float64Observable {
return float64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
observable: newObservable(scope, kind, name, desc, u, meas),
}
}

Expand All @@ -296,20 +296,20 @@ var _ metric.Int64ObservableCounter = int64Observable{}
var _ metric.Int64ObservableUpDownCounter = int64Observable{}
var _ metric.Int64ObservableGauge = int64Observable{}

func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[int64]) int64Observable {
func newInt64Observable(scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[int64]) int64Observable {
return int64Observable{
observable: newObservable(scope, kind, name, desc, u, agg),
observable: newObservable(scope, kind, name, desc, u, meas),
}
}

type observable[N int64 | float64] struct {
metric.Observable
observablID[N]

aggregators []aggregate.Aggregator[N]
measures []aggregate.Measure[N]
}

func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, agg []aggregate.Aggregator[N]) *observable[N] {
func newObservable[N int64 | float64](scope instrumentation.Scope, kind InstrumentKind, name, desc, u string, meas []aggregate.Measure[N]) *observable[N] {
return &observable[N]{
observablID: observablID[N]{
name: name,
Expand All @@ -318,14 +318,14 @@ func newObservable[N int64 | float64](scope instrumentation.Scope, kind Instrume
unit: u,
scope: scope,
},
aggregators: agg,
measures: meas,
}
}

// observe records the val for the set of attrs.
func (o *observable[N]) observe(val N, s attribute.Set) {
for _, agg := range o.aggregators {
agg.Aggregate(val, s)
for _, in := range o.measures {
in(context.Background(), val, s)
}
}

Expand All @@ -336,7 +336,7 @@ var errEmptyAgg = errors.New("no aggregators for observable instrument")
// no-op because it does not have any aggregators. Also, an error is returned
// if scope defines a Meter other than the one o was created by.
func (o *observable[N]) registerable(scope instrumentation.Scope) error {
if len(o.aggregators) == 0 {
if len(o.measures) == 0 {
return errEmptyAgg
}
if scope != o.scope {
Expand Down
41 changes: 31 additions & 10 deletions sdk/metric/instrument_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func BenchmarkInstrument(b *testing.B) {
Expand All @@ -32,11 +33,21 @@ func BenchmarkInstrument(b *testing.B) {
}

b.Run("instrumentImpl/aggregate", func(b *testing.B) {
inst := int64Inst{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

inst := int64Inst{measures: meas}
ctx := context.Background()

b.ReportAllocs()
Expand All @@ -47,11 +58,21 @@ func BenchmarkInstrument(b *testing.B) {
})

b.Run("observable/observe", func(b *testing.B) {
o := observable[int64]{aggregators: []aggregate.Aggregator[int64]{
aggregate.NewLastValue[int64](),
aggregate.NewCumulativeSum[int64](true),
aggregate.NewDeltaSum[int64](true),
}}
build := aggregate.Builder[int64]{}
var meas []aggregate.Measure[int64]

in, _ := build.LastValue()
meas = append(meas, in)

build.Temporality = metricdata.CumulativeTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

build.Temporality = metricdata.DeltaTemporality
in, _ = build.Sum(true)
meas = append(meas, in)

o := observable[int64]{measures: meas}

b.ReportAllocs()
b.ResetTimer()
Expand Down
127 changes: 127 additions & 0 deletions sdk/metric/internal/aggregate/aggregate.go
pellared marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate"

import (
"context"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

// Measure receives measurements to be aggregated.
type Measure[N int64 | float64] func(context.Context, N, attribute.Set)

// ComputeAggregation stores the aggregate of measurements into dest and
// returns the number of aggregate data-points output.
type ComputeAggregation func(dest *metricdata.Aggregation) int

// Builder builds an aggregate function.
type Builder[N int64 | float64] struct {
// Temporality is the temporality used for the returned aggregate function.
//
// If this is not provided a default of cumulative will be used (except for
// the last-value aggregate function where delta is the only appropriate
// temporality).
Temporality metricdata.Temporality
// Filter is the attribute filter the aggregate function will use on the
// input of measurements.
Filter attribute.Filter
}

func (b Builder[N]) input(agg aggregator[N]) Measure[N] {
if b.Filter != nil {
agg = newFilter[N](agg, b.Filter)
}
return func(_ context.Context, n N, a attribute.Set) {
agg.Aggregate(n, a)
}
}

// LastValue returns a last-value aggregate function input and output.
//
// The Builder.Temporality is ignored and delta is use always.
func (b Builder[N]) LastValue() (Measure[N], ComputeAggregation) {
// Delta temporality is the only temporality that makes semantic sense for
// a last-value aggregate.
lv := newLastValue[N]()

return b.input(lv), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = lv.Aggregation()

gData, _ := (*dest).(metricdata.Gauge[N])
return len(gData.DataPoints)
}
}

// PrecomputedSum returns a sum aggregate function input and output. The
// arguments passed to the input are expected to be the precomputed sum values.
func (b Builder[N]) PrecomputedSum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newPrecomputedDeltaSum[N](monotonic)
default:
s = newPrecomputedCumulativeSum[N](monotonic)
}

return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()

sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}

// Sum returns a sum aggregate function input and output.
func (b Builder[N]) Sum(monotonic bool) (Measure[N], ComputeAggregation) {
var s aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
s = newDeltaSum[N](monotonic)
default:
s = newCumulativeSum[N](monotonic)
}

return b.input(s), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = s.Aggregation()

sData, _ := (*dest).(metricdata.Sum[N])
return len(sData.DataPoints)
}
}

// ExplicitBucketHistogram returns a histogram aggregate function input and
// output.
func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistogram) (Measure[N], ComputeAggregation) {
var h aggregator[N]
switch b.Temporality {
case metricdata.DeltaTemporality:
h = newDeltaHistogram[N](cfg)
default:
h = newCumulativeHistogram[N](cfg)
}
return b.input(h), func(dest *metricdata.Aggregation) int {
// TODO (#4220): optimize memory reuse here.
*dest = h.Aggregation()

hData, _ := (*dest).(metricdata.Histogram[N])
return len(hData.DataPoints)
}
}
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
// override the default time.Now function.
var now = time.Now

// Aggregator forms an aggregation from a collection of recorded measurements.
// aggregator forms an aggregation from a collection of recorded measurements.
//
// Aggregators need to be comparable so they can be de-duplicated by the SDK
// when it creates them for multiple views.
type Aggregator[N int64 | float64] interface {
type aggregator[N int64 | float64] interface {
// Aggregate records the measurement, scoped by attr, and aggregates it
// into an aggregation.
Aggregate(measurement N, attr attribute.Set)
Expand All @@ -45,7 +45,7 @@ type precomputeAggregator[N int64 | float64] interface {
// The Aggregate method of the embedded Aggregator is used to record
// pre-computed measurements, scoped by attributes that have not been
// filtered by an attribute filter.
Aggregator[N]
aggregator[N]

// aggregateFiltered records measurements scoped by attributes that have
// been filtered by an attribute filter.
Expand Down
6 changes: 3 additions & 3 deletions sdk/metric/internal/aggregate/aggregator_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *meter) Int64Counter(string, ...metric.Int64CounterOption) (metric.Int64
// temporality to used based on the Reader and View configuration. Assume
// here these are determined to be a cumulative sum.

aggregator := NewCumulativeSum[int64](true)
aggregator := newCumulativeSum[int64](true)
count := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand All @@ -54,7 +54,7 @@ func (p *meter) Int64UpDownCounter(string, ...metric.Int64UpDownCounterOption) (
// configuration. Assume here these are determined to be a last-value
// aggregation (the temporality does not affect the produced aggregations).

aggregator := NewLastValue[int64]()
aggregator := newLastValue[int64]()
upDownCount := inst{aggregateFunc: aggregator.Aggregate}

p.aggregations = append(p.aggregations, aggregator.Aggregation())
Expand All @@ -71,7 +71,7 @@ func (p *meter) Int64Histogram(string, ...metric.Int64HistogramOption) (metric.I
// Assume here these are determined to be a delta explicit-bucket
// histogram.

aggregator := NewDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
aggregator := newDeltaHistogram[int64](aggregation.ExplicitBucketHistogram{
Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000},
NoMinMax: false,
})
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/internal/aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ type aggregatorTester[N int64 | float64] struct {
CycleN int
}

func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
func (at *aggregatorTester[N]) Run(a aggregator[N], incr setMap[N], eFunc expectFunc) func(*testing.T) {
m := at.MeasurementN * at.GoroutineN
return func(t *testing.T) {
t.Run("Comparable", func(t *testing.T) {
assert.NotPanics(t, func() {
_ = map[Aggregator[N]]struct{}{a: {}}
_ = map[aggregator[N]]struct{}{a: {}}
})
})

Expand Down Expand Up @@ -117,7 +117,7 @@ func (at *aggregatorTester[N]) Run(a Aggregator[N], incr setMap[N], eFunc expect

var bmarkResults metricdata.Aggregation

func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggregator[N], count int) {
func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() aggregator[N], count int) {
attrs := make([]attribute.Set, count)
for i := range attrs {
attrs[i] = attribute.NewSet(attribute.Int("value", i))
Expand All @@ -137,7 +137,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
})

b.Run("Aggregations", func(b *testing.B) {
aggs := make([]Aggregator[N], b.N)
aggs := make([]aggregator[N], b.N)
for n := range aggs {
a := factory()
for _, attr := range attrs {
Expand All @@ -155,7 +155,7 @@ func benchmarkAggregatorN[N int64 | float64](b *testing.B, factory func() Aggreg
})
}

func benchmarkAggregator[N int64 | float64](factory func() Aggregator[N]) func(*testing.B) {
func benchmarkAggregator[N int64 | float64](factory func() aggregator[N]) func(*testing.B) {
counts := []int{1, 10, 100}
return func(b *testing.B) {
for _, n := range counts {
Expand Down
Loading