From de44947e21d14f0385e53d4b91fa0c923bbe0076 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Thu, 3 Oct 2024 14:05:52 +0100 Subject: [PATCH] Address loki.process config reloads (#1809) --- CHANGELOG.md | 8 +- .../component/loki/process/stages/drop.go | 44 +++++----- .../loki/process/stages/drop_test.go | 7 +- .../component/loki/process/stages/geoip.go | 4 +- .../component/loki/process/stages/labels.go | 36 +++++---- .../loki/process/stages/labels_test.go | 28 +++---- .../component/loki/process/stages/luhn.go | 4 +- .../loki/process/stages/luhn_test.go | 80 +++++++++++++++++++ .../component/loki/process/stages/match.go | 12 ++- .../loki/process/stages/match_test.go | 38 ++++++--- .../component/loki/process/stages/metric.go | 23 +----- .../loki/process/stages/multiline.go | 25 +++--- .../loki/process/stages/multiline_test.go | 9 ++- .../component/loki/process/stages/pipeline.go | 24 +----- .../component/loki/process/stages/replace.go | 8 +- .../loki/process/stages/static_labels.go | 9 ++- .../process/stages/structured_metadata.go | 16 ++-- .../component/loki/process/stages/template.go | 6 +- .../loki/process/stages/timestamp.go | 8 +- .../loki/process/stages/timestamp_test.go | 48 +++++++---- 20 files changed, 259 insertions(+), 178 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ab99b56e7..93c787da79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ v1.4.2 - Update yet-another-cloudwatch-exporter from v0.60.0 vo v0.61.0: (@morremeyer) - Fixes a bug where cloudwatch S3 metrics are reported as `0` - + - Issue 1687 - otelcol.exporter.awss3 fails to configure (@cydergoth) - Fix parsing of the Level configuration attribute in debug_metrics config block - Ensure "optional" debug_metrics config block really is optional @@ -22,6 +22,12 @@ v1.4.2 - Update windows_exporter from v0.27.2 vo v0.27.3: (@jkroepke) - Fixes a bug where scraping Windows service crashes alloy +- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply + default configuration settings correctly (@thampiotr) + +- Fixed an issue with `loki.process` where configuration could be reloaded even if there + were no changes. (@ptodev, @thampiotr) + v1.4.1 ----------------- diff --git a/internal/component/loki/process/stages/drop.go b/internal/component/loki/process/stages/drop.go index a6c2ca2da5..67b23754f4 100644 --- a/internal/component/loki/process/stages/drop.go +++ b/internal/component/loki/process/stages/drop.go @@ -10,8 +10,9 @@ import ( "github.com/alecthomas/units" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/client_golang/prometheus" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) const ( @@ -37,50 +38,49 @@ type DropConfig struct { Expression string `alloy:"expression,attr,optional"` OlderThan time.Duration `alloy:"older_than,attr,optional"` LongerThan units.Base2Bytes `alloy:"longer_than,attr,optional"` - regex *regexp.Regexp } // validateDropConfig validates the DropConfig for the dropStage -func validateDropConfig(cfg *DropConfig) error { +func validateDropConfig(cfg *DropConfig) (*regexp.Regexp, error) { if cfg == nil || (cfg.Source == "" && cfg.Expression == "" && cfg.OlderThan == emptyDuration && cfg.LongerThan == emptySize) { - - return errors.New(ErrDropStageEmptyConfig) + return nil, errors.New(ErrDropStageEmptyConfig) } if cfg.DropReason == "" { cfg.DropReason = defaultDropReason } if cfg.Value != "" && cfg.Expression != "" { - return errors.New(ErrDropStageInvalidConfig) + return nil, errors.New(ErrDropStageInvalidConfig) } if cfg.Separator == "" { cfg.Separator = defaultSeparator } if cfg.Value != "" && cfg.Source == "" { - return errors.New(ErrDropStageNoSourceWithValue) + return nil, errors.New(ErrDropStageNoSourceWithValue) } + var ( + expr *regexp.Regexp + err error + ) if cfg.Expression != "" { - expr, err := regexp.Compile(cfg.Expression) - if err != nil { - return fmt.Errorf(ErrDropStageInvalidRegex, err) + if expr, err = regexp.Compile(cfg.Expression); err != nil { + return nil, fmt.Errorf(ErrDropStageInvalidRegex, err) } - cfg.regex = expr } // The first step to exclude `value` and fully replace it with the `expression`. // It will simplify code and less confusing for the end-user on which option to choose. if cfg.Value != "" { - expr, err := regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value))) + expr, err = regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value))) if err != nil { - return fmt.Errorf(ErrDropStageInvalidRegex, err) + return nil, fmt.Errorf(ErrDropStageInvalidRegex, err) } - cfg.regex = expr } - return nil + return expr, nil } // newDropStage creates a DropStage from config func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Registerer) (Stage, error) { - err := validateDropConfig(&config) + regex, err := validateDropConfig(&config) if err != nil { return nil, err } @@ -88,6 +88,7 @@ func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Re return &dropStage{ logger: log.With(logger, "component", "stage", "type", "drop"), cfg: &config, + regex: regex, dropCount: getDropCountMetric(registerer), }, nil } @@ -96,6 +97,7 @@ func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Re type dropStage struct { logger log.Logger cfg *DropConfig + regex *regexp.Regexp dropCount *prometheus.CounterVec } @@ -144,7 +146,7 @@ func (m *dropStage) shouldDrop(e Entry) bool { return false } } - if m.cfg.Source != "" && m.cfg.regex == nil { + if m.cfg.Source != "" && m.regex == nil { var match bool match = true for _, src := range splitSource(m.cfg.Source) { @@ -165,8 +167,8 @@ func (m *dropStage) shouldDrop(e Entry) bool { } } - if m.cfg.Source == "" && m.cfg.regex != nil { - if !m.cfg.regex.MatchString(e.Line) { + if m.cfg.Source == "" && m.regex != nil { + if !m.regex.MatchString(e.Line) { // Not a match to the regex, don't drop if Debug { level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line") @@ -178,7 +180,7 @@ func (m *dropStage) shouldDrop(e Entry) bool { } } - if m.cfg.Source != "" && m.cfg.regex != nil { + if m.cfg.Source != "" && m.regex != nil { var extractedData []string for _, src := range splitSource(m.cfg.Source) { if e, ok := e.Extracted[src]; ok { @@ -192,7 +194,7 @@ func (m *dropStage) shouldDrop(e Entry) bool { extractedData = append(extractedData, s) } } - if !m.cfg.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) { + if !m.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) { // Not a match to the regex, don't drop if Debug { level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line") diff --git a/internal/component/loki/process/stages/drop_test.go b/internal/component/loki/process/stages/drop_test.go index 736ddfe5d1..e77293f45d 100644 --- a/internal/component/loki/process/stages/drop_test.go +++ b/internal/component/loki/process/stages/drop_test.go @@ -7,12 +7,13 @@ import ( "time" "github.com/alecthomas/units" - "github.com/grafana/alloy/internal/util" dskit "github.com/grafana/dskit/server" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/util" ) // Not all these are tested but are here to make sure the different types marshal without error @@ -411,7 +412,7 @@ func Test_dropStage_Process(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - err := validateDropConfig(tt.config) + _, err := validateDropConfig(tt.config) if err != nil { t.Error(err) } @@ -465,7 +466,7 @@ func Test_validateDropConfig(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) { + if _, err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) { t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr) } }) diff --git a/internal/component/loki/process/stages/geoip.go b/internal/component/loki/process/stages/geoip.go index 47994acf7c..02bebe7d71 100644 --- a/internal/component/loki/process/stages/geoip.go +++ b/internal/component/loki/process/stages/geoip.go @@ -7,15 +7,15 @@ import ( "reflect" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/jmespath/go-jmespath" "github.com/oschwald/geoip2-golang" "github.com/oschwald/maxminddb-golang" "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) var ( - ErrEmptyGeoIPStageConfig = errors.New("geoip stage config cannot be empty") ErrEmptyDBPathGeoIPStageConfig = errors.New("db path cannot be empty") ErrEmptySourceGeoIPStageConfig = errors.New("source cannot be empty") ErrEmptyDBTypeGeoIPStageConfig = errors.New("db type should be either city or asn") diff --git a/internal/component/loki/process/stages/labels.go b/internal/component/loki/process/stages/labels.go index e47517c329..4764017184 100644 --- a/internal/component/loki/process/stages/labels.go +++ b/internal/component/loki/process/stages/labels.go @@ -7,8 +7,9 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) const ( @@ -22,53 +23,56 @@ type LabelsConfig struct { } // validateLabelsConfig validates the Label stage configuration -func validateLabelsConfig(c LabelsConfig) error { +func validateLabelsConfig(c LabelsConfig) (map[string]string, error) { + // We must not mutate the c.Values, create a copy with changes we need. + ret := map[string]string{} if c.Values == nil { - return errors.New(ErrEmptyLabelStageConfig) + return nil, errors.New(ErrEmptyLabelStageConfig) } for labelName, labelSrc := range c.Values { if !model.LabelName(labelName).IsValid() { - return fmt.Errorf(ErrInvalidLabelName, labelName) + return nil, fmt.Errorf(ErrInvalidLabelName, labelName) } // If no label source was specified, use the key name if labelSrc == nil || *labelSrc == "" { - lName := labelName - c.Values[labelName] = &lName + ret[labelName] = labelName + } else { + ret[labelName] = *labelSrc } } - return nil + return ret, nil } // newLabelStage creates a new label stage to set labels from extracted data func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) { - err := validateLabelsConfig(configs) + labelsConfig, err := validateLabelsConfig(configs) if err != nil { return nil, err } return toStage(&labelStage{ - cfgs: configs, - logger: logger, + labelsConfig: labelsConfig, + logger: logger, }), nil } // labelStage sets labels from extracted data type labelStage struct { - cfgs LabelsConfig - logger log.Logger + labelsConfig map[string]string + logger log.Logger } // Process implements Stage func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) { - processLabelsConfigs(l.logger, extracted, l.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) { labels[labelName] = labelValue }) } type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue) -func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, configs LabelsConfig, consumer labelsConsumer) { - for lName, lSrc := range configs.Values { - if lValue, ok := extracted[*lSrc]; ok { +func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, labelsConfig map[string]string, consumer labelsConsumer) { + for lName, lSrc := range labelsConfig { + if lValue, ok := extracted[lSrc]; ok { s, err := getString(lValue) if err != nil { if Debug { diff --git a/internal/component/loki/process/stages/labels_test.go b/internal/component/loki/process/stages/labels_test.go index 201ab06b99..8cb0ac7cf1 100644 --- a/internal/component/loki/process/stages/labels_test.go +++ b/internal/component/loki/process/stages/labels_test.go @@ -72,10 +72,8 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) { } var ( - lv1 = "lv1" - lv2c = "l2" - lv3 = "" - lv3c = "l3" + lv1 = "lv1" + lv3 = "" ) var emptyLabelsConfig = LabelsConfig{nil} @@ -84,19 +82,19 @@ func TestLabels(t *testing.T) { tests := map[string]struct { config LabelsConfig err error - expectedCfgs LabelsConfig + expectedCfgs map[string]string }{ "missing config": { config: emptyLabelsConfig, err: errors.New(ErrEmptyLabelStageConfig), - expectedCfgs: emptyLabelsConfig, + expectedCfgs: nil, }, "invalid label name": { config: LabelsConfig{ Values: map[string]*string{"#*FDDS*": nil}, }, err: fmt.Errorf(ErrInvalidLabelName, "#*FDDS*"), - expectedCfgs: emptyLabelsConfig, + expectedCfgs: nil, }, "label value is set from name": { config: LabelsConfig{Values: map[string]*string{ @@ -105,18 +103,18 @@ func TestLabels(t *testing.T) { "l3": &lv3, }}, err: nil, - expectedCfgs: LabelsConfig{Values: map[string]*string{ - "l1": &lv1, - "l2": &lv2c, - "l3": &lv3c, - }}, + expectedCfgs: map[string]string{ + "l1": lv1, + "l2": "l2", + "l3": "l3", + }, }, } for name, test := range tests { test := test t.Run(name, func(t *testing.T) { t.Parallel() - err := validateLabelsConfig(test.config) + actual, err := validateLabelsConfig(test.config) if (err != nil) != (test.err != nil) { t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err) return @@ -125,8 +123,8 @@ func TestLabels(t *testing.T) { t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err) return } - if test.expectedCfgs.Values != nil { - assert.Equal(t, test.expectedCfgs, test.config) + if test.expectedCfgs != nil { + assert.Equal(t, test.expectedCfgs, actual) } }) } diff --git a/internal/component/loki/process/stages/luhn.go b/internal/component/loki/process/stages/luhn.go index 14cf8b6fef..3640af0c5c 100644 --- a/internal/component/loki/process/stages/luhn.go +++ b/internal/component/loki/process/stages/luhn.go @@ -17,7 +17,7 @@ type LuhnFilterConfig struct { } // validateLuhnFilterConfig validates the LuhnFilterConfig. -func validateLuhnFilterConfig(c LuhnFilterConfig) error { +func validateLuhnFilterConfig(c *LuhnFilterConfig) error { if c.Replacement == "" { c.Replacement = "**REDACTED**" } @@ -32,7 +32,7 @@ func validateLuhnFilterConfig(c LuhnFilterConfig) error { // newLuhnFilterStage creates a new LuhnFilterStage. func newLuhnFilterStage(config LuhnFilterConfig) (Stage, error) { - if err := validateLuhnFilterConfig(config); err != nil { + if err := validateLuhnFilterConfig(&config); err != nil { return nil, err } return toStage(&luhnFilterStage{ diff --git a/internal/component/loki/process/stages/luhn_test.go b/internal/component/loki/process/stages/luhn_test.go index ef618aa863..9ac622ce44 100644 --- a/internal/component/loki/process/stages/luhn_test.go +++ b/internal/component/loki/process/stages/luhn_test.go @@ -2,6 +2,8 @@ package stages import ( "testing" + + "github.com/stretchr/testify/require" ) // Test cases for the Luhn algorithm validation @@ -52,3 +54,81 @@ func TestReplaceLuhnValidNumbers(t *testing.T) { } } } + +func TestValidateConfig(t *testing.T) { + source := ".*" + emptySource := "" + cases := []struct { + name string + input LuhnFilterConfig + expected LuhnFilterConfig + errorContainsStr string + }{ + { + name: "successful validation", + input: LuhnFilterConfig{ + Replacement: "ABC", + Source: &source, + MinLength: 10, + }, + expected: LuhnFilterConfig{ + Replacement: "ABC", + Source: &source, + MinLength: 10, + }, + }, + { + name: "nil source", + input: LuhnFilterConfig{ + Replacement: "ABC", + Source: nil, + MinLength: 10, + }, + expected: LuhnFilterConfig{ + Replacement: "ABC", + Source: nil, + MinLength: 10, + }, + }, + { + name: "empty source error", + input: LuhnFilterConfig{ + Replacement: "ABC", + Source: &emptySource, + MinLength: 11, + }, + expected: LuhnFilterConfig{ + Replacement: "ABC", + Source: &emptySource, + MinLength: 11, + }, + errorContainsStr: "empty source", + }, + { + name: "defaults update", + input: LuhnFilterConfig{ + Replacement: "", + Source: &source, + MinLength: -10, + }, + expected: LuhnFilterConfig{ + Replacement: "**REDACTED**", + Source: &source, + MinLength: 13, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + err := validateLuhnFilterConfig(&c.input) + if c.errorContainsStr == "" { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, c.errorContainsStr) + } + require.Equal(t, c.expected, c.input) + }) + } + +} diff --git a/internal/component/loki/process/stages/match.go b/internal/component/loki/process/stages/match.go index 4f07a976da..8cb71e2e27 100644 --- a/internal/component/loki/process/stages/match.go +++ b/internal/component/loki/process/stages/match.go @@ -13,13 +13,11 @@ import ( // Configuration errors. var ( - ErrEmptyMatchStageConfig = errors.New("match stage config cannot be empty") - ErrPipelineNameRequired = errors.New("match stage pipeline name can be omitted but cannot be an empty string") - ErrSelectorRequired = errors.New("selector statement required for match stage") - ErrMatchRequiresStages = errors.New("match stage requires at least one additional stage to be defined in '- stages'") - ErrSelectorSyntax = errors.New("invalid selector syntax for match stage") - ErrStagesWithDropLine = errors.New("match stage configured to drop entries cannot contains stages") - ErrUnknownMatchAction = errors.New("match stage action should be 'keep' or 'drop'") + ErrSelectorRequired = errors.New("selector statement required for match stage") + ErrMatchRequiresStages = errors.New("match stage requires at least one additional stage to be defined in '- stages'") + ErrSelectorSyntax = errors.New("invalid selector syntax for match stage") + ErrStagesWithDropLine = errors.New("match stage configured to drop entries cannot contains stages") + ErrUnknownMatchAction = errors.New("match stage action should be 'keep' or 'drop'") MatchActionKeep = "keep" MatchActionDrop = "drop" diff --git a/internal/component/loki/process/stages/match_test.go b/internal/component/loki/process/stages/match_test.go index d1f5e05027..3a67c064be 100644 --- a/internal/component/loki/process/stages/match_test.go +++ b/internal/component/loki/process/stages/match_test.go @@ -5,9 +5,11 @@ import ( "testing" "time" - "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/util" ) var testMatchAlloy = ` @@ -191,19 +193,26 @@ func TestValidateMatcherConfig(t *testing.T) { emptyStages := []StageConfig{} defaultStage := []StageConfig{{MatchConfig: &MatchConfig{}}} tests := []struct { - name string - cfg *MatchConfig - wantErr bool + name string + cfg *MatchConfig + wantErr bool + expected *MatchConfig }{ - {"pipeline name required", &MatchConfig{}, true}, - {"selector required", &MatchConfig{Selector: ""}, true}, - {"nil stages without dropping", &MatchConfig{PipelineName: "", Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: nil}, true}, - {"empty stages without dropping", &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: emptyStages}, true}, - {"stages with dropping", &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: defaultStage}, true}, - {"empty stages dropping", &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: emptyStages}, false}, - {"stages without dropping", &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: defaultStage}, false}, - {"bad selector", &MatchConfig{Selector: `{app="foo}`, Action: MatchActionKeep, Stages: defaultStage}, true}, - {"bad action", &MatchConfig{Selector: `{app="foo}`, Action: "nope", Stages: emptyStages}, true}, + {name: "pipeline name required", cfg: &MatchConfig{}, wantErr: true}, + {name: "selector required", cfg: &MatchConfig{Selector: ""}, wantErr: true}, + {name: "nil stages without dropping", cfg: &MatchConfig{PipelineName: "", Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: nil}, wantErr: true}, + {name: "empty stages without dropping", cfg: &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: emptyStages}, wantErr: true}, + {name: "stages with dropping", cfg: &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: defaultStage}, wantErr: true}, + {name: "empty stages dropping", cfg: &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionDrop, Stages: emptyStages}}, + {name: "stages without dropping", cfg: &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: defaultStage}}, + {name: "bad selector", cfg: &MatchConfig{Selector: `{app="foo}`, Action: MatchActionKeep, Stages: defaultStage}, wantErr: true}, + {name: "bad action", cfg: &MatchConfig{Selector: `{app="foo}`, Action: "nope", Stages: emptyStages}, wantErr: true}, + { + name: "sets default action to keep", + cfg: &MatchConfig{Selector: `{app="foo"}`, Stages: defaultStage}, + wantErr: false, + expected: &MatchConfig{Selector: `{app="foo"}`, Action: MatchActionKeep, Stages: defaultStage}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -212,6 +221,9 @@ func TestValidateMatcherConfig(t *testing.T) { t.Errorf("validateMatcherConfig() error = %v, wantErr %v", err, tt.wantErr) return } + if tt.expected != nil { + require.Equal(t, tt.expected, tt.cfg) + } }) } } diff --git a/internal/component/loki/process/stages/metric.go b/internal/component/loki/process/stages/metric.go index 632e918b16..1d4a0cf936 100644 --- a/internal/component/loki/process/stages/metric.go +++ b/internal/component/loki/process/stages/metric.go @@ -1,7 +1,6 @@ package stages import ( - "errors" "fmt" "math" "reflect" @@ -9,31 +8,19 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component/loki/process/metric" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/component/loki/process/metric" + "github.com/grafana/alloy/internal/runtime/logging/level" ) // Metric types. const ( - MetricTypeCounter = "counter" - MetricTypeGauge = "gauge" - MetricTypeHistogram = "histogram" - defaultMetricsPrefix = "loki_process_custom_" ) -// Configuration errors. -var ( - ErrEmptyMetricsStageConfig = errors.New("empty metric stage configuration") - ErrMetricsStageInvalidType = errors.New("invalid metric type: must be one of 'counter', 'gauge', or 'histogram'") - ErrInvalidIdleDur = errors.New("max_idle_duration could not be parsed as a time.Duration") - ErrSubSecIdleDur = errors.New("max_idle_duration less than 1s not allowed") -) - // MetricConfig is a single metrics configuration. -// TODO(@tpaschalis) Rework once Alloy squashing is implemented. type MetricConfig struct { Counter *metric.CounterConfig `alloy:"counter,block,optional"` Gauge *metric.GaugeConfig `alloy:"gauge,block,optional"` @@ -106,7 +93,6 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus } return &metricStage{ logger: logger, - cfg: config, metrics: metrics, }, nil } @@ -114,7 +100,6 @@ func newMetricStage(logger log.Logger, config MetricsConfig, registry prometheus // metricStage creates and updates prometheus metrics based on extracted pipeline data type metricStage struct { logger log.Logger - cfg MetricsConfig metrics map[string]cfgCollector } @@ -132,7 +117,7 @@ func (m *metricStage) Run(in chan Entry) chan Entry { } // Process implements Stage -func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { +func (m *metricStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, entry *string) { for name, cc := range m.metrics { // There is a special case for counters where we count even if there is no match in the extracted map. if c, ok := cc.collector.(*metric.Counters); ok { diff --git a/internal/component/loki/process/stages/multiline.go b/internal/component/loki/process/stages/multiline.go index f754909754..99c9416e93 100644 --- a/internal/component/loki/process/stages/multiline.go +++ b/internal/component/loki/process/stages/multiline.go @@ -18,9 +18,8 @@ import ( // Configuration errors. var ( - ErrMultilineStageEmptyConfig = errors.New("multiline stage config must define `firstline` regular expression") - ErrMultilineStageInvalidRegex = errors.New("multiline stage first line regex compilation error") - ErrMultilineStageInvalidMaxWaitTime = errors.New("multiline stage `max_wait_time` parse error") + ErrMultilineStageEmptyConfig = errors.New("multiline stage config must define `firstline` regular expression") + ErrMultilineStageInvalidRegex = errors.New("multiline stage first line regex compilation error") ) // MultilineConfig contains the configuration for a Multiline stage. @@ -28,7 +27,6 @@ type MultilineConfig struct { Expression string `alloy:"firstline,attr"` MaxLines uint64 `alloy:"max_lines,attr,optional"` MaxWaitTime time.Duration `alloy:"max_wait_time,attr,optional"` - regex *regexp.Regexp } // DefaultMultilineConfig applies the default values on @@ -51,24 +49,24 @@ func (args *MultilineConfig) Validate() error { return nil } -func validateMultilineConfig(cfg *MultilineConfig) error { +func validateMultilineConfig(cfg MultilineConfig) (*regexp.Regexp, error) { if cfg.Expression == "" { - return ErrMultilineStageEmptyConfig + return nil, ErrMultilineStageEmptyConfig } expr, err := regexp.Compile(cfg.Expression) if err != nil { - return fmt.Errorf("%v: %w", ErrMultilineStageInvalidRegex, err) + return nil, fmt.Errorf("%v: %w", ErrMultilineStageInvalidRegex, err) } - cfg.regex = expr - return nil + return expr, nil } // multilineStage matches lines to determine whether the following lines belong to a block and should be collapsed type multilineStage struct { logger log.Logger cfg MultilineConfig + regex *regexp.Regexp } // multilineState captures the internal state of a running multiline stage. @@ -80,7 +78,7 @@ type multilineState struct { // newMultilineStage creates a MulitlineStage from config func newMultilineStage(logger log.Logger, config MultilineConfig) (Stage, error) { - err := validateMultilineConfig(&config) + regex, err := validateMultilineConfig(config) if err != nil { return nil, err } @@ -88,6 +86,7 @@ func newMultilineStage(logger log.Logger, config MultilineConfig) (Stage, error) return &multilineStage{ logger: log.With(logger, "component", "stage", "type", "multiline"), cfg: config, + regex: regex, }, nil } @@ -96,7 +95,7 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { go func() { defer close(out) - streams := make(map[model.Fingerprint](chan Entry)) + streams := make(map[model.Fingerprint]chan Entry) wg := new(sync.WaitGroup) for e := range in { @@ -104,7 +103,7 @@ func (m *multilineStage) Run(in chan Entry) chan Entry { s, ok := streams[key] if !ok { // Pass through entries until we hit first start line. - if !m.cfg.regex.MatchString(e.Line) { + if !m.regex.MatchString(e.Line) { level.Debug(m.logger).Log("msg", "pass through entry", "stream", key) out <- e continue @@ -152,7 +151,7 @@ func (m *multilineStage) runMultiline(in chan Entry, out chan Entry, wg *sync.Wa return } - isFirstLine := m.cfg.regex.MatchString(e.Line) + isFirstLine := m.regex.MatchString(e.Line) if isFirstLine { level.Debug(m.logger).Log("msg", "flush multiline block because new start line", "block", state.buffer.String(), "stream", e.Labels.FastFingerprint()) m.flush(out, state) diff --git a/internal/component/loki/process/stages/multiline_test.go b/internal/component/loki/process/stages/multiline_test.go index dda55f08c6..bdad1743cc 100644 --- a/internal/component/loki/process/stages/multiline_test.go +++ b/internal/component/loki/process/stages/multiline_test.go @@ -17,11 +17,12 @@ import ( func TestMultilineStageProcess(t *testing.T) { logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second} - err := validateMultilineConfig(&mcfg) + regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) stage := &multilineStage{ cfg: mcfg, + regex: regex, logger: logger, } @@ -44,11 +45,12 @@ func TestMultilineStageProcess(t *testing.T) { func TestMultilineStageMultiStreams(t *testing.T) { logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 3 * time.Second} - err := validateMultilineConfig(&mcfg) + regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) stage := &multilineStage{ cfg: mcfg, + regex: regex, logger: logger, } @@ -84,11 +86,12 @@ func TestMultilineStageMultiStreams(t *testing.T) { func TestMultilineStageMaxWaitTime(t *testing.T) { logger := util.TestAlloyLogger(t) mcfg := MultilineConfig{Expression: "^START", MaxWaitTime: 100 * time.Millisecond} - err := validateMultilineConfig(&mcfg) + regex, err := validateMultilineConfig(mcfg) require.NoError(t, err) stage := &multilineStage{ cfg: mcfg, + regex: regex, logger: logger, } diff --git a/internal/component/loki/process/stages/pipeline.go b/internal/component/loki/process/stages/pipeline.go index 043e16cce6..e6583b4514 100644 --- a/internal/component/loki/process/stages/pipeline.go +++ b/internal/component/loki/process/stages/pipeline.go @@ -6,16 +6,16 @@ import ( "sync" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/component/common/loki" "github.com/prometheus/client_golang/prometheus" "golang.org/x/time/rate" + + "github.com/grafana/alloy/internal/component/common/loki" ) // StageConfig defines a single stage in a processing pipeline. // We define these as pointers types so we can use reflection to check that // exactly one is set. type StageConfig struct { - //TODO(thampiotr): sync these with new stages CRIConfig *CRIConfig `alloy:"cri,block,optional"` DecolorizeConfig *DecolorizeConfig `alloy:"decolorize,block,optional"` DockerConfig *DockerConfig `alloy:"docker,block,optional"` @@ -86,24 +86,8 @@ func RunWith(input chan Entry, process func(e Entry) Entry) chan Entry { return out } -// RunWithSkip same as RunWith, except it skip sending it to output channel, if `process` functions returns `skip` true. -func RunWithSkip(input chan Entry, process func(e Entry) (Entry, bool)) chan Entry { - out := make(chan Entry) - go func() { - defer close(out) - for e := range input { - ee, skip := process(e) - if skip { - continue - } - out <- ee - } - }() - - return out -} - -// RunWithSkiporSendMany same as RunWithSkip, except it can either skip sending it to output channel, if `process` functions returns `skip` true. Or send many entries. +// RunWithSkipOrSendMany same as RunWith, except it handles sending multiple entries at the same time and it wil skip +// sending the batch to output channel, if `process` functions returns `skip` true. func RunWithSkipOrSendMany(input chan Entry, process func(e Entry) ([]Entry, bool)) chan Entry { out := make(chan Entry) go func() { diff --git a/internal/component/loki/process/stages/replace.go b/internal/component/loki/process/stages/replace.go index ed34927a73..a4c2ddfd2e 100644 --- a/internal/component/loki/process/stages/replace.go +++ b/internal/component/loki/process/stages/replace.go @@ -2,7 +2,6 @@ package stages import ( "bytes" - "errors" "fmt" "reflect" "regexp" @@ -10,14 +9,9 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/common/model" -) -// Config Errors -var ( - ErrEmptyReplaceStageConfig = errors.New("empty replace stage configuration") - ErrEmptyReplaceStageSource = errors.New("empty source in replace stage") + "github.com/grafana/alloy/internal/runtime/logging/level" ) func init() { diff --git a/internal/component/loki/process/stages/static_labels.go b/internal/component/loki/process/stages/static_labels.go index 8ffaaa56f1..d3f372844a 100644 --- a/internal/component/loki/process/stages/static_labels.go +++ b/internal/component/loki/process/stages/static_labels.go @@ -7,8 +7,9 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/common/model" + + "github.com/grafana/alloy/internal/runtime/logging/level" ) // ErrEmptyStaticLabelStageConfig error returned if the config is empty. @@ -26,7 +27,7 @@ func newStaticLabelsStage(logger log.Logger, config StaticLabelsConfig) (Stage, } return toStage(&staticLabelStage{ - Config: config, + config: config, logger: logger, }), nil } @@ -45,13 +46,13 @@ func validateLabelStaticConfig(c StaticLabelsConfig) error { // staticLabelStage implements Stage. type staticLabelStage struct { - Config StaticLabelsConfig + config StaticLabelsConfig logger log.Logger } // Process implements Stage. func (l *staticLabelStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) { - for lName, lSrc := range l.Config.Values { + for lName, lSrc := range l.config.Values { if lSrc == nil || *lSrc == "" { continue } diff --git a/internal/component/loki/process/stages/structured_metadata.go b/internal/component/loki/process/stages/structured_metadata.go index a9a3efcb84..211f6c63a4 100644 --- a/internal/component/loki/process/stages/structured_metadata.go +++ b/internal/component/loki/process/stages/structured_metadata.go @@ -8,19 +8,19 @@ import ( ) func newStructuredMetadataStage(logger log.Logger, configs LabelsConfig) (Stage, error) { - err := validateLabelsConfig(configs) + labelsConfig, err := validateLabelsConfig(configs) if err != nil { return nil, err } return &structuredMetadataStage{ - cfgs: configs, - logger: logger, + labelsConfig: labelsConfig, + logger: logger, }, nil } type structuredMetadataStage struct { - cfgs LabelsConfig - logger log.Logger + labelsConfig map[string]string + logger log.Logger } func (s *structuredMetadataStage) Name() string { @@ -34,7 +34,7 @@ func (*structuredMetadataStage) Cleanup() { func (s *structuredMetadataStage) Run(in chan Entry) chan Entry { return RunWith(in, func(e Entry) Entry { - processLabelsConfigs(s.logger, e.Extracted, s.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) { + processLabelsConfigs(s.logger, e.Extracted, s.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) { e.StructuredMetadata = append(e.StructuredMetadata, logproto.LabelAdapter{Name: string(labelName), Value: string(labelValue)}) }) return s.extractFromLabels(e) @@ -45,8 +45,8 @@ func (s *structuredMetadataStage) extractFromLabels(e Entry) Entry { labels := e.Labels foundLabels := []model.LabelName{} - for lName, lSrc := range s.cfgs.Values { - labelKey := model.LabelName(*lSrc) + for lName, lSrc := range s.labelsConfig { + labelKey := model.LabelName(lSrc) if lValue, ok := labels[labelKey]; ok { e.StructuredMetadata = append(e.StructuredMetadata, logproto.LabelAdapter{Name: lName, Value: string(lValue)}) foundLabels = append(foundLabels, labelKey) diff --git a/internal/component/loki/process/stages/template.go b/internal/component/loki/process/stages/template.go index 00ace665d9..3a81a56023 100644 --- a/internal/component/loki/process/stages/template.go +++ b/internal/component/loki/process/stages/template.go @@ -13,16 +13,16 @@ import ( "github.com/Masterminds/sprig/v3" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" "github.com/prometheus/common/model" + "github.com/grafana/alloy/internal/runtime/logging/level" + "golang.org/x/crypto/sha3" ) // Config Errors. var ( - ErrEmptyTemplateStageConfig = errors.New("template stage config cannot be empty") - ErrTemplateSourceRequired = errors.New("template source value is required") + ErrTemplateSourceRequired = errors.New("template source value is required") ) var extraFunctionMap = template.FuncMap{ diff --git a/internal/component/loki/process/stages/timestamp.go b/internal/component/loki/process/stages/timestamp.go index a2dc46c5d5..8f8b61e0c7 100644 --- a/internal/component/loki/process/stages/timestamp.go +++ b/internal/component/loki/process/stages/timestamp.go @@ -5,13 +5,13 @@ import ( "fmt" "reflect" "time" + _ "time/tzdata" // embed timezone data "github.com/go-kit/log" - "github.com/grafana/alloy/internal/runtime/logging/level" lru "github.com/hashicorp/golang-lru" "github.com/prometheus/common/model" - _ "time/tzdata" // embed timezone data + "github.com/grafana/alloy/internal/runtime/logging/level" ) // Config errors. @@ -53,7 +53,7 @@ type TimestampConfig struct { type parser func(string) (time.Time, error) -func validateTimestampConfig(cfg TimestampConfig) (parser, error) { +func validateTimestampConfig(cfg *TimestampConfig) (parser, error) { if cfg.Source == "" { return nil, ErrTimestampSourceRequired } @@ -99,7 +99,7 @@ func validateTimestampConfig(cfg TimestampConfig) (parser, error) { // newTimestampStage creates a new timestamp extraction pipeline stage. func newTimestampStage(logger log.Logger, config TimestampConfig) (Stage, error) { - parser, err := validateTimestampConfig(config) + parser, err := validateTimestampConfig(&config) if err != nil { return nil, err } diff --git a/internal/component/loki/process/stages/timestamp_test.go b/internal/component/loki/process/stages/timestamp_test.go index 6e6c2cb56a..f1dff22267 100644 --- a/internal/component/loki/process/stages/timestamp_test.go +++ b/internal/component/loki/process/stages/timestamp_test.go @@ -8,11 +8,12 @@ import ( "time" "github.com/go-kit/log" - "github.com/grafana/alloy/internal/util" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/grafana/alloy/internal/util" ) var testTimestampAlloy = ` @@ -75,9 +76,10 @@ func TestTimestampValidation(t *testing.T) { config *TimestampConfig // Note the error text validation is a little loose as it only validates with strings.HasPrefix // this is to work around different errors related to timezone loading on different systems - err error - testString string - expectedTime time.Time + err error + testString string + expectedTime time.Time + expectedConfig *TimestampConfig }{ "missing source": { config: &TimestampConfig{}, @@ -106,6 +108,18 @@ func TestTimestampValidation(t *testing.T) { testString: "2012-11-01T22:08:41-04:00", expectedTime: time.Date(2012, 11, 01, 22, 8, 41, 0, time.FixedZone("", -4*60*60)), }, + "sets default action on failure": { + config: &TimestampConfig{ + Source: "source1", + Format: time.RFC3339, + }, + err: nil, + expectedConfig: &TimestampConfig{ + Source: "source1", + Format: time.RFC3339, + ActionOnFailure: "fudge", + }, + }, "custom format with year": { config: &TimestampConfig{ Source: "source1", @@ -166,23 +180,26 @@ func TestTimestampValidation(t *testing.T) { test := test t.Run(name, func(t *testing.T) { t.Parallel() - parser, err := validateTimestampConfig(*test.config) + parser, err := validateTimestampConfig(test.config) if (err != nil) != (test.err != nil) { - t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err) + t.Errorf("validateTimestampConfig() expected error = %v, actual error = %v", test.err, err) return } if (err != nil) && !strings.HasPrefix(err.Error(), test.err.Error()) { - t.Errorf("validateOutputConfig() expected error = %v, actual error = %v", test.err, err) + t.Errorf("validateTimestampConfig() expected error = %v, actual error = %v", test.err, err) return } if test.testString != "" { ts, err := parser(test.testString) if err != nil { - t.Errorf("validateOutputConfig() unexpected error parsing test time: %v", err) + t.Errorf("validateTimestampConfig() unexpected error parsing test time: %v", err) return } assert.Equal(t, test.expectedTime.UnixNano(), ts.UnixNano()) } + if test.expectedConfig != nil { + assert.Equal(t, test.expectedConfig, test.config) + } }) } } @@ -328,9 +345,8 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) { }, "should fudge the timestamp based on the last known value on timestamp parsing failure": { config: TimestampConfig{ - Source: "time", - Format: time.RFC3339Nano, - ActionOnFailure: TimestampActionOnFailureFudge, + Source: "time", + Format: time.RFC3339Nano, }, inputEntries: []inputEntry{ {timestamp: time.Unix(1, 0), extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.400000000Z"}}, @@ -345,9 +361,8 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) { }, "should fudge the timestamp based on the last known value for the right file target": { config: TimestampConfig{ - Source: "time", - Format: time.RFC3339Nano, - ActionOnFailure: TimestampActionOnFailureFudge, + Source: "time", + Format: time.RFC3339Nano, }, inputEntries: []inputEntry{ {timestamp: time.Unix(1, 0), labels: model.LabelSet{"filename": "/1.log"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.400000000Z"}}, @@ -366,9 +381,8 @@ func TestTimestampStage_ProcessActionOnFailure(t *testing.T) { }, "should keep the input timestamp if unable to fudge because there's no known valid timestamp yet": { config: TimestampConfig{ - Source: "time", - Format: time.RFC3339Nano, - ActionOnFailure: TimestampActionOnFailureFudge, + Source: "time", + Format: time.RFC3339Nano, }, inputEntries: []inputEntry{ {timestamp: time.Unix(1, 0), labels: model.LabelSet{"filename": "/1.log"}, extracted: map[string]interface{}{"time": "2019-10-01T01:02:03.400000000Z"}},