Skip to content

Commit

Permalink
feat: Add intervalprocessor component (#1484)
Browse files Browse the repository at this point in the history
* feat: Add intervalprocessor component

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>

* Apply suggestions from code review

Co-authored-by: Paulin Todev <paulin.todev@gmail.com>

* Update outdated error in testdata

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>

* Apply suggestions from code review

Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>

---------

Signed-off-by: Arthur Silva Sens <arthursens2005@gmail.com>
Co-authored-by: Paulin Todev <paulin.todev@gmail.com>
Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 27, 2024
1 parent e1d3892 commit 9b8b81b
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 6 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Main (unreleased)
### Features

- Added Datadog Exporter community component, enabling exporting of otel-formatted Metrics and traces to Datadog. (@polyrain)
- (_Experimental_) Add an `otelcol.processor.interval` component to aggregate metrics and periodically
forward the latest values to the next component in the pipeline.

### Enhancements

- Clustering peer resolution through `--cluster.join-addresses` flag has been
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/reference/compatibility/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ The following components, grouped by namespace, _export_ OpenTelemetry `otelcol.
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down Expand Up @@ -339,6 +340,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol
- [otelcol.processor.discovery](../components/otelcol/otelcol.processor.discovery)
- [otelcol.processor.filter](../components/otelcol/otelcol.processor.filter)
- [otelcol.processor.groupbyattrs](../components/otelcol/otelcol.processor.groupbyattrs)
- [otelcol.processor.interval](../components/otelcol/otelcol.processor.interval)
- [otelcol.processor.k8sattributes](../components/otelcol/otelcol.processor.k8sattributes)
- [otelcol.processor.memory_limiter](../components/otelcol/otelcol.processor.memory_limiter)
- [otelcol.processor.probabilistic_sampler](../components/otelcol/otelcol.processor.probabilistic_sampler)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.processor.interval/
description: Learn about otelcol.processor.interval
title: otelcol.processor.interval
---

<span class="badge docs-labels__stage docs-labels__item">Experimental</span>

# otelcol.processor.interval

{{< docs/shared lookup="stability/experimental.md" source="alloy" version="<ALLOY_VERSION>" >}}

`otelcol.processor.interval` aggregates metrics and periodically forwards the latest values to the next component in the pipeline.
The processor supports aggregating the following metric types:

* Monotonically increasing, cumulative sums
* Monotonically increasing, cumulative histograms
* Monotonically increasing, cumulative exponential histograms

The following metric types will *not* be aggregated and will instead be passed, unchanged, to the next component in the pipeline:

* All delta metrics
* Non-monotonically increasing sums
* Gauges
* Summaries

{{< admonition type="warning" >}}
After exporting, any internal state is cleared. If no new metrics come in, the next interval will export nothing.
{{< /admonition >}}

{{< admonition type="note" >}}
`otelcol.processor.interval` is a wrapper over the upstream OpenTelemetry Collector `interval` processor.
Bug reports or feature requests will be redirected to the upstream repository, if necessary.
{{< /admonition >}}

## Usage

```alloy
otelcol.processor.interval "LABEL" {
output {
metrics = [...]
}
}
```

## Arguments

`otelcol.processor.interval` supports the following arguments:

Name | Type | Description | Default | Required
------------- | ---------- | ------------------------------------------------------------------- | ------- | --------
`interval` | `duration` | The interval in the processor should export the aggregated metrics. | `"60s"` | no

## Blocks

The following blocks are supported inside the definition of `otelcol.processor.interval`:

Hierarchy | Block | Description | Required
------------- | ----------------- | -------------------------------------------------------------------------- | --------
output | [output][] | Configures where to send received telemetry data. | yes
debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no

[output]: #output-block
[debug_metrics]: #debug_metrics-block

### output block

{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

### debug_metrics block

{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="<ALLOY_VERSION>" >}}

## Exported fields

The following fields are exported and can be referenced by other components:

Name | Type | Description
--------|--------------------|-----------------------------------------------------------------
`input` | `otelcol.Consumer` | A value that other components can use to send telemetry data to.

`input` accepts `otelcol.Consumer` data for metrics.

## Component health

`otelcol.processor.interval` is only reported as unhealthy if given an invalid configuration.

## Debug information

`otelcol.processor.interval` does not expose any component-specific debug information.

## Example

This example receives OTLP metrics and aggregates them for 30s before sending to the next exporter.

```alloy
otelcol.receiver.otlp "default" {
grpc { ... }
http { ... }
output {
metrics = [otelcol.processor.interval.default.input]
}
}
otelcol.processor.interval "default" {
interval = "30s"
output {
metrics = [otelcol.exporter.otlphttp.grafana_cloud.input]
}
}
otelcol.exporter.otlphttp "grafana_cloud" {
client {
endpoint = "https://otlp-gateway-prod-gb-south-0.grafana.net/otlp"
auth = otelcol.auth.basic.grafana_cloud.handler
}
}
otelcol.auth.basic "grafana_cloud" {
username = env("GRAFANA_CLOUD_USERNAME")
password = env("GRAFANA_CLOUD_API_KEY")
}
```

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 0 | test_metric | Cumulative | labelA: foo | 4.0 |
| 2 | test_metric | Cumulative | labelA: bar | 3.1 |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |
| 6 | test_metric | Cumulative | labelA: foo | 8.2 |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

The processor immediately passes the following metric to the next processor in the chain because it is a Delta metric.

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ------------ | ------------------------- | ----------------- | ----: |
| 4 | other_metric | Delta | fruitType: orange | 77.4 |


At the next `interval` (15s by default), the processor passed the following metrics to the next processor in the chain.

| Timestamp | Metric Name | Aggregation Temporarility | Attributes | Value |
| --------- | ----------- | ------------------------- | ----------- | ----: |
| 8 | test_metric | Cumulative | labelA: foo | 12.8 |
| 10 | test_metric | Cumulative | labelA: bar | 6.4 |

<!-- START GENERATED COMPATIBLE COMPONENTS -->

## Compatible components

`otelcol.processor.interval` can accept arguments from the following components:

- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters)

`otelcol.processor.interval` has exports that can be consumed by the following components:

- Components that consume [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-consumers)

{{< admonition type="note" >}}
Connecting some components may not be sensible or components may require further configuration to make the connection work correctly.
Refer to the linked documentation for more details.
{{< /admonition >}}

<!-- END GENERATED COMPATIBLE COMPONENTS -->
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ require (
github.com/go-openapi/validate v0.23.0 // indirect
github.com/go-redis/redis/v8 v8.11.5 // indirect
github.com/go-resty/resty/v2 v2.13.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 // indirect
github.com/go-viper/mapstructure/v2 v2.0.0 // indirect
github.com/go-zookeeper/zk v1.0.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.3 // indirect
Expand Down Expand Up @@ -656,6 +656,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/opencensus v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.105.0 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/opencontainers/runc v1.1.12 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,8 @@ github.com/go-test/deep v1.0.2-0.20181118220953-042da051cf31/go.mod h1:wGDj63lr6
github.com/go-test/deep v1.0.2/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/go-test/deep v1.1.0/go.mod h1:5C2ZWiW0ErCdrYzpqxLbTX7MG14M9iiw8DgHncVwcsE=
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1 h1:TQcrn6Wq+sKGkpyPvppOz99zsMBaUOKXq6HSv655U1c=
github.com/go-viper/mapstructure/v2 v2.0.0-alpha.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-viper/mapstructure/v2 v2.0.0 h1:dhn8MZ1gZ0mzeodTG3jt5Vj/o87xZKuNAprG2mQfMfc=
github.com/go-viper/mapstructure/v2 v2.0.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/go-zookeeper/zk v1.0.3 h1:7M2kwOsc//9VeeFiPtf+uSJlVpU66x9Ba5+8XK7/TDg=
github.com/go-zookeeper/zk v1.0.3/go.mod h1:nOB03cncLtlp4t+UAkGSV+9beXP/akpekBwL+UX1Qcw=
github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg=
Expand Down Expand Up @@ -1238,8 +1238,6 @@ github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc h1:GN2Lv3MGO7AS6PrR
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc/go.mod h1:+JKpmjMGhpgPL+rXZ5nsZieVzvarn86asRlBg4uNGnk=
github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3 h1:UPkAxuhlAcRmJT3/qd34OMTl+ZU7BLLfOO2+NXBlJpY=
github.com/grafana/smimesign v0.2.1-0.20220408144937-2a5adf3481d3/go.mod h1:iZiiwNT4HbtGRVqCQu7uJPEZCuEE5sfSSttcnePkDl4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15 h1:J4PmreN24XmbqMIKReAS/P1t7ND6NCAZApcbjBhedrY=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240701202215-1d847d62ed15/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548 h1:XwoNrPHEl07N7EIMt/WXlzGj0Q4CDEfa+6nrdnQGOG4=
github.com/grafana/snowflake-prometheus-exporter v0.0.0-20240813124544-9995e8354548/go.mod h1:DANNLd5vSKqHloqNX4yeS+9ZRI89dj8ySZeEWlI5UU4=
github.com/grafana/tail v0.0.0-20230510142333-77b18831edf0 h1:bjh0PVYSVVFxzINqPFYJmAmJNrWPgnVjuSdYJGHmtFU=
Expand Down Expand Up @@ -1968,6 +1966,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterproces
github.com/open-telemetry/opentelemetry-collector-contrib/processor/filterprocessor v0.105.0/go.mod h1:66cZFd4X8vQBTmvm1hPHxrSNHS474iUEsAVbYk9xQBU=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0 h1:OYsGaSC9G7pAVYKTd1+D0f7HTHcxuQfoEHyQy+a1NKk=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/groupbyattrsprocessor v0.105.0/go.mod h1:WCesGEakYveZYZH4o3cUTLt3UB7JxE+yDiiphRHoJoc=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0 h1:4EBgaDFaVQOaV0hpgNTrFQL8zjXSOglXz15gyUL/Kds=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.105.0/go.mod h1:C9PSzt0uqtTM9oPs+1H92PAzowI4yIhnzXvdpFJjX30=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0 h1:ScIwuYg6l79Ta+deOyZIADXrBlXSdeAZ7sp3MXhm7JY=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.105.0/go.mod h1:pranRmnWRkzDsn9a16BzSqX6HJ6XjjVVFmMhyZPEzt0=
github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.105.0 h1:mFAlBmDFELQJS8uj1M8csB/vQqjpq6W9/9k9izh9Hr4=
Expand Down
1 change: 1 addition & 0 deletions internal/component/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ import (
_ "github.com/grafana/alloy/internal/component/otelcol/processor/attributes" // Import otelcol.processor.attributes
_ "github.com/grafana/alloy/internal/component/otelcol/processor/batch" // Import otelcol.processor.batch
_ "github.com/grafana/alloy/internal/component/otelcol/processor/deltatocumulative" // Import otelcol.processor.deltatocumulative
_ "github.com/grafana/alloy/internal/component/otelcol/processor/interval" // Import otelcol.processor.interval
_ "github.com/grafana/alloy/internal/component/otelcol/processor/discovery" // Import otelcol.processor.discovery
_ "github.com/grafana/alloy/internal/component/otelcol/processor/filter" // Import otelcol.processor.filter
_ "github.com/grafana/alloy/internal/component/otelcol/processor/groupbyattrs" // Import otelcol.processor.groupbyattrs
Expand Down
88 changes: 88 additions & 0 deletions internal/component/otelcol/processor/interval/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Package interval provides an otelcol.processor.interval component.
package interval

import (
"fmt"
"time"

"github.com/grafana/alloy/internal/component"
"github.com/grafana/alloy/internal/component/otelcol"
otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config"
"github.com/grafana/alloy/internal/component/otelcol/processor"
"github.com/grafana/alloy/internal/featuregate"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"
otelcomponent "go.opentelemetry.io/collector/component"
otelextension "go.opentelemetry.io/collector/extension"
)

func init() {
component.Register(component.Registration{
Name: "otelcol.processor.interval",
Stability: featuregate.StabilityExperimental,
Args: Arguments{},
Exports: otelcol.ConsumerExports{},
Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return processor.New(opts, intervalprocessor.NewFactory(), args.(Arguments))
},
})
}

type Arguments struct {
// The interval in which the processor should export the aggregated metrics. Default: 60s.
Interval time.Duration `alloy:"interval,attr,optional"`

// Output configures where to send processed data. Required.
Output *otelcol.ConsumerArguments `alloy:"output,block"`

// DebugMetrics configures component internal metrics. Optional.
DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"`
}

var (
_ processor.Arguments = Arguments{}
)

// DefaultArguments holds default settings for Arguments.
var DefaultArguments = Arguments{
Interval: 60 * time.Second,
}

// SetToDefault implements syntax.Defaulter.
func (args *Arguments) SetToDefault() {
*args = DefaultArguments
}

// Validate implements syntax.Validator.
func (args *Arguments) Validate() error {
if args.Interval <= 0 {
return fmt.Errorf("interval must be greater than 0")
}
return nil
}

// Convert implements processor.Arguments.
func (args Arguments) Convert() (otelcomponent.Config, error) {
return &intervalprocessor.Config{
Interval: args.Interval,
}, nil
}

// Extensions implements processor.Arguments.
func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension {
return nil
}

// Exporters implements processor.Arguments.
func (args Arguments) Exporters() map[otelcomponent.DataType]map[otelcomponent.ID]otelcomponent.Component {
return nil
}

// NextConsumers implements processor.Arguments.
func (args Arguments) NextConsumers() *otelcol.ConsumerArguments {
return args.Output
}

// DebugMetricsConfig implements processor.Arguments.
func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments {
return args.DebugMetrics
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package otelcolconvert

import (
"fmt"

"github.com/grafana/alloy/internal/component/otelcol"
"github.com/grafana/alloy/internal/component/otelcol/processor/interval"
"github.com/grafana/alloy/internal/converter/diag"
"github.com/grafana/alloy/internal/converter/internal/common"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor"
"go.opentelemetry.io/collector/component"
)

func init() {
converters = append(converters, intervalProcessorConverter{})
}

type intervalProcessorConverter struct{}

func (intervalProcessorConverter) Factory() component.Factory {
return intervalprocessor.NewFactory()
}

func (intervalProcessorConverter) InputComponentName() string {
return "otelcol.processor.interval"
}

func (intervalProcessorConverter) ConvertAndAppend(state *State, id component.InstanceID, cfg component.Config) diag.Diagnostics {
var diags diag.Diagnostics

label := state.AlloyComponentLabel()

args := toIntervalProcessor(state, id, cfg.(*intervalprocessor.Config))
block := common.NewBlockWithOverride([]string{"otelcol", "processor", "interval"}, label, args)

diags.Add(
diag.SeverityLevelInfo,
fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)),
)

state.Body().AppendBlock(block)
return diags
}

func toIntervalProcessor(state *State, id component.InstanceID, cfg *intervalprocessor.Config) *interval.Arguments {
var (
nextMetrics = state.Next(id, component.DataTypeMetrics)
nextLogs = state.Next(id, component.DataTypeLogs)
nextTraces = state.Next(id, component.DataTypeTraces)
)

return &interval.Arguments{
Interval: cfg.Interval,
Output: &otelcol.ConsumerArguments{
Metrics: ToTokenizedConsumers(nextMetrics),
Logs: ToTokenizedConsumers(nextLogs),
Traces: ToTokenizedConsumers(nextTraces),
},
DebugMetrics: common.DefaultValue[interval.Arguments]().DebugMetrics,
}
}
Loading

0 comments on commit 9b8b81b

Please sign in to comment.