Skip to content

Commit

Permalink
Filestream returns error when an input with duplicated ID is created (e…
Browse files Browse the repository at this point in the history
…lastic#41954)

* filestream: do not run input with duplicated ID

* wip

* wip

* Restore configuration files/remove test files

* Move reload retry logic to reload.go

* Fix isReloadable and improve tests

* Add changelog

* remove comments

* Fix build

* Update reloadInputs to use the new error types from cfgfile/list.go

* Fix more tests

* Add `allow_deprecated_id_duplication` flag

* Add non reloadable logic to autodiscover

* Add test to autodiscover not reloading common.ErrNonReloadable

* Add test for common.IsInputReloadable

* Update changelog

* address lint warnings

* Update notice to 2025

* Implement review suggestions

* Fix OTel API

* Fix flakiness on TestFilestreamMetadataUpdatedOnRename

For some reason this test became flaky, the root of the flakiness
is not on the test, it is on how a rename operation is detected.
Even though this test uses `os.Rename`, it does not seem to be an atomic
operation. https://www.man7.org/linux/man-pages/man2/rename.2.html
does not make it clear whether 'renameat' (used by `os.Rename`) is
atomic.

On a flaky execution, the file is actually perceived as removed
and then a new file is created, both with the same inode. This
happens on a system that does not reuse inodes as soon they're
freed. Because the file is detected as removed, it's state is also
removed. Then when more data is added, only the offset of the new
data is tracked by the registry, causing the test to fail.

A workaround for this is to not remove the state when the file is
removed, hence `clean_removed: false` is set in the test config.

---------

Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
Co-authored-by: Julien Lind <julien.lind@elastic.co>
  • Loading branch information
3 people authored Jan 4, 2025
1 parent ef29005 commit e3e2332
Show file tree
Hide file tree
Showing 20 changed files with 530 additions and 56 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]

*Heartbeat*


Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand Down
17 changes: 16 additions & 1 deletion filebeat/docs/inputs/input-filestream-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,27 @@ supported.
===== `id`

A unique identifier for this filestream input. Each filestream input
must have a unique ID.
must have a unique ID. Filestream will not start inputs with duplicated IDs.

WARNING: Changing input ID may cause data duplication because the
state of the files will be lost and they will be read from the
beginning again.

[float]
[[filestream-input-allow_deprecated_id_duplication]]
===== `allow_deprecated_id_duplication`

This allows {beatname_uc} to run multiple instances of the {type}
input with the same ID. This is intended to add backwards
compatibility with the behaviour prior to 9.0. It defaults to `false`
and is **not recommended** in new configurations.

This setting is per input, so make sure to enable it in all {type}
inputs that use duplicated IDs.

WARNING: Duplicated IDs will lead to data duplication and some input
instances will not produce any metrics.

[float]
[[filestream-input-paths]]
===== `paths`
Expand Down
11 changes: 11 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type config struct {
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
Rotation *conf.Namespace `config:"rotation"`
TakeOver bool `config:"take_over"`

// AllowIDDuplication is used by InputManager.Create
// (see internal/input-logfile/manager.go).
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
}

type closerConfig struct {
Expand Down Expand Up @@ -142,6 +146,13 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if c.AllowIDDuplication {
logp.L().Named("input.filestream").Warn(
"setting `allow_deprecated_id_duplication` will lead to data " +
"duplication and incomplete input metrics, it's use is " +
"highly discouraged.")
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
// For some reason this test became flaky, the root of the flakiness
// is not on the test, it is on how a rename operation is detected.
// Even though this test uses `os.Rename`, it does not seem to be an atomic
// operation. https://www.man7.org/linux/man-pages/man2/rename.2.html
// does not make it clear whether 'renameat' (used by `os.Rename`) is
// atomic.
//
// On a flaky execution, the file is actually perceived as removed
// and then a new file is created, both with the same inode. This
// happens on a system that does not reuse inodes as soon they're
// freed. Because the file is detected as removed, it's state is also
// removed. Then when more data is added, only the offset of the new
// data is tracked by the registry, causing the test to fail.
//
// A workaround for this is to not remove the state when the file is
// removed, hence `clean_removed: false` is set here.
"clean_removed": false,
})

testline := []byte("log line\n")
Expand Down
47 changes: 38 additions & 9 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/go-concert/unison"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -155,26 +156,54 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}

settings := struct {
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
}{CleanInactive: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat")
}

metricsID := settings.ID
cim.idsMux.Lock()
if _, exists := cim.ids[settings.ID]; exists {
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.", settings.ID)
metricsID = ""
duplicatedInput := map[string]any{}
unpackErr := config.Unpack(&duplicatedInput)
if unpackErr != nil {
duplicatedInput["error"] = fmt.Errorf("failed to unpack duplicated input config: %w", unpackErr).Error()
}

// Keep old behaviour so users can upgrade to 9.0 without
// having their inputs not starting.
if settings.AllowIDDuplication {
cim.Logger.Errorf("filestream input with ID '%s' already exists, "+
"this will lead to data duplication, please use a different "+
"ID. Metrics collection has been disabled on this input. The "+
" input will start only because "+
"'allow_deprecated_id_duplication' is set to true",
settings.ID)
metricsID = ""
} else {
cim.Logger.Errorw(
fmt.Sprintf(
"filestream input ID '%s' is duplicated: input will NOT start",
settings.ID,
),
"input.cfg", conf.DebugString(config, true))

cim.idsMux.Unlock()
return nil, &common.ErrNonReloadable{
Err: fmt.Errorf(
"filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID",
settings.ID,
)}
}
}

// TODO: improve how inputs with empty IDs are tracked.
Expand Down
124 changes: 124 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package input_logfile

import (
"bytes"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -42,6 +45,18 @@ func (s *testSource) Name() string {
return s.name
}

type noopProspector struct{}

func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error {
return nil
}

func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}

func (m noopProspector) Test() error {
return nil
}

func TestSourceIdentifier_ID(t *testing.T) {
testCases := map[string]struct {
userID string
Expand Down Expand Up @@ -198,6 +213,115 @@ func TestInputManager_Create(t *testing.T) {
assert.NotContains(t, buff.String(),
"already exists")
})

t.Run("does not start an input with duplicated ID", func(t *testing.T) {
tcs := []struct {
name string
id string
}{
{name: "ID is empty", id: ""},
{name: "non-empty ID", id: "non-empty-ID"},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/foo
`, tc.id))

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/bar
`, tc.id))

_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")

// Attempt to create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.Error(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
fmt.Sprintf("filestream input ID '%s' is duplicated:", tc.id))

// Assert the error contains the correct text
assert.Contains(t, err.Error(),
fmt.Sprintf("filestream input with ID '%s' already exists", tc.id))
})
}
})

t.Run("allow duplicated IDs setting", func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(`
type: filestream
id: duplicated-id
allow_deprecated_id_duplication: true
paths:
- /var/log/foo
`)

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(`
type: filestream
id: duplicated-id
allow_deprecated_id_duplication: true
paths:
- /var/log/bar
`)
_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")
// Create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.NoError(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
"filestream input with ID 'duplicated-id' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.",
"did not find the expected message about the duplicated input ID")
})
}

func newBufferLogger() (*logp.Logger, *bytes.Buffer) {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ logging:
filebeat.WaitForLogs(
"Input 'filestream' starting",
10*time.Second,
"Filebeat did log a validation error")
"Filebeat did not log a validation error")
}

func TestFilestreamCanMigrateIdentity(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}

if bt.config.ConfigMonitors.Enabled() {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunReloadableMonitors()
Expand Down
6 changes: 5 additions & 1 deletion libbeat/autodiscover/autodiscover.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/elastic/beats/v7/libbeat/autodiscover/meta"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/elastic-agent-autodiscover/bus"
conf "github.com/elastic/elastic-agent-libs/config"
Expand Down Expand Up @@ -169,7 +170,10 @@ func (a *Autodiscover) worker() {
updated = false

// On error, make sure the next run also updates because some runners were not properly loaded
retry = err != nil
retry = common.IsInputReloadable(err)
if err != nil && !retry {
a.logger.Errorw("all new inputs failed to start with a non-retriable error", err)
}
if retry {
// The recoverable errors that can lead to retry are related
// to the harvester state, so we need to give the publishing
Expand Down
Loading

0 comments on commit e3e2332

Please sign in to comment.