diff --git a/CHANGELOG.md b/CHANGELOG.md index d777414afce..f2044274f5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add `ManualReader` struct in `go.opentelemetry.io/otel/sdk/metric`. (#4244) - Add `PeriodicReader` struct in `go.opentelemetry.io/otel/sdk/metric`. (#4244) +- Add support for exponential histogram aggregations. + A histogram can be configured as an exponential histogram using a view with `go.opentelemetry.io/otel/sdk/metric/aggregation.ExponentialHistogram` as the aggregation. (#4245) - Add `Exporter` struct in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc`. (#4272) - Add `Exporter` struct in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#4272) - OTLP Metrics Exporter now supports the `OTEL_EXPORTER_OTLP_METRICS_TEMPORALITY_PREFERENCE` environment variable. (#4287) diff --git a/sdk/metric/aggregation/aggregation.go b/sdk/metric/aggregation/aggregation.go index f1c215069f4..850e309ea37 100644 --- a/sdk/metric/aggregation/aggregation.go +++ b/sdk/metric/aggregation/aggregation.go @@ -164,3 +164,58 @@ func (h ExplicitBucketHistogram) Copy() Aggregation { NoMinMax: h.NoMinMax, } } + +// Base2ExponentialHistogram is an aggregation that summarizes a set of +// measurements as an histogram with bucket widths that grow exponentially. +type Base2ExponentialHistogram struct { + // MaxSize is the maximum number of buckets to use for the histogram. + MaxSize int32 + // MaxScale is the maximum resolution scale to use for the histogram. + // + // MaxScale has a maximum value of 20. Using a value of 20 means the + // maximum number of buckets that can fit within the range of a + // signed 32-bit integer index could be used. + // + // MaxScale has a minimum value of -10. Using a value of -10 means only + // two buckets will be use. + MaxScale int32 + + // NoMinMax indicates whether to not record the min and max of the + // distribution. By default, these extrema are recorded. + // + // Recording these extrema for cumulative data is expected to have little + // value, they will represent the entire life of the instrument instead of + // just the current collection cycle. It is recommended to set this to true + // for that type of data to avoid computing the low-value extrema. + NoMinMax bool +} + +var _ Aggregation = Base2ExponentialHistogram{} + +// private attempts to ensure no user-defined Aggregation is allowed. The +// OTel specification does not allow user-defined Aggregation currently. +func (e Base2ExponentialHistogram) private() {} + +// Copy returns a deep copy of the Aggregation. +func (e Base2ExponentialHistogram) Copy() Aggregation { + return e +} + +const ( + expoMaxScale = 20 + expoMinScale = -10 +) + +// errExpoHist is returned by misconfigured Base2ExponentialBucketHistograms. +var errExpoHist = fmt.Errorf("%w: exponential histogram", errAgg) + +// Err returns an error for any misconfigured Aggregation. +func (e Base2ExponentialHistogram) Err() error { + if e.MaxScale > expoMaxScale { + return fmt.Errorf("%w: max size %d is greater than maximum scale %d", errExpoHist, e.MaxSize, expoMaxScale) + } + if e.MaxSize <= 0 { + return fmt.Errorf("%w: max size %d is less than or equal to zero", errExpoHist, e.MaxSize) + } + return nil +} diff --git a/sdk/metric/aggregation/aggregation_test.go b/sdk/metric/aggregation/aggregation_test.go index 0f2846d3a76..15bc37f2500 100644 --- a/sdk/metric/aggregation/aggregation_test.go +++ b/sdk/metric/aggregation/aggregation_test.go @@ -55,6 +55,34 @@ func TestAggregationErr(t *testing.T) { Boundaries: []float64{0, 1, 2, 1, 3, 4}, }.Err(), errAgg) }) + + t.Run("ExponentialHistogramOperation", func(t *testing.T) { + assert.NoError(t, Base2ExponentialHistogram{ + MaxSize: 160, + MaxScale: 20, + }.Err()) + + assert.NoError(t, Base2ExponentialHistogram{ + MaxSize: 1, + NoMinMax: true, + }.Err()) + + assert.NoError(t, Base2ExponentialHistogram{ + MaxSize: 1024, + MaxScale: -3, + }.Err()) + }) + + t.Run("InvalidExponentialHistogramOperation", func(t *testing.T) { + // MazSize must be greater than 0 + assert.ErrorIs(t, Base2ExponentialHistogram{}.Err(), errAgg) + + // MaxScale Must be <=20 + assert.ErrorIs(t, Base2ExponentialHistogram{ + MaxSize: 1, + MaxScale: 30, + }.Err(), errAgg) + }) } func TestExplicitBucketHistogramDeepCopy(t *testing.T) { diff --git a/sdk/metric/internal/aggregate/aggregate.go b/sdk/metric/internal/aggregate/aggregate.go index a6cceb2c253..6dd531d1cbb 100644 --- a/sdk/metric/internal/aggregate/aggregate.go +++ b/sdk/metric/internal/aggregate/aggregate.go @@ -112,6 +112,18 @@ func (b Builder[N]) ExplicitBucketHistogram(cfg aggregation.ExplicitBucketHistog } } +// ExponentialBucketHistogram returns a histogram aggregate function input and +// output. +func (b Builder[N]) ExponentialBucketHistogram(cfg aggregation.Base2ExponentialHistogram, noSum bool) (Measure[N], ComputeAggregation) { + h := newExponentialHistogram[N](cfg, noSum) + switch b.Temporality { + case metricdata.DeltaTemporality: + return b.filter(h.measure), h.delta + default: + return b.filter(h.measure), h.cumulative + } +} + // reset ensures s has capacity and sets it length. If the capacity of s too // small, a new slice is returned with the specified capacity and length. func reset[T any](s []T, length, capacity int) []T { diff --git a/sdk/metric/internal/aggregate/exponential_histogram.go b/sdk/metric/internal/aggregate/exponential_histogram.go new file mode 100644 index 00000000000..8434500f0df --- /dev/null +++ b/sdk/metric/internal/aggregate/exponential_histogram.go @@ -0,0 +1,497 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate // import "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" + +import ( + "context" + "errors" + "math" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +const ( + expoMaxScale = 20 + expoMinScale = -10 + + smallestNonZeroNormalFloat64 = 0x1p-1022 + + // These redefine the Math constants with a type, so the compiler won't coerce + // them into an int on 32 bit platforms. + maxInt64 int64 = math.MaxInt64 + minInt64 int64 = math.MinInt64 +) + +// expoHistogramValues summarizes a set of measurements as expoHistogramDataPoints using +// dynamically scaled buckets. +type expoHistogramValues[N int64 | float64] struct { + noSum bool + noMinMax bool + maxSize int + maxScale int + + values map[attribute.Set]*expoHistogramDataPoint[N] + valuesMu sync.Mutex +} + +func newExpoHistValues[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramValues[N] { + return &expoHistogramValues[N]{ + noSum: noSum, + noMinMax: noMinMax, + maxSize: maxSize, + maxScale: maxScale, + + values: make(map[attribute.Set]*expoHistogramDataPoint[N]), + } +} + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. +func (e *expoHistogramValues[N]) measure(_ context.Context, value N, attr attribute.Set) { + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + v, ok := e.values[attr] + if !ok { + v = newExpoHistogramDataPoint[N](e.maxSize, e.maxScale, e.noMinMax, e.noSum) + e.values[attr] = v + } + v.record(value) +} + +// expoHistogramDataPoint is a single data point in an exponential histogram. +type expoHistogramDataPoint[N int64 | float64] struct { + count uint64 + min N + max N + sum N + + maxSize int + noMinMax bool + noSum bool + + scale int + + posBuckets expoBuckets + negBuckets expoBuckets + zeroCount uint64 +} + +func newExpoHistogramDataPoint[N int64 | float64](maxSize, maxScale int, noMinMax, noSum bool) *expoHistogramDataPoint[N] { + f := math.MaxFloat64 + max := N(f) // if N is int64, max will overflow to -9223372036854775808 + min := N(-f) + if N(maxInt64) > N(f) { + max = N(maxInt64) + min = N(minInt64) + } + return &expoHistogramDataPoint[N]{ + min: max, + max: min, + maxSize: maxSize, + noMinMax: noMinMax, + noSum: noSum, + scale: maxScale, + } +} + +// record adds a new measurement to the histogram. It will rescale the buckets if needed. +func (p *expoHistogramDataPoint[N]) record(v N) { + p.count++ + + if !p.noMinMax { + if v < p.min { + p.min = v + } + if v > p.max { + p.max = v + } + } + if !p.noSum { + p.sum += v + } + + absV := math.Abs(float64(v)) + + if float64(absV) == 0.0 { + p.zeroCount++ + return + } + + bin := getBin(absV, p.scale) + + bucket := &p.posBuckets + if v < 0 { + bucket = &p.negBuckets + } + + // If the new bin would make the counts larger than maxScale, we need to + // downscale current measurements. + if scaleDelta := scaleChange(bin, bucket.startBin, len(bucket.counts), p.maxSize); scaleDelta > 0 { + if p.scale-scaleDelta < expoMinScale { + // With a scale of -10 there is only two buckets for the whole range of float64 values. + // This can only happen if there is a max size of 1. + otel.Handle(errors.New("exponential histogram scale underflow")) + return + } + //Downscale + p.scale -= scaleDelta + p.posBuckets.downscale(scaleDelta) + p.negBuckets.downscale(scaleDelta) + + bin = getBin(absV, p.scale) + } + + bucket.record(bin) +} + +// getBin returns the bin of the bucket that the value v should be recorded +// into at the given scale. +func getBin(v float64, scale int) int { + frac, exp := math.Frexp(v) + if scale <= 0 { + // Because of the choice of fraction is always 1 power of two higher than we want. + correction := 1 + if frac == .5 { + // If v is an exact power of two the frac will be .5 and the exp + // will be one higher than we want. + correction = 2 + } + return (exp - correction) >> (-scale) + } + return exp<= bin { + low = bin + high = startBin + length - 1 + } + + count := 0 + for high-low >= maxSize { + low = low >> 1 + high = high >> 1 + count++ + if count > expoMaxScale-expoMinScale { + return count + } + } + return count +} + +// expoBuckets is a set of buckets in an exponential histogram. +type expoBuckets struct { + startBin int + counts []uint64 +} + +// record increments the count for the given bin, and expands the buckets if needed. +// Size changes must be done before calling this function. +func (b *expoBuckets) record(bin int) { + if len(b.counts) == 0 { + b.counts = []uint64{1} + b.startBin = bin + return + } + + endBin := b.startBin + len(b.counts) - 1 + + // if the new bin is inside the current range + if bin >= b.startBin && bin <= endBin { + b.counts[bin-b.startBin]++ + return + } + // if the new bin is before the current start add spaces to the counts + if bin < b.startBin { + origLen := len(b.counts) + newLength := endBin - bin + 1 + shift := b.startBin - bin + + if newLength > cap(b.counts) { + b.counts = append(b.counts, make([]uint64, newLength-len(b.counts))...) + } + + copy(b.counts[shift:origLen+shift], b.counts[:]) + b.counts = b.counts[:newLength] + for i := 1; i < shift; i++ { + b.counts[i] = 0 + } + b.startBin = bin + b.counts[0] = 1 + return + } + // if the new is after the end add spaces to the end + if bin > endBin { + if bin-b.startBin < cap(b.counts) { + b.counts = b.counts[:bin-b.startBin+1] + for i := endBin + 1 - b.startBin; i < len(b.counts); i++ { + b.counts[i] = 0 + } + b.counts[bin-b.startBin] = 1 + return + } + + end := make([]uint64, bin-b.startBin-len(b.counts)+1) + b.counts = append(b.counts, end...) + b.counts[bin-b.startBin] = 1 + } +} + +// downscale shrinks a bucket by a factor of 2*s. It will sum counts into the +// correct lower resolution bucket. +func (b *expoBuckets) downscale(delta int) { + // Example + // delta = 2 + // Original offset: -6 + // Counts: [ 3, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + // bins: -6 -5, -4, -3, -2, -1, 0, 1, 2, 3, 4 + // new bins:-2, -2, -1, -1, -1, -1, 0, 0, 0, 0, 1 + // new Offset: -2 + // new Counts: [4, 14, 30, 10] + + if len(b.counts) <= 1 || delta < 1 { + b.startBin = b.startBin >> delta + return + } + + steps := 1 << delta + offset := b.startBin % steps + offset = (offset + steps) % steps // to make offset positive + for i := 1; i < len(b.counts); i++ { + idx := i + offset + if idx%steps == 0 { + b.counts[idx/steps] = b.counts[i] + continue + } + b.counts[idx/steps] += b.counts[i] + } + + lastIdx := (len(b.counts) - 1 + offset) / steps + b.counts = b.counts[:lastIdx+1] + b.startBin = b.startBin >> delta +} + +// newExponentialHistogram returns an Aggregator that summarizes a set of +// measurements as an exponential histogram. Each histogram is scoped by attributes +// and the aggregation cycle the measurements were made in. +func newExponentialHistogram[N int64 | float64](cfg aggregation.Base2ExponentialHistogram, noSum bool) *expoHistogram[N] { + return &expoHistogram[N]{ + expoHistogramValues: newExpoHistValues[N]( + int(cfg.MaxSize), + int(cfg.MaxScale), + cfg.NoMinMax, + noSum, + ), + start: now(), + } +} + +// expoHistogram summarizes a set of measurements as an histogram with exponentially +// defined buckets. +type expoHistogram[N int64 | float64] struct { + *expoHistogramValues[N] + + start time.Time +} + +func (e *expoHistogram[N]) delta(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. + // In that case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.ExponentialHistogram[N]) + h.Temporality = metricdata.DeltaTemporality + + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + n := len(e.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range e.values { + hDPts[i].Attributes = a + hDPts[i].StartTime = e.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Scale = int32(b.scale) + hDPts[i].ZeroCount = b.zeroCount + hDPts[i].ZeroThreshold = 0.0 + + hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + + hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) + + if !e.noSum { + hDPts[i].Sum = b.sum + } + if !e.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + + delete(e.values, a) + i++ + } + e.start = t + h.DataPoints = hDPts + *dest = h + return n +} +func (e *expoHistogram[N]) cumulative(dest *metricdata.Aggregation) int { + t := now() + + // If *dest is not a metricdata.ExponentialHistogram, memory reuse is missed. + // In that case, use the zero-value h and hope for better alignment next cycle. + h, _ := (*dest).(metricdata.ExponentialHistogram[N]) + h.Temporality = metricdata.CumulativeTemporality + + e.valuesMu.Lock() + defer e.valuesMu.Unlock() + + n := len(e.values) + hDPts := reset(h.DataPoints, n, n) + + var i int + for a, b := range e.values { + hDPts[i].Attributes = a + hDPts[i].StartTime = e.start + hDPts[i].Time = t + hDPts[i].Count = b.count + hDPts[i].Scale = int32(b.scale) + hDPts[i].ZeroCount = b.zeroCount + hDPts[i].ZeroThreshold = 0.0 + + hDPts[i].PositiveBucket.Offset = int32(b.posBuckets.startBin) + hDPts[i].PositiveBucket.Counts = reset(hDPts[i].PositiveBucket.Counts, len(b.posBuckets.counts), len(b.posBuckets.counts)) + copy(hDPts[i].PositiveBucket.Counts, b.posBuckets.counts) + + hDPts[i].NegativeBucket.Offset = int32(b.negBuckets.startBin) + hDPts[i].NegativeBucket.Counts = reset(hDPts[i].NegativeBucket.Counts, len(b.negBuckets.counts), len(b.negBuckets.counts)) + + if !e.noSum { + hDPts[i].Sum = b.sum + } + if !e.noMinMax { + hDPts[i].Min = metricdata.NewExtrema(b.min) + hDPts[i].Max = metricdata.NewExtrema(b.max) + } + + i++ + // TODO (#3006): This will use an unbounded amount of memory if there + // are unbounded number of attribute sets being aggregated. Attribute + // sets that become "stale" need to be forgotten so this will not + // overload the system. + } + + h.DataPoints = hDPts + *dest = h + return n +} + +// Aggregate records the measurement, scoped by attr, and aggregates it +// into an aggregation. +// func (e *cumulativeExponentialHistogram[N]) Aggregation() metricdata.Aggregation { +// e.valuesMu.Lock() +// defer e.valuesMu.Unlock() + +// if len(e.values) == 0 { +// return nil +// } +// t := now() +// h := metricdata.ExponentialHistogram[N]{ +// Temporality: metricdata.CumulativeTemporality, +// DataPoints: make([]metricdata.ExponentialHistogramDataPoint[N], 0, len(e.values)), +// } +// for a, b := range e.values { +// ehdp := metricdata.ExponentialHistogramDataPoint[N]{ +// Attributes: a, +// StartTime: e.start, +// Time: t, +// Count: b.count, +// Scale: int32(b.scale), +// ZeroCount: b.zeroCount, +// ZeroThreshold: 0.0, +// PositiveBucket: metricdata.ExponentialBucket{ +// Offset: int32(b.posBuckets.startBin), +// Counts: make([]uint64, len(b.posBuckets.counts)), +// }, +// NegativeBucket: metricdata.ExponentialBucket{ +// Offset: int32(b.negBuckets.startBin), +// Counts: make([]uint64, len(b.negBuckets.counts)), +// }, +// } +// copy(ehdp.PositiveBucket.Counts, b.posBuckets.counts) +// copy(ehdp.NegativeBucket.Counts, b.negBuckets.counts) + +// if !e.noMinMax { +// ehdp.Min = metricdata.NewExtrema(b.min) +// ehdp.Max = metricdata.NewExtrema(b.max) +// } +// if !e.noSum { +// ehdp.Sum = b.sum +// } +// h.DataPoints = append(h.DataPoints, ehdp) +// // TODO (#3006): This will use an unbounded amount of memory if there +// // are unbounded number of attribute sets being aggregated. Attribute +// // sets that become "stale" need to be forgotten so this will not +// // overload the system. +// } + +// return h +// } diff --git a/sdk/metric/internal/aggregate/exponential_histogram_test.go b/sdk/metric/internal/aggregate/exponential_histogram_test.go new file mode 100644 index 00000000000..de949359ffd --- /dev/null +++ b/sdk/metric/internal/aggregate/exponential_histogram_test.go @@ -0,0 +1,905 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package aggregate + +import ( + "context" + "fmt" + "math" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/sdk/metric/aggregation" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +type noErrorHandler struct{ t *testing.T } + +func (h *noErrorHandler) Handle(e error) { + require.NoError(h.t, e) +} + +func withHandler(t *testing.T) func() { + t.Helper() + h := &noErrorHandler{t: t} + original := global.GetErrorHandler() + global.SetErrorHandler(h) + return func() { global.SetErrorHandler(original) } +} + +func TestExpoHistogramDataPointRecord(t *testing.T) { + t.Run("float64", testExpoHistogramDataPointRecord[float64]) + t.Run("float64 MinMaxSum", testExpoHistogramDataPointRecordMinMaxSum[float64]) + t.Run("float64-2", testExpoHistogramDataPointRecordFloat64) + t.Run("int64", testExpoHistogramDataPointRecord[int64]) + t.Run("int64 MinMaxSum", testExpoHistogramDataPointRecordMinMaxSum[int64]) +} + +// TODO: This can be defined in the test after we drop support for go1.19. +type expoHistogramDataPointRecordTestCase[N int64 | float64] struct { + maxSize int + values []N + expectedBuckets expoBuckets + expectedScale int +} + +func testExpoHistogramDataPointRecord[N int64 | float64](t *testing.T) { + testCases := []expoHistogramDataPointRecordTestCase[N]{ + { + maxSize: 4, + values: []N{2, 4, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 1, 1}, + }, + expectedScale: 0, + }, + { + maxSize: 4, + values: []N{4, 4, 4, 2, 16, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{1, 4, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{1, 2, 4}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{1, 4, 2}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{2, 4, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{2, 1, 4}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{4, 1, 2}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []N{4, 2, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + + counts: []uint64{1, 2}, + }, + expectedScale: -1, + }, + } + for _, tt := range testCases { + t.Run(fmt.Sprint(tt.values), func(t *testing.T) { + restore := withHandler(t) + defer restore() + + dp := newExpoHistogramDataPoint[N](tt.maxSize, 20, false, false) + for _, v := range tt.values { + dp.record(v) + dp.record(-v) + } + + assert.Equal(t, tt.expectedBuckets, dp.posBuckets, "positive buckets") + assert.Equal(t, tt.expectedBuckets, dp.negBuckets, "negative buckets") + assert.Equal(t, tt.expectedScale, dp.scale, "scale") + }) + } +} + +// TODO: This can be defined in the test after we drop support for go1.19. +type expectedMinMaxSum[N int64 | float64] struct { + min N + max N + sum N + count uint +} +type expoHistogramDataPointRecordMinMaxSumTestCase[N int64 | float64] struct { + values []N + expected expectedMinMaxSum[N] +} + +func testExpoHistogramDataPointRecordMinMaxSum[N int64 | float64](t *testing.T) { + testCases := []expoHistogramDataPointRecordMinMaxSumTestCase[N]{ + { + values: []N{2, 4, 1}, + expected: expectedMinMaxSum[N]{1, 4, 7, 3}, + }, + { + values: []N{4, 4, 4, 2, 16, 1}, + expected: expectedMinMaxSum[N]{1, 16, 31, 6}, + }, + } + + for _, tt := range testCases { + t.Run(fmt.Sprint(tt.values), func(t *testing.T) { + restore := withHandler(t) + defer restore() + + dp := newExpoHistogramDataPoint[N](4, 20, false, false) + for _, v := range tt.values { + dp.record(v) + } + + assert.Equal(t, tt.expected.max, dp.max) + assert.Equal(t, tt.expected.min, dp.min) + assert.Equal(t, tt.expected.sum, dp.sum) + }) + } +} + +func testExpoHistogramDataPointRecordFloat64(t *testing.T) { + type TestCase struct { + maxSize int + values []float64 + expectedBuckets expoBuckets + expectedScale int + } + + testCases := []TestCase{ + { + maxSize: 4, + values: []float64{2, 2, 2, 1, 8, 0.5}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 3, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{1, 0.5, 2}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{1, 2, 0.5}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{2, 0.5, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{2, 1, 0.5}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{0.5, 1, 2}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + { + maxSize: 2, + values: []float64{0.5, 2, 1}, + expectedBuckets: expoBuckets{ + startBin: -1, + counts: []uint64{2, 1}, + }, + expectedScale: -1, + }, + } + for _, tt := range testCases { + t.Run(fmt.Sprint(tt.values), func(t *testing.T) { + restore := withHandler(t) + defer restore() + + dp := newExpoHistogramDataPoint[float64](tt.maxSize, 20, false, false) + for _, v := range tt.values { + dp.record(v) + dp.record(-v) + } + + assert.Equal(t, tt.expectedBuckets, dp.posBuckets) + assert.Equal(t, tt.expectedBuckets, dp.negBuckets) + assert.Equal(t, tt.expectedScale, dp.scale) + }) + } +} + +func TestExponentialHistogramDataPointRecordLimits(t *testing.T) { + // These bins are calculated from the following formula: + // floor( log2( value) * 2^20 ) using an arbitrary precision calculator. + + fdp := newExpoHistogramDataPoint[float64](4, 20, false, false) + fdp.record(math.MaxFloat64) + + if fdp.posBuckets.startBin != 1073741823 { + t.Errorf("Expected startBin to be 1073741823, got %d", fdp.posBuckets.startBin) + } + + fdp = newExpoHistogramDataPoint[float64](4, 20, false, false) + fdp.record(math.SmallestNonzeroFloat64) + + if fdp.posBuckets.startBin != -1126170625 { + t.Errorf("Expected startBin to be -1126170625, got %d", fdp.posBuckets.startBin) + } + + idp := newExpoHistogramDataPoint[int64](4, 20, false, false) + idp.record(math.MaxInt64) + + if idp.posBuckets.startBin != 66060287 { + t.Errorf("Expected startBin to be 66060287, got %d", idp.posBuckets.startBin) + } +} + +func TestExpoBucketDownscale(t *testing.T) { + tests := []struct { + name string + bucket *expoBuckets + scale int + want *expoBuckets + }{ + { + name: "Empty bucket", + bucket: &expoBuckets{}, + scale: 3, + want: &expoBuckets{}, + }, + { + name: "1 size bucket", + bucket: &expoBuckets{ + startBin: 50, + counts: []uint64{7}, + }, + scale: 4, + want: &expoBuckets{ + startBin: 3, + counts: []uint64{7}, + }, + }, + { + name: "zero scale", + bucket: &expoBuckets{ + startBin: 50, + counts: []uint64{7, 5}, + }, + scale: 0, + want: &expoBuckets{ + startBin: 50, + counts: []uint64{7, 5}, + }, + }, + { + name: "aligned bucket scale 1", + bucket: &expoBuckets{ + startBin: 0, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scale: 1, + want: &expoBuckets{ + startBin: 0, + counts: []uint64{3, 7, 11}, + }, + }, + { + name: "aligned bucket scale 2", + bucket: &expoBuckets{ + startBin: 0, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scale: 2, + want: &expoBuckets{ + startBin: 0, + counts: []uint64{10, 11}, + }, + }, + { + name: "aligned bucket scale 3", + bucket: &expoBuckets{ + startBin: 0, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + scale: 3, + want: &expoBuckets{ + startBin: 0, + counts: []uint64{21}, + }, + }, + { + name: "unaligned bucket scale 1", + bucket: &expoBuckets{ + startBin: 5, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, // This is equivalent to [0,0,0,0,0,1,2,3,4,5,6] + scale: 1, + want: &expoBuckets{ + startBin: 2, + counts: []uint64{1, 5, 9, 6}, + }, // This is equivalent to [0,0,1,5,9,6] + }, + { + name: "unaligned bucket scale 2", + bucket: &expoBuckets{ + startBin: 7, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, // This is equivalent to [0,0,0,0,0,0,0,1,2,3,4,5,6] + scale: 2, + want: &expoBuckets{ + startBin: 1, + counts: []uint64{1, 14, 6}, + }, // This is equivalent to [0,1,14,6] + }, + { + name: "unaligned bucket scale 3", + bucket: &expoBuckets{ + startBin: 3, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, // This is equivalent to [0,0,0,1,2,3,4,5,6] + scale: 3, + want: &expoBuckets{ + startBin: 0, + counts: []uint64{15, 6}, + }, // This is equivalent to [0,15,6] + }, + { + name: "unaligned bucket scale 1", + bucket: &expoBuckets{ + startBin: 1, + counts: []uint64{1, 0, 1}, + }, + scale: 1, + want: &expoBuckets{ + startBin: 0, + counts: []uint64{1, 1}, + }, + }, + { + name: "negative startBin", + bucket: &expoBuckets{ + startBin: -1, + counts: []uint64{1, 0, 3}, + }, + scale: 1, + want: &expoBuckets{ + startBin: -1, + counts: []uint64{1, 3}, + }, + }, + { + name: "negative startBin 2", + bucket: &expoBuckets{ + startBin: -4, + counts: []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}, + }, + scale: 1, + want: &expoBuckets{ + startBin: -2, + counts: []uint64{3, 7, 11, 15, 19}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.bucket.downscale(tt.scale) + + assert.Equal(t, tt.want, tt.bucket) + }) + } +} + +func TestExpoBucketRecord(t *testing.T) { + tests := []struct { + name string + bucket *expoBuckets + bin int + want *expoBuckets + }{ + { + name: "Empty Bucket creates first count", + bucket: &expoBuckets{}, + bin: -5, + want: &expoBuckets{ + startBin: -5, + counts: []uint64{1}, + }, + }, + { + name: "Bin is in the bucket", + bucket: &expoBuckets{ + startBin: 3, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + bin: 5, + want: &expoBuckets{ + startBin: 3, + counts: []uint64{1, 2, 4, 4, 5, 6}, + }, + }, + { + name: "Bin is before the start of the bucket", + bucket: &expoBuckets{ + startBin: 1, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + bin: -2, + want: &expoBuckets{ + startBin: -2, + counts: []uint64{1, 0, 0, 1, 2, 3, 4, 5, 6}, + }, + }, + { + name: "Bin is after the end of the bucket", + bucket: &expoBuckets{ + startBin: -2, + counts: []uint64{1, 2, 3, 4, 5, 6}, + }, + bin: 4, + want: &expoBuckets{ + startBin: -2, + counts: []uint64{1, 2, 3, 4, 5, 6, 1}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.bucket.record(tt.bin) + + assert.Equal(t, tt.want, tt.bucket) + }) + } +} + +func TestScaleChange(t *testing.T) { + type args struct { + bin int + startBin int + length int + maxSize int + } + tests := []struct { + name string + args args + want int + }{ + { + name: "if length is 0, no rescale is needed", + // [] -> [5] Length 1 + args: args{ + bin: 5, + startBin: 0, + length: 0, + maxSize: 4, + }, + want: 0, + }, + { + name: "if bin is between start, and the end, no rescale needed", + // [-1, ..., 8] Length 10 -> [-1, ..., 5, ..., 8] Length 10 + args: args{ + bin: 5, + startBin: -1, + length: 10, + maxSize: 20, + }, + want: 0, + }, + { + name: "if len([bin,... end]) > maxSize, rescale needed", + // [8,9,10] Length 3 -> [5, ..., 10] Length 6 + args: args{ + bin: 5, + startBin: 8, + length: 3, + maxSize: 5, + }, + want: 1, + }, + { + name: "if len([start, ..., bin]) > maxSize, rescale needed", + // [2,3,4] Length 3 -> [2, ..., 7] Length 6 + args: args{ + bin: 7, + startBin: 2, + length: 3, + maxSize: 5, + }, + want: 1, + }, + { + name: "if len([start, ..., bin]) > maxSize, rescale needed", + // [2,3,4] Length 3 -> [2, ..., 7] Length 12 + args: args{ + bin: 13, + startBin: 2, + length: 3, + maxSize: 5, + }, + want: 2, + }, + { + name: "It should not hang if it will never be able to rescale", + args: args{ + bin: 1, + startBin: -1, + length: 1, + maxSize: 1, + }, + want: 31, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := scaleChange(tt.args.bin, tt.args.startBin, tt.args.length, tt.args.maxSize) + if got != tt.want { + t.Errorf("scaleChange() = %v, want %v", got, tt.want) + } + }) + } +} + +func BenchmarkPrepend(b *testing.B) { + for i := 0; i < b.N; i++ { + agg := newExpoHistogramDataPoint[float64](1024, 20, false, false) + n := math.MaxFloat64 + for j := 0; j < 1024; j++ { + agg.record(n) + n = n / 2 + } + } +} + +func BenchmarkAppend(b *testing.B) { + for i := 0; i < b.N; i++ { + agg := newExpoHistogramDataPoint[float64](1024, 200, false, false) + n := smallestNonZeroNormalFloat64 + for j := 0; j < 1024; j++ { + agg.record(n) + n = n * 2 + } + } +} + +var expoHistConf = aggregation.Base2ExponentialHistogram{ + MaxSize: 160, + MaxScale: 20, +} + +func BenchmarkExponentialHistogram(b *testing.B) { + b.Run("Int64/Cumulative", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.CumulativeTemporality, + }.ExponentialBucketHistogram(expoHistConf, false) + })) + b.Run("Int64/Delta", benchmarkAggregate(func() (Measure[int64], ComputeAggregation) { + return Builder[int64]{ + Temporality: metricdata.DeltaTemporality, + }.ExponentialBucketHistogram(expoHistConf, false) + })) + b.Run("Float64/Cumulative", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.CumulativeTemporality, + }.ExponentialBucketHistogram(expoHistConf, false) + })) + b.Run("Float64/Delta", benchmarkAggregate(func() (Measure[float64], ComputeAggregation) { + return Builder[float64]{ + Temporality: metricdata.DeltaTemporality, + }.ExponentialBucketHistogram(expoHistConf, false) + })) +} + +func TestSubNormal(t *testing.T) { + want := &expoHistogramDataPoint[float64]{ + maxSize: 4, + count: 3, + min: math.SmallestNonzeroFloat64, + max: math.SmallestNonzeroFloat64, + sum: 3 * math.SmallestNonzeroFloat64, + + scale: 20, + posBuckets: expoBuckets{ + startBin: -1126170625, + counts: []uint64{3}, + }, + } + + ehdp := newExpoHistogramDataPoint[float64](4, 20, false, false) + ehdp.record(math.SmallestNonzeroFloat64) + ehdp.record(math.SmallestNonzeroFloat64) + ehdp.record(math.SmallestNonzeroFloat64) + + assert.Equal(t, want, ehdp) +} + +func TestExponentialHistogramAggregation(t *testing.T) { + t.Run("Int64", testExponentialHistogramAggregation[int64]) + t.Run("Float64", testExponentialHistogramAggregation[float64]) +} + +// TODO: This can be defined in the test after we drop support for go1.19. +type exponentialHistogramAggregationTestCase[N int64 | float64] struct { + name string + build func() (Measure[N], ComputeAggregation) + input [][]N + want metricdata.ExponentialHistogram[N] + wantCount int +} + +func testExponentialHistogramAggregation[N int64 | float64](t *testing.T) { + cfg := aggregation.Base2ExponentialHistogram{ + MaxSize: 4, + MaxScale: 20, + } + + tests := []exponentialHistogramAggregationTestCase[N]{ + { + name: "Delta Single", + build: func() (Measure[N], ComputeAggregation) { + return Builder[N]{ + Temporality: metricdata.DeltaTemporality, + }.ExponentialBucketHistogram(cfg, false) + }, + input: [][]N{ + {4, 4, 4, 2, 16, 1}, + }, + want: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + wantCount: 1, + }, + { + name: "Cumulative Single", + build: func() (Measure[N], ComputeAggregation) { + return Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + }.ExponentialBucketHistogram(cfg, false) + }, + input: [][]N{ + {4, 4, 4, 2, 16, 1}, + }, + want: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + wantCount: 1, + }, + { + name: "Delta Multiple", + build: func() (Measure[N], ComputeAggregation) { + return Builder[N]{ + Temporality: metricdata.DeltaTemporality, + }.ExponentialBucketHistogram(cfg, false) + }, + input: [][]N{ + {2, 3, 8}, + {4, 4, 4, 2, 16, 1}, + }, + want: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.DeltaTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Count: 6, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 31, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 4, 1}, + }, + }, + }, + }, + wantCount: 1, + }, + { + name: "Cumulative Multiple ", + build: func() (Measure[N], ComputeAggregation) { + return Builder[N]{ + Temporality: metricdata.CumulativeTemporality, + }.ExponentialBucketHistogram(cfg, false) + }, + input: [][]N{ + {2, 3, 8}, + {4, 4, 4, 2, 16, 1}, + }, + want: metricdata.ExponentialHistogram[N]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.ExponentialHistogramDataPoint[N]{ + { + Count: 9, + Min: metricdata.NewExtrema[N](1), + Max: metricdata.NewExtrema[N](16), + Sum: 44, + Scale: -1, + PositiveBucket: metricdata.ExponentialBucket{ + Offset: -1, + Counts: []uint64{1, 6, 2}, + }, + }, + }, + }, + wantCount: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + restore := withHandler(t) + defer restore() + in, out := tt.build() + ctx := context.Background() + + var got metricdata.Aggregation + var count int + for _, n := range tt.input { + for _, v := range n { + in(ctx, v, *attribute.EmptySet()) + } + count = out(&got) + } + + metricdatatest.AssertAggregationsEqual(t, tt.want, got, metricdatatest.IgnoreTimestamp()) + assert.Equal(t, tt.wantCount, count) + }) + } +} + +func FuzzGetBin(f *testing.F) { + values := []float64{ + 2.0, + 0x1p35, + 0x1.0000000000001p35, + 0x1.fffffffffffffp34, + 0x1p300, + 0x1.0000000000001p300, + 0x1.fffffffffffffp299, + } + scales := []int{0, 15, -5} + + for _, s := range scales { + for _, v := range values { + f.Add(v, s) + } + } + + f.Fuzz(func(t *testing.T, v float64, scale int) { + // GetBin only works on positive values. + if math.Signbit(v) { + v = v * -1 + } + // GetBin Doesn't work on zero. + if v == 0.0 { + t.Skip("skipping test for zero") + } + + // GetBin is only used with a range of -10 to 20. + scale = (scale%31+31)%31 - 10 + + got := getBin(v, scale) + if v <= lowerBound(got, scale) { + t.Errorf("v=%x scale =%d had bin %d, but was below lower bound %x", v, scale, got, lowerBound(got, scale)) + } + if v > lowerBound(got+1, scale) { + t.Errorf("v=%x scale =%d had bin %d, but was above upper bound %x", v, scale, got, lowerBound(got+1, scale)) + } + }) +} + +func lowerBound(index int, scale int) float64 { + // The lowerBound of the index of Math.SmallestNonzeroFloat64 at any scale + // is always rounded down to 0.0. + // For example lowerBound(getBin(Math.SmallestNonzeroFloat64, 7), 7) == 0.0 + // 2 ^ (index * 2 ^ (-scale)) + return math.Exp2(math.Ldexp(float64(index), -scale)) +} diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index 9b9483fad89..fd28a4afc15 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -446,6 +446,17 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr noSum = true } meas, comp = b.ExplicitBucketHistogram(a, noSum) + case aggregation.Base2ExponentialHistogram: + var noSum bool + switch kind { + case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: + // The sum should not be collected for any instrument that can make + // negative measurements: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations + noSum = true + } + meas, comp = b.ExponentialBucketHistogram(a, noSum) + default: err = errUnknownAggregation } @@ -459,16 +470,16 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // | Instrument Kind | Drop | LastValue | Sum | Histogram | Exponential Histogram | // |--------------------------|------|-----------|-----|-----------|-----------------------| // | Counter | ✓ | | ✓ | ✓ | ✓ | -// | UpDownCounter | ✓ | | ✓ | ✓ | | +// | UpDownCounter | ✓ | | ✓ | ✓ | ✓ | // | Histogram | ✓ | | ✓ | ✓ | ✓ | -// | Observable Counter | ✓ | | ✓ | ✓ | | -// | Observable UpDownCounter | ✓ | | ✓ | ✓ | | -// | Observable Gauge | ✓ | ✓ | | ✓ | |. +// | Observable Counter | ✓ | | ✓ | ✓ | ✓ | +// | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ | +// | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |. func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { switch agg.(type) { case aggregation.Default: return nil - case aggregation.ExplicitBucketHistogram: + case aggregation.ExplicitBucketHistogram, aggregation.Base2ExponentialHistogram: switch kind { case InstrumentKindCounter, InstrumentKindUpDownCounter, diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 5969392c6cb..52fedd12471 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -623,6 +623,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindCounter, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "SyncCounter and ExponentialHistogram", + kind: InstrumentKindCounter, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "SyncUpDownCounter and Drop", kind: InstrumentKindUpDownCounter, @@ -644,6 +649,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "SyncUpDownCounter and ExponentialHistogram", + kind: InstrumentKindUpDownCounter, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "SyncHistogram and Drop", kind: InstrumentKindHistogram, @@ -665,6 +675,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindHistogram, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "SyncHistogram and ExponentialHistogram", + kind: InstrumentKindHistogram, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "ObservableCounter and Drop", kind: InstrumentKindObservableCounter, @@ -686,6 +701,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindObservableCounter, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "ObservableCounter and ExponentialHistogram", + kind: InstrumentKindObservableCounter, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "ObservableUpDownCounter and Drop", kind: InstrumentKindObservableUpDownCounter, @@ -707,6 +727,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindObservableUpDownCounter, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "ObservableUpDownCounter and ExponentialHistogram", + kind: InstrumentKindObservableUpDownCounter, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "ObservableGauge and Drop", kind: InstrumentKindObservableGauge, @@ -728,6 +753,11 @@ func TestIsAggregatorCompatible(t *testing.T) { kind: InstrumentKindObservableGauge, agg: aggregation.ExplicitBucketHistogram{}, }, + { + name: "ObservableGauge and ExponentialHistogram", + kind: InstrumentKindObservableGauge, + agg: aggregation.Base2ExponentialHistogram{}, + }, { name: "unknown kind with Sum should error", kind: undefinedInstrument, @@ -746,6 +776,12 @@ func TestIsAggregatorCompatible(t *testing.T) { agg: aggregation.ExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, + { + name: "unknown kind with Histogram should error", + kind: undefinedInstrument, + agg: aggregation.Base2ExponentialHistogram{}, + want: errIncompatibleAggregation, + }, } for _, tt := range testCases {