From 2c8c9618e1b71c03c043b77ddb2a66bd40a3e790 Mon Sep 17 00:00:00 2001 From: Cole Laven Date: Wed, 25 Sep 2024 14:36:00 -0400 Subject: [PATCH] add condition field to dedupe logs processor --- processor/logdeduplicationprocessor/README.md | 43 +++++-- processor/logdeduplicationprocessor/config.go | 5 + .../logdeduplicationprocessor/config_test.go | 1 + .../logdeduplicationprocessor/factory.go | 8 +- .../logdeduplicationprocessor/factory_test.go | 19 ++- processor/logdeduplicationprocessor/go.mod | 17 +++ processor/logdeduplicationprocessor/go.sum | 35 ++++++ .../logdeduplicationprocessor/processor.go | 75 ++++++++---- .../processor_test.go | 109 +++++++++++++++++- 9 files changed, 277 insertions(+), 35 deletions(-) diff --git a/processor/logdeduplicationprocessor/README.md b/processor/logdeduplicationprocessor/README.md index 5df7c288a..5b374d032 100644 --- a/processor/logdeduplicationprocessor/README.md +++ b/processor/logdeduplicationprocessor/README.md @@ -6,7 +6,7 @@ This processor is used to deduplicate logs by detecting identical logs over a ra ## How It Works 1. The user configures the log deduplication processor in the desired logs pipeline. -2. All logs sent to the processor and aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes. +2. All logs are sent to the processor. Each log for which `condition` evaluates true is aggregated over the configured `interval`. Logs are considered identical if they have the same body, resource attributes, severity, and log attributes. 3. After the interval, the processor emits a single log with the count of logs that were deduplicated. The emitted log will have the same body, resource attributes, severity, and log attributes as the original log. The emitted log will also have the following new attributes: - `log_count`: The count of logs that were deduplicated over the interval. The name of the attribute is configurable via the `log_count_attribute` parameter. @@ -16,13 +16,17 @@ This processor is used to deduplicate logs by detecting identical logs over a ra **Note**: The `ObservedTimestamp` and `Timestamp` of the emitted log will be the time that the aggregated log was emitted and will not be the same as the `ObservedTimestamp` and `Timestamp` of the original logs. ## Configuration -| Field | Type | Default | Description | -| --- | --- | --- | --- | -| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. | -| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. | -| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. Valid values listed [here](../../docs/timezone.md) | -| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).

**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. | +| Field | Type | Default | Description | +| --- | --- | --- | --- | +| interval | duration | `10s` | The interval at which logs are aggregated. The counter will reset after each interval. | +| condition | string | `true` | An [OTTL] expression used to match which log records to sample from. All paths in the [log context] are available to reference. All [converters] are available to use. | +| log_count_attribute | string | `log_count` | The name of the count attribute of deduplicated logs that will be added to the emitted aggregated log. | +| timezone | string | `UTC` | The timezone of the `first_observed_timestamp` and `last_observed_timestamp` timestamps on the emitted aggregated log. Valid values listed [here](../../docs/timezone.md) | +| exclude_fields | []string | `[]` | Fields to exclude from duplication matching. Fields can be excluded from the log `body` or `attributes`. These fields will not be present in the emitted aggregated log. Nested fields must be `.` delimited. If a field contains a `.` it can be escaped by using a `\` see [example config](#example-config-with-excluded-fields).

**Note**: The entire `body` cannot be excluded. If the body is a map then fields within it can be excluded. | +[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.109.0/pkg/ottl#readme +[converters]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/ottlfuncs/README.md#converters +[log context]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.109.0/pkg/ottl/contexts/ottllog/README.md ### Example Config The following config is an example configuration for the log deduplication processor. It is configured with an aggregation interval of `60 seconds`, a timezone of `America/Los_Angeles`, and a log count attribute of `dedup_count`. It has no fields being excluded. @@ -73,3 +77,28 @@ service: processors: [logdedup] exporters: [googlecloud] ``` + + +### Example Config with Condition +The following config is an example configuration that only performs the deduping process on telemetry where Attribute "ID" equals 1: + +```yaml +receivers: + filelog: + include: [./example/*.log] +processors: + logdedup: + condition: (attributes["ID"] == 1) + interval: 60s + log_count_attribute: dedup_count + timezone: 'America/Los_Angeles' +exporters: + googlecloud: + +service: + pipelines: + logs: + receivers: [filelog] + processors: [logdedup] + exporters: [googlecloud] +``` diff --git a/processor/logdeduplicationprocessor/config.go b/processor/logdeduplicationprocessor/config.go index c62b5f6f4..203c96f2f 100644 --- a/processor/logdeduplicationprocessor/config.go +++ b/processor/logdeduplicationprocessor/config.go @@ -35,6 +35,9 @@ const ( // defaultTimezone is the default timezone defaultTimezone = "UTC" + // defaultCondition is the default condition + defaultCondition = "true" + // bodyField is the name of the body field bodyField = "body" @@ -55,6 +58,7 @@ type Config struct { Interval time.Duration `mapstructure:"interval"` Timezone string `mapstructure:"timezone"` ExcludeFields []string `mapstructure:"exclude_fields"` + Condition string `mapstructure:"condition"` } // createDefaultConfig returns the default config for the processor. @@ -64,6 +68,7 @@ func createDefaultConfig() component.Config { Interval: defaultInterval, Timezone: defaultTimezone, ExcludeFields: []string{}, + Condition: defaultCondition, } } diff --git a/processor/logdeduplicationprocessor/config_test.go b/processor/logdeduplicationprocessor/config_test.go index 2820f7a3d..8d1d045fb 100644 --- a/processor/logdeduplicationprocessor/config_test.go +++ b/processor/logdeduplicationprocessor/config_test.go @@ -101,6 +101,7 @@ func TestValidateConfig(t *testing.T) { LogCountAttribute: defaultLogCountAttribute, Interval: defaultInterval, Timezone: defaultTimezone, + Condition: defaultCondition, ExcludeFields: []string{"body.thing", "attributes.otherthing"}, }, expectedErr: nil, diff --git a/processor/logdeduplicationprocessor/factory.go b/processor/logdeduplicationprocessor/factory.go index 67278c781..b3738ca15 100644 --- a/processor/logdeduplicationprocessor/factory.go +++ b/processor/logdeduplicationprocessor/factory.go @@ -18,6 +18,7 @@ import ( "context" "fmt" + "github.com/observiq/bindplane-agent/expr" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/processor" @@ -47,5 +48,10 @@ func createLogsProcessor(_ context.Context, params processor.Settings, cfg compo return nil, fmt.Errorf("invalid config type: %+v", cfg) } - return newProcessor(processorCfg, consumer, params.Logger) + condition, err := expr.NewOTTLLogRecordCondition(processorCfg.Condition, params.TelemetrySettings) + if err != nil { + return nil, fmt.Errorf("invalid condition: %w", err) + } + + return newProcessor(processorCfg, condition, consumer, params.Logger) } diff --git a/processor/logdeduplicationprocessor/factory_test.go b/processor/logdeduplicationprocessor/factory_test.go index a60092e15..c1664cee5 100644 --- a/processor/logdeduplicationprocessor/factory_test.go +++ b/processor/logdeduplicationprocessor/factory_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/processor" + "go.uber.org/zap" ) func TestNewProcessorFactory(t *testing.T) { @@ -46,12 +47,28 @@ func TestCreateLogsProcessor(t *testing.T) { cfg: nil, expectedErr: "invalid config type", }, + { + name: "invalid condition", + cfg: &Config{ + LogCountAttribute: defaultLogCountAttribute, + Interval: defaultInterval, + Timezone: defaultTimezone, + ExcludeFields: []string{}, + Condition: "x", + }, + expectedErr: "invalid condition", + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { f := NewFactory() - p, err := f.CreateLogsProcessor(context.Background(), processor.Settings{}, tc.cfg, nil) + set := processor.Settings{ + TelemetrySettings: component.TelemetrySettings{ + Logger: zap.NewNop(), + }, + } + p, err := f.CreateLogsProcessor(context.Background(), set, tc.cfg, nil) if tc.expectedErr == "" { require.NoError(t, err) require.IsType(t, &logDedupProcessor{}, p) diff --git a/processor/logdeduplicationprocessor/go.mod b/processor/logdeduplicationprocessor/go.mod index 91a72679b..df409a147 100644 --- a/processor/logdeduplicationprocessor/go.mod +++ b/processor/logdeduplicationprocessor/go.mod @@ -3,6 +3,8 @@ module github.com/observiq/bindplane-agent/processor/logdeduplicationprocessor go 1.22.6 require ( + github.com/observiq/bindplane-agent/expr v1.61.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 github.com/stretchr/testify v1.9.0 go.opentelemetry.io/collector/component v0.109.0 @@ -20,24 +22,35 @@ require ( ) require ( + github.com/alecthomas/participle/v2 v2.1.1 // indirect + github.com/antonmedv/expr v1.15.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/elastic/go-grok v0.3.1 // indirect + github.com/elastic/lunes v0.1.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/iancoleman/strcase v0.3.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/magefile/mage v1.15.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 // indirect github.com/prometheus/client_golang v1.20.2 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.57.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.109.0 // indirect go.opentelemetry.io/collector/consumer/consumerprofiles v0.109.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.109.0 // indirect + go.opentelemetry.io/collector/semconv v0.109.0 // indirect go.opentelemetry.io/otel v1.29.0 // indirect go.opentelemetry.io/otel/exporters/prometheus v0.51.0 // indirect go.opentelemetry.io/otel/metric v1.29.0 // indirect @@ -45,10 +58,14 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.29.0 // indirect go.opentelemetry.io/otel/trace v1.29.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/sys v0.24.0 // indirect golang.org/x/text v0.17.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect google.golang.org/grpc v1.66.0 // indirect google.golang.org/protobuf v1.34.2 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect ) + +replace github.com/observiq/bindplane-agent/expr => ../../expr diff --git a/processor/logdeduplicationprocessor/go.sum b/processor/logdeduplicationprocessor/go.sum index 62d83f4f9..be18a3c3d 100644 --- a/processor/logdeduplicationprocessor/go.sum +++ b/processor/logdeduplicationprocessor/go.sum @@ -1,3 +1,11 @@ +github.com/alecthomas/assert/v2 v2.3.0 h1:mAsH2wmvjsuvyBvAmCtm7zFsBlb8mIHx5ySLVdDZXL0= +github.com/alecthomas/assert/v2 v2.3.0/go.mod h1:pXcQ2Asjp247dahGEmsZ6ru0UVwnkhktn7S0bBDLxvQ= +github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= +github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= +github.com/alecthomas/repr v0.2.0 h1:HAzS41CIzNW5syS8Mf9UwXhNH1J9aix/BvDRf1Ml2Yk= +github.com/alecthomas/repr v0.2.0/go.mod h1:Fr0507jx4eOXV7AlPV6AVZLYrLIuIeSOWtW57eE/O/4= +github.com/antonmedv/expr v1.15.5 h1:y0Iz3cEwmpRz5/r3w4qQR0MfIqJGdGM1zbhD/v0G5Vg= +github.com/antonmedv/expr v1.15.5/go.mod h1:0E/6TxnOlRNp81GMzX9QfDPAmHo2Phg00y4JUv1ihsE= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -5,11 +13,17 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/elastic/go-grok v0.3.1 h1:WEhUxe2KrwycMnlvMimJXvzRa7DoByJB4PVUIE1ZD/U= +github.com/elastic/go-grok v0.3.1/go.mod h1:n38ls8ZgOboZRgKcjMY8eFeZFMmcL9n2lP0iHhIDk64= +github.com/elastic/lunes v0.1.0 h1:amRtLPjwkWtzDF/RKzcEPMvSsSseLDLW+bnhfNSLRe4= +github.com/elastic/lunes v0.1.0/go.mod h1:xGphYIt3XdZRtyWosHQTErsQTd4OP1p9wsbVoHelrd4= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= +github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= @@ -17,6 +31,12 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM= +github.com/hexops/gotextdiff v1.0.3/go.mod h1:pSWU5MAI3yDq+fZBTazCSJysOMbxWL1BSow5/V2vxeg= +github.com/iancoleman/strcase v0.3.0 h1:nTXanmYxhfFAMjZL34Ov6gkzEsSJZ5DbhxWjvSASxEI= +github.com/iancoleman/strcase v0.3.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -29,6 +49,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/magefile/mage v1.15.0 h1:BvGheCMAsG3bWUDbZ8AyXXpCNwU9u5CB6sM+HNb9HYg= +github.com/magefile/mage v1.15.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -36,6 +58,10 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0 h1:4VBRgtyh3hHSgAVGgs4bvNwJd0oUGyxVA3eQO2ujNsA= +github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.109.0/go.mod h1:9MGQCqxdCNBhdD+7QBZ6hH9HipXe5CajMafVKglD5f0= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0 h1:k7uHhrznH4dYvzbaCRz5VgFyHzhd1NGow1s6504r6tA= +github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.109.0/go.mod h1:LfwqmbImTBZW5psp7tCyZPHIPy3Imwexq+K/A1NAhEY= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0 h1:3kXFgdEEKw37ftdRC7SmXAiZuLahVavqOYRhlJVMLc8= github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.109.0/go.mod h1:HtaWI5WJKJkBhHz2R7Xb2n7R3fdBPhfKieYcQajNCTo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -54,6 +80,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 h1:SIKIoA4e/5Y9ZOl0DCe3eVMLPOQzJxgZpfdHHeauNTM= +github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6/go.mod h1:BUbeWZiieNxAuuADTBNb3/aeje6on3DhU3rpWsQSB1E= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= go.opentelemetry.io/collector v0.109.0 h1:ULnMWuwcy4ix1oP5RFFRcmpEbaU5YabW6nWcLMQQRo0= @@ -75,6 +103,8 @@ go.opentelemetry.io/collector/pdata/testdata v0.109.0 h1:gvIqy6juvqFET/6zi+zUOH1 go.opentelemetry.io/collector/pdata/testdata v0.109.0/go.mod h1:zRttU/F5QMQ6ZXBMXCoSVG3EORTZLTK+UUS0VoMoT44= go.opentelemetry.io/collector/processor v0.109.0 h1:Pgo9hib4ae1FSA47RB7TUUS26nConIlXcltzbxrjFg8= go.opentelemetry.io/collector/processor v0.109.0/go.mod h1:Td43GwGMRCXin5JM/zAzMtLieobHTVVrD4Y7jSvsMtg= +go.opentelemetry.io/collector/semconv v0.109.0 h1:6CStOFOVhdrzlHg51kXpcPHRKPh5RtV7z/wz+c1TG1g= +go.opentelemetry.io/collector/semconv v0.109.0/go.mod h1:zCJ5njhWpejR+A40kiEoeFm1xq1uzyZwMnRNX6/D82A= go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= go.opentelemetry.io/otel/exporters/prometheus v0.51.0 h1:G7uexXb/K3T+T9fNLCCKncweEtNEBMTO+46hKX5EdKw= @@ -96,6 +126,8 @@ go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -133,5 +165,8 @@ google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWn gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/processor/logdeduplicationprocessor/processor.go b/processor/logdeduplicationprocessor/processor.go index 4492e521f..0a7ade6e2 100644 --- a/processor/logdeduplicationprocessor/processor.go +++ b/processor/logdeduplicationprocessor/processor.go @@ -20,6 +20,8 @@ import ( "sync" "time" + "github.com/observiq/bindplane-agent/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/plog" @@ -28,17 +30,19 @@ import ( // logDedupProcessor is a logDedupProcessor that counts duplicate instances of logs. type logDedupProcessor struct { - emitInterval time.Duration - aggregator *logAggregator - remover *fieldRemover - consumer consumer.Logs - logger *zap.Logger - cancel context.CancelFunc - wg sync.WaitGroup - mux sync.Mutex + emitInterval time.Duration + condition *expr.OTTLCondition[ottllog.TransformContext] + conditionString string + aggregator *logAggregator + remover *fieldRemover + consumer consumer.Logs + logger *zap.Logger + cancel context.CancelFunc + wg sync.WaitGroup + mux sync.Mutex } -func newProcessor(cfg *Config, consumer consumer.Logs, logger *zap.Logger) (*logDedupProcessor, error) { +func newProcessor(cfg *Config, condition *expr.OTTLCondition[ottllog.TransformContext], consumer consumer.Logs, logger *zap.Logger) (*logDedupProcessor, error) { // This should not happen due to config validation but we check anyways. timezone, err := time.LoadLocation(cfg.Timezone) if err != nil { @@ -46,11 +50,13 @@ func newProcessor(cfg *Config, consumer consumer.Logs, logger *zap.Logger) (*log } return &logDedupProcessor{ - emitInterval: cfg.Interval, - aggregator: newLogAggregator(cfg.LogCountAttribute, timezone), - remover: newFieldRemover(cfg.ExcludeFields), - consumer: consumer, - logger: logger, + emitInterval: cfg.Interval, + condition: condition, + conditionString: cfg.Condition, + aggregator: newLogAggregator(cfg.LogCountAttribute, timezone), + remover: newFieldRemover(cfg.ExcludeFields), + consumer: consumer, + logger: logger, }, nil } @@ -91,7 +97,7 @@ func (p *logDedupProcessor) Shutdown(ctx context.Context) error { } // ConsumeLogs processes the logs. -func (p *logDedupProcessor) ConsumeLogs(_ context.Context, pl plog.Logs) error { +func (p *logDedupProcessor) ConsumeLogs(ctx context.Context, pl plog.Logs) error { p.mux.Lock() defer p.mux.Unlock() @@ -101,14 +107,39 @@ func (p *logDedupProcessor) ConsumeLogs(_ context.Context, pl plog.Logs) error { for j := 0; j < resourceLogs.ScopeLogs().Len(); j++ { scope := resourceLogs.ScopeLogs().At(j) logs := scope.LogRecords() - for k := 0; k < logs.Len(); k++ { - logRecord := logs.At(k) - // Remove excluded fields if any - p.remover.RemoveFields(logRecord) + logs.RemoveIf(func(logRecord plog.LogRecord) bool { + var match bool + if p.conditionString == "true" || p.conditionString == "" { + match = true + } else { + logCtx := ottllog.NewTransformContext( + logRecord, + scope.Scope(), + resourceLogs.Resource(), + scope, + resourceLogs, + ) + logMatch, err := p.condition.Match(ctx, logCtx) + match = err == nil && logMatch + } + // only aggregate logs that match condition + if match { + // Remove excluded fields if any + p.remover.RemoveFields(logRecord) - // Add the log to the aggregator - p.aggregator.Add(resourceAttrs, logRecord) - } + // Add the log to the aggregator + p.aggregator.Add(resourceAttrs, logRecord) + } + return match + }) + } + } + + // immediately consume any logs that didn't match the condition + if pl.LogRecordCount() > 0 { + err := p.consumer.ConsumeLogs(ctx, pl) + if err != nil { + p.logger.Error("failed to consume logs", zap.Error(err)) } } diff --git a/processor/logdeduplicationprocessor/processor_test.go b/processor/logdeduplicationprocessor/processor_test.go index f6f6abe3c..0067e3ecd 100644 --- a/processor/logdeduplicationprocessor/processor_test.go +++ b/processor/logdeduplicationprocessor/processor_test.go @@ -21,7 +21,10 @@ import ( "testing" "time" + "github.com/observiq/bindplane-agent/expr" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -53,6 +56,7 @@ func Test_newProcessor(t *testing.T) { LogCountAttribute: defaultLogCountAttribute, Interval: defaultInterval, Timezone: defaultTimezone, + Condition: defaultCondition, }, expected: &logDedupProcessor{ emitInterval: defaultInterval, @@ -71,7 +75,7 @@ func Test_newProcessor(t *testing.T) { tc.expected.logger = logger } - actual, err := newProcessor(tc.cfg, logsSink, logger) + actual, err := newProcessor(tc.cfg, &expr.OTTLCondition[ottllog.TransformContext]{}, logsSink, logger) if tc.expectedErr != nil { require.ErrorContains(t, err, tc.expectedErr.Error()) require.Nil(t, actual) @@ -100,7 +104,7 @@ func TestProcessorShutdownCtxError(t *testing.T) { } // Create a processor - p, err := newProcessor(cfg, logsSink, logger) + p, err := newProcessor(cfg, &expr.OTTLCondition[ottllog.TransformContext]{}, logsSink, logger) require.NoError(t, err) // We don't call p.Start as it can create a non-deterministic situation in Shutdown where we may not exit due to ctx error @@ -136,7 +140,7 @@ func TestShutdownBeforeStart(t *testing.T) { } // Create a processor - p, err := newProcessor(cfg, logsSink, logger) + p, err := newProcessor(cfg, &expr.OTTLCondition[ottllog.TransformContext]{}, logsSink, logger) require.NoError(t, err) require.NotPanics(t, func() { p.Shutdown(context.Background()) @@ -150,13 +154,14 @@ func TestProcessorConsume(t *testing.T) { LogCountAttribute: defaultLogCountAttribute, Interval: 1 * time.Second, Timezone: defaultTimezone, + Condition: defaultCondition, ExcludeFields: []string{ fmt.Sprintf("%s.remove_me", attributeField), }, } // Create a processor - p, err := newProcessor(cfg, logsSink, logger) + p, err := newProcessor(cfg, &expr.OTTLCondition[ottllog.TransformContext]{}, logsSink, logger) require.NoError(t, err) err = p.Start(context.Background(), componenttest.NewNopHost()) @@ -208,3 +213,99 @@ func TestProcessorConsume(t *testing.T) { err = p.Shutdown(context.Background()) require.NoError(t, err) } + +func TestProcessorConsumeCondition(t *testing.T) { + logsSink := &consumertest.LogsSink{} + logger := zap.NewNop() + cfg := &Config{ + LogCountAttribute: defaultLogCountAttribute, + Interval: 1 * time.Second, + Timezone: defaultTimezone, + Condition: `(attributes["ID"] == 1)`, + ExcludeFields: []string{ + fmt.Sprintf("%s.remove_me", attributeField), + }, + } + + condition, err := expr.NewOTTLLogRecordCondition(cfg.Condition, component.TelemetrySettings{Logger: logger}) + require.NoError(t, err) + + // Create a processor + p, err := newProcessor(cfg, condition, logsSink, logger) + require.NoError(t, err) + + err = p.Start(context.Background(), componenttest.NewNopHost()) + require.NoError(t, err) + + // Create plog payload + logRecord1 := generateTestLogRecord(t, "Body of the log1") + logRecord2 := generateTestLogRecord(t, "Body of the log1") + logRecord3 := generateTestLogRecord(t, "Body of the log2") + logRecord4 := generateTestLogRecord(t, "Body of the log2") + + // Differ by timestamps + logRecord1.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(time.Minute))) + logRecord2.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(2 * time.Minute))) + logRecord3.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(3 * time.Minute))) + logRecord4.SetTimestamp(pcommon.NewTimestampFromTime(time.Now().Add(4 * time.Minute))) + + // Set ID attributes to use for condition + logRecord1.Attributes().PutInt("ID", 1) + logRecord2.Attributes().PutInt("ID", 1) + logRecord3.Attributes().PutInt("ID", 2) + logRecord4.Attributes().PutInt("ID", 2) + + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + logRecord1.CopyTo(sl.LogRecords().AppendEmpty()) + logRecord3.CopyTo(sl.LogRecords().AppendEmpty()) + logRecord2.CopyTo(sl.LogRecords().AppendEmpty()) + logRecord4.CopyTo(sl.LogRecords().AppendEmpty()) + + // Consume the payload + err = p.ConsumeLogs(context.Background(), logs) + require.NoError(t, err) + + // Wait for the logs to be emitted + require.Eventually(t, func() bool { + return logsSink.LogRecordCount() > 2 + }, 3*time.Second, 200*time.Millisecond) + + allSinkLogs := logsSink.AllLogs() + require.Len(t, allSinkLogs, 2) + + consumedLogs := allSinkLogs[0] + require.Equal(t, 2, consumedLogs.LogRecordCount()) + + require.Equal(t, 1, consumedLogs.ResourceLogs().Len()) + consumedRl := consumedLogs.ResourceLogs().At(0) + require.Equal(t, 1, consumedRl.ScopeLogs().Len()) + consumedSl := consumedRl.ScopeLogs().At(0) + require.Equal(t, 2, consumedSl.LogRecords().Len()) + + for i := 0; i < consumedSl.LogRecords().Len(); i++ { + consumedLogRecord := consumedSl.LogRecords().At(i) + ID, ok := consumedLogRecord.Attributes().Get("ID") + require.True(t, ok) + require.Equal(t, int64(2), ID.Int()) + } + + dedupedLogs := allSinkLogs[1] + require.Equal(t, 1, dedupedLogs.LogRecordCount()) + + require.Equal(t, 1, dedupedLogs.ResourceLogs().Len()) + dedupedRl := dedupedLogs.ResourceLogs().At(0) + require.Equal(t, 1, dedupedRl.ScopeLogs().Len()) + dedupedSl := dedupedRl.ScopeLogs().At(0) + require.Equal(t, 1, dedupedSl.LogRecords().Len()) + dedupedLogRecord := dedupedSl.LogRecords().At(0) + + countVal, ok := dedupedLogRecord.Attributes().Get(cfg.LogCountAttribute) + require.True(t, ok) + require.Equal(t, int64(2), countVal.Int()) + + // Cleanup + err = p.Shutdown(context.Background()) + require.NoError(t, err) +}