diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c98371398..85c632eaf6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,9 @@ Main (unreleased) ### Features - Added Datadog Exporter community component, enabling exporting of otel-formatted Metrics and traces to Datadog. (@polyrain) +- (_Experimental_) Add an `otelcol.processor.interval` component to aggregate metrics and periodically + forward the latest values to the next component in the pipeline. + ### Enhancements - Clustering peer resolution through `--cluster.join-addresses` flag has been diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index 83a31453f7..6549abff99 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -302,6 +302,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol. - [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery) - [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter) - [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs) +- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval) - [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes) - [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter) - [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler) @@ -339,6 +340,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery) - [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter) - [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs) +- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval) - [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes) - [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter) - [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler) diff --git a/docs/sources/reference/components/otelcol/otelcol.processor.interval.md b/docs/sources/reference/components/otelcol/otelcol.processor.interval.md new file mode 100644 index 0000000000..bd4db2e324 --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.processor.interval.md @@ -0,0 +1,166 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.interval/ +description: Learn about otelcol.processor.interval +title: otelcol.processor.interval +--- + +Experimental + +# otelcol.processor.interval + +{{< docs/shared lookup="stability/experimental.md" source="alloy" version="" >}} + +`otelcol.processor.interval` aggregates metrics and periodically forwards the latest values to the next component in the pipeline. +The processor supports aggregating the following metric types: + +* Monotonically increasing, cumulative sums +* Monotonically increasing, cumulative histograms +* Monotonically increasing, cumulative exponential histograms + +The following metric types will *not* be aggregated and will instead be passed, unchanged, to the next component in the pipeline: + +* All delta metrics +* Non-monotonically increasing sums +* Gauges +* Summaries + +{{< admonition type="warning" >}} +After exporting, any internal state is cleared. If no new metrics come in, the next interval will export nothing. +{{< /admonition >}} + +{{< admonition type="note" >}} +`otelcol.processor.interval` is a wrapper over the upstream OpenTelemetry Collector `interval` processor. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +## Usage + +```alloy +otelcol.processor.interval "LABEL" { + output { + metrics = [...] + } +} +``` + +## Arguments + +`otelcol.processor.interval` supports the following arguments: + +Name | Type | Description | Default | Required +------------- | ---------- | ------------------------------------------------------------------- | ------- | -------- +`interval` | `duration` | The interval in the processor should export the aggregated metrics. | `"60s"` | no + +## Blocks + +The following blocks are supported inside the definition of `otelcol.processor.interval`: + +Hierarchy | Block | Description | Required +------------- | ----------------- | -------------------------------------------------------------------------- | -------- +output | [output][] | Configures where to send received telemetry data. | yes +debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no + +[output]: #output-block +[debug_metrics]: #debug_metrics-block + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +## Exported fields + +The following fields are exported and can be referenced by other components: + +Name | Type | Description +--------|--------------------|----------------------------------------------------------------- +`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to. + +`input` accepts `otelcol.Consumer` data for metrics. + +## Component health + +`otelcol.processor.interval` is only reported as unhealthy if given an invalid configuration. + +## Debug information + +`otelcol.processor.interval` does not expose any component-specific debug information. + +## Example + +This example receives OTLP metrics and aggregates them for 30s before sending to the next exporter. + +```alloy +otelcol.receiver.otlp "default" { + grpc { ... } + http { ... } + + output { + metrics = [otelcol.processor.interval.default.input] + } +} + +otelcol.processor.interval "default" { + interval = "30s" + output { + metrics = [otelcol.exporter.otlphttp.grafana_cloud.input] + } +} + +otelcol.exporter.otlphttp "grafana_cloud" { + client { + endpoint = "https://otlp-gateway-prod-gb-south-0.grafana.net/otlp" + auth = otelcol.auth.basic.grafana_cloud.handler + } +} + +otelcol.auth.basic "grafana_cloud" { + username = env("GRAFANA_CLOUD_USERNAME") + password = env("GRAFANA_CLOUD_API_KEY") +} +``` + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ------------ | ------------------------- | ----------------- | ----: | +| 0 | test_metric | Cumulative | labelA: foo | 4.0 | +| 2 | test_metric | Cumulative | labelA: bar | 3.1 | +| 4 | other_metric | Delta | fruitType: orange | 77.4 | +| 6 | test_metric | Cumulative | labelA: foo | 8.2 | +| 8 | test_metric | Cumulative | labelA: foo | 12.8 | +| 10 | test_metric | Cumulative | labelA: bar | 6.4 | + +The processor immediately passes the following metric to the next processor in the chain because it is a Delta metric. + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ------------ | ------------------------- | ----------------- | ----: | +| 4 | other_metric | Delta | fruitType: orange | 77.4 | + + +At the next `interval` (15s by default), the processor passed the following metrics to the next processor in the chain. + +| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value | +| --------- | ----------- | ------------------------- | ----------- | ----: | +| 8 | test_metric | Cumulative | labelA: foo | 12.8 | +| 10 | test_metric | Cumulative | labelA: bar | 6.4 | + + + +## Compatible components + +`otelcol.processor.interval` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + +`otelcol.processor.interval` has exports that can be consumed by the following components: + +- Components that consume [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-consumers) + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index dc9d0d345f..4acfe13a0f 100644 --- a/go.mod +++ b/go.mod @@ -492,7 +492,7 @@ require ( github.com/go-openapi/validate v0.23.0 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/go-resty/resty/v2 v2.13.1 // indirect - github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect + github.com/go-viper/mapstructure/v2 v2.0.0 // indirect github.com/go-zookeeper/zk v1.0.3 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/goccy/go-json v0.10.3 // indirect @@ -656,6 +656,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.105.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.105.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.105.0 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0 github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/opencontainers/runc v1.1.12 // indirect diff --git a/go.sum b/go.sum index 5cb332f75b..ddbc0fdf92 100644 --- a/go.sum +++ b/go.sum @@ -981,8 +981,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6 github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA= github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg= github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c= -github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc= +github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg= github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw= github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg= @@ -1238,8 +1238,6 @@ github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrR github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk= github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 h1:UPkAxuhlAcRmJT3/qd34OMTl+ZU7BLLfOO2+NXBlJpY= github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3/go.mod h1:iZiiwNT4HbtGRVqCQu7uJPEZCuEE5sfSSttcnePkDl4= -github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15 h1:J4PmreN24XmbqMIKReAS/P1t7ND6NCAZApcbjBhedrY= -github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4= github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548 h1:XwoNrPHEl07N7EIMt/WXlzGj0Q4CDEfa+6nrdnQGOG4= github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4= github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU= @@ -1968,6 +1966,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterproces github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0/go.mod h1:66cZFd4X8vQBTmvm1hPHxrSNHS474iUEsAVbYk9xQBU= github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0 h1:OYsGaSC9G7pAVYKTd1+D0f7HTHcxuQfoEHyQy+a1NKk= github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0/go.mod h1:WCesGEakYveZYZH4o3cUTLt3UB7JxE+yDiiphRHoJoc= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0 h1:4EBgaDFaVQOaV0hpgNTrFQL8zjXSOglXz15gyUL/Kds= +github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0/go.mod h1:C9PSzt0uqtTM9oPs+1H92PAzowI4yIhnzXvdpFJjX30= github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0 h1:ScIwuYg6l79Ta+deOyZIADXrBlXSdeAZ7sp3MXhm7JY= github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0/go.mod h1:pranRmnWRkzDsn9a16BzSqX6HJ6XjjVVFmMhyZPEzt0= github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0 h1:mFAlBmDFELQJS8uj1M8csB/vQqjpq6W9/9k9izh9Hr4= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 3c633f2505..bfdde5c5b3 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -81,6 +81,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes _ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch _ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative + _ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval _ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery _ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter _ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs diff --git a/internal/component/otelcol/processor/interval/interval.go b/internal/component/otelcol/processor/interval/interval.go new file mode 100644 index 0000000000..ae74efa01c --- /dev/null +++ b/internal/component/otelcol/processor/interval/interval.go @@ -0,0 +1,88 @@ +// Package interval provides an otelcol.processor.interval component. +package interval + +import ( + "fmt" + "time" + + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/processor" + "github.com/grafana/alloy/internal/featuregate" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.processor.interval", + Stability: featuregate.StabilityExperimental, + Args: Arguments{}, + Exports: otelcol.ConsumerExports{}, + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + return processor.New(opts, intervalprocessor.NewFactory(), args.(Arguments)) + }, + }) +} + +type Arguments struct { + // The interval in which the processor should export the aggregated metrics. Default: 60s. + Interval time.Duration `alloy:"interval,attr,optional"` + + // Output configures where to send processed data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` +} + +var ( + _ processor.Arguments = Arguments{} +) + +// DefaultArguments holds default settings for Arguments. +var DefaultArguments = Arguments{ + Interval: 60 * time.Second, +} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = DefaultArguments +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + if args.Interval <= 0 { + return fmt.Errorf("interval must be greater than 0") + } + return nil +} + +// Convert implements processor.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + return &intervalprocessor.Config{ + Interval: args.Interval, + }, nil +} + +// Extensions implements processor.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements processor.Arguments. +func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements processor.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// DebugMetricsConfig implements processor.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/converter/internal/otelcolconvert/converter_intervalprocessor.go b/internal/converter/internal/otelcolconvert/converter_intervalprocessor.go new file mode 100644 index 0000000000..062b4e96a6 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_intervalprocessor.go @@ -0,0 +1,61 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/processor/interval" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor" + "go.opentelemetry.io/collector/component" +) + +func init() { + converters = append(converters, intervalProcessorConverter{}) +} + +type intervalProcessorConverter struct{} + +func (intervalProcessorConverter) Factory() component.Factory { + return intervalprocessor.NewFactory() +} + +func (intervalProcessorConverter) InputComponentName() string { + return "otelcol.processor.interval" +} + +func (intervalProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toIntervalProcessor(state, id, cfg.(*intervalprocessor.Config)) + block := common.NewBlockWithOverride([]string{"otelcol", "processor", "interval"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toIntervalProcessor(state *State, id component.InstanceID, cfg *intervalprocessor.Config) *interval.Arguments { + var ( + nextMetrics = state.Next(id, component.DataTypeMetrics) + nextLogs = state.Next(id, component.DataTypeLogs) + nextTraces = state.Next(id, component.DataTypeTraces) + ) + + return &interval.Arguments{ + Interval: cfg.Interval, + Output: &otelcol.ConsumerArguments{ + Metrics: ToTokenizedConsumers(nextMetrics), + Logs: ToTokenizedConsumers(nextLogs), + Traces: ToTokenizedConsumers(nextTraces), + }, + DebugMetrics: common.DefaultValue[interval.Arguments]().DebugMetrics, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/interval.alloy b/internal/converter/internal/otelcolconvert/testdata/interval.alloy new file mode 100644 index 0000000000..d6d5465dc1 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/interval.alloy @@ -0,0 +1,25 @@ +otelcol.receiver.otlp "default" { + grpc { + endpoint = "localhost:4317" + } + + http { + endpoint = "localhost:4318" + } + + output { + metrics = [otelcol.processor.interval.default.input] + } +} + +otelcol.processor.interval "default" { + output { + metrics = [otelcol.exporter.otlp.default.input] + } +} + +otelcol.exporter.otlp "default" { + client { + endpoint = "database:4317" + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/interval.yaml b/internal/converter/internal/otelcolconvert/testdata/interval.yaml new file mode 100644 index 0000000000..8d282b5904 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/interval.yaml @@ -0,0 +1,19 @@ +receivers: + otlp: + protocols: + grpc: + http: + +processors: + interval: + +exporters: + otlp: + endpoint: database:4317 + +service: + pipelines: + metrics: + receivers: [otlp] + processors: [interval] + exporters: [otlp] diff --git a/internal/converter/internal/otelcolconvert/testdata/otelcol_errors/corrupt_config.diags b/internal/converter/internal/otelcolconvert/testdata/otelcol_errors/corrupt_config.diags index 51268d8129..b22a0a514f 100644 --- a/internal/converter/internal/otelcolconvert/testdata/otelcol_errors/corrupt_config.diags +++ b/internal/converter/internal/otelcolconvert/testdata/otelcol_errors/corrupt_config.diags @@ -1 +1 @@ -(Critical) failed to get otelcol config: cannot unmarshal the configuration: 1 error(s) decoding:\n\n* '' has invalid keys: bad-key +(Critical) failed to get otelcol config: cannot unmarshal the configuration: decoding failed due to the following error(s):\n\n'' has invalid keys: bad-key