Skip to content

Commit

Permalink
review changes - improve naming, change signature and improve documen…
Browse files Browse the repository at this point in the history
…tation

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
  • Loading branch information
Kavindu-Dodan committed Jan 2, 2025
1 parent da508e0 commit ce2786a
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 9 deletions.
2 changes: 1 addition & 1 deletion x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]
[float]
==== `ignore_older`

The parameter specifies the time duration during which bucket entries are accepted for processing.
The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing.
By default, this feature is disabled, allowing any entry in the bucket to be processed.
It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage.

Expand Down
7 changes: 3 additions & 4 deletions x-pack/filebeat/input/awss3/s3_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool {
filters := map[string]filter{}

if f.cfg.IgnoreOlder != 0 {
timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder)
timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now())
filters[timeFilter.getID()] = timeFilter
}

Expand Down Expand Up @@ -107,11 +107,10 @@ type oldestTimeFilter struct {
timeOldest time.Time
}

func newOldestTimeFilter(timespan time.Duration) *oldestTimeFilter {
oldest := time.Now().Add(-1 * timespan)
func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter {
return &oldestTimeFilter{
id: filterOldestTime,
timeOldest: oldest,
timeOldest: now.Add(-1 * timespan),
}
}

Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func Test_oldestTimeFilter(t *testing.T) {
duration := time.Duration(1) * time.Second
entry := newState("bucket", "key", "eTag", time.Now())

oldTimeFilter := newOldestTimeFilter(duration)
oldTimeFilter := newOldestTimeFilter(duration, time.Now())

assert.Equal(t, filterOldestTime, oldTimeFilter.getID())
assert.True(t, oldTimeFilter.isValid(entry))
Expand Down Expand Up @@ -171,7 +171,7 @@ func Test_oldestTimeFilter(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
timeFilter := newOldestTimeFilter(test.duration)
timeFilter := newOldestTimeFilter(test.duration, time.Now())
assert.Equal(t, test.result, timeFilter.isValid(test.input))
})
}
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
defer close(workChan)
bucketName := getBucketNameFromARN(in.config.getBucketARN())

applyFilter := in.filterProvider.getApplierFunc()
isStateValid := in.filterProvider.getApplierFunc()

errorBackoff := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120)
circuitBreaker := 0
Expand Down Expand Up @@ -235,7 +235,7 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state)
in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects))
for _, object := range page.Contents {
state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified)
if !applyFilter(in.log, state) {
if !isStateValid(in.log, state) {
continue
}

Expand Down

0 comments on commit ce2786a

Please sign in to comment.