Skip to content

Commit

Permalink
[filebeat][websocket] - Added infinite & blanket retry options to web…
Browse files Browse the repository at this point in the history
…sockets and improved logging and retry logic (#42225)

* added blanket & infinite retry options and improved logging

(cherry picked from commit 177a47a)
  • Loading branch information
ShourieG authored and mergify[bot] committed Jan 7, 2025
1 parent 2cc3ff3 commit 6b3da92
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 19 deletions.
48 changes: 48 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,54 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add SSL and username support for Redis input, now the input includes support for Redis 6.0+. {pull}40111[40111]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]
- Document `winlog` input. {issue}40074[40074] {pull}40462[40462]
- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601]
- Disable event normalization for netflow input {pull}40635[40635]
- Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662]
- Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580]
- Added support for Microsoft Entra ID RBAC authentication. {issue}40434[40434] {pull}40879[40879]
- Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
- Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779]
- Add CSV decoder to awss3 input. {pull}40896[40896]
- Change request trace logging to include headers instead of complete request. {pull}41072[41072]
- Improved GCS input documentation. {pull}41143[41143]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
- Add CSV decoding capacity to gcs input {pull}40979[40979]
- Add support to source AWS cloudwatch logs from linked accounts. {pull}41188[41188]
- Jounrald input now supports filtering by facilities {pull}41061[41061]
- Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206]
- Improved Azure Blob Storage input documentation. {pull}41252[41252]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]
- Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460]
- Add support for Journald in the System module. {pull}41555[41555]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add `unifiedlogs` input for MacOS. {pull}41791[41791]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- The Filestream input now uses the `fingerprint` file identity by default. The state from files are automatically migrated if the previous file identity was `native` (the default) or `path`. If the `file_identity` is explicitly set, there is no change in behaviour. {issue}40197[40197] {pull}41762[41762]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*

Expand Down
14 changes: 13 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely
[float]
==== `retry`

The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries.
The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default.

["source","yaml",subs="attributes"]
----
Expand All @@ -333,6 +333,8 @@ filebeat.inputs:
max_attempts: 5
wait_min: 1s
wait_max: 10s
blanket_retries: false
infinite_retries: false
----
[float]
==== `retry.max_attempts`
Expand All @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o

The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds.

[float]
==== `retry.blanket_retries`

Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying.

[float]
==== `retry.infinite_retries`

Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed.

[float]
=== `timeout`
Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds.
Expand Down
10 changes: 6 additions & 4 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,11 @@ type redact struct {
}

type retry struct {
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
MaxAttempts int `config:"max_attempts"`
WaitMin time.Duration `config:"wait_min"`
WaitMax time.Duration `config:"wait_max"`
BlanketRetries bool `config:"blanket_retries"`
InfiniteRetries bool `config:"infinite_retries"`
}

type authConfig struct {
Expand Down Expand Up @@ -136,7 +138,7 @@ func (c config) Validate() error {

if c.Retry != nil {
switch {
case c.Retry.MaxAttempts <= 0:
case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries:
return errors.New("max_attempts must be greater than zero")
case c.Retry.WaitMin > c.Retry.WaitMax:
return errors.New("wait_min must be less than or equal to wait_max")
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ var configTests = []struct {
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_retry_with_infinite",
config: map[string]interface{}{
"retry": map[string]interface{}{
"infinite_retries": true,
"max_attempts": 0,
"wait_min": "1s",
"wait_max": "2s",
},
"url": "wss://localhost:443/v1/stream",
},
},
}

func TestConfig(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/streaming/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ var inputTests = []struct {
"wait_max": "2s",
},
},
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"),
wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"),
},
{
name: "single_event_tls",
Expand Down
43 changes: 30 additions & 13 deletions x-pack/filebeat/input/streaming/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error {
_, message, err := c.ReadMessage()
if err != nil {
s.metrics.errorsTotal.Inc()
if !isRetryableError(err) {
if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) {
s.log.Errorw("failed to read websocket data", "error", err)
return err
}
Expand Down Expand Up @@ -233,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log
}
if cfg.Retry != nil {
retryConfig := cfg.Retry
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
if !retryConfig.InfiniteRetries {
for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
continue
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode)
} else {
for attempt := 1; ; attempt++ {
conn, response, err = dialer.DialContext(ctx, url, headers)
if err == nil {
return conn, response, nil
}
//nolint:errorlint // it will never be a wrapped error at this point
if err == websocket.ErrBadHandshake {
log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode)
} else {
log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode)
}
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt)
waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt)
time.Sleep(waitTime)
}
return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err)
}

return dialer.DialContext(ctx, url, headers)
Expand Down

0 comments on commit 6b3da92

Please sign in to comment.