diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 44f5ecc3325..1935e2478b9 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -695,7 +695,7 @@ include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] [float] ==== `ignore_older` -The parameter specifies the time duration during which bucket entries are accepted for processing. +The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing. By default, this feature is disabled, allowing any entry in the bucket to be processed. It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage. diff --git a/x-pack/filebeat/input/awss3/s3_filters.go b/x-pack/filebeat/input/awss3/s3_filters.go index df9fbfedd80..a3471658603 100644 --- a/x-pack/filebeat/input/awss3/s3_filters.go +++ b/x-pack/filebeat/input/awss3/s3_filters.go @@ -46,7 +46,7 @@ func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool { filters := map[string]filter{} if f.cfg.IgnoreOlder != 0 { - timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder) + timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now()) filters[timeFilter.getID()] = timeFilter } @@ -107,11 +107,10 @@ type oldestTimeFilter struct { timeOldest time.Time } -func newOldestTimeFilter(timespan time.Duration) *oldestTimeFilter { - oldest := time.Now().Add(-1 * timespan) +func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter { return &oldestTimeFilter{ id: filterOldestTime, - timeOldest: oldest, + timeOldest: now.Add(-1 * timespan), } } diff --git a/x-pack/filebeat/input/awss3/s3_filters_test.go b/x-pack/filebeat/input/awss3/s3_filters_test.go index 096acea6f77..0350b4952cd 100644 --- a/x-pack/filebeat/input/awss3/s3_filters_test.go +++ b/x-pack/filebeat/input/awss3/s3_filters_test.go @@ -142,7 +142,7 @@ func Test_oldestTimeFilter(t *testing.T) { duration := time.Duration(1) * time.Second entry := newState("bucket", "key", "eTag", time.Now()) - oldTimeFilter := newOldestTimeFilter(duration) + oldTimeFilter := newOldestTimeFilter(duration, time.Now()) assert.Equal(t, filterOldestTime, oldTimeFilter.getID()) assert.True(t, oldTimeFilter.isValid(entry)) @@ -171,7 +171,7 @@ func Test_oldestTimeFilter(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - timeFilter := newOldestTimeFilter(test.duration) + timeFilter := newOldestTimeFilter(test.duration, time.Now()) assert.Equal(t, test.result, timeFilter.isValid(test.input)) }) } diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index 31715c4d08f..ad20579aae4 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -202,7 +202,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) defer close(workChan) bucketName := getBucketNameFromARN(in.config.getBucketARN()) - applyFilter := in.filterProvider.getApplierFunc() + isStateValid := in.filterProvider.getApplierFunc() errorBackoff := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120) circuitBreaker := 0 @@ -235,7 +235,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified) - if !applyFilter(in.log, state) { + if !isStateValid(in.log, state) { continue }