Skip to content

Commit

Permalink
[filebeat][streaming] - Added OAuth2 support with auto token refresh …
Browse files Browse the repository at this point in the history
…for websockets (#42212)

* Added OAuth2 support with auto token refresh for websockets. A manual token refresh logic had to be implemented since the OAuth2 client and the websocket client are separate entities and cannot be clubbed together.

(cherry picked from commit 4244fa2)
  • Loading branch information
ShourieG authored and mergify[bot] committed Jan 7, 2025
1 parent 1564b54 commit 6dbc1eb
Show file tree
Hide file tree
Showing 8 changed files with 455 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- The Filestream input can automatically migrate state from files when changing the `file_identity` if the previous file identity was `native` (the default) or `path`. {issue}40197[40197] {pull}41762[41762]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*
Expand Down
45 changes: 44 additions & 1 deletion x-pack/filebeat/docs/inputs/input-streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The websocket streaming input supports:
** Basic
** Bearer
** Custom
** OAuth2.0

NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart.

Expand Down Expand Up @@ -113,7 +114,7 @@ This will include any sensitive or secret information kept in the `state` object

==== Authentication

The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.
The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication, authentication via a custom auth config and OAuth2 based authentication. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively.

Example configurations with authentication:

Expand Down Expand Up @@ -166,6 +167,48 @@ filebeat.inputs:
token_url: https://api.crowdstrike.com/oauth2/token
----

==== Websocket OAuth2.0

The `websocket` streaming input supports OAuth2.0 authentication. The `auth` configuration field is used to specify the OAuth2.0 configuration. These values are not exposed to the `state` object.

The `auth` configuration field has the following subfields:

- `client_id`: The client ID to use for OAuth2.0 authentication.
- `client_secret`: The client secret to use for OAuth2.0 authentication.
- `token_url`: The token URL to use for OAuth2.0 authentication.
- `scopes`: The scopes to use for OAuth2.0 authentication.
- `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication.
- `auth_style`: The authentication style to use for OAuth2.0 authentication. If left unset, the style will be automatically detected.
- `token_expiry_buffer`: Minimum valid time remaining before attempting an OAuth2 token renewal. The default value is `2m`.

**Explanations for `auth_style` and `token_expiry_buffer`:**

- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default style value is automatically inferred and used appropriately if no value is provided. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values:

* `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header.
* `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters.

- `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used.

NOTE: We recommend leaving the `auth_style` configuration field unset (automatically inferred internally) for most scenarios, except where manual intervention is required.

["source","yaml",subs="attributes"]
----
filebeat.inputs:
- type: streaming
auth:
client_id: a23fcea2643868ef1a41565a1a8a1c7c
client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK
token_url: https://api.sample-url.com/oauth2/token
scopes: ["read", "write"]
endpoint_params:
param1: value1
param2: value2
auth_style: in_params
token_expiry_buffer: 5m
url: wss://localhost:443/_stream
----

[[input-state-streaming]]
==== Input state

Expand Down
47 changes: 42 additions & 5 deletions x-pack/filebeat/input/streaming/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,17 @@ import (
"regexp"
"time"

"golang.org/x/oauth2"

"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

const (
authStyleInHeader = "in_header"
authStyleInParams = "in_params"
)

type config struct {
// Type is the type of the stream being followed. The
// zero value indicates websocket.
Expand Down Expand Up @@ -85,11 +92,30 @@ type customAuthConfig struct {

type oAuth2Config struct {
// common oauth fields
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
EndpointParams map[string][]string `config:"endpoint_params"`
Scopes []string `config:"scopes"`
TokenURL string `config:"token_url"`
AuthStyle string `config:"auth_style"`
ClientID string `config:"client_id"`
ClientSecret string `config:"client_secret"`
EndpointParams url.Values `config:"endpoint_params"`
Scopes []string `config:"scopes"`
TokenExpiryBuffer time.Duration `config:"token_expiry_buffer" validate:"min=0"`
TokenURL string `config:"token_url"`
// accessToken is only used internally to set the initial headers via formHeader() if oauth2 is enabled
accessToken string
}

func (o oAuth2Config) isEnabled() bool {
return o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != ""
}

func (o oAuth2Config) getAuthStyle() oauth2.AuthStyle {
switch o.AuthStyle {
case authStyleInHeader:
return oauth2.AuthStyleInHeader
case authStyleInParams:
return oauth2.AuthStyleInParams
default:
return oauth2.AuthStyleAutoDetect
}
}

type urlConfig struct {
Expand Down Expand Up @@ -144,6 +170,12 @@ func (c config) Validate() error {
return errors.New("wait_min must be less than or equal to wait_max")
}
}

if c.Auth.OAuth2.isEnabled() {
if c.Auth.OAuth2.AuthStyle != authStyleInHeader && c.Auth.OAuth2.AuthStyle != authStyleInParams && c.Auth.OAuth2.AuthStyle != "" {
return fmt.Errorf("unsupported auth style: %s", c.Auth.OAuth2.AuthStyle)
}
}
return nil
}

Expand Down Expand Up @@ -173,6 +205,11 @@ func defaultConfig() config {
Transport: httpcommon.HTTPTransportSettings{
Timeout: 180 * time.Second,
},
Auth: authConfig{
OAuth2: oAuth2Config{
TokenExpiryBuffer: 2 * time.Minute,
},
},
Retry: &retry{
MaxAttempts: 5,
WaitMin: 1 * time.Second,
Expand Down
73 changes: 73 additions & 0 deletions x-pack/filebeat/input/streaming/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,79 @@ var configTests = []struct {
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_default",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_in_params",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_params",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "valid_authStyle_in_header",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_header",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "invalid_authStyle",
config: map[string]interface{}{
"auth": map[string]interface{}{
"auth_style": "in_query",
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("unsupported auth style: in_query accessing config"),
},
{
name: "valid_tokenExpiryBuffer",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
"token_expiry_buffer": "5m",
},
"url": "wss://localhost:443/v1/stream",
},
},
{
name: "invalid_tokenExpiryBuffer",
config: map[string]interface{}{
"auth": map[string]interface{}{
"client_id": "a_client_id",
"client_secret": "a_client_secret",
"token_url": "https://localhost:443/token",
"token_expiry_buffer": "-1s",
},
"url": "wss://localhost:443/v1/stream",
},
wantErr: fmt.Errorf("requires duration >= 0 accessing 'auth.token_expiry_buffer'"),
},
}

func TestConfig(t *testing.T) {
Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/streaming/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,14 @@ func errorMessage(msg string) map[string]interface{} {
func formHeader(cfg config) map[string][]string {
header := make(map[string][]string)
switch {
case cfg.Auth.CustomAuth != nil:
header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value}
case cfg.Auth.OAuth2.accessToken != "":
header["Authorization"] = []string{"Bearer " + cfg.Auth.OAuth2.accessToken}
case cfg.Auth.BearerToken != "":
header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken}
case cfg.Auth.BasicToken != "":
header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken}
case cfg.Auth.CustomAuth != nil:
header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value}
}
return header
}
1 change: 0 additions & 1 deletion x-pack/filebeat/input/streaming/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, erro
if err := cfg.Unpack(&src.cfg); err != nil {
return nil, nil, err
}

if src.cfg.Program == "" {
// set default program
src.cfg.Program = `
Expand Down
Loading

0 comments on commit 6dbc1eb

Please sign in to comment.