From c197fe93057875323a2e4f07af6d5707de52f1fd Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Wed, 19 Jul 2023 11:52:11 -0400 Subject: [PATCH] Metric SDK: Sum duplicate async observations regardless of filtering (#4289) * Metric SDK: Remove the distinction between filtered and unfiltered attributes. --- CHANGELOG.md | 1 + sdk/metric/internal/aggregate/aggregator.go | 19 --- sdk/metric/internal/aggregate/filter.go | 43 ------- sdk/metric/internal/aggregate/filter_test.go | 89 -------------- sdk/metric/internal/aggregate/sum.go | 82 ++----------- sdk/metric/internal/aggregate/sum_test.go | 116 +++++-------------- sdk/metric/meter_test.go | 3 - 7 files changed, 45 insertions(+), 308 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 75d463e4eb4..fd2e673fb42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm The `AttributeKeys` fields allows users to specify an allow-list of attributes allowed to be recorded for a view. This change is made to ensure compatibility with the OpenTelemetry specification. (#4288) - If an attribute set is omitted from an async callback, the previous value will no longer be exported. (#4290) +- If an attribute set is Observed multiple times in an async callback, the values will be summed instead of the last observation winning. (#4289) - Allow the explicit bucket histogram aggregation to be used for the up-down counter, observable counter, observable up-down counter, and observable gauge in the `go.opentelemetry.io/otel/sdk/metric` package. (#4332) - Restrict `Meter`s in `go.opentelemetry.io/otel/sdk/metric` to only register and collect instruments it created. (#4333) diff --git a/sdk/metric/internal/aggregate/aggregator.go b/sdk/metric/internal/aggregate/aggregator.go index 03d814d9b4a..fac0dfd901a 100644 --- a/sdk/metric/internal/aggregate/aggregator.go +++ b/sdk/metric/internal/aggregate/aggregator.go @@ -38,22 +38,3 @@ type aggregator[N int64 | float64] interface { // measurements made and ends an aggregation cycle. Aggregation() metricdata.Aggregation } - -// precomputeAggregator is an Aggregator that receives values to aggregate that -// have been pre-computed by the caller. -type precomputeAggregator[N int64 | float64] interface { - // The Aggregate method of the embedded Aggregator is used to record - // pre-computed measurements, scoped by attributes that have not been - // filtered by an attribute filter. - aggregator[N] - - // aggregateFiltered records measurements scoped by attributes that have - // been filtered by an attribute filter. - // - // Pre-computed measurements of filtered attributes need to be recorded - // separate from those that haven't been filtered so they can be added to - // the non-filtered pre-computed measurements in a collection cycle and - // then resets after the cycle (the non-filtered pre-computed measurements - // are not reset). - aggregateFiltered(N, attribute.Set) -} diff --git a/sdk/metric/internal/aggregate/filter.go b/sdk/metric/internal/aggregate/filter.go index 782c28a85df..ea471149e75 100644 --- a/sdk/metric/internal/aggregate/filter.go +++ b/sdk/metric/internal/aggregate/filter.go @@ -27,9 +27,6 @@ func newFilter[N int64 | float64](agg aggregator[N], fn attribute.Filter) aggreg if fn == nil { return agg } - if fa, ok := agg.(precomputeAggregator[N]); ok { - return newPrecomputedFilter(fa, fn) - } return &filter[N]{ filter: fn, aggregator: agg, @@ -59,43 +56,3 @@ func (f *filter[N]) Aggregate(measurement N, attr attribute.Set) { func (f *filter[N]) Aggregation() metricdata.Aggregation { return f.aggregator.Aggregation() } - -// precomputedFilter is an aggregator that applies attribute filter when -// Aggregating for pre-computed Aggregations. The pre-computed Aggregations -// need to operate normally when no attribute filtering is done (for sums this -// means setting the value), but when attribute filtering is done it needs to -// be added to any set value. -type precomputedFilter[N int64 | float64] struct { - filter attribute.Filter - aggregator precomputeAggregator[N] -} - -// newPrecomputedFilter returns a precomputedFilter Aggregator that wraps agg -// with the attribute filter fn. -// -// This should not be used to wrap a non-pre-computed Aggregator. Use a -// precomputedFilter instead. -func newPrecomputedFilter[N int64 | float64](agg precomputeAggregator[N], fn attribute.Filter) *precomputedFilter[N] { - return &precomputedFilter[N]{ - filter: fn, - aggregator: agg, - } -} - -// Aggregate records the measurement, scoped by attr, and aggregates it -// into an aggregation. -func (f *precomputedFilter[N]) Aggregate(measurement N, attr attribute.Set) { - fAttr, _ := attr.Filter(f.filter) - if fAttr.Equals(&attr) { - // No filtering done. - f.aggregator.Aggregate(measurement, fAttr) - } else { - f.aggregator.aggregateFiltered(measurement, fAttr) - } -} - -// Aggregation returns an Aggregation, for all the aggregated -// measurements made and ends an aggregation cycle. -func (f *precomputedFilter[N]) Aggregation() metricdata.Aggregation { - return f.aggregator.Aggregation() -} diff --git a/sdk/metric/internal/aggregate/filter_test.go b/sdk/metric/internal/aggregate/filter_test.go index e348e7582ea..b6544e3706b 100644 --- a/sdk/metric/internal/aggregate/filter_test.go +++ b/sdk/metric/internal/aggregate/filter_test.go @@ -15,8 +15,6 @@ package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" import ( - "fmt" - "strings" "sync" "testing" @@ -196,90 +194,3 @@ func TestFilterConcurrent(t *testing.T) { testFilterConcurrent[float64](t) }) } - -func TestPrecomputedFilter(t *testing.T) { - t.Run("Int64", testPrecomputedFilter[int64]()) - t.Run("Float64", testPrecomputedFilter[float64]()) -} - -func testPrecomputedFilter[N int64 | float64]() func(t *testing.T) { - return func(t *testing.T) { - agg := newTestFilterAgg[N]() - f := newFilter[N](agg, testAttributeFilter) - require.IsType(t, &precomputedFilter[N]{}, f) - - var ( - powerLevel = attribute.Int("power-level", 9000) - user = attribute.String("user", "Alice") - admin = attribute.Bool("admin", true) - ) - a := attribute.NewSet(powerLevel) - key := a - f.Aggregate(1, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(0), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel, user) - f.Aggregate(2, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(2), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel, user, admin) - f.Aggregate(3, a) - assert.Equal(t, N(1), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(powerLevel) - f.Aggregate(2, a) - assert.Equal(t, N(2), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - - a = attribute.NewSet(user) - f.Aggregate(3, a) - assert.Equal(t, N(2), agg.values[key].measured, str(a)) - assert.Equal(t, N(5), agg.values[key].filtered, str(a)) - assert.Equal(t, N(3), agg.values[*attribute.EmptySet()].filtered, str(a)) - - _ = f.Aggregation() - assert.Equal(t, 1, agg.aggregationN, "failed to propagate Aggregation") - } -} - -func str(a attribute.Set) string { - iter := a.Iter() - out := make([]string, 0, iter.Len()) - for iter.Next() { - kv := iter.Attribute() - out = append(out, fmt.Sprintf("%s:%#v", kv.Key, kv.Value.AsInterface())) - } - return strings.Join(out, ",") -} - -type testFilterAgg[N int64 | float64] struct { - values map[attribute.Set]precomputedValue[N] - aggregationN int -} - -func newTestFilterAgg[N int64 | float64]() *testFilterAgg[N] { - return &testFilterAgg[N]{ - values: make(map[attribute.Set]precomputedValue[N]), - } -} - -func (a *testFilterAgg[N]) Aggregate(val N, attr attribute.Set) { - v := a.values[attr] - v.measured = val - a.values[attr] = v -} - -// nolint: unused // Used to agg filtered. -func (a *testFilterAgg[N]) aggregateFiltered(val N, attr attribute.Set) { - v := a.values[attr] - v.filtered += val - a.values[attr] = v -} - -func (a *testFilterAgg[N]) Aggregation() metricdata.Aggregation { - a.aggregationN++ - return nil -} diff --git a/sdk/metric/internal/aggregate/sum.go b/sdk/metric/internal/aggregate/sum.go index 14af6273cbd..594068c4354 100644 --- a/sdk/metric/internal/aggregate/sum.go +++ b/sdk/metric/internal/aggregate/sum.go @@ -150,63 +150,6 @@ func (s *cumulativeSum[N]) Aggregation() metricdata.Aggregation { return out } -// precomputedValue is the recorded measurement value for a set of attributes. -type precomputedValue[N int64 | float64] struct { - // measured is the last value measured for a set of attributes that were - // not filtered. - measured N - // filtered is the sum of values from measurements that had their - // attributes filtered. - filtered N -} - -// precomputedMap is the storage for precomputed sums. -type precomputedMap[N int64 | float64] struct { - sync.Mutex - values map[attribute.Set]precomputedValue[N] -} - -func newPrecomputedMap[N int64 | float64]() *precomputedMap[N] { - return &precomputedMap[N]{ - values: make(map[attribute.Set]precomputedValue[N]), - } -} - -// Aggregate records value with the unfiltered attributes attr. -// -// If a previous measurement was made for the same attribute set: -// -// - If that measurement's attributes were not filtered, this value overwrite -// that value. -// - If that measurement's attributes were filtered, this value will be -// recorded along side that value. -func (s *precomputedMap[N]) Aggregate(value N, attr attribute.Set) { - s.Lock() - v := s.values[attr] - v.measured = value - s.values[attr] = v - s.Unlock() -} - -// aggregateFiltered records value with the filtered attributes attr. -// -// If a previous measurement was made for the same attribute set: -// -// - If that measurement's attributes were not filtered, this value will be -// recorded along side that value. -// - If that measurement's attributes were filtered, this value will be -// added to it. -// -// This method should not be used if attr have not been reduced by an attribute -// filter. -func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // nolint: unused // Used to agg filtered. - s.Lock() - v := s.values[attr] - v.filtered += value - s.values[attr] = v - s.Unlock() -} - // newPrecomputedDeltaSum returns an Aggregator that summarizes a set of // pre-computed sums. Each sum is scoped by attributes and the aggregation // cycle the measurements were made in. @@ -218,17 +161,17 @@ func (s *precomputedMap[N]) aggregateFiltered(value N, attr attribute.Set) { // // The output Aggregation will report recorded values as delta temporality. func newPrecomputedDeltaSum[N int64 | float64](monotonic bool) aggregator[N] { return &precomputedDeltaSum[N]{ - precomputedMap: newPrecomputedMap[N](), - reported: make(map[attribute.Set]N), - monotonic: monotonic, - start: now(), + valueMap: newValueMap[N](), + reported: make(map[attribute.Set]N), + monotonic: monotonic, + start: now(), } } // precomputedDeltaSum summarizes a set of pre-computed sums recorded over all // aggregation cycles as the delta of these sums. type precomputedDeltaSum[N int64 | float64] struct { - *precomputedMap[N] + *valueMap[N] reported map[attribute.Set]N @@ -263,15 +206,14 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { DataPoints: make([]metricdata.DataPoint[N], 0, len(s.values)), } for attr, value := range s.values { - v := value.measured + value.filtered - delta := v - s.reported[attr] + delta := value - s.reported[attr] out.DataPoints = append(out.DataPoints, metricdata.DataPoint[N]{ Attributes: attr, StartTime: s.start, Time: t, Value: delta, }) - newReported[attr] = v + newReported[attr] = value // Unused attribute sets do not report. delete(s.values, attr) } @@ -294,15 +236,15 @@ func (s *precomputedDeltaSum[N]) Aggregation() metricdata.Aggregation { // temporality. func newPrecomputedCumulativeSum[N int64 | float64](monotonic bool) aggregator[N] { return &precomputedCumulativeSum[N]{ - precomputedMap: newPrecomputedMap[N](), - monotonic: monotonic, - start: now(), + valueMap: newValueMap[N](), + monotonic: monotonic, + start: now(), } } // precomputedCumulativeSum directly records and reports a set of pre-computed sums. type precomputedCumulativeSum[N int64 | float64] struct { - *precomputedMap[N] + *valueMap[N] monotonic bool start time.Time @@ -337,7 +279,7 @@ func (s *precomputedCumulativeSum[N]) Aggregation() metricdata.Aggregation { Attributes: attr, StartTime: s.start, Time: t, - Value: value.measured + value.filtered, + Value: value, }) // Unused attribute sets do not report. delete(s.values, attr) diff --git a/sdk/metric/internal/aggregate/sum_test.go b/sdk/metric/internal/aggregate/sum_test.go index e128459b1d0..0843bcf8429 100644 --- a/sdk/metric/internal/aggregate/sum_test.go +++ b/sdk/metric/internal/aggregate/sum_test.go @@ -37,6 +37,7 @@ func testSum[N int64 | float64](t *testing.T) { MeasurementN: defaultMeasurements, CycleN: defaultCycles, } + totalMeasurements := defaultGoroutines * defaultMeasurements t.Run("Delta", func(t *testing.T) { incr, mono := monoIncr[N](), true @@ -60,21 +61,21 @@ func testSum[N int64 | float64](t *testing.T) { t.Run("PreComputedDelta", func(t *testing.T) { incr, mono := monoIncr[N](), true - eFunc := preDeltaExpecter[N](incr, mono) + eFunc := preDeltaExpecter[N](incr, mono, N(totalMeasurements)) t.Run("Monotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr[N](), false - eFunc = preDeltaExpecter[N](incr, mono) + eFunc = preDeltaExpecter[N](incr, mono, N(totalMeasurements)) t.Run("NonMonotonic", tester.Run(newPrecomputedDeltaSum[N](mono), incr, eFunc)) }) t.Run("PreComputedCumulative", func(t *testing.T) { incr, mono := monoIncr[N](), true - eFunc := preCumuExpecter[N](incr, mono) + eFunc := preCumuExpecter[N](incr, mono, N(totalMeasurements)) t.Run("Monotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) incr, mono = nonMonoIncr[N](), false - eFunc = preCumuExpecter[N](incr, mono) + eFunc = preCumuExpecter[N](incr, mono, N(totalMeasurements)) t.Run("NonMonotonic", tester.Run(newPrecomputedCumulativeSum[N](mono), incr, eFunc)) }) } @@ -103,26 +104,26 @@ func cumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { } } -func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { +func preDeltaExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { sum := metricdata.Sum[N]{Temporality: metricdata.DeltaTemporality, IsMonotonic: mono} last := make(map[attribute.Set]N) return func(int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { l := last[a] - sum.DataPoints = append(sum.DataPoints, point(a, N(v)-l)) + sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*(N(v)-l))) last[a] = N(v) } return sum } } -func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool) expectFunc { +func preCumuExpecter[N int64 | float64](incr setMap[N], mono bool, totalMeasurements N) expectFunc { sum := metricdata.Sum[N]{Temporality: metricdata.CumulativeTemporality, IsMonotonic: mono} return func(int) metricdata.Aggregation { sum.DataPoints = make([]metricdata.DataPoint[N], 0, len(incr)) for a, v := range incr { - sum.DataPoints = append(sum.DataPoints, point(a, N(v))) + sum.DataPoints = append(sum.DataPoints, point(a, totalMeasurements*N(v))) } return sum } @@ -167,118 +168,65 @@ func TestDeltaSumReset(t *testing.T) { func TestPreComputedDeltaSum(t *testing.T) { var mono bool agg := newPrecomputedDeltaSum[int64](mono) - require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + require.Implements(t, (*aggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) - got := agg.Aggregation() want := metricdata.Sum[int64]{ IsMonotonic: mono, Temporality: metricdata.DeltaTemporality, DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, } opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // No observation means no metric data - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + // No observation results in an empty aggregation, and causes previous + // observations to be forgotten. + metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) - got = agg.Aggregation() - // measured(+): 1, previous(-): 1, filtered(+): 1 + agg.Aggregate(1, attrs) + // measured(+): 1, previous(-): 0 want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - // No observation means no metric data - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // Override set value. + // Duplicate observations add agg.Aggregate(2, attrs) agg.Aggregate(5, attrs) - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - // measured(+): 5, previous(-): 0, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - agg.Aggregate(5, attrs) - got = agg.Aggregation() - // measured(+): 5, previous(-): 18, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Order should not affect measure. - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.Aggregate(7, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - // measured(+): 7, previous(-): 5, filtered(+): 13 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 15)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - agg.Aggregate(7, attrs) - got = agg.Aggregation() - // measured(+): 7, previous(-): 20, filtered(+): 0 - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, -13)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + agg.Aggregate(3, attrs) + agg.Aggregate(10, attrs) + // measured(+): 20, previous(-): 1 + want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 19)} + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) } func TestPreComputedCumulativeSum(t *testing.T) { var mono bool agg := newPrecomputedCumulativeSum[int64](mono) - require.Implements(t, (*precomputeAggregator[int64])(nil), agg) + require.Implements(t, (*aggregator[int64])(nil), agg) attrs := attribute.NewSet(attribute.String("key", "val")) agg.Aggregate(1, attrs) - got := agg.Aggregation() want := metricdata.Sum[int64]{ IsMonotonic: mono, Temporality: metricdata.CumulativeTemporality, DataPoints: []metricdata.DataPoint[int64]{point[int64](attrs, 1)}, } opt := metricdatatest.IgnoreTimestamp() - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) // Cumulative values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, nil, agg.Aggregation(), opt) - agg.(precomputeAggregator[int64]).aggregateFiltered(1, attrs) - got = agg.Aggregation() + agg.Aggregate(1, attrs) want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 1)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) - // Override set value. + // Duplicate measurements add agg.Aggregate(5, attrs) - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() + agg.Aggregate(3, attrs) + agg.Aggregate(10, attrs) want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 18)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) - - // Filtered values should not persist. - got = agg.Aggregation() - metricdatatest.AssertAggregationsEqual(t, nil, got, opt) - - // Order should not affect measure. - // Filtered should add. - agg.(precomputeAggregator[int64]).aggregateFiltered(3, attrs) - agg.Aggregate(7, attrs) - agg.(precomputeAggregator[int64]).aggregateFiltered(10, attrs) - got = agg.Aggregation() - want.DataPoints = []metricdata.DataPoint[int64]{point[int64](attrs, 20)} - metricdatatest.AssertAggregationsEqual(t, want, got, opt) + metricdatatest.AssertAggregationsEqual(t, want, agg.Aggregation(), opt) } func TestEmptySumNilAggregation(t *testing.T) { diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 185095e4f8d..8657ccc7135 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -960,9 +960,6 @@ func TestGlobalInstRegisterCallback(t *testing.T) { _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) assert.NoError(t, err) - _, err = preMtr.RegisterCallback(cb, preInt64Ctr, preFloat64Ctr, postInt64Ctr, postFloat64Ctr) - assert.NoError(t, err) - got := metricdata.ResourceMetrics{} err = rdr.Collect(context.Background(), &got) assert.NoError(t, err)