Skip to content

Commit

Permalink
add condition field to dedupe logs processor
Browse files Browse the repository at this point in the history
  • Loading branch information
colelaven committed Sep 25, 2024
1 parent 3535635 commit 2c8c961
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 35 deletions.
43 changes: 36 additions & 7 deletions processor/logdeduplicationprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This processor is used to deduplicate logs by detecting identical logs over a ra

## How It Works
1. The user configures the log deduplication processor in the desired logs pipeline.
2. All logs sent to the processor and aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes.
2. All logs are sent to the processor. Each log for which `condition` evaluates true is aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes.
3. After the interval, the processor emits a single log with the count of logs that were deduplicated. The emitted log will have the same body, resource attributes, severity, and log attributes as the original log. The emitted log will also have the following new attributes:

- `log_count`: The count of logs that were deduplicated over the interval. The name of the attribute is configurable via the `log_count_attribute` parameter.
Expand All @@ -16,13 +16,17 @@ This processor is used to deduplicate logs by detecting identical logs over a ra
**Note**: The `ObservedTimestamp` and `Timestamp` of the emitted log will be the time that the aggregated log was emitted and will not be the same as the `ObservedTimestamp` and `Timestamp` of the original logs.

## Configuration
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. Valid values listed [here](../../docs/timezone.md) |
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |
| Field | Type | Default | Description |
| --- | --- | --- | --- |
| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. |
| condition | string | `true` | An [OTTL] expression used to match which log records to sample from. All paths in the [log context] are available to reference. All [converters] are available to use. |
| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. |
| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. Valid values listed [here](../../docs/timezone.md) |
| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).<br><br>**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. |

[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.109.0/pkg/ottl#readme
[converters]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/ottlfuncs/README.md#converters
[log context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/contexts/ottllog/README.md

### Example Config
The following config is an example configuration for the log deduplication processor. It is configured with an aggregation interval of `60 seconds`, a timezone of `America/Los_Angeles`, and a log count attribute of `dedup_count`. It has no fields being excluded.
Expand Down Expand Up @@ -73,3 +77,28 @@ service:
processors: [logdedup]
exporters: [googlecloud]
```


### Example Config with Condition
The following config is an example configuration that only performs the deduping process on telemetry where Attribute "ID" equals 1:

```yaml
receivers:
filelog:
include: [./example/*.log]
processors:
logdedup:
condition: (attributes["ID"] == 1)
interval: 60s
log_count_attribute: dedup_count
timezone: 'America/Los_Angeles'
exporters:
googlecloud:
service:
pipelines:
logs:
receivers: [filelog]
processors: [logdedup]
exporters: [googlecloud]
```
5 changes: 5 additions & 0 deletions processor/logdeduplicationprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
// defaultTimezone is the default timezone
defaultTimezone = "UTC"

// defaultCondition is the default condition
defaultCondition = "true"

// bodyField is the name of the body field
bodyField = "body"

Expand All @@ -55,6 +58,7 @@ type Config struct {
Interval time.Duration `mapstructure:"interval"`
Timezone string `mapstructure:"timezone"`
ExcludeFields []string `mapstructure:"exclude_fields"`
Condition string `mapstructure:"condition"`
}

// createDefaultConfig returns the default config for the processor.
Expand All @@ -64,6 +68,7 @@ func createDefaultConfig() component.Config {
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Condition: defaultCondition,
}
}

Expand Down
1 change: 1 addition & 0 deletions processor/logdeduplicationprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ func TestValidateConfig(t *testing.T) {
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
Condition: defaultCondition,
ExcludeFields: []string{"body.thing", "attributes.otherthing"},
},
expectedErr: nil,
Expand Down
8 changes: 7 additions & 1 deletion processor/logdeduplicationprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"

"github.com/observiq/bindplane-agent/expr"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"
Expand Down Expand Up @@ -47,5 +48,10 @@ func createLogsProcessor(_ context.Context, params processor.Settings, cfg compo
return nil, fmt.Errorf("invalid config type: %+v", cfg)
}

return newProcessor(processorCfg, consumer, params.Logger)
condition, err := expr.NewOTTLLogRecordCondition(processorCfg.Condition, params.TelemetrySettings)
if err != nil {
return nil, fmt.Errorf("invalid condition: %w", err)
}

return newProcessor(processorCfg, condition, consumer, params.Logger)
}
19 changes: 18 additions & 1 deletion processor/logdeduplicationprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/processor"
"go.uber.org/zap"
)

func TestNewProcessorFactory(t *testing.T) {
Expand All @@ -46,12 +47,28 @@ func TestCreateLogsProcessor(t *testing.T) {
cfg: nil,
expectedErr: "invalid config type",
},
{
name: "invalid condition",
cfg: &Config{
LogCountAttribute: defaultLogCountAttribute,
Interval: defaultInterval,
Timezone: defaultTimezone,
ExcludeFields: []string{},
Condition: "x",
},
expectedErr: "invalid condition",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
f := NewFactory()
p, err := f.CreateLogsProcessor(context.Background(), processor.Settings{}, tc.cfg, nil)
set := processor.Settings{
TelemetrySettings: component.TelemetrySettings{
Logger: zap.NewNop(),
},
}
p, err := f.CreateLogsProcessor(context.Background(), set, tc.cfg, nil)
if tc.expectedErr == "" {
require.NoError(t, err)
require.IsType(t, &logDedupProcessor{}, p)
Expand Down
17 changes: 17 additions & 0 deletions processor/logdeduplicationprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ module github.com/observiq/bindplane-agent/processor/logdeduplicationprocessor
go 1.22.6

require (
github.com/observiq/bindplane-agent/expr v1.61.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.109.0
Expand All @@ -20,35 +22,50 @@ require (
)

require (
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/antonmedv/expr v1.15.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/elastic/go-grok v0.3.1 // indirect
github.com/elastic/lunes v0.1.0 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/iancoleman/strcase v0.3.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 // indirect
github.com/prometheus/client_golang v1.20.2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.57.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect
go.opentelemetry.io/collector/config/configtelemetry v0.109.0 // indirect
go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect
go.opentelemetry.io/collector/semconv v0.109.0 // indirect
go.opentelemetry.io/otel v1.29.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.51.0 // indirect
go.opentelemetry.io/otel/metric v1.29.0 // indirect
go.opentelemetry.io/otel/sdk v1.29.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.29.0 // indirect
go.opentelemetry.io/otel/trace v1.29.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/observiq/bindplane-agent/expr => ../../expr
Loading

0 comments on commit 2c8c961

Please sign in to comment.