diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index ea38e969a3d3..053b6a144c3c 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -121,6 +121,7 @@ CHANGELOG* /x-pack/filebeat/input/lumberjack/ @elastic/security-service-integrations /x-pack/filebeat/input/netflow/ @elastic/sec-deployment-and-devices /x-pack/filebeat/input/o365audit/ @elastic/security-service-integrations +/x-pack/filebeat/input/websocket/ @elastic/security-service-integrations /x-pack/filebeat/module/activemq @elastic/obs-infraobs-integrations /x-pack/filebeat/module/aws @elastic/obs-cloud-monitoring /x-pack/filebeat/module/awsfargate @elastic/obs-cloud-monitoring diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9e27962b7e55..48497df4957c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -174,6 +174,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Relax TCP/UDP metric polling expectations to improve metric collection. {pull}37714[37714] - Add support for PEM-based Okta auth in HTTPJSON. {pull}37772[37772] - Prevent complete loss of long request trace data. {issue}37826[37826] {pull}37836[37836] +- Added experimental version of the Websocket Input. {pull}37774[37774] - Add support for PEM-based Okta auth in CEL. {pull}37813[37813] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 573e544bb2e8..dba3c7e6ffc0 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -18447,6 +18447,38 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-------------------------------------------------------------------------------- +Dependency : github.com/gorilla/websocket +Version: v1.4.2 +Licence type (autodetected): BSD-2-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/gorilla/websocket@v1.4.2/LICENSE: + +Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + + Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + -------------------------------------------------------------------------------- Dependency : github.com/h2non/filetype Version: v1.1.1 @@ -41594,38 +41626,6 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. --------------------------------------------------------------------------------- -Dependency : github.com/gorilla/websocket -Version: v1.4.2 -Licence type (autodetected): BSD-2-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/gorilla/websocket@v1.4.2/LICENSE: - -Copyright (c) 2013 The Gorilla WebSocket Authors. All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - - Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - - Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - -------------------------------------------------------------------------------- Dependency : github.com/hashicorp/cronexpr Version: v1.1.0 diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index faff00e7e3d2..0787e3660bf8 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -91,6 +91,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-tcp>> * <<{beatname_lc}-input-udp>> * <<{beatname_lc}-input-gcs>> +* <<{beatname_lc}-input-websocket>> include::multiline.asciidoc[] @@ -145,3 +146,5 @@ include::inputs/input-udp.asciidoc[] include::inputs/input-unix.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-gcs.asciidoc[] + +include::../../x-pack/filebeat/docs/inputs/input-websocket.asciidoc[] diff --git a/go.mod b/go.mod index ee391fb43d20..2739f86c33ac 100644 --- a/go.mod +++ b/go.mod @@ -213,6 +213,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 github.com/gorilla/handlers v1.5.1 github.com/gorilla/mux v1.8.0 + github.com/gorilla/websocket v1.4.2 github.com/icholy/digest v0.1.22 github.com/lestrrat-go/jwx/v2 v2.0.19 github.com/otiai10/copy v1.12.0 @@ -299,7 +300,6 @@ require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect github.com/googleapis/gnostic v0.5.5 // indirect - github.com/gorilla/websocket v1.4.2 // indirect github.com/hashicorp/cronexpr v1.1.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/x-pack/filebeat/docs/inputs/input-websocket.asciidoc b/x-pack/filebeat/docs/inputs/input-websocket.asciidoc new file mode 100644 index 000000000000..8ee2da2b42ad --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-websocket.asciidoc @@ -0,0 +1,272 @@ +[role="xpack"] + +:type: websocket +:mito_version: v1.8.0 +:mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version} + +[id="{beatname_lc}-input-{type}"] +=== Websocket Input +experimental[] + +The `websocket` input reads messages from a websocket server or api endpoint. This input uses the `CEL engine` and the `mito` library interally to parse and process the messages. Having support for `CEL` allows you to parse and process the messages in a more flexible way. It has many similarities with the `cel` input as to how the `CEL` programs are written but deviates in the way the messages are read and processed. The `websocket` input is a `streaming` input and can only be used to read messages from a websocket server or api endpoint. + +This input supports: + +* Auth +** Basic +** Bearer +** Custom + +NOTE: The `websocket` input as of now does not support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart. + +==== Execution + +The execution environment provided for the input includes includes the functions, macros, and global variables provided by the mito library. +A single JSON object is provided as an input accessible through a `state` variable. `state` contains a `response` map field and may contain arbitrary other fields configured via the input's `state` configuration. If the CEL program saves cursor states between executions of the program, the configured `state.cursor` value will be replaced by the saved cursor prior to execution. + +On start the `state` will be something like this: + +["source","json",subs="attributes"] +---- +{ + "response": { ... }, + "cursor": { ... }, + ... +} +---- +The `websocket` input creates a `response` field in the state map and attaches the websocket message to this field. All `CEL` programs written should act on this `response` field. Additional fields may be present at the root of the object and if the program tolerates it, the cursor value may be absent. Only the cursor is persisted over restarts, but all fields in state are retained between iterations of the processing loop except for the produced events array, see below. + +If the cursor is present the program should process or filter out responses based on its value. If cursor is not present all responses should be processed as per the program's logic. + +After completion of a program's execution it should return a single object with a structure looking like this: + +["source","json",subs="attributes"] +---- +{ + "events": [ <1> + {...}, + ... + ], + "cursor": [ <2> + {...}, + ... + ] +} +---- + +<1> The `events` field must be present, but may be empty or null. If it is not empty, it must only have objects as elements. +The field could be an array or a single object that will be treated as an array with a single element. This depends completely on the websocket server or api endpoint. The `events` field is the array of events to be published to the output. Each event must be a JSON object. + +<2> If `cursor` is present it must be either be a single object or an array with the same length as events; each element _i_ of the `cursor` will be the details for obtaining the events at and beyond event _i_ in the `events` array. If the `cursor` is a single object, it will be the details for obtaining events after the last event in the `events` array and will only be retained on successful publication of all the events in the `events` array. + + +Example configuration: + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +# Read and process simple websocket messages from a local websocket server +- type: websocket + url: ws://localhost:443/v1/stream + program: | + bytes(state.response).decode_json().as(inner_body,{ + "events": { + "message": inner_body.encode_json(), + } + }) +---- + +==== Debug state logging + +The Websocket input will log the complete state when logging at the DEBUG level before and after CEL evaluation. +This will include any sensitive or secret information kept in the `state` object, and so DEBUG level logging should not be used in production when sensitive information is retained in the `state` object. See <> configuration parameters for settings to exclude sensitive fields from DEBUG logs. + +==== Authentication +The Websocket input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively. + +Example configurations with authentication: + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + auth.basic_token: "dXNlcjpwYXNzd29yZA==" + url: wss://localhost:443/_stream +---- + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + auth.bearer_token: "dXNlcjpwYXNzd29yZA==" + url: wss://localhost:443/_stream +---- + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + auth.custom: + header: "x-api-key" + value: "dXNlcjpwYXNzd29yZA==" + url: wss://localhost:443/_stream +---- + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + auth.custom: + header: "Auth" + value: "Bearer dXNlcjpwYXNzd29yZA==" + url: wss://localhost:443/_stream +---- + +[[input-state-websocket]] +==== Input state + +The `websocket` input keeps a runtime state between every message received. This state can be accessed by the CEL program and may contain arbitrary objects. +The state must contain a `response` map and may contain any object the user wishes to store in it. All objects are stored at runtime, except `cursor`, which has values that are persisted between restarts. + +==== Configuration options + +The `websocket` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[[program-websocket]] +[float] +==== `program` + +The CEL program that is executed on each message received. This field should ideally be present but if not the default program given below is used. + +["source","yaml",subs="attributes"] +---- +program: | + bytes(state.response).decode_json().as(inner_body,{ + "events": { + "message": inner_body.encode_json(), + } + }) +---- + +[[state-websocket]] +[float] +==== `state` + +`state` is an optional object that is passed to the CEL program on the first execution. It is available to the executing program as the `state` variable. Except for the `state.cursor` field, `state` does not persist over restarts. + +[[cursor-websocket]] +[float] +==== `state.cursor` + +The cursor is an object available as `state.cursor` where arbitrary values may be stored. Cursor state is kept between input restarts and updated after each event of a request has been published. When a cursor is used the CEL program must either create a cursor state for each event that is returned by the program, or a single cursor that reflects the cursor for completion of the full set of events. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +# Read and process simple websocket messages from a local websocket server +- type: websocket + url: ws://localhost:443/v1/stream + program: | + bytes(state.response).as(body, { + "events": [body.decode_json().with({ + "last_requested_at": has(state.cursor) && has(state.cursor.last_requested_at) ? + state.cursor.last_requested_at + : + now + })], + "cursor": {"last_requested_at": now} + }) +---- + +[[regexp-websocket]] +[float] +==== `regexp` + +A set of named regular expressions that may be used during a CEL program's execution using the `regexp` extension library. The syntax used for the regular expressions is https://github.com/google/re2/wiki/Syntax[RE2]. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + # Define two regular expressions, 'products' and 'solutions' for use during CEL program execution. + regexp: + products: '(?i)(Elasticsearch|Beats|Logstash|Kibana)' + solutions: '(?i)(Search|Observability|Security)' +---- + +[[websocket-state-redact]] +[float] +==== `redact` + +During debug level logging, the `state` object and the resulting evaluation result are included in logs. This may result in leaking of secrets. In order to prevent this, fields may be redacted or deleted from the logged `state`. The `redact` configuration allows users to configure this field redaction behaviour. For safety reasons if the `redact` configuration is missing a warning is logged. + +In the case of no-required redaction an empty `redact.fields` configuration should be used to silence the logged warning. + +["source","yaml",subs="attributes"] +---- +- type: websocket + redact: + fields: ~ +---- + +As an example, if a user-constructed Basic Authentication request is used in a CEL program the password can be redacted like so + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: websocket + url: ws://localhost:443/_stream + state: + user: user@domain.tld + password: P@$$W0₹D + redact: + fields: + - password + delete: true +---- + +Note that fields under the `auth` configuration hierarchy are not exposed to the `state` and so do not need to be redacted. For this reason it is preferable to use these for authentication over the request construction shown above where possible. + +[float] +==== `redact.fields` + +This specifies fields in the `state` to be redacted prior to debug logging. Fields listed in this array will be either replaced with a `*` or deleted entirely from messages sent to debug logs. + +[float] +==== `redact.delete` + +This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced. + +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `url` | URL of the input resource. +| `cel_eval_errors` | Number of errors encountered during cel program evaluation. +| `errors_total` | Number of errors encountered over the life cycle of the input. +| `batches_received_total` | Number of event arrays received. +| `batches_published_total` | Number of event arrays published. +| `received_bytes_total` | Number of bytes received over the life cycle of the input. +| `events_received_total` | Number of events received. +| `events_published_total` | Number of events published. +| `cel_processing_time` | Histogram of the elapsed successful CEL program processing times in nanoseconds. +| `batch_processing_time` | Histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +|======= + +==== Developer tools + +A stand-alone CEL environment that implements the majority of the websocket input's Comment Expression Language functionality is available in the https://github.com/elastic/mito[Elastic Mito] repository. This tool may be used to help develop CEL programs to be used by the input. Installation is available from source by running `go install github.com/elastic/mito/cmd/mito@latest` and requires a Go toolchain. + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +NOTE: The `websocket` input is currently tagged as experimental and might have bugs and other issues. Please report any issues on the https://github.com/elastic/beats[Github] repository. + +:type!: \ No newline at end of file diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index d396d4635a1d..5b55cecc56e8 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -22,6 +22,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/lumberjack" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit" "github.com/elastic/beats/v7/x-pack/filebeat/input/shipper" + "github.com/elastic/beats/v7/x-pack/filebeat/input/websocket" "github.com/elastic/elastic-agent-libs/logp" ) @@ -39,5 +40,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 awscloudwatch.Plugin(), lumberjack.Plugin(), shipper.Plugin(log, store), + websocket.Plugin(log, store), } } diff --git a/x-pack/filebeat/input/websocket/cel.go b/x-pack/filebeat/input/websocket/cel.go new file mode 100644 index 000000000000..11c2e7ad8f13 --- /dev/null +++ b/x-pack/filebeat/input/websocket/cel.go @@ -0,0 +1,99 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "regexp" + + "github.com/google/cel-go/cel" + "github.com/google/cel-go/checker/decls" + + "github.com/elastic/beats/v7/libbeat/version" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/useragent" + "github.com/elastic/mito/lib" +) + +var ( + // mimetypes holds supported MIME type mappings. + mimetypes = map[string]interface{}{ + "application/gzip": func(r io.Reader) (io.Reader, error) { return gzip.NewReader(r) }, + "application/x-ndjson": lib.NDJSON, + "application/zip": lib.Zip, + "text/csv; header=absent": lib.CSVNoHeader, + "text/csv; header=present": lib.CSVHeader, + "text/csv;header=absent": lib.CSVNoHeader, + "text/csv;header=present": lib.CSVHeader, + } +) + +func regexpsFromConfig(cfg config) (map[string]*regexp.Regexp, error) { + if len(cfg.Regexps) == 0 { + return nil, nil + } + patterns := make(map[string]*regexp.Regexp) + for name, expr := range cfg.Regexps { + var err error + patterns[name], err = regexp.Compile(expr) + if err != nil { + return nil, err + } + } + return patterns, nil +} + +// The Filebeat user-agent is provided to the program as useragent. +var userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) + +func newProgram(ctx context.Context, src, root string, patterns map[string]*regexp.Regexp, log *logp.Logger) (cel.Program, *cel.Ast, error) { + opts := []cel.EnvOption{ + cel.Declarations(decls.NewVar(root, decls.Dyn)), + cel.OptionalTypes(cel.OptionalTypesVersion(lib.OptionalTypesVersion)), + lib.Collections(), + lib.Crypto(), + lib.JSON(nil), + lib.Strings(), + lib.Time(), + lib.Try(), + lib.Debug(debug(log)), + lib.MIME(mimetypes), + lib.Regexp(patterns), + lib.Globals(map[string]interface{}{ + "useragent": userAgent, + }), + } + + env, err := cel.NewEnv(opts...) + if err != nil { + return nil, nil, fmt.Errorf("failed to create env: %w", err) + } + + ast, iss := env.Compile(src) + if iss.Err() != nil { + return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err()) + } + + prg, err := env.Program(ast) + if err != nil { + return nil, nil, fmt.Errorf("failed program instantiation: %w", err) + } + return prg, ast, nil +} + +func debug(log *logp.Logger) func(string, any) { + log = log.Named("websocket_debug") + return func(tag string, value any) { + level := "DEBUG" + if _, ok := value.(error); ok { + level = "ERROR" + } + + log.Debugw(level, "tag", tag, "value", value) + } +} diff --git a/x-pack/filebeat/input/websocket/config.go b/x-pack/filebeat/input/websocket/config.go new file mode 100644 index 000000000000..1a961f3c1625 --- /dev/null +++ b/x-pack/filebeat/input/websocket/config.go @@ -0,0 +1,105 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "context" + "fmt" + "net/url" + "regexp" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type config struct { + // Program is the CEL program to be run for each polling. + Program string `config:"program"` + // Regexps is the set of regular expression to be made + // available to the program. + Regexps map[string]string `config:"regexp"` + // State is the initial state to be provided to the + // program. If it has a cursor field, that field will + // be overwritten by any stored cursor, but will be + // available if no stored cursor exists. + State map[string]interface{} `config:"state"` + // Auth is the authentication config for connection. + Auth authConfig `config:"auth"` + // URL is the websocket url to connect to. + URL *urlConfig `config:"url" validate:"required"` + // Redact is the debug log state redaction configuration. + Redact *redact `config:"redact"` +} + +type redact struct { + // Fields indicates which fields to apply redaction to prior + // to logging. + Fields []string `config:"fields"` + // Delete indicates that fields should be completely deleted + // before logging rather than redaction with a "*". + Delete bool `config:"delete"` +} + +type authConfig struct { + // Custom auth config to use for authentication. + CustomAuth *customAuthConfig `config:"custom"` + // Baerer token to use for authentication. + BearerToken string `config:"bearer_token"` + // Basic auth token to use for authentication. + BasicToken string `config:"basic_token"` +} + +type customAuthConfig struct { + // Custom auth config to use for authentication. + Header string `config:"header"` + Value string `config:"value"` +} +type urlConfig struct { + *url.URL +} + +func (u *urlConfig) Unpack(in string) error { + parsed, err := url.Parse(in) + if err != nil { + return err + } + u.URL = parsed + return nil +} + +func (c config) Validate() error { + if c.Redact == nil { + logp.L().Named("input.websocket").Warn("missing recommended 'redact' configuration: " + + "see documentation for details: https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-input-websocket.html#_redact") + } + _, err := regexpsFromConfig(c) + if err != nil { + return fmt.Errorf("failed to check regular expressions: %w", err) + } + + var patterns map[string]*regexp.Regexp + if len(c.Regexps) != 0 { + patterns = map[string]*regexp.Regexp{".": nil} + } + if c.Program != "" { + _, _, err = newProgram(context.Background(), c.Program, root, patterns, logp.L().Named("input.websocket")) + if err != nil { + return fmt.Errorf("failed to check program: %w", err) + } + } + err = checkURLScheme(c.URL) + if err != nil { + return err + } + return nil +} + +func checkURLScheme(url *urlConfig) error { + switch url.Scheme { + case "ws", "wss": + return nil + default: + return fmt.Errorf("unsupported scheme: %s", url.Scheme) + } +} diff --git a/x-pack/filebeat/input/websocket/config_test.go b/x-pack/filebeat/input/websocket/config_test.go new file mode 100644 index 000000000000..3cb9c910cde0 --- /dev/null +++ b/x-pack/filebeat/input/websocket/config_test.go @@ -0,0 +1,121 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +var configTests = []struct { + name string + config map[string]interface{} + wantErr error +}{ + { + name: "invalid_url_scheme", + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "url": "http://localhost:8080", + }, + wantErr: fmt.Errorf("unsupported scheme: http accessing config"), + }, + { + name: "missing_url", + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + wantErr: fmt.Errorf("missing required field accessing 'url'"), + }, + { + name: "invalid_program", + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? + [inner_body] + : + null, + })`, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("failed to check program: failed compilation: ERROR: :3:79: found no matching overload for '_?_:_' applied to '(bool, list(dyn), null)'\n | \"events\": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? \n | ..............................................................................^ accessing config"), + }, + { + name: "invalid_regexps", + config: map[string]interface{}{ + "regexp": map[string]interface{}{ + "products": "(?i)(xq>)d+)", + "solutions": "(?)(Sws>(d+)", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("failed to check regular expressions: error parsing regexp: unexpected ): `(?i)(xq>)d+)` accessing config"), + }, + { + name: "valid_regexps", + config: map[string]interface{}{ + "regexp": map[string]interface{}{ + "products": "(?i)(Elasticsearch|Beats|Logstash|Kibana)", + "solutions": "(?i)(Search|Observability|Security)", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "valid_config", + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "url": "wss://localhost:443/v1/stream", + "regexp": map[string]interface{}{ + "products": "(?i)(Elasticsearch|Beats|Logstash|Kibana)", + "solutions": "(?i)(Search|Observability|Security)", + }, + "state": map[string]interface{}{ + "cursor": map[string]int{ + "last_updated": 1502908200, + }, + }, + }, + }, +} + +func TestConfig(t *testing.T) { + logp.TestingSetup() + for _, test := range configTests { + t.Run(test.name, func(t *testing.T) { + cfg := conf.MustNewConfigFrom(test.config) + conf := config{} + // Make sure we pass the redact requirement. + conf.Redact = &redact{} + err := cfg.Unpack(&conf) + + switch { + case err == nil && test.wantErr != nil: + t.Fatalf("expected error unpacking config: %v", test.wantErr) + case err != nil && test.wantErr == nil: + t.Fatalf("unexpected error unpacking config: %v", err) + case err != nil && test.wantErr != nil: + assert.EqualError(t, err, test.wantErr.Error()) + default: + // no error + } + }) + } +} diff --git a/x-pack/filebeat/input/websocket/input.go b/x-pack/filebeat/input/websocket/input.go new file mode 100644 index 000000000000..c48ce177931c --- /dev/null +++ b/x-pack/filebeat/input/websocket/input.go @@ -0,0 +1,379 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "context" + "errors" + "fmt" + "reflect" + "time" + + "github.com/google/cel-go/cel" + "github.com/gorilla/websocket" + "google.golang.org/protobuf/types/known/structpb" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/go-concert/ctxtool" + "github.com/elastic/mito/lib" +) + +type input struct { + time func() time.Time + cfg config +} + +const ( + inputName string = "websocket" + root string = "state" +) + +func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Experimental, + Deprecated: false, + Info: "Websocket Input", + Doc: "Collect data from websocket api endpoints", + Manager: NewInputManager(log, store), + } +} + +func (input) Name() string { return inputName } + +func (input) Test(src inputcursor.Source, _ v2.TestContext) error { + return nil +} + +// Run starts the input and blocks as long as websocket connections are alive. It will return on +// context cancellation or type invalidity errors, any other error will be retried. +func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error { + var cursor map[string]interface{} + if !crsr.IsNew() { // Allow the user to bootstrap the program if needed. + err := crsr.Unpack(&cursor) + if err != nil { + return err + } + } + return input{}.run(env, src.(*source), cursor, pub) +} + +func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, pub inputcursor.Publisher) error { + cfg := src.cfg + i.cfg = cfg + log := env.Logger.With("input_url", cfg.URL) + + metrics := newInputMetrics(env.ID) + defer metrics.Close() + metrics.url.Set(cfg.URL.String()) + metrics.errorsTotal.Set(0) + + ctx := ctxtool.FromCanceller(env.Cancelation) + + patterns, err := regexpsFromConfig(cfg) + if err != nil { + metrics.errorsTotal.Inc() + return err + } + + prg, ast, err := newProgram(ctx, cfg.Program, root, patterns, log) + if err != nil { + metrics.errorsTotal.Inc() + return err + } + var state map[string]interface{} + if cfg.State == nil { + state = make(map[string]interface{}) + } else { + state = cfg.State + } + if cursor != nil { + state["cursor"] = cursor + } + + // websocket client + headers := formHeader(cfg) + url := cfg.URL.String() + c, resp, err := websocket.DefaultDialer.DialContext(ctx, url, headers) + if resp != nil && resp.Body != nil { + log.Debugw("websocket connection response", "body", resp.Body) + resp.Body.Close() + } + if err != nil { + metrics.errorsTotal.Inc() + log.Errorw("failed to establish websocket connection", "error", err) + return err + } + defer c.Close() + + done := make(chan error) + + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + metrics.errorsTotal.Inc() + if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { + log.Errorw("websocket connection closed", "error", err) + } else { + log.Errorw("failed to read websocket data", "error", err) + } + done <- err + return + } + metrics.receivedBytesTotal.Add(uint64(len(message))) + state["response"] = message + log.Debugw("received websocket message", logp.Namespace("websocket"), string(message)) + err = i.processAndPublishData(ctx, metrics, prg, ast, state, cursor, pub, log) + if err != nil { + metrics.errorsTotal.Inc() + log.Errorw("failed to process and publish data", "error", err) + done <- err + return + } + } + }() + + // blocks until done is closed, context is cancelled or an error is received + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// processAndPublishData processes the data in state, updates the cursor and publishes it to the publisher. +// the CEL program here only executes a single time, since the websocket connection is persistent and events are received and processed in real time. +func (i *input) processAndPublishData(ctx context.Context, metrics *inputMetrics, prg cel.Program, ast *cel.Ast, + state map[string]interface{}, cursor map[string]interface{}, pub inputcursor.Publisher, log *logp.Logger) error { + goodCursor := cursor + log.Debugw("cel engine state before eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: i.cfg.Redact}) + start := i.now().In(time.UTC) + state, err := evalWith(ctx, prg, ast, state, start) + log.Debugw("cel engine state after eval", logp.Namespace("websocket"), "state", redactor{state: state, cfg: i.cfg.Redact}) + if err != nil { + metrics.celEvalErrors.Add(1) + switch { + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + return err + default: + metrics.errorsTotal.Inc() + } + log.Errorw("failed evaluation", "error", err) + } + metrics.celProcessingTime.Update(time.Since(start).Nanoseconds()) + + e, ok := state["events"] + if !ok { + log.Errorw("unexpected missing events from evaluation") + } + var events []interface{} + switch e := e.(type) { + case []interface{}: + if len(e) == 0 { + return nil + } + events = e + case map[string]interface{}: + if e == nil { + return nil + } + log.Debugw("single event object returned by evaluation", "event", e) + events = []interface{}{e} + default: + return fmt.Errorf("unexpected type returned for evaluation events: %T", e) + } + + // We have a non-empty batch of events to process. + metrics.batchesReceived.Add(1) + metrics.eventsReceived.Add(uint64(len(events))) + + // Drop events from state. If we fail during the publication, + // we will reprocess these events. + delete(state, "events") + + // Get cursors if they exist. + var ( + cursors []interface{} + singleCursor bool + ) + if c, ok := state["cursor"]; ok { + cursors, ok = c.([]interface{}) + if ok { + if len(cursors) != len(events) { + log.Errorw("unexpected cursor list length", "cursors", len(cursors), "events", len(events)) + // But try to continue. + if len(cursors) < len(events) { + cursors = nil + } + } + } else { + cursors = []interface{}{c} + singleCursor = true + } + } + // Drop old cursor from state. This will be replaced with + // the current cursor object below; it is an array now. + delete(state, "cursor") + + start = time.Now() + var hadPublicationError bool + for i, e := range events { + event, ok := e.(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected type returned for evaluation events: %T", e) + } + var pubCursor interface{} + if cursors != nil { + if singleCursor { + // Only set the cursor for publication at the last event + // when a single cursor object has been provided. + if i == len(events)-1 { + goodCursor = cursor + cursor, ok = cursors[0].(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[0]) + } + pubCursor = cursor + } + } else { + goodCursor = cursor + cursor, ok = cursors[i].(map[string]interface{}) + if !ok { + return fmt.Errorf("unexpected type returned for evaluation cursor element: %T", cursors[i]) + } + pubCursor = cursor + } + } + // Publish the event. + err = pub.Publish(beat.Event{ + Timestamp: time.Now(), + Fields: event, + }, pubCursor) + if err != nil { + hadPublicationError = true + metrics.errorsTotal.Inc() + log.Errorw("error publishing event", "error", err) + cursors = nil // We are lost, so retry with this event's cursor, + continue // but continue with the events that we have without + // advancing the cursor. This allows us to potentially publish the + // events we have now, with a fallback to the last guaranteed + // correctly published cursor. + } + if i == 0 { + metrics.batchesPublished.Add(1) + } + metrics.eventsPublished.Add(1) + + err = ctx.Err() + if err != nil { + return err + } + } + // calculate batch processing time + metrics.batchProcessingTime.Update(time.Since(start).Nanoseconds()) + + // Advance the cursor to the final state if there was no error during + // publications. This is needed to transition to the next set of events. + if !hadPublicationError { + goodCursor = cursor + } + + // Replace the last known good cursor. + state["cursor"] = goodCursor + + switch { + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + metrics.errorsTotal.Inc() + log.Infof("input stopped because context was cancelled with: %v", err) + err = nil + } + return err +} + +func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) { + out, _, err := prg.ContextEval(ctx, map[string]interface{}{ + // Replace global program "now" with current time. This is necessary + // as the lib.Time now global is static at program instantiation time + // which will persist over multiple evaluations. The lib.Time behaviour + // is correct for mito where CEL program instances live for only a + // single evaluation. Rather than incurring the cost of creating a new + // cel.Program for each evaluation, shadow lib.Time's now with a new + // value for each eval. We retain the lib.Time now global for + // compatibility between CEL programs developed in mito with programs + // run in the input. + "now": now, + root: state, + }) + if err != nil { + err = lib.DecoratedError{AST: ast, Err: err} + } + if e := ctx.Err(); e != nil { + err = e + } + if err != nil { + state["events"] = errorMessage(fmt.Sprintf("failed eval: %v", err)) + clearWantMore(state) + return state, fmt.Errorf("failed eval: %w", err) + } + + v, err := out.ConvertToNative(reflect.TypeOf((*structpb.Struct)(nil))) + if err != nil { + state["events"] = errorMessage(fmt.Sprintf("failed proto conversion: %v", err)) + clearWantMore(state) + return state, fmt.Errorf("failed proto conversion: %w", err) + } + switch v := v.(type) { + case *structpb.Struct: + return v.AsMap(), nil + default: + // This should never happen. + errMsg := fmt.Sprintf("unexpected native conversion type: %T", v) + state["events"] = errorMessage(errMsg) + clearWantMore(state) + return state, errors.New(errMsg) + } +} + +// now is time.Now with a modifiable time source. +func (i input) now() time.Time { + if i.time == nil { + return time.Now() + } + return i.time() +} + +// clearWantMore sets the state to not request additional work in a periodic evaluation. +// It leaves state intact if there is no "want_more" element, and sets the element to false +// if there is. This is necessary instead of just doing delete(state, "want_more") as +// client CEL code may expect the want_more field to be present. +func clearWantMore(state map[string]interface{}) { + if _, ok := state["want_more"]; ok { + state["want_more"] = false + } +} + +func errorMessage(msg string) map[string]interface{} { + return map[string]interface{}{"error": map[string]interface{}{"message": msg}} +} + +func formHeader(cfg config) map[string][]string { + header := make(map[string][]string) + switch { + case cfg.Auth.CustomAuth != nil: + header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} + case cfg.Auth.BearerToken != "": + header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken} + case cfg.Auth.BasicToken != "": + header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken} + } + return header +} diff --git a/x-pack/filebeat/input/websocket/input_manager.go b/x-pack/filebeat/input/websocket/input_manager.go new file mode 100644 index 000000000000..49fca0b0a82c --- /dev/null +++ b/x-pack/filebeat/input/websocket/input_manager.go @@ -0,0 +1,71 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "github.com/elastic/go-concert/unison" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// inputManager wraps one stateless input manager +// and one cursor input manager. It will create one or the other +// based on the config that is passed. +type InputManager struct { + cursor *inputcursor.InputManager +} + +var _ v2.InputManager = InputManager{} + +func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager { + return InputManager{ + cursor: &inputcursor.InputManager{ + Logger: log, + StateStore: store, + Type: inputName, + Configure: cursorConfigure, + }, + } +} + +func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, error) { + src := &source{cfg: config{}} + if err := cfg.Unpack(&src.cfg); err != nil { + return nil, nil, err + } + + if src.cfg.Program == "" { + // set default program + src.cfg.Program = ` + bytes(state.response).decode_json().as(inner_body,{ + "events": { + "message": inner_body.encode_json(), + } + }) + ` + } + return []inputcursor.Source{src}, input{}, nil +} + +type source struct{ cfg config } + +func (s *source) Name() string { return s.cfg.URL.String() } + +// Init initializes both wrapped input managers. +func (m InputManager) Init(grp unison.Group, mode v2.Mode) error { + return m.cursor.Init(grp, mode) +} + +// Create creates a cursor input manager. +func (m InputManager) Create(cfg *conf.C) (v2.Input, error) { + config := config{} + if err := cfg.Unpack(&config); err != nil { + return nil, err + } + return m.cursor.Create(cfg) +} diff --git a/x-pack/filebeat/input/websocket/input_test.go b/x-pack/filebeat/input/websocket/input_test.go new file mode 100644 index 000000000000..fc98a2f0b46a --- /dev/null +++ b/x-pack/filebeat/input/websocket/input_test.go @@ -0,0 +1,597 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "reflect" + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +const ( + basicToken = "dXNlcjpwYXNz" + bearerToken = "BXNlcjpwYVVz" + customHeader = "X-Api-Key" + customValue = "my-api-key" +) + +// WebSocketHandler is a type for handling WebSocket messages. +type WebSocketHandler func(*testing.T, *websocket.Conn, []string) + +var inputTests = []struct { + name string + server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) + handler WebSocketHandler + config map[string]interface{} + response []string + time func() time.Time + persistCursor map[string]interface{} + want []map[string]interface{} + wantErr error +}{ + { + name: "single_event", + server: newWebSocketTestServer(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, + { + name: "multiple_events", + server: newWebSocketTestServer(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`, + `{ + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyX" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyX", + }, + }, + }, + { + name: "bad_cursor", + server: newWebSocketTestServer(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + "cursor":["What's next?"], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + }`}, + wantErr: fmt.Errorf("unexpected type returned for evaluation cursor element: %T", "What's next?"), + }, + { + name: "invalid_url_scheme", + server: invalidWebSocketTestServer(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + wantErr: fmt.Errorf("unsupported scheme: http accessing config"), + }, + { + name: "cursor_condition_check", + server: newWebSocketTestServer(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": has(state.cursor) && inner_body.ts > state.cursor.last_updated ? [inner_body] : [], + })`, + "state": map[string]interface{}{ + "cursor": map[string]int{ + "last_updated": 1502908200, + }, + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, + `{ + "pps": { + "agent": "example.proofpoint-1.com", + "cid": "mmeng_vxciml" + }, + "ts": 1503081000 + }`, + }, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint-1.com", + "cid": "mmeng_vxciml", + }, + "ts": float64(1503081000), + }, + }, + }, + { + name: "auth_basic_token", + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "auth": map[string]interface{}{ + "basic_token": basicToken, + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, + }, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": float64(1502908200), + }, + }, + }, + { + name: "auth_bearer_token", + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "auth": map[string]interface{}{ + "bearer_token": bearerToken, + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, + }, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": float64(1502908200), + }, + }, + }, + { + name: "auth_custom", + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + "auth": map[string]interface{}{ + "custom": map[string]interface{}{ + "header": customHeader, + "value": customValue, + }, + }, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, + }, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": float64(1502908200), + }, + }, + }, +} + +func TestInput(t *testing.T) { + // tests will ignore context cancelled errors, since they are expected + ctxCancelledError := fmt.Errorf("context canceled") + logp.TestingSetup() + for _, test := range inputTests { + t.Run(test.name, func(t *testing.T) { + if test.server != nil { + test.server(t, test.handler, test.config, test.response) + } + + cfg := conf.MustNewConfigFrom(test.config) + + conf := config{} + conf.Redact = &redact{} // Make sure we pass the redact requirement. + err := cfg.Unpack(&conf) + if err != nil { + if test.wantErr != nil { + assert.EqualError(t, err, test.wantErr.Error()) + return + } + t.Fatalf("unexpected error unpacking config: %v", err) + } + + name := input{}.Name() + if name != "websocket" { + t.Errorf(`unexpected input name: got:%q want:"websocket"`, name) + } + src := &source{conf} + err = input{}.Test(src, v2.TestContext{}) + if err != nil { + t.Fatalf("unexpected error running test: %v", err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Second) + defer cancel() + + v2Ctx := v2.Context{ + Logger: logp.NewLogger("websocket_test"), + ID: "test_id:" + test.name, + Cancelation: ctx, + } + var client publisher + client.done = func() { + if len(client.published) >= len(test.want) { + cancel() + } + } + + err = input{test.time, conf}.run(v2Ctx, src, test.persistCursor, &client) + if (fmt.Sprint(err) != fmt.Sprint(ctxCancelledError)) && (fmt.Sprint(err) != fmt.Sprint(test.wantErr)) { + t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr) + } + if test.wantErr != nil { + return + } + + if len(client.published) < len(test.want) { + t.Errorf("unexpected number of published events: got:%d want at least:%d", len(client.published), len(test.want)) + test.want = test.want[:len(client.published)] + } + client.published = client.published[:len(test.want)] + for i, got := range client.published { + if !reflect.DeepEqual(got.Fields, mapstr.M(test.want[i])) { + t.Errorf("unexpected result for event %d: got:- want:+\n%s", i, cmp.Diff(got.Fields, mapstr.M(test.want[i]))) + } + } + }) + } +} + +var _ inputcursor.Publisher = (*publisher)(nil) + +type publisher struct { + done func() + mu sync.Mutex + published []beat.Event + cursors []map[string]interface{} +} + +func (p *publisher) Publish(e beat.Event, cursor interface{}) error { + p.mu.Lock() + p.published = append(p.published, e) + if cursor != nil { + c, ok := cursor.(map[string]interface{}) + if !ok { + return fmt.Errorf("invalid cursor type for testing: %T", cursor) + } + p.cursors = append(p.cursors, c) + } + p.done() + p.mu.Unlock() + return nil +} + +func newWebSocketTestServer(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + // only set the resource URL if it is not already set + if config["url"] == nil { + config["url"] = "ws" + server.URL[4:] + } + t.Cleanup(server.Close) + } +} + +// invalidWebSocketTestServer returns a function that creates a WebSocket server with an invalid URL scheme. +func invalidWebSocketTestServer(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + return true + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + config["url"] = server.URL + t.Cleanup(server.Close) + } +} + +// webSocketTestServerWithAuth returns a function that creates a WebSocket server with authentication. This does not however simulate a TLS connection. +func webSocketTestServerWithAuth(serve func(http.Handler) *httptest.Server) func(*testing.T, WebSocketHandler, map[string]interface{}, []string) { + return func(t *testing.T, handler WebSocketHandler, config map[string]interface{}, response []string) { + server := serve(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + upgrader := websocket.Upgrader{ + CheckOrigin: func(r *http.Request) bool { + // check for auth token + authToken := r.Header.Get("Authorization") + if authToken == "" { + authToken = r.Header.Get(customHeader) + if authToken == "" { + return false + } + } + + switch { + case authToken == "Bearer "+bearerToken: + return true + case authToken == "Basic "+basicToken: + return true + case authToken == customValue: + return true + default: + return false + + } + }, + } + + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + t.Fatalf("error upgrading connection to WebSocket: %v", err) + return + } + + handler(t, conn, response) + })) + // only set the resource URL if it is not already set + if config["url"] == nil { + config["url"] = "ws" + server.URL[4:] + } + t.Cleanup(server.Close) + } +} + +// defaultHandler is a default handler for WebSocket connections. +func defaultHandler(t *testing.T, conn *websocket.Conn, response []string) { + for _, r := range response { + err := conn.WriteMessage(websocket.TextMessage, []byte(r)) + if err != nil { + t.Fatalf("error writing message to WebSocket: %v", err) + } + } +} diff --git a/x-pack/filebeat/input/websocket/metrics.go b/x-pack/filebeat/input/websocket/metrics.go new file mode 100644 index 000000000000..34e6a9620f93 --- /dev/null +++ b/x-pack/filebeat/input/websocket/metrics.go @@ -0,0 +1,55 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "github.com/rcrowley/go-metrics" + + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +// inputMetrics handles the input's metric reporting. +type inputMetrics struct { + unregister func() + url *monitoring.String // URL of the input resource + celEvalErrors *monitoring.Uint // number of errors encountered during cel program evaluation + batchesReceived *monitoring.Uint // number of event arrays received + errorsTotal *monitoring.Uint // number of errors encountered + receivedBytesTotal *monitoring.Uint // number of bytes received + eventsReceived *monitoring.Uint // number of events received + batchesPublished *monitoring.Uint // number of event arrays published + eventsPublished *monitoring.Uint // number of events published + celProcessingTime metrics.Sample // histogram of the elapsed successful cel program processing times in nanoseconds + batchProcessingTime metrics.Sample // histogram of the elapsed successful batch processing times in nanoseconds (time of receipt to time of ACK for non-empty batches). +} + +func newInputMetrics(id string) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &inputMetrics{ + unregister: unreg, + url: monitoring.NewString(reg, "url"), + celEvalErrors: monitoring.NewUint(reg, "cel_eval_errors"), + batchesReceived: monitoring.NewUint(reg, "batches_received_total"), + errorsTotal: monitoring.NewUint(reg, "errors_total"), + receivedBytesTotal: monitoring.NewUint(reg, "received_bytes_total"), + eventsReceived: monitoring.NewUint(reg, "events_received_total"), + batchesPublished: monitoring.NewUint(reg, "batches_published_total"), + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + celProcessingTime: metrics.NewUniformSample(1024), + batchProcessingTime: metrics.NewUniformSample(1024), + } + _ = adapter.NewGoMetrics(reg, "cel_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.celProcessingTime)) + _ = adapter.NewGoMetrics(reg, "batch_processing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.batchProcessingTime)) + + return out +} + +func (m *inputMetrics) Close() { + m.unregister() +} diff --git a/x-pack/filebeat/input/websocket/redact.go b/x-pack/filebeat/input/websocket/redact.go new file mode 100644 index 000000000000..86583f0691c1 --- /dev/null +++ b/x-pack/filebeat/input/websocket/redact.go @@ -0,0 +1,111 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "strings" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +// redactor implements lazy field redaction of sets of a mapstr.M. +type redactor struct { + state mapstr.M + cfg *redact +} + +// String renders the JSON corresponding to r.state after applying redaction +// operations. +func (r redactor) String() string { + if r.cfg == nil || len(r.cfg.Fields) == 0 { + return r.state.String() + } + c := make(mapstr.M, len(r.state)) + cloneMap(c, r.state) + for _, mask := range r.cfg.Fields { + if r.cfg.Delete { + walkMap(c, mask, func(parent mapstr.M, key string) { + delete(parent, key) + }) + continue + } + walkMap(c, mask, func(parent mapstr.M, key string) { + parent[key] = "*" + }) + } + return c.String() +} + +// cloneMap is an enhanced version of mapstr.M.Clone that handles cloning arrays +// within objects. Nested arrays are not handled. +func cloneMap(dst, src mapstr.M) { + for k, v := range src { + switch v := v.(type) { + case mapstr.M: + d := make(mapstr.M, len(v)) + dst[k] = d + cloneMap(d, v) + case map[string]interface{}: + d := make(map[string]interface{}, len(v)) + dst[k] = d + cloneMap(d, v) + case []mapstr.M: + a := make([]mapstr.M, 0, len(v)) + for _, m := range v { + d := make(mapstr.M, len(m)) + cloneMap(d, m) + a = append(a, d) + } + dst[k] = a + case []map[string]interface{}: + a := make([]map[string]interface{}, 0, len(v)) + for _, m := range v { + d := make(map[string]interface{}, len(m)) + cloneMap(d, m) + a = append(a, d) + } + dst[k] = a + default: + dst[k] = v + } + } +} + +// walkMap walks to all ends of the provided path in m and applies fn to the +// final element of each walk. Nested arrays are not handled. +func walkMap(m mapstr.M, path string, fn func(parent mapstr.M, key string)) { + key, rest, more := strings.Cut(path, ".") + v, ok := m[key] + if !ok { + return + } + if !more { + fn(m, key) + return + } + switch v := v.(type) { + case mapstr.M: + walkMap(v, rest, fn) + case map[string]interface{}: + walkMap(v, rest, fn) + case []mapstr.M: + for _, m := range v { + walkMap(m, rest, fn) + } + case []map[string]interface{}: + for _, m := range v { + walkMap(m, rest, fn) + } + case []interface{}: + for _, v := range v { + switch m := v.(type) { + case mapstr.M: + walkMap(m, rest, fn) + case map[string]interface{}: + walkMap(m, rest, fn) + } + } + } +} diff --git a/x-pack/filebeat/input/websocket/redact_test.go b/x-pack/filebeat/input/websocket/redact_test.go new file mode 100644 index 000000000000..c66db60d97b0 --- /dev/null +++ b/x-pack/filebeat/input/websocket/redact_test.go @@ -0,0 +1,148 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package websocket + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var redactorTests = []struct { + name string + state mapstr.M + cfg *redact + + wantOrig string + wantRedact string +}{ + { + name: "nil_redact", + state: mapstr.M{ + "auth": mapstr.M{ + "user": "fred", + "pass": "top_secret", + }, + "other": "data", + }, + cfg: nil, + wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + wantRedact: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + }, + { + name: "auth_no_delete", + state: mapstr.M{ + "auth": mapstr.M{ + "user": "fred", + "pass": "top_secret", + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"auth"}, + Delete: false, + }, + wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + wantRedact: `{"auth":"*","other":"data"}`, + }, + { + name: "auth_delete", + state: mapstr.M{ + "auth": mapstr.M{ + "user": "fred", + "pass": "top_secret", + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"auth"}, + Delete: true, + }, + wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + wantRedact: `{"other":"data"}`, + }, + { + name: "pass_no_delete", + state: mapstr.M{ + "auth": mapstr.M{ + "user": "fred", + "pass": "top_secret", + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"auth.pass"}, + Delete: false, + }, + wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + wantRedact: `{"auth":{"pass":"*","user":"fred"},"other":"data"}`, + }, + { + name: "pass_delete", + state: mapstr.M{ + "auth": mapstr.M{ + "user": "fred", + "pass": "top_secret", + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"auth.pass"}, + Delete: true, + }, + wantOrig: `{"auth":{"pass":"top_secret","user":"fred"},"other":"data"}`, + wantRedact: `{"auth":{"user":"fred"},"other":"data"}`, + }, + { + name: "multi_cursor_no_delete", + state: mapstr.M{ + "cursor": []mapstr.M{ + {"key": "val_one", "other": "data"}, + {"key": "val_two", "other": "data"}, + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"cursor.key"}, + Delete: false, + }, + wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`, + wantRedact: `{"cursor":[{"key":"*","other":"data"},{"key":"*","other":"data"}],"other":"data"}`, + }, + { + name: "multi_cursor_delete", + state: mapstr.M{ + "cursor": []mapstr.M{ + {"key": "val_one", "other": "data"}, + {"key": "val_two", "other": "data"}, + }, + "other": "data", + }, + cfg: &redact{ + Fields: []string{"cursor.key"}, + Delete: true, + }, + wantOrig: `{"cursor":[{"key":"val_one","other":"data"},{"key":"val_two","other":"data"}],"other":"data"}`, + wantRedact: `{"cursor":[{"other":"data"},{"other":"data"}],"other":"data"}`, + }, +} + +func TestRedactor(t *testing.T) { + for _, test := range redactorTests { + t.Run(test.name, func(t *testing.T) { + got := fmt.Sprint(redactor{state: test.state, cfg: test.cfg}) + orig := fmt.Sprint(test.state) + if orig != test.wantOrig { + t.Errorf("unexpected original state after redaction:\n--- got\n--- want\n%s", cmp.Diff(orig, test.wantOrig)) + } + if got != test.wantRedact { + t.Errorf("unexpected redaction:\n--- got\n--- want\n%s", cmp.Diff(got, test.wantRedact)) + } + }) + } +}