Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unnecessary loki.process config reloads #1809

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Main (unreleased)
- Fixes a bug where cloudwatch S3 metrics are reported as `0`

- Fixed a bug in `import.git` which caused a `"non-fast-forward update"` error message. (@ptodev)

- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply
default configuration settings correctly (@thampiotr)

- Fixed an issue with `loki.process` where configuration could be reloaded even if there
were no changes. (@ptodev, @thampiotr)

### Other changes

Expand Down
44 changes: 23 additions & 21 deletions internal/component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

const (
Expand All @@ -37,57 +38,57 @@ type DropConfig struct {
Expression string `alloy:"expression,attr,optional"`
OlderThan time.Duration `alloy:"older_than,attr,optional"`
LongerThan units.Base2Bytes `alloy:"longer_than,attr,optional"`
regex *regexp.Regexp
}

// validateDropConfig validates the DropConfig for the dropStage
func validateDropConfig(cfg *DropConfig) error {
func validateDropConfig(cfg *DropConfig) (*regexp.Regexp, error) {
if cfg == nil ||
(cfg.Source == "" && cfg.Expression == "" && cfg.OlderThan == emptyDuration && cfg.LongerThan == emptySize) {

return errors.New(ErrDropStageEmptyConfig)
return nil, errors.New(ErrDropStageEmptyConfig)
}
if cfg.DropReason == "" {
cfg.DropReason = defaultDropReason
}
if cfg.Value != "" && cfg.Expression != "" {
return errors.New(ErrDropStageInvalidConfig)
return nil, errors.New(ErrDropStageInvalidConfig)
}
if cfg.Separator == "" {
cfg.Separator = defaultSeparator
}
if cfg.Value != "" && cfg.Source == "" {
return errors.New(ErrDropStageNoSourceWithValue)
return nil, errors.New(ErrDropStageNoSourceWithValue)
}
var (
expr *regexp.Regexp
err error
)
if cfg.Expression != "" {
expr, err := regexp.Compile(cfg.Expression)
if err != nil {
return fmt.Errorf(ErrDropStageInvalidRegex, err)
if expr, err = regexp.Compile(cfg.Expression); err != nil {
return nil, fmt.Errorf(ErrDropStageInvalidRegex, err)
}
cfg.regex = expr
}
// The first step to exclude `value` and fully replace it with the `expression`.
// It will simplify code and less confusing for the end-user on which option to choose.
if cfg.Value != "" {
expr, err := regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value)))
expr, err = regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value)))
if err != nil {
return fmt.Errorf(ErrDropStageInvalidRegex, err)
return nil, fmt.Errorf(ErrDropStageInvalidRegex, err)
}
cfg.regex = expr
}
return nil
return expr, nil
}

// newDropStage creates a DropStage from config
func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Registerer) (Stage, error) {
err := validateDropConfig(&config)
regex, err := validateDropConfig(&config)
if err != nil {
return nil, err
}

return &dropStage{
logger: log.With(logger, "component", "stage", "type", "drop"),
cfg: &config,
regex: regex,
dropCount: getDropCountMetric(registerer),
}, nil
}
Expand All @@ -96,6 +97,7 @@ func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Re
type dropStage struct {
logger log.Logger
cfg *DropConfig
regex *regexp.Regexp
dropCount *prometheus.CounterVec
}

Expand Down Expand Up @@ -144,7 +146,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
return false
}
}
if m.cfg.Source != "" && m.cfg.regex == nil {
if m.cfg.Source != "" && m.regex == nil {
var match bool
match = true
for _, src := range splitSource(m.cfg.Source) {
Expand All @@ -165,8 +167,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
}
}

if m.cfg.Source == "" && m.cfg.regex != nil {
if !m.cfg.regex.MatchString(e.Line) {
if m.cfg.Source == "" && m.regex != nil {
if !m.regex.MatchString(e.Line) {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
Expand All @@ -178,7 +180,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
}
}

if m.cfg.Source != "" && m.cfg.regex != nil {
if m.cfg.Source != "" && m.regex != nil {
var extractedData []string
for _, src := range splitSource(m.cfg.Source) {
if e, ok := e.Extracted[src]; ok {
Expand All @@ -192,7 +194,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
extractedData = append(extractedData, s)
}
}
if !m.cfg.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) {
if !m.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"time"

"github.com/alecthomas/units"
"github.com/grafana/alloy/internal/util"
dskit "github.com/grafana/dskit/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/util"
)

// Not all these are tested but are here to make sure the different types marshal without error
Expand Down Expand Up @@ -411,7 +412,7 @@ func Test_dropStage_Process(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateDropConfig(tt.config)
_, err := validateDropConfig(tt.config)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -465,7 +466,7 @@ func Test_validateDropConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
if _, err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
Expand Down
4 changes: 2 additions & 2 deletions internal/component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"reflect"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/jmespath/go-jmespath"
"github.com/oschwald/geoip2-golang"
"github.com/oschwald/maxminddb-golang"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

var (
ErrEmptyGeoIPStageConfig = errors.New("geoip stage config cannot be empty")
ErrEmptyDBPathGeoIPStageConfig = errors.New("db path cannot be empty")
ErrEmptySourceGeoIPStageConfig = errors.New("source cannot be empty")
ErrEmptyDBTypeGeoIPStageConfig = errors.New("db type should be either city or asn")
Expand Down
36 changes: 20 additions & 16 deletions internal/component/loki/process/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

const (
Expand All @@ -22,53 +23,56 @@ type LabelsConfig struct {
}

// validateLabelsConfig validates the Label stage configuration
func validateLabelsConfig(c LabelsConfig) error {
func validateLabelsConfig(c LabelsConfig) (map[string]string, error) {
// We must not mutate the c.Values, create a copy with changes we need.
ret := map[string]string{}
if c.Values == nil {
return errors.New(ErrEmptyLabelStageConfig)
return nil, errors.New(ErrEmptyLabelStageConfig)
}
for labelName, labelSrc := range c.Values {
if !model.LabelName(labelName).IsValid() {
return fmt.Errorf(ErrInvalidLabelName, labelName)
return nil, fmt.Errorf(ErrInvalidLabelName, labelName)
}
// If no label source was specified, use the key name
if labelSrc == nil || *labelSrc == "" {
lName := labelName
c.Values[labelName] = &lName
ret[labelName] = labelName
} else {
ret[labelName] = *labelSrc
}
}
return nil
return ret, nil
}

// newLabelStage creates a new label stage to set labels from extracted data
func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) {
err := validateLabelsConfig(configs)
labelsConfig, err := validateLabelsConfig(configs)
if err != nil {
return nil, err
}
return toStage(&labelStage{
cfgs: configs,
logger: logger,
labelsConfig: labelsConfig,
logger: logger,
}), nil
}

// labelStage sets labels from extracted data
type labelStage struct {
cfgs LabelsConfig
logger log.Logger
labelsConfig map[string]string
logger log.Logger
}

// Process implements Stage
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
processLabelsConfigs(l.logger, extracted, l.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) {
labels[labelName] = labelValue
})
}

type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue)

func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, configs LabelsConfig, consumer labelsConsumer) {
for lName, lSrc := range configs.Values {
if lValue, ok := extracted[*lSrc]; ok {
func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, labelsConfig map[string]string, consumer labelsConsumer) {
for lName, lSrc := range labelsConfig {
if lValue, ok := extracted[lSrc]; ok {
s, err := getString(lValue)
if err != nil {
if Debug {
Expand Down
28 changes: 13 additions & 15 deletions internal/component/loki/process/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) {
}

var (
lv1 = "lv1"
lv2c = "l2"
lv3 = ""
lv3c = "l3"
lv1 = "lv1"
lv3 = ""
)

var emptyLabelsConfig = LabelsConfig{nil}
Expand All @@ -84,19 +82,19 @@ func TestLabels(t *testing.T) {
tests := map[string]struct {
config LabelsConfig
err error
expectedCfgs LabelsConfig
expectedCfgs map[string]string
}{
"missing config": {
config: emptyLabelsConfig,
err: errors.New(ErrEmptyLabelStageConfig),
expectedCfgs: emptyLabelsConfig,
expectedCfgs: nil,
},
"invalid label name": {
config: LabelsConfig{
Values: map[string]*string{"#*FDDS*": nil},
},
err: fmt.Errorf(ErrInvalidLabelName, "#*FDDS*"),
expectedCfgs: emptyLabelsConfig,
expectedCfgs: nil,
},
"label value is set from name": {
config: LabelsConfig{Values: map[string]*string{
Expand All @@ -105,18 +103,18 @@ func TestLabels(t *testing.T) {
"l3": &lv3,
}},
err: nil,
expectedCfgs: LabelsConfig{Values: map[string]*string{
"l1": &lv1,
"l2": &lv2c,
"l3": &lv3c,
}},
expectedCfgs: map[string]string{
"l1": lv1,
"l2": "l2",
"l3": "l3",
},
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
err := validateLabelsConfig(test.config)
actual, err := validateLabelsConfig(test.config)
if (err != nil) != (test.err != nil) {
t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err)
return
Expand All @@ -125,8 +123,8 @@ func TestLabels(t *testing.T) {
t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err)
return
}
if test.expectedCfgs.Values != nil {
assert.Equal(t, test.expectedCfgs, test.config)
if test.expectedCfgs != nil {
assert.Equal(t, test.expectedCfgs, actual)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions internal/component/loki/process/stages/luhn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type LuhnFilterConfig struct {
}

// validateLuhnFilterConfig validates the LuhnFilterConfig.
func validateLuhnFilterConfig(c LuhnFilterConfig) error {
func validateLuhnFilterConfig(c *LuhnFilterConfig) error {
if c.Replacement == "" {
c.Replacement = "**REDACTED**"
}
Expand All @@ -32,7 +32,7 @@ func validateLuhnFilterConfig(c LuhnFilterConfig) error {

// newLuhnFilterStage creates a new LuhnFilterStage.
func newLuhnFilterStage(config LuhnFilterConfig) (Stage, error) {
if err := validateLuhnFilterConfig(config); err != nil {
if err := validateLuhnFilterConfig(&config); err != nil {
return nil, err
}
return toStage(&luhnFilterStage{
Expand Down
Loading
Loading