Skip to content

Commit

Permalink
Address loki.process config reloads
Browse files Browse the repository at this point in the history
  • Loading branch information
thampiotr committed Oct 3, 2024
1 parent 67409b5 commit e588bfb
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 177 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ Main (unreleased)
- Fixes a bug where cloudwatch S3 metrics are reported as `0`

- Fixed a bug in `import.git` which caused a `"non-fast-forward update"` error message. (@ptodev)

- Fixed an issue with `loki.process` where `stage.luhn` and `stage.timestamp` would not apply
default configuration settings correctly (@thampiotr)

- Fixed an issue with `loki.process` where configuration could be reloaded even if there
were no changes. (@ptodev, @thampiotr)

### Other changes

Expand Down
44 changes: 23 additions & 21 deletions internal/component/loki/process/stages/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (

"github.com/alecthomas/units"
"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

const (
Expand All @@ -37,57 +38,57 @@ type DropConfig struct {
Expression string `alloy:"expression,attr,optional"`
OlderThan time.Duration `alloy:"older_than,attr,optional"`
LongerThan units.Base2Bytes `alloy:"longer_than,attr,optional"`
regex *regexp.Regexp
}

// validateDropConfig validates the DropConfig for the dropStage
func validateDropConfig(cfg *DropConfig) error {
func validateDropConfig(cfg *DropConfig) (*regexp.Regexp, error) {
if cfg == nil ||
(cfg.Source == "" && cfg.Expression == "" && cfg.OlderThan == emptyDuration && cfg.LongerThan == emptySize) {

return errors.New(ErrDropStageEmptyConfig)
return nil, errors.New(ErrDropStageEmptyConfig)
}
if cfg.DropReason == "" {
cfg.DropReason = defaultDropReason
}
if cfg.Value != "" && cfg.Expression != "" {
return errors.New(ErrDropStageInvalidConfig)
return nil, errors.New(ErrDropStageInvalidConfig)
}
if cfg.Separator == "" {
cfg.Separator = defaultSeparator
}
if cfg.Value != "" && cfg.Source == "" {
return errors.New(ErrDropStageNoSourceWithValue)
return nil, errors.New(ErrDropStageNoSourceWithValue)
}
var (
expr *regexp.Regexp
err error
)
if cfg.Expression != "" {
expr, err := regexp.Compile(cfg.Expression)
if err != nil {
return fmt.Errorf(ErrDropStageInvalidRegex, err)
if expr, err = regexp.Compile(cfg.Expression); err != nil {
return nil, fmt.Errorf(ErrDropStageInvalidRegex, err)
}
cfg.regex = expr
}
// The first step to exclude `value` and fully replace it with the `expression`.
// It will simplify code and less confusing for the end-user on which option to choose.
if cfg.Value != "" {
expr, err := regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value)))
expr, err = regexp.Compile(fmt.Sprintf("^%s$", regexp.QuoteMeta(cfg.Value)))
if err != nil {
return fmt.Errorf(ErrDropStageInvalidRegex, err)
return nil, fmt.Errorf(ErrDropStageInvalidRegex, err)
}
cfg.regex = expr
}
return nil
return expr, nil
}

// newDropStage creates a DropStage from config
func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Registerer) (Stage, error) {
err := validateDropConfig(&config)
regex, err := validateDropConfig(&config)
if err != nil {
return nil, err
}

return &dropStage{
logger: log.With(logger, "component", "stage", "type", "drop"),
cfg: &config,
regex: regex,
dropCount: getDropCountMetric(registerer),
}, nil
}
Expand All @@ -96,6 +97,7 @@ func newDropStage(logger log.Logger, config DropConfig, registerer prometheus.Re
type dropStage struct {
logger log.Logger
cfg *DropConfig
regex *regexp.Regexp
dropCount *prometheus.CounterVec
}

Expand Down Expand Up @@ -144,7 +146,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
return false
}
}
if m.cfg.Source != "" && m.cfg.regex == nil {
if m.cfg.Source != "" && m.regex == nil {
var match bool
match = true
for _, src := range splitSource(m.cfg.Source) {
Expand All @@ -165,8 +167,8 @@ func (m *dropStage) shouldDrop(e Entry) bool {
}
}

if m.cfg.Source == "" && m.cfg.regex != nil {
if !m.cfg.regex.MatchString(e.Line) {
if m.cfg.Source == "" && m.regex != nil {
if !m.regex.MatchString(e.Line) {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
Expand All @@ -178,7 +180,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
}
}

if m.cfg.Source != "" && m.cfg.regex != nil {
if m.cfg.Source != "" && m.regex != nil {
var extractedData []string
for _, src := range splitSource(m.cfg.Source) {
if e, ok := e.Extracted[src]; ok {
Expand All @@ -192,7 +194,7 @@ func (m *dropStage) shouldDrop(e Entry) bool {
extractedData = append(extractedData, s)
}
}
if !m.cfg.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) {
if !m.regex.MatchString(strings.Join(extractedData, m.cfg.Separator)) {
// Not a match to the regex, don't drop
if Debug {
level.Debug(m.logger).Log("msg", "line will not be dropped, the provided regular expression did not match the log line")
Expand Down
7 changes: 4 additions & 3 deletions internal/component/loki/process/stages/drop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ import (
"time"

"github.com/alecthomas/units"
"github.com/grafana/alloy/internal/util"
dskit "github.com/grafana/dskit/server"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/grafana/alloy/internal/util"
)

// Not all these are tested but are here to make sure the different types marshal without error
Expand Down Expand Up @@ -411,7 +412,7 @@ func Test_dropStage_Process(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := validateDropConfig(tt.config)
_, err := validateDropConfig(tt.config)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -465,7 +466,7 @@ func Test_validateDropConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
if _, err := validateDropConfig(tt.config); ((err != nil) && (err.Error() != tt.wantErr.Error())) || (err == nil && tt.wantErr != nil) {
t.Errorf("validateDropConfig() error = %v, wantErr = %v", err, tt.wantErr)
}
})
Expand Down
4 changes: 2 additions & 2 deletions internal/component/loki/process/stages/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"reflect"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/jmespath/go-jmespath"
"github.com/oschwald/geoip2-golang"
"github.com/oschwald/maxminddb-golang"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

var (
ErrEmptyGeoIPStageConfig = errors.New("geoip stage config cannot be empty")
ErrEmptyDBPathGeoIPStageConfig = errors.New("db path cannot be empty")
ErrEmptySourceGeoIPStageConfig = errors.New("source cannot be empty")
ErrEmptyDBTypeGeoIPStageConfig = errors.New("db type should be either city or asn")
Expand Down
36 changes: 20 additions & 16 deletions internal/component/loki/process/stages/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/alloy/internal/runtime/logging/level"
"github.com/prometheus/common/model"

"github.com/grafana/alloy/internal/runtime/logging/level"
)

const (
Expand All @@ -22,53 +23,56 @@ type LabelsConfig struct {
}

// validateLabelsConfig validates the Label stage configuration
func validateLabelsConfig(c LabelsConfig) error {
func validateLabelsConfig(c LabelsConfig) (map[string]string, error) {
// We must not mutate the c.Values, create a copy with changes we need.
ret := map[string]string{}
if c.Values == nil {
return errors.New(ErrEmptyLabelStageConfig)
return nil, errors.New(ErrEmptyLabelStageConfig)
}
for labelName, labelSrc := range c.Values {
if !model.LabelName(labelName).IsValid() {
return fmt.Errorf(ErrInvalidLabelName, labelName)
return nil, fmt.Errorf(ErrInvalidLabelName, labelName)
}
// If no label source was specified, use the key name
if labelSrc == nil || *labelSrc == "" {
lName := labelName
c.Values[labelName] = &lName
ret[labelName] = labelName
} else {
ret[labelName] = *labelSrc
}
}
return nil
return ret, nil
}

// newLabelStage creates a new label stage to set labels from extracted data
func newLabelStage(logger log.Logger, configs LabelsConfig) (Stage, error) {
err := validateLabelsConfig(configs)
labelsConfig, err := validateLabelsConfig(configs)
if err != nil {
return nil, err
}
return toStage(&labelStage{
cfgs: configs,
logger: logger,
labelsConfig: labelsConfig,
logger: logger,
}), nil
}

// labelStage sets labels from extracted data
type labelStage struct {
cfgs LabelsConfig
logger log.Logger
labelsConfig map[string]string
logger log.Logger
}

// Process implements Stage
func (l *labelStage) Process(labels model.LabelSet, extracted map[string]interface{}, _ *time.Time, _ *string) {
processLabelsConfigs(l.logger, extracted, l.cfgs, func(labelName model.LabelName, labelValue model.LabelValue) {
processLabelsConfigs(l.logger, extracted, l.labelsConfig, func(labelName model.LabelName, labelValue model.LabelValue) {
labels[labelName] = labelValue
})
}

type labelsConsumer func(labelName model.LabelName, labelValue model.LabelValue)

func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, configs LabelsConfig, consumer labelsConsumer) {
for lName, lSrc := range configs.Values {
if lValue, ok := extracted[*lSrc]; ok {
func processLabelsConfigs(logger log.Logger, extracted map[string]interface{}, labelsConfig map[string]string, consumer labelsConsumer) {
for lName, lSrc := range labelsConfig {
if lValue, ok := extracted[lSrc]; ok {
s, err := getString(lValue)
if err != nil {
if Debug {
Expand Down
28 changes: 13 additions & 15 deletions internal/component/loki/process/stages/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,8 @@ func TestLabelsPipelineWithMissingKey_Labels(t *testing.T) {
}

var (
lv1 = "lv1"
lv2c = "l2"
lv3 = ""
lv3c = "l3"
lv1 = "lv1"
lv3 = ""
)

var emptyLabelsConfig = LabelsConfig{nil}
Expand All @@ -84,19 +82,19 @@ func TestLabels(t *testing.T) {
tests := map[string]struct {
config LabelsConfig
err error
expectedCfgs LabelsConfig
expectedCfgs map[string]string
}{
"missing config": {
config: emptyLabelsConfig,
err: errors.New(ErrEmptyLabelStageConfig),
expectedCfgs: emptyLabelsConfig,
expectedCfgs: nil,
},
"invalid label name": {
config: LabelsConfig{
Values: map[string]*string{"#*FDDS*": nil},
},
err: fmt.Errorf(ErrInvalidLabelName, "#*FDDS*"),
expectedCfgs: emptyLabelsConfig,
expectedCfgs: nil,
},
"label value is set from name": {
config: LabelsConfig{Values: map[string]*string{
Expand All @@ -105,18 +103,18 @@ func TestLabels(t *testing.T) {
"l3": &lv3,
}},
err: nil,
expectedCfgs: LabelsConfig{Values: map[string]*string{
"l1": &lv1,
"l2": &lv2c,
"l3": &lv3c,
}},
expectedCfgs: map[string]string{
"l1": "lv1",
"l2": "l2",
"l3": "l3",
},
},
}
for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
err := validateLabelsConfig(test.config)
actual, err := validateLabelsConfig(test.config)
if (err != nil) != (test.err != nil) {
t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err)
return
Expand All @@ -125,8 +123,8 @@ func TestLabels(t *testing.T) {
t.Errorf("validateLabelsConfig() expected error = %v, actual error = %v", test.err, err)
return
}
if test.expectedCfgs.Values != nil {
assert.Equal(t, test.expectedCfgs, test.config)
if test.expectedCfgs != nil {
assert.Equal(t, test.expectedCfgs, actual)
}
})
}
Expand Down
5 changes: 3 additions & 2 deletions internal/component/loki/process/stages/luhn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type LuhnFilterConfig struct {
}

// validateLuhnFilterConfig validates the LuhnFilterConfig.
func validateLuhnFilterConfig(c LuhnFilterConfig) error {
func validateLuhnFilterConfig(c *LuhnFilterConfig) error {
if c.Replacement == "" {
c.Replacement = "**REDACTED**"
}
Expand All @@ -32,7 +32,8 @@ func validateLuhnFilterConfig(c LuhnFilterConfig) error {

// newLuhnFilterStage creates a new LuhnFilterStage.
func newLuhnFilterStage(config LuhnFilterConfig) (Stage, error) {
if err := validateLuhnFilterConfig(config); err != nil {
err := validateLuhnFilterConfig(&config)
if err != nil {
return nil, err
}
return toStage(&luhnFilterStage{
Expand Down
Loading

0 comments on commit e588bfb

Please sign in to comment.