Skip to content

Commit

Permalink
Merge branch 'main' into poc/es_state_store
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksmaus authored Jan 7, 2025
2 parents 904d8e8 + 4ba7d1c commit 02d684a
Show file tree
Hide file tree
Showing 18 changed files with 1,141 additions and 56 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@
# Controls deletion of objects after backing them up
#delete_after_backup: false

# Ignore bucket entries older than the given timespan.
# Timespan is calculated from current time to processing object's last modified timestamp.
# This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m".
#ignore_older: 0

# Accept bucket entries with last modified timestamp newer than the given timestamp.
# Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty.
# For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset).
#start_timestamp:

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
28 changes: 28 additions & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ Node pipeline.
The `aws-s3` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

NOTE: For time durations, valid time units are - "ns", "us" (or "µs"), "ms", "s", "m", "h". For example, "2h"

[float]
==== `api_timeout`

Expand Down Expand Up @@ -690,6 +692,32 @@ This option can only be used together with the backup functionality.
[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

[float]
==== `ignore_older`

The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing.
By default, this feature is disabled, allowing any entry in the bucket to be processed.
It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage.

When defined, bucket entries are processed only if their last modified timestamp falls within the specified time duration, relative to the current time.
However, when the start_timestamp is set, the initial processing will include all bucket entries up to that timestamp.

NOTE: Bucket entries that are older than the defined duration and have failed processing will not be re-processed.
It is recommended to configure a sufficiently long duration based on your use case and current settings to avoid conflicts with the bucket_list_interval property.
Additionally, this ensures that subsequent runs can include and re-process objects that failed due to unavoidable errors.

[float]
==== `start_timestamp`

Accepts a timestamp in the YYYY-MM-DDTHH:MM:SSZ format, which defines the point from which bucket entries are accepted for processing.
By default, this is disabled, allowing all entries in the bucket to be processed.

This parameter is useful when configuring input for the first time, especially if you want to ingest logs starting from a specific time.
The timestamp can also be set to a future time, offering greater flexibility.
You can combine this property with ignore_older duration to improve memory usage by reducing tracked bucket entries.

NOTE: It is recommended to update this value when updating or restarting filebeat

[float]
=== AWS Permissions

Expand Down
45 changes: 44 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The websocket streaming input supports:
** Basic
** Bearer
** Custom
** OAuth2.0

NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

Expand Down Expand Up @@ -113,7 +114,7 @@ This will include any sensitive or secret information kept in the `state` object

==== Authentication

The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication, authentication via a custom auth config and OAuth2 based authentication. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

Expand Down Expand Up @@ -166,6 +167,48 @@ filebeat.inputs:
token_url: https://api.crowdstrike.com/oauth2/token
----

==== Websocket OAuth2.0

The `websocket` streaming input supports OAuth2.0 authentication. The `auth` configuration field is used to specify the OAuth2.0 configuration. These values are not exposed to the `state` object.

The `auth` configuration field has the following subfields:

- `client_id`: The client ID to use for OAuth2.0 authentication.
- `client_secret`: The client secret to use for OAuth2.0 authentication.
- `token_url`: The token URL to use for OAuth2.0 authentication.
- `scopes`: The scopes to use for OAuth2.0 authentication.
- `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication.
- `auth_style`: The authentication style to use for OAuth2.0 authentication. If left unset, the style will be automatically detected.
- `token_expiry_buffer`: Minimum valid time remaining before attempting an OAuth2 token renewal. The default value is `2m`.

**Explanations for `auth_style` and `token_expiry_buffer`:**

- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default style value is automatically inferred and used appropriately if no value is provided. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values:

* `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header.
* `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters.

- `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used.

NOTE: We recommend leaving the `auth_style` configuration field unset (automatically inferred internally) for most scenarios, except where manual intervention is required.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
auth:
client_id: a23fcea2643868ef1a41565a1a8a1c7c
client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
token_url: https://api.sample-url.com/oauth2/token
scopes: ["read", "write"]
endpoint_params:
param1: value1
param2: value2
auth_style: in_params
token_expiry_buffer: 5m
url: wss://localhost:443/_stream
----

[[input-state-streaming]]
==== Input state

Expand Down
10 changes: 10 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3051,6 +3051,16 @@ filebeat.inputs:
# Controls deletion of objects after backing them up
#delete_after_backup: false

# Ignore bucket entries older than the given timespan.
# Timespan is calculated from current time to processing object's last modified timestamp.
# This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m".
#ignore_older: 0

# Accept bucket entries with last modified timestamp newer than the given timestamp.
# Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty.
# For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset).
#start_timestamp:

#------------------------------ AWS CloudWatch input --------------------------------
# Beta: Config options for AWS CloudWatch input
#- type: aws-cloudwatch
Expand Down
34 changes: 22 additions & 12 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,26 @@ import (

type config struct {
APITimeout time.Duration `config:"api_timeout"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
QueueURL string `config:"queue_url"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
AccessPointARN string `config:"access_point_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BackupConfig backupConfig `config:",inline"`
BucketARN string `config:"bucket_arn"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
NumberOfWorkers int `config:"number_of_workers"`
AWSConfig awscommon.ConfigAWS `config:",inline"`
FileSelectors []fileSelectorConfig `config:"file_selectors"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
IgnoreOlder time.Duration `config:"ignore_older"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
NumberOfWorkers int `config:"number_of_workers"`
PathStyle bool `config:"path_style"`
ProviderOverride string `config:"provider"`
BackupConfig backupConfig `config:",inline"`
QueueURL string `config:"queue_url"`
ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used.
RegionName string `config:"region"`
SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it.
SQSScript *scriptConfig `config:"sqs.notification_parsing_script"`
SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning.
StartTimestamp string `config:"start_timestamp"`
VisibilityTimeout time.Duration `config:"visibility_timeout"`
}

func defaultConfig() config {
Expand Down Expand Up @@ -142,6 +144,13 @@ func (c *config) Validate() error {
}
}

if c.StartTimestamp != "" {
_, err := time.Parse(time.RFC3339, c.StartTimestamp)
if err != nil {
return fmt.Errorf("invalid input for start_timestamp: %w", err)
}
}

return nil
}

Expand Down Expand Up @@ -295,6 +304,7 @@ func (c config) sqsConfigModifier(o *sqs.Options) {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
//nolint:staticcheck // not changing through this PR
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)
}
}
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/awss3/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,39 @@ func TestConfig(t *testing.T) {
expectedErr: "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop",
expectedCfg: nil,
},
{
name: "validate ignore_older and start_timestamp configurations",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"ignore_older": "24h",
"start_timestamp": "2024-11-20T20:00:00Z",
},
expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config {
c := makeConfig(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket)
c.IgnoreOlder = 24 * time.Hour
c.StartTimestamp = "2024-11-20T20:00:00Z"
return c
},
},
{
name: "ignore_older only accepts valid duration - unit valid with ParseDuration",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"ignore_older": "24D",
},
expectedErr: "time: unknown unit \"D\" in duration \"24D\" accessing 'ignore_older'",
},
{
name: "start_timestamp accepts a valid timestamp of format - YYYY-MM-DDTHH:MM:SSZ",
s3Bucket: s3Bucket,
config: mapstr.M{
"bucket_arn": s3Bucket,
"start_timestamp": "2024-11-20 20:20:00",
},
expectedErr: "invalid input for start_timestamp: parsing time \"2024-11-20 20:20:00\" as \"2006-01-02T15:04:05Z07:00\": cannot parse \" 20:20:00\" as \"T\" accessing config",
},
}

for _, tc := range testCases {
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
s3ObjectHandler: s3EventHandlerFactory,
states: states,
provider: "provider",
filterProvider: newFilterProvider(&config),
}

s3Poller.run(ctx)
Expand Down
123 changes: 123 additions & 0 deletions x-pack/filebeat/input/awss3/s3_filters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

package awss3

import (
"sync"
"time"

"github.com/elastic/elastic-agent-libs/logp"
)

const (
filterOldestTime = "oldestTimeFilter"
filterStartTime = "startTimeFilter"
)

// filterProvider exposes filters that needs to be applied on derived state.
// Once configured, retrieve filter applier using getApplierFunc.
type filterProvider struct {
cfg *config

staticFilters []filter
once sync.Once
}

func newFilterProvider(cfg *config) *filterProvider {
fp := &filterProvider{
cfg: cfg,
}

// derive static filters
if cfg.StartTimestamp != "" {
// note - errors should not occur as this has validated prior reaching here
parse, _ := time.Parse(time.RFC3339, cfg.StartTimestamp)
fp.staticFilters = append(fp.staticFilters, newStartTimestampFilter(parse))
}

return fp
}

// getApplierFunc returns aggregated filters valid at the time of retrival.
// Applier return true if state is valid for processing according to the underlying filter collection.
func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool {
filters := map[string]filter{}

if f.cfg.IgnoreOlder != 0 {
timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now())
filters[timeFilter.getID()] = timeFilter
}

for _, f := range f.staticFilters {
filters[f.getID()] = f
}

f.once.Do(func() {
// Ignore the oldest time filter once for initial startup only if start timestamp filter is defined
// This allows users to ingest desired data from start time onwards.
if filters[filterStartTime] != nil {
delete(filters, filterOldestTime)
}
})

return func(log *logp.Logger, s state) bool {
for _, f := range filters {
if !f.isValid(s) {
log.Debugf("skipping processing of object '%s' by filter '%s'", s.Key, f.getID())
return false
}
}

return true
}
}

// filter defines the fileter implementation contract
type filter interface {
getID() string
isValid(objState state) (valid bool)
}

// startTimestampFilter - filter out entries based on provided start time stamp, compared to filtering object's last modified times stamp.
type startTimestampFilter struct {
id string
timeStart time.Time
}

func newStartTimestampFilter(start time.Time) *startTimestampFilter {
return &startTimestampFilter{
id: filterStartTime,
timeStart: start,
}
}

func (s startTimestampFilter) isValid(objState state) bool {
return s.timeStart.Before(objState.LastModified)
}

func (s startTimestampFilter) getID() string {
return s.id
}

// oldestTimeFilter - filter out entries based on calculated oldest modified time, compared to filtering object's last modified times stamp.
type oldestTimeFilter struct {
id string
timeOldest time.Time
}

func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter {
return &oldestTimeFilter{
id: filterOldestTime,
timeOldest: now.Add(-1 * timespan),
}
}

func (s oldestTimeFilter) isValid(objState state) bool {
return s.timeOldest.Before(objState.LastModified)
}

func (s oldestTimeFilter) getID() string {
return s.id
}
Loading

0 comments on commit 02d684a

Please sign in to comment.