diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f4d7ef0ee757..2903959e0bd5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -367,6 +367,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] - Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] +- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804] *Auditbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 4e966d594c57..84ae2d963176 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -124,6 +124,16 @@ # Controls deletion of objects after backing them up #delete_after_backup: false + # Ignore bucket entries older than the given timespan. + # Timespan is calculated from current time to processing object's last modified timestamp. + # This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m". + #ignore_older: 0 + + # Accept bucket entries with last modified timestamp newer than the given timestamp. + # Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty. + # For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset). + #start_timestamp: + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 9858d3022dbd..1e4d61b3f537 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -178,6 +178,8 @@ Node pipeline. The `aws-s3` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. +NOTE: For time durations, valid time units are - "ns", "us" (or "µs"), "ms", "s", "m", "h". For example, "2h" + [float] ==== `api_timeout` @@ -690,6 +692,32 @@ This option can only be used together with the backup functionality. [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] +[float] +==== `ignore_older` + +The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing. +By default, this feature is disabled, allowing any entry in the bucket to be processed. +It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage. + +When defined, bucket entries are processed only if their last modified timestamp falls within the specified time duration, relative to the current time. +However, when the start_timestamp is set, the initial processing will include all bucket entries up to that timestamp. + +NOTE: Bucket entries that are older than the defined duration and have failed processing will not be re-processed. +It is recommended to configure a sufficiently long duration based on your use case and current settings to avoid conflicts with the bucket_list_interval property. +Additionally, this ensures that subsequent runs can include and re-process objects that failed due to unavoidable errors. + +[float] +==== `start_timestamp` + +Accepts a timestamp in the YYYY-MM-DDTHH:MM:SSZ format, which defines the point from which bucket entries are accepted for processing. +By default, this is disabled, allowing all entries in the bucket to be processed. + +This parameter is useful when configuring input for the first time, especially if you want to ingest logs starting from a specific time. +The timestamp can also be set to a future time, offering greater flexibility. +You can combine this property with ignore_older duration to improve memory usage by reducing tracked bucket entries. + +NOTE: It is recommended to update this value when updating or restarting filebeat + [float] === AWS Permissions diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index ae0871a6e3da..86c0c3ad4b67 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3050,6 +3050,16 @@ filebeat.inputs: # Controls deletion of objects after backing them up #delete_after_backup: false + # Ignore bucket entries older than the given timespan. + # Timespan is calculated from current time to processing object's last modified timestamp. + # This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m". + #ignore_older: 0 + + # Accept bucket entries with last modified timestamp newer than the given timestamp. + # Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty. + # For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset). + #start_timestamp: + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 843061ae3c3e..bc62ed9f8717 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -27,24 +27,26 @@ import ( type config struct { APITimeout time.Duration `config:"api_timeout"` - VisibilityTimeout time.Duration `config:"visibility_timeout"` - SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. - SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. - SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` - QueueURL string `config:"queue_url"` - RegionName string `config:"region"` - BucketARN string `config:"bucket_arn"` + AWSConfig awscommon.ConfigAWS `config:",inline"` AccessPointARN string `config:"access_point_arn"` - NonAWSBucketName string `config:"non_aws_bucket_name"` + BackupConfig backupConfig `config:",inline"` + BucketARN string `config:"bucket_arn"` BucketListInterval time.Duration `config:"bucket_list_interval"` BucketListPrefix string `config:"bucket_list_prefix"` - NumberOfWorkers int `config:"number_of_workers"` - AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` - ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + IgnoreOlder time.Duration `config:"ignore_older"` + NonAWSBucketName string `config:"non_aws_bucket_name"` + NumberOfWorkers int `config:"number_of_workers"` PathStyle bool `config:"path_style"` ProviderOverride string `config:"provider"` - BackupConfig backupConfig `config:",inline"` + QueueURL string `config:"queue_url"` + ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + RegionName string `config:"region"` + SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. + SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` + SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. + StartTimestamp string `config:"start_timestamp"` + VisibilityTimeout time.Duration `config:"visibility_timeout"` } func defaultConfig() config { @@ -142,6 +144,13 @@ func (c *config) Validate() error { } } + if c.StartTimestamp != "" { + _, err := time.Parse(time.RFC3339, c.StartTimestamp) + if err != nil { + return fmt.Errorf("invalid input for start_timestamp: %w", err) + } + } + return nil } @@ -295,6 +304,7 @@ func (c config) sqsConfigModifier(o *sqs.Options) { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } if c.AWSConfig.Endpoint != "" { + //nolint:staticcheck // not changing through this PR o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint) } } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index d791271ba6ef..50ccf47c1e80 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -596,6 +596,39 @@ func TestConfig(t *testing.T) { expectedErr: "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop", expectedCfg: nil, }, + { + name: "validate ignore_older and start_timestamp configurations", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "ignore_older": "24h", + "start_timestamp": "2024-11-20T20:00:00Z", + }, + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket) + c.IgnoreOlder = 24 * time.Hour + c.StartTimestamp = "2024-11-20T20:00:00Z" + return c + }, + }, + { + name: "ignore_older only accepts valid duration - unit valid with ParseDuration", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "ignore_older": "24D", + }, + expectedErr: "time: unknown unit \"D\" in duration \"24D\" accessing 'ignore_older'", + }, + { + name: "start_timestamp accepts a valid timestamp of format - YYYY-MM-DDTHH:MM:SSZ", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "start_timestamp": "2024-11-20 20:20:00", + }, + expectedErr: "invalid input for start_timestamp: parsing time \"2024-11-20 20:20:00\" as \"2006-01-02T15:04:05Z07:00\": cannot parse \" 20:20:00\" as \"T\" accessing config", + }, } for _, tc := range testCases { diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index e47da893bbed..d8b148ec9ecf 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -349,6 +349,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult s3ObjectHandler: s3EventHandlerFactory, states: states, provider: "provider", + filterProvider: newFilterProvider(&config), } s3Poller.run(ctx) diff --git a/x-pack/filebeat/input/awss3/s3_filters.go b/x-pack/filebeat/input/awss3/s3_filters.go new file mode 100644 index 000000000000..a34716586038 --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_filters.go @@ -0,0 +1,123 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + filterOldestTime = "oldestTimeFilter" + filterStartTime = "startTimeFilter" +) + +// filterProvider exposes filters that needs to be applied on derived state. +// Once configured, retrieve filter applier using getApplierFunc. +type filterProvider struct { + cfg *config + + staticFilters []filter + once sync.Once +} + +func newFilterProvider(cfg *config) *filterProvider { + fp := &filterProvider{ + cfg: cfg, + } + + // derive static filters + if cfg.StartTimestamp != "" { + // note - errors should not occur as this has validated prior reaching here + parse, _ := time.Parse(time.RFC3339, cfg.StartTimestamp) + fp.staticFilters = append(fp.staticFilters, newStartTimestampFilter(parse)) + } + + return fp +} + +// getApplierFunc returns aggregated filters valid at the time of retrival. +// Applier return true if state is valid for processing according to the underlying filter collection. +func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool { + filters := map[string]filter{} + + if f.cfg.IgnoreOlder != 0 { + timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now()) + filters[timeFilter.getID()] = timeFilter + } + + for _, f := range f.staticFilters { + filters[f.getID()] = f + } + + f.once.Do(func() { + // Ignore the oldest time filter once for initial startup only if start timestamp filter is defined + // This allows users to ingest desired data from start time onwards. + if filters[filterStartTime] != nil { + delete(filters, filterOldestTime) + } + }) + + return func(log *logp.Logger, s state) bool { + for _, f := range filters { + if !f.isValid(s) { + log.Debugf("skipping processing of object '%s' by filter '%s'", s.Key, f.getID()) + return false + } + } + + return true + } +} + +// filter defines the fileter implementation contract +type filter interface { + getID() string + isValid(objState state) (valid bool) +} + +// startTimestampFilter - filter out entries based on provided start time stamp, compared to filtering object's last modified times stamp. +type startTimestampFilter struct { + id string + timeStart time.Time +} + +func newStartTimestampFilter(start time.Time) *startTimestampFilter { + return &startTimestampFilter{ + id: filterStartTime, + timeStart: start, + } +} + +func (s startTimestampFilter) isValid(objState state) bool { + return s.timeStart.Before(objState.LastModified) +} + +func (s startTimestampFilter) getID() string { + return s.id +} + +// oldestTimeFilter - filter out entries based on calculated oldest modified time, compared to filtering object's last modified times stamp. +type oldestTimeFilter struct { + id string + timeOldest time.Time +} + +func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter { + return &oldestTimeFilter{ + id: filterOldestTime, + timeOldest: now.Add(-1 * timespan), + } +} + +func (s oldestTimeFilter) isValid(objState state) bool { + return s.timeOldest.Before(objState.LastModified) +} + +func (s oldestTimeFilter) getID() string { + return s.id +} diff --git a/x-pack/filebeat/input/awss3/s3_filters_test.go b/x-pack/filebeat/input/awss3/s3_filters_test.go new file mode 100644 index 000000000000..0350b4952cdd --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_filters_test.go @@ -0,0 +1,179 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/stretchr/testify/assert" +) + +func Test_filterProvider(t *testing.T) { + t.Run("Configuration check", func(t *testing.T) { + cfg := config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + } + + fProvider := newFilterProvider(&cfg) + + assert.Equal(t, 1, len(fProvider.staticFilters)) + assert.Equal(t, filterStartTime, fProvider.staticFilters[0].getID()) + }) + + logger := logp.NewLogger("test-logger") + + tests := []struct { + name string + cfg *config + inputState state + runFilterForCount int + expectFilterResults []bool + }{ + { + name: "Simple run - all valid result", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Now()), + runFilterForCount: 1, + expectFilterResults: []bool{true}, + }, + { + name: "Simple run - all invalid result", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + }, + inputState: newState("bucket", "key", "eTag", time.Unix(0, 0)), + runFilterForCount: 1, + expectFilterResults: []bool{false}, + }, + { + name: "Simple run - no filters hence valid result", + cfg: &config{}, + inputState: newState("bucket", "key", "eTag", time.Now()), + runFilterForCount: 1, + expectFilterResults: []bool{true}, + }, + { + name: "Single filter - ignore old invalid result", + cfg: &config{ + IgnoreOlder: 1 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Unix(time.Now().Add(-2*time.Minute).Unix(), 0)), + runFilterForCount: 1, + expectFilterResults: []bool{false}, + }, + { + name: "Combined filters - ignore old won't affect first run if timestamp is given but will affect thereafter", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Unix(1732654860, 0)), // 2024-11-26T21:01:00Z in epoch + runFilterForCount: 3, + expectFilterResults: []bool{true, false, false}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + provider := newFilterProvider(test.cfg) + results := make([]bool, 0, test.runFilterForCount) + + for i := 0; i < test.runFilterForCount; i++ { + applierFunc := provider.getApplierFunc() + results = append(results, applierFunc(logger, test.inputState)) + } + + assert.Equal(t, test.expectFilterResults, results) + }) + } +} + +func Test_startTimestampFilter(t *testing.T) { + t.Run("Configuration check", func(t *testing.T) { + entry := newState("bucket", "key", "eTag", time.Now()) + + oldTimeFilter := newStartTimestampFilter(time.Now().Add(-2 * time.Minute)) + + assert.Equal(t, filterStartTime, oldTimeFilter.getID()) + assert.True(t, oldTimeFilter.isValid(entry)) + }) + + tests := []struct { + name string + timeStamp time.Time + input state + result bool + }{ + { + name: "State valid", + timeStamp: time.Now().Add(-10 * time.Minute), + input: newState("bucket", "key", "eTag", time.Now()), + result: true, + }, + + { + name: "State invalid", + timeStamp: time.Now(), + input: newState("bucket", "key", "eTag", time.Now().Add(-10*time.Minute)), + result: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timeFilter := newStartTimestampFilter(test.timeStamp) + assert.Equal(t, test.result, timeFilter.isValid(test.input)) + }) + } + +} + +func Test_oldestTimeFilter(t *testing.T) { + t.Run("configuration check", func(t *testing.T) { + duration := time.Duration(1) * time.Second + entry := newState("bucket", "key", "eTag", time.Now()) + + oldTimeFilter := newOldestTimeFilter(duration, time.Now()) + + assert.Equal(t, filterOldestTime, oldTimeFilter.getID()) + assert.True(t, oldTimeFilter.isValid(entry)) + }) + + tests := []struct { + name string + duration time.Duration + input state + result bool + }{ + { + name: "State valid", + duration: time.Duration(1) * time.Minute, + input: newState("bucket", "key", "eTag", time.Now()), + result: true, + }, + + { + name: "State invalid", + duration: time.Duration(1) * time.Minute, + input: newState("bucket", "key", "eTag", time.Now().Add(-10*time.Minute)), + result: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timeFilter := newOldestTimeFilter(test.duration, time.Now()) + assert.Equal(t, test.result, timeFilter.isValid(test.input)) + }) + } + +} diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index c2d3b8446cc4..ad20579aae49 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -36,6 +36,7 @@ type s3PollerInput struct { metrics *inputMetrics s3ObjectHandler s3ObjectHandlerFactory states *states + filterProvider *filterProvider } func newS3PollerInput( @@ -43,11 +44,11 @@ func newS3PollerInput( awsConfig awssdk.Config, store beater.StateStore, ) (v2.Input, error) { - return &s3PollerInput{ - config: config, - awsConfig: awsConfig, - store: store, + config: config, + awsConfig: awsConfig, + store: store, + filterProvider: newFilterProvider(&config), }, nil } @@ -199,9 +200,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) // These IDs are intended to be used for state clean-up. func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) (knownStateIDSlice []string, ok bool) { defer close(workChan) - bucketName := getBucketNameFromARN(in.config.getBucketARN()) + isStateValid := in.filterProvider.getApplierFunc() + errorBackoff := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120) circuitBreaker := 0 paginator := in.s3.ListObjectsPaginator(bucketName, in.config.BucketListPrefix) @@ -233,10 +235,14 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified) - knownStateIDSlice = append(knownStateIDSlice, state.ID()) + if !isStateValid(in.log, state) { + continue + } + // add to known states only if valid for processing + knownStateIDSlice = append(knownStateIDSlice, state.ID()) if in.states.IsProcessed(state) { - in.log.Debugw("skipping state.", "state", state) + in.log.Debugw("skipping state processing as already processed.", "state", state) continue } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 2f79cb44a48c..5518a1808e1d 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -9,13 +9,14 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - - "github.com/elastic/elastic-agent-libs/logp" ) func TestS3Poller(t *testing.T) { @@ -130,21 +131,24 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{}) states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") + + cfg := config{ + NumberOfWorkers: numberOfWorkers, + BucketListInterval: pollInterval, + BucketARN: bucket, + BucketListPrefix: listPrefix, + RegionName: "region", + } poller := &s3PollerInput{ - log: logp.NewLogger(inputName), - config: config{ - NumberOfWorkers: numberOfWorkers, - BucketListInterval: pollInterval, - BucketARN: bucket, - BucketListPrefix: listPrefix, - RegionName: "region", - }, + log: logp.NewLogger(inputName), + config: cfg, s3: mockAPI, pipeline: pipeline, s3ObjectHandler: s3ObjProc, states: states, provider: "provider", metrics: newInputMetrics("", nil, 0), + filterProvider: newFilterProvider(&cfg), } poller.runPoll(ctx) }) @@ -268,6 +272,14 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{}) states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") + + cfg := config{ + NumberOfWorkers: numberOfWorkers, + BucketListInterval: pollInterval, + BucketARN: bucket, + BucketListPrefix: "key", + RegionName: "region", + } poller := &s3PollerInput{ log: logp.NewLogger(inputName), config: config{ @@ -283,15 +295,254 @@ func TestS3Poller(t *testing.T) { states: states, provider: "provider", metrics: newInputMetrics("", nil, 0), + filterProvider: newFilterProvider(&cfg), } poller.run(ctx) }) } -func TestS3ReaderLoop(t *testing.T) { - -} - -func TestS3WorkerLoop(t *testing.T) { - +func Test_S3StateHandling(t *testing.T) { + bucket := "bucket" + logger := logp.NewLogger(inputName) + fixedTimeNow := time.Now() + + tests := []struct { + name string + s3Objects []types.Object + config *config + initStates []state + runPollFor int + expectStateIDs []string + }{ + { + name: "State unchanged - registry backed state", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State cleanup - remove existing registry entry based on ignore older filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Second, + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{}, + }, + { + name: "State cleanup - remove existing registry entry based on timestamp filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + StartTimestamp: "2024-11-27T12:00:00Z", + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{}, + }, + { + name: "State updated - no filters", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State updated - ignore old filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(fixedTimeNow), + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", fixedTimeNow)}, + }, + { + name: "State updated - timestamp filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(fixedTimeNow), + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + StartTimestamp: "2024-11-26T12:00:00Z", + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", fixedTimeNow)}, + }, + { + name: "State updated - combined filters of ignore old and timestamp entry exist after first run", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + StartTimestamp: "2024-11-20T12:00:00Z", + }, + // run once to validate initial coverage of entries till start timestamp + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State updated - combined filters of ignore old and timestamp remove entry after second run", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + StartTimestamp: "2024-11-20T12:00:00Z", + }, + // run twice to validate removal by ignore old filter + runPollFor: 2, + expectStateIDs: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given - setup and mocks + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + mockS3API := NewMockS3API(ctrl) + mockS3Pager := NewMockS3Pager(ctrl) + mockObjHandler := NewMockS3ObjectHandlerFactory(ctrl) + mockS3ObjectHandler := NewMockS3ObjectHandler(ctrl) + + gomock.InOrder( + mockS3API.EXPECT(). + ListObjectsPaginator(gomock.Eq(bucket), ""). + AnyTimes(). + DoAndReturn(func(_, _ string) s3Pager { + return mockS3Pager + }), + ) + + for i := 0; i < test.runPollFor; i++ { + mockS3Pager.EXPECT().HasMorePages().Times(1).DoAndReturn(func() bool { return true }) + mockS3Pager.EXPECT().HasMorePages().Times(1).DoAndReturn(func() bool { return false }) + } + + mockS3Pager.EXPECT(). + NextPage(gomock.Any()). + Times(test.runPollFor). + DoAndReturn(func(_ context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return &s3.ListObjectsV2Output{Contents: test.s3Objects}, nil + }) + + mockObjHandler.EXPECT().Create(gomock.Any(), gomock.Any()).AnyTimes().Return(mockS3ObjectHandler) + mockS3ObjectHandler.EXPECT().ProcessS3Object(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(log *logp.Logger, eventCallback func(e beat.Event)) error { + eventCallback(beat.Event{}) + return nil + }) + + store := openTestStatestore() + s3States, err := newStates(logger, store, "") + require.NoError(t, err, "States creation must succeed") + + // Note - add init states as if we are deriving them from registry + for _, st := range test.initStates { + err := s3States.AddState(st) + require.NoError(t, err, "State add should not error") + } + + poller := &s3PollerInput{ + log: logger, + config: *test.config, + s3: mockS3API, + pipeline: newFakePipeline(), + s3ObjectHandler: mockObjHandler, + states: s3States, + metrics: newInputMetrics("state-test: "+test.name, nil, 0), + filterProvider: newFilterProvider(test.config), + } + + // when - run polling for desired time + for i := 0; i < test.runPollFor; i++ { + poller.runPoll(ctx) + <-time.After(500 * time.Millisecond) + } + + // then - desired state entries + + // state must only contain expected state IDs + require.Equal(t, len(test.expectStateIDs), len(s3States.states)) + for _, id := range test.expectStateIDs { + if s3States.states[id] == nil { + t.Errorf("state with ID %s should exist", id) + } + } + }) + } }