Skip to content

Commit

Permalink
[aws] [s3] Introduce ignore_older & start_timestamp for S3 input allo…
Browse files Browse the repository at this point in the history
…wing better registry cleanups (#41817)

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* sort config entries

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* introduce ignore old and start timestamp configurations and document them

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* add filtering logic

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* filter tests

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* add component test for filtering and fix lint issues

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

# Conflicts:
#	x-pack/filebeat/input/awss3/s3_test.go

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* improve documentation

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

* review changes - improve naming, change signature and improve documentation

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>

---------

Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
(cherry picked from commit 4ba7d1c)
  • Loading branch information
Kavindu-Dodan authored and mergify[bot] committed Jan 7, 2025
1 parent fe44ff2 commit d315b12
Show file tree
Hide file tree
Showing 11 changed files with 687 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@
# Controls deletion of objects after backing them up
#delete_after_backup: false

# Ignore bucket entries older than the given timespan.
# Timespan is calculated from current time to processing object's last modified timestamp.
# This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m".
#ignore_older: 0

# Accept bucket entries with last modified timestamp newer than the given timestamp.
# Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty.
# For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset).
#start_timestamp:

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
28 changes: 28 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ Node pipeline.
The `aws-s3` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

NOTE: For time durations, valid time units are - "ns", "us" (or "µs"), "ms", "s", "m", "h". For example, "2h"

[float]
==== `api_timeout`

Expand Down Expand Up @@ -690,6 +692,32 @@ This option can only be used together with the backup functionality.
[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

[float]
==== `ignore_older`

The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing.
By default, this feature is disabled, allowing any entry in the bucket to be processed.
It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage.

When defined, bucket entries are processed only if their last modified timestamp falls within the specified time duration, relative to the current time.
However, when the start_timestamp is set, the initial processing will include all bucket entries up to that timestamp.

NOTE: Bucket entries that are older than the defined duration and have failed processing will not be re-processed.
It is recommended to configure a sufficiently long duration based on your use case and current settings to avoid conflicts with the bucket_list_interval property.
Additionally, this ensures that subsequent runs can include and re-process objects that failed due to unavoidable errors.

[float]
==== `start_timestamp`

Accepts a timestamp in the YYYY-MM-DDTHH:MM:SSZ format, which defines the point from which bucket entries are accepted for processing.
By default, this is disabled, allowing all entries in the bucket to be processed.

This parameter is useful when configuring input for the first time, especially if you want to ingest logs starting from a specific time.
The timestamp can also be set to a future time, offering greater flexibility.
You can combine this property with ignore_older duration to improve memory usage by reducing tracked bucket entries.

NOTE: It is recommended to update this value when updating or restarting filebeat

[float]
=== AWS Permissions

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3050,6 +3050,16 @@ filebeat.inputs:
# Controls deletion of objects after backing them up
#delete_after_backup: false

# Ignore bucket entries older than the given timespan.
# Timespan is calculated from current time to processing object's last modified timestamp.
# This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m".
#ignore_older: 0

# Accept bucket entries with last modified timestamp newer than the given timestamp.
# Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty.
# For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset).
#start_timestamp:

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
34 changes: 22 additions & 12 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ import (

type config struct {
APITimeout time.Duration `config:"api_timeout"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
QueueURL string `config:"queue_url"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
AccessPointARN string `config:"access_point_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BackupConfig backupConfig `config:",inline"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
IgnoreOlder time.Duration `config:"ignore_older"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
NumberOfWorkers int `config:"number_of_workers"`
PathStyle bool `config:"path_style"`
ProviderOverride string `config:"provider"`
BackupConfig backupConfig `config:",inline"`
QueueURL string `config:"queue_url"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
RegionName string `config:"region"`
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
StartTimestamp string `config:"start_timestamp"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
}

func defaultConfig() config {
Expand Down Expand Up @@ -142,6 +144,13 @@ func (c *config) Validate() error {
}
}

if c.StartTimestamp != "" {
_, err := time.Parse(time.RFC3339, c.StartTimestamp)
if err != nil {
return fmt.Errorf("invalid input for start_timestamp: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -295,6 +304,7 @@ func (c config) sqsConfigModifier(o *sqs.Options) {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
//nolint:staticcheck // not changing through this PR
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)
}
}
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,39 @@ func TestConfig(t *testing.T) {
expectedErr: "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop",
expectedCfg: nil,
},
{
name: "validate ignore_older and start_timestamp configurations",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"ignore_older": "24h",
"start_timestamp": "2024-11-20T20:00:00Z",
},
expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket)
c.IgnoreOlder = 24 * time.Hour
c.StartTimestamp = "2024-11-20T20:00:00Z"
return c
},
},
{
name: "ignore_older only accepts valid duration - unit valid with ParseDuration",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"ignore_older": "24D",
},
expectedErr: "time: unknown unit \"D\" in duration \"24D\" accessing 'ignore_older'",
},
{
name: "start_timestamp accepts a valid timestamp of format - YYYY-MM-DDTHH:MM:SSZ",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"start_timestamp": "2024-11-20 20:20:00",
},
expectedErr: "invalid input for start_timestamp: parsing time \"2024-11-20 20:20:00\" as \"2006-01-02T15:04:05Z07:00\": cannot parse \" 20:20:00\" as \"T\" accessing config",
},
}

for _, tc := range testCases {
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
s3ObjectHandler: s3EventHandlerFactory,
states: states,
provider: "provider",
filterProvider: newFilterProvider(&config),
}

s3Poller.run(ctx)
Expand Down
123 changes: 123 additions & 0 deletions x-pack/filebeat/input/awss3/s3_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"sync"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

const (
filterOldestTime = "oldestTimeFilter"
filterStartTime = "startTimeFilter"
)

// filterProvider exposes filters that needs to be applied on derived state.
// Once configured, retrieve filter applier using getApplierFunc.
type filterProvider struct {
cfg *config

staticFilters []filter
once sync.Once
}

func newFilterProvider(cfg *config) *filterProvider {
fp := &filterProvider{
cfg: cfg,
}

// derive static filters
if cfg.StartTimestamp != "" {
// note - errors should not occur as this has validated prior reaching here
parse, _ := time.Parse(time.RFC3339, cfg.StartTimestamp)
fp.staticFilters = append(fp.staticFilters, newStartTimestampFilter(parse))
}

return fp
}

// getApplierFunc returns aggregated filters valid at the time of retrival.
// Applier return true if state is valid for processing according to the underlying filter collection.
func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool {
filters := map[string]filter{}

if f.cfg.IgnoreOlder != 0 {
timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now())
filters[timeFilter.getID()] = timeFilter
}

for _, f := range f.staticFilters {
filters[f.getID()] = f
}

f.once.Do(func() {
// Ignore the oldest time filter once for initial startup only if start timestamp filter is defined
// This allows users to ingest desired data from start time onwards.
if filters[filterStartTime] != nil {
delete(filters, filterOldestTime)
}
})

return func(log *logp.Logger, s state) bool {
for _, f := range filters {
if !f.isValid(s) {
log.Debugf("skipping processing of object '%s' by filter '%s'", s.Key, f.getID())
return false
}
}

return true
}
}

// filter defines the fileter implementation contract
type filter interface {
getID() string
isValid(objState state) (valid bool)
}

// startTimestampFilter - filter out entries based on provided start time stamp, compared to filtering object's last modified times stamp.
type startTimestampFilter struct {
id string
timeStart time.Time
}

func newStartTimestampFilter(start time.Time) *startTimestampFilter {
return &startTimestampFilter{
id: filterStartTime,
timeStart: start,
}
}

func (s startTimestampFilter) isValid(objState state) bool {
return s.timeStart.Before(objState.LastModified)
}

func (s startTimestampFilter) getID() string {
return s.id
}

// oldestTimeFilter - filter out entries based on calculated oldest modified time, compared to filtering object's last modified times stamp.
type oldestTimeFilter struct {
id string
timeOldest time.Time
}

func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter {
return &oldestTimeFilter{
id: filterOldestTime,
timeOldest: now.Add(-1 * timespan),
}
}

func (s oldestTimeFilter) isValid(objState state) bool {
return s.timeOldest.Before(objState.LastModified)
}

func (s oldestTimeFilter) getID() string {
return s.id
}
Loading

0 comments on commit d315b12

Please sign in to comment.