Skip to content

Commit

Permalink
[processor/interval] Support Gauges and Summaries (#34805)
Browse files Browse the repository at this point in the history
**Description:** Adds support for Gauges and Summaries 

**Link to tracking Issue:** #34803 

**Testing:** Unit tests were extended to cover the new behavior

**Documentation:** <Describe the documentation added.>

---------

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
  • Loading branch information
ArthurSens authored Aug 29, 2024
1 parent 38c4921 commit 3d5bb5f
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 31 deletions.
27 changes: 27 additions & 0 deletions .chloggen/intervalprocessor_gauge_summary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: processor/interval

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support for gauge and summary metrics.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34803]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Only the last value of a gauge or summary metric is reported in the interval processor, instead of all values.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 6 additions & 2 deletions processor/intervalprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,23 @@ The interval processor (`intervalprocessor`) aggregates metrics and periodically
* Monotonically increasing, cumulative sums
* Monotonically increasing, cumulative histograms
* Monotonically increasing, cumulative exponential histograms
* Gauges
* Summaries

The following metric types will *not* be aggregated, and will instead be passed, unchanged, to the next component in the pipeline:

* All delta metrics
* Non-monotonically increasing sums
* Gauges
* Summaries

> NOTE: Aggregating data over an interval is an inherently "lossy" process. For monotonically increasing, cumulative sums, histograms, and exponential histograms, you "lose" precision, but you don't lose overall data. But for non-monotonically increasing sums, gauges, and summaries, aggregation represents actual data loss. IE you could "lose" that a value increased and then decreased back to the original value. In most cases, this data "loss" is ok. However, if you would rather these values be passed through, and *not* aggregated, you can set that in the configuration
## Configuration

The following settings can be optionally configured:

* `interval`: The interval in which the processor should export the aggregated metrics. Default: 60s
* `gauge_pass_through`: Whether gauges should pass through as they are to the next component or be aggregated. Default: false
* `summary_pass_through`: Whether summaries should pass through as they are to the next component or be aggregated. Default: false

## Example of metric flows

Expand Down
8 changes: 7 additions & 1 deletion processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,14 @@ var _ component.Config = (*Config)(nil)

// Config defines the configuration for the processor.
type Config struct {
// Interval is the time
// Interval is the time interval at which the processor will aggregate metrics.
Interval time.Duration `mapstructure:"interval"`
// GaugePassThrough is a flag that determines whether gauge metrics should be passed through
// as they are or aggregated.
GaugePassThrough bool `mapstructure:"gauge_pass_through"`
// SummaryPassThrough is a flag that determines whether summary metrics should be passed through
// as they are or aggregated.
SummaryPassThrough bool `mapstructure:"summary_pass_through"`
}

// Validate checks whether the input configuration has all of the required fields for the processor.
Expand Down
4 changes: 3 additions & 1 deletion processor/intervalprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ func NewFactory() processor.Factory {

func createDefaultConfig() component.Config {
return &Config{
Interval: 60 * time.Second,
Interval: 60 * time.Second,
GaugePassThrough: false,
SummaryPassThrough: false,
}
}

Expand Down
3 changes: 0 additions & 3 deletions processor/intervalprocessor/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con

import (
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
)

type DataPointSlice[DP DataPoint[DP]] interface {
Expand All @@ -15,8 +14,6 @@ type DataPointSlice[DP DataPoint[DP]] interface {
}

type DataPoint[Self any] interface {
pmetric.NumberDataPoint | pmetric.HistogramDataPoint | pmetric.ExponentialHistogramDataPoint

Timestamp() pcommon.Timestamp
Attributes() pcommon.Map
CopyTo(dest Self)
Expand Down
29 changes: 25 additions & 4 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,11 @@ type Processor struct {
numberLookup map[identity.Stream]pmetric.NumberDataPoint
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

exportInterval time.Duration
exportInterval time.Duration
gaugePassThrough bool
summaryPassThrough bool

nextConsumer consumer.Metrics
}
Expand All @@ -59,8 +62,11 @@ func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},

exportInterval: config.Interval,
exportInterval: config.Interval,
gaugePassThrough: config.GaugePassThrough,
summaryPassThrough: config.SummaryPassThrough,

nextConsumer: nextConsumer,
}
Expand Down Expand Up @@ -102,8 +108,22 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
rm.ScopeMetrics().RemoveIf(func(sm pmetric.ScopeMetrics) bool {
sm.Metrics().RemoveIf(func(m pmetric.Metric) bool {
switch m.Type() {
case pmetric.MetricTypeGauge, pmetric.MetricTypeSummary:
return false
case pmetric.MetricTypeSummary:
if p.summaryPassThrough {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.gaugePassThrough {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup)
return true
case pmetric.MetricTypeSum:
// Check if we care about this value
sum := m.Sum()
Expand Down Expand Up @@ -202,6 +222,7 @@ func (p *Processor) exportMetrics() {
clear(p.numberLookup)
clear(p.histogramLookup)
clear(p.expHistogramLookup)
clear(p.summaryLookup)

return out
}()
Expand Down
32 changes: 18 additions & 14 deletions processor/intervalprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,29 @@ import (
func TestAggregation(t *testing.T) {
t.Parallel()

testCases := []string{
"basic_aggregation",
"non_monotonic_sums_are_passed_through",
"summaries_are_passed_through",
"histograms_are_aggregated",
"exp_histograms_are_aggregated",
"all_delta_metrics_are_passed_through",
testCases := []struct {
name string
passThrough bool
}{
{name: "basic_aggregation"},
{name: "histograms_are_aggregated"},
{name: "exp_histograms_are_aggregated"},
{name: "gauges_are_aggregated"},
{name: "summaries_are_aggregated"},
{name: "all_delta_metrics_are_passed_through"}, // Deltas are passed through even when aggregation is enabled
{name: "non_monotonic_sums_are_passed_through"}, // Non-monotonic sums are passed through even when aggregation is enabled
{name: "gauges_are_passed_through", passThrough: true},
{name: "summaries_are_passed_through", passThrough: true},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

config := &Config{Interval: time.Second}

var config *Config
for _, tc := range testCases {
testName := tc

t.Run(testName, func(t *testing.T) {
t.Parallel()
config = &Config{Interval: time.Second, GaugePassThrough: tc.passThrough, SummaryPassThrough: tc.passThrough}

t.Run(tc.name, func(t *testing.T) {
// next stores the results of the filter metric processor
next := &consumertest.MetricsSink{}

Expand All @@ -53,7 +56,7 @@ func TestAggregation(t *testing.T) {
)
require.NoError(t, err)

dir := filepath.Join("testdata", testName)
dir := filepath.Join("testdata", tc.name)

md, err := golden.ReadMetrics(filepath.Join(dir, "input.yaml"))
require.NoError(t, err)
Expand All @@ -75,6 +78,7 @@ func TestAggregation(t *testing.T) {
require.Empty(t, processor.numberLookup)
require.Empty(t, processor.histogramLookup)
require.Empty(t, processor.expHistogramLookup)
require.Empty(t, processor.summaryLookup)

// Exporting again should return nothing
processor.exportMetrics()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: test.gauge
gauge:
aggregationTemporality: 2
dataPoints:
- timeUnixNano: 50
asDouble: 345
attributes:
- key: aaa
value:
stringValue: bbb
- timeUnixNano: 20
asDouble: 258
attributes:
- key: aaa
value:
stringValue: bbb
# For interval processor point of view, only the last datapoint should be passed through.
- timeUnixNano: 80
asDouble: 178
attributes:
- key: aaa
value:
stringValue: bbb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
resourceMetrics: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: test.gauge
gauge:
aggregationTemporality: 2
dataPoints:
- timeUnixNano: 80
asDouble: 178
attributes:
- key: aaa
value:
stringValue: bbb
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ resourceMetrics:
attributes:
- key: aaa
value:
stringValue: bbb
stringValue: bbb
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,4 @@ resourceMetrics:
attributes:
- key: aaa
value:
stringValue: bbb
stringValue: bbb
Original file line number Diff line number Diff line change
@@ -1 +1 @@
resourceMetrics: []
resourceMetrics: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
resourceMetrics:
- schemaUrl: https://test-res-schema.com/schema
resource:
attributes:
- key: asdf
value:
stringValue: foo
scopeMetrics:
- schemaUrl: https://test-scope-schema.com/schema
scope:
name: MyTestInstrument
version: "1.2.3"
attributes:
- key: foo
value:
stringValue: bar
metrics:
- name: summary.test
summary:
dataPoints:
- timeUnixNano: 50
quantileValues:
- quantile: 0.25
value: 50
- quantile: 0.5
value: 20
- quantile: 0.75
value: 75
- quantile: 0.95
value: 10
attributes:
- key: aaa
value:
stringValue: bbb
- timeUnixNano: 20
quantileValues:
- quantile: 0.25
value: 40
- quantile: 0.5
value: 10
- quantile: 0.75
value: 60
- quantile: 0.95
value: 5
attributes:
- key: aaa
value:
stringValue: bbb
# Only last summary should pass through
- timeUnixNano: 80
quantileValues:
- quantile: 0.25
value: 80
- quantile: 0.5
value: 35
- quantile: 0.75
value: 90
- quantile: 0.95
value: 15
attributes:
- key: aaa
value:
stringValue: bbb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
resourceMetrics: []
Loading

0 comments on commit 3d5bb5f

Please sign in to comment.