From 86ef638dbc9854dbd31729ea1a12f4afc331f14f Mon Sep 17 00:00:00 2001 From: Aleksandr Maus Date: Wed, 15 May 2024 08:24:45 -0400 Subject: [PATCH] Osquerybeat: Add action responses data stream (#39143) * Add action responses data stream * Add missing copyright header * Make linter happy * Linting * Fix the compatibility with older stack --- x-pack/osquerybeat/beater/action_handler.go | 4 + x-pack/osquerybeat/beater/osquerybeat.go | 2 +- .../beater/resetable_action_handler.go | 8 +- .../beater/resetable_action_handler_test.go | 12 +- x-pack/osquerybeat/cmd/root.go | 89 ++++++++- x-pack/osquerybeat/cmd/root_test.go | 98 +++++++++ .../cmd/testdata/osquerycfg/legacy.in.json | 51 +++++ .../cmd/testdata/osquerycfg/legacy.out.json | 77 +++++++ .../osquerycfg/legacy_with_osquery.in.json | 56 ++++++ .../osquerycfg/legacy_with_osquery.out.json | 82 ++++++++ .../testdata/osquerycfg/two_streams.in.json | 104 ++++++++++ .../testdata/osquerycfg/two_streams.out.json | 122 +++++++++++ .../two_streams_with_osquery.in.json | 109 ++++++++++ .../two_streams_with_osquery.out.json | 127 ++++++++++++ x-pack/osquerybeat/internal/config/config.go | 7 +- x-pack/osquerybeat/internal/pub/publisher.go | 189 +++++++++++++----- .../internal/pub/publisher_test.go | 90 +++++++++ 17 files changed, 1173 insertions(+), 54 deletions(-) create mode 100644 x-pack/osquerybeat/cmd/root_test.go create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.out.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.in.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.out.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.in.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.out.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.in.json create mode 100644 x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.out.json diff --git a/x-pack/osquerybeat/beater/action_handler.go b/x-pack/osquerybeat/beater/action_handler.go index c4650ee9f165..a2a86bdf8dc2 100644 --- a/x-pack/osquerybeat/beater/action_handler.go +++ b/x-pack/osquerybeat/beater/action_handler.go @@ -21,6 +21,10 @@ var ( ErrNoQueryExecutor = errors.New("no query executor configures") ) +type actionResultPublisher interface { + PublishActionResult(req map[string]interface{}, res map[string]interface{}) +} + type publisher interface { Publish(index, actionID, responseID string, meta map[string]interface{}, hits []map[string]interface{}, ecsm ecs.Mapping, reqData interface{}) } diff --git a/x-pack/osquerybeat/beater/osquerybeat.go b/x-pack/osquerybeat/beater/osquerybeat.go index bb82525a5d59..b4fe30a47d88 100644 --- a/x-pack/osquerybeat/beater/osquerybeat.go +++ b/x-pack/osquerybeat/beater/osquerybeat.go @@ -177,7 +177,7 @@ func (bt *osquerybeat) Run(b *beat.Beat) error { } // Set reseable action handler - rah := newResetableActionHandler(bt.log) + rah := newResetableActionHandler(bt.pub, bt.log) defer rah.Clear() g, ctx := errgroup.WithContext(ctx) diff --git a/x-pack/osquerybeat/beater/resetable_action_handler.go b/x-pack/osquerybeat/beater/resetable_action_handler.go index 1b6bb20e1db4..a7daba113130 100644 --- a/x-pack/osquerybeat/beater/resetable_action_handler.go +++ b/x-pack/osquerybeat/beater/resetable_action_handler.go @@ -31,6 +31,8 @@ var ( // // The lifetime of this should the a scope of the beat Run type resetableActionHandler struct { + pub actionResultPublisher + log *logp.Logger ah client.Action @@ -43,8 +45,9 @@ type resetableActionHandler struct { type optionFunc func(a *resetableActionHandler) -func newResetableActionHandler(log *logp.Logger, opts ...optionFunc) *resetableActionHandler { +func newResetableActionHandler(pub actionResultPublisher, log *logp.Logger, opts ...optionFunc) *resetableActionHandler { a := &resetableActionHandler{ + pub: pub, log: log, timeout: defaultTimeout, } @@ -69,6 +72,9 @@ func (a *resetableActionHandler) Execute(ctx context.Context, req map[string]int res = renderResult(res, err) err = nil } + if a.pub != nil { + a.pub.PublishActionResult(req, res) + } }() res, err = a.execute(ctx, req) diff --git a/x-pack/osquerybeat/beater/resetable_action_handler_test.go b/x-pack/osquerybeat/beater/resetable_action_handler_test.go index eafeb758f679..d44985b3ac76 100644 --- a/x-pack/osquerybeat/beater/resetable_action_handler_test.go +++ b/x-pack/osquerybeat/beater/resetable_action_handler_test.go @@ -40,6 +40,15 @@ func (a *mockActionHandler) Name() string { return "osquery" } +type mockActionResultPublisher struct { + req, res map[string]interface{} +} + +func (p *mockActionResultPublisher) PublishActionResult(req map[string]interface{}, res map[string]interface{}) { + p.req = req + p.res = res +} + func TestResetableActionHandler(t *testing.T) { ctx, cn := context.WithCancel(context.Background()) defer cn() @@ -78,7 +87,8 @@ func TestResetableActionHandler(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - rah := newResetableActionHandler(log, resetableActionHandlerWithTimeout(testActionHandlerTimeout)) + pub := &mockActionResultPublisher{} + rah := newResetableActionHandler(pub, log, resetableActionHandlerWithTimeout(testActionHandlerTimeout)) defer rah.Clear() if tc.ah != nil { diff --git a/x-pack/osquerybeat/cmd/root.go b/x-pack/osquerybeat/cmd/root.go index 73584ec06f12..9c02433169ef 100644 --- a/x-pack/osquerybeat/cmd/root.go +++ b/x-pack/osquerybeat/cmd/root.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/spf13/cobra" + "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -81,6 +82,92 @@ func genVerifyCmd(_ instance.Settings) *cobra.Command { } func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + // For the older stack there were no streams, creating one + if len(rawIn.GetStreams()) == 0 { + return osquerybeatCfgNoStreams(rawIn, agentInfo) + } + return osquerybeatCfgFromStreams(rawIn, agentInfo) +} + +func osquerybeatCfgFromStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { + + streams := make([]*proto.Stream, 0, len(rawIn.Streams)) + + // Attach osquery configuration to the osquery_manager.result stream and set it as a first stream + for _, stream := range rawIn.Streams { + if stream.DataStream != nil && stream.DataStream.Dataset == config.DefaultDataset { + if stream.Source == nil { + // If for any reason the stream source is missing completely, use datastream source as before + stream.Source = rawIn.Source + } else { + // Set osquery configuration value + fieldsSrc := rawIn.Source.Fields + fieldsDst := stream.Source.Fields + var osqVal *structpb.Value + if fieldsSrc != nil { + osqVal = fieldsSrc["osquery"] + } + if osqVal != nil { + fieldsDst["osquery"] = osqVal + } + // Setting id to the source because it is being picked up from there in shared management.CreateInputsFromStreams + vId, ok := fieldsDst["id"] + shouldSet := false + if !ok || vId == nil { + shouldSet = true + } else { + if _, ok := vId.GetKind().(*structpb.Value_NullValue); ok { + shouldSet = true + } + } + if shouldSet { + fieldsDst["id"] = structpb.NewStringValue(rawIn.Id) + } + } + streams = append([]*proto.Stream{stream}, streams...) + continue + } + streams = append(streams, stream) + } + rawIn.Streams = streams + + streamList, err := management.CreateInputsFromStreams(rawIn, "logs", agentInfo) + if err != nil { + return nil, fmt.Errorf("error creating input list from raw expected config: %w", err) + } + + var ns string + if rawIn.DataStream != nil { + ns = rawIn.DataStream.Namespace + if ns == "" { + ns = config.DefaultNamespace + } + } + + for iter := range streamList { + if _, ok := streamList[iter]["type"]; !ok { + streamList[iter]["type"] = rawIn.Type + } + if v, ok := streamList[iter]["data_stream"]; ok { + if m, ok := v.(map[string]interface{}); ok { + if _, ok := m["namespace"]; !ok { + m["namespace"] = ns + } + } + } + } + + // format for the reloadable list needed by the cm.Reload() method + configList, err := management.CreateReloadConfigFromInputs(streamList) + if err != nil { + return nil, fmt.Errorf("error creating config for reloader: %w", err) + } + + return configList, nil +} + +// This is needed for compatibility with the legacy implementation where kibana set empty streams array [] into the policy +func osquerybeatCfgNoStreams(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo) ([]*reload.ConfigWithMeta, error) { // Convert to streams, osquerybeat doesn't use streams streams := make([]*proto.Stream, 1) @@ -113,7 +200,7 @@ func osquerybeatCfg(rawIn *proto.UnitExpectedConfig, agentInfo *client.AgentInfo modules[iter]["type"] = "log" } - // format for the reloadable list needed bythe cm.Reload() method + // format for the reloadable list needed by the cm.Reload() method configList, err := management.CreateReloadConfigFromInputs(modules) if err != nil { return nil, fmt.Errorf("error creating config for reloader: %w", err) diff --git a/x-pack/osquerybeat/cmd/root_test.go b/x-pack/osquerybeat/cmd/root_test.go new file mode 100644 index 000000000000..5d0df4df0a6d --- /dev/null +++ b/x-pack/osquerybeat/cmd/root_test.go @@ -0,0 +1,98 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package cmd + +import ( + "encoding/json" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/google/go-cmp/cmp" + + "github.com/elastic/beats/v7/libbeat/common/reload" + + "github.com/elastic/elastic-agent-client/v7/pkg/client" + "github.com/elastic/elastic-agent-client/v7/pkg/proto" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +func TestOsquerybeatCfg(t *testing.T) { + matches, err := filepath.Glob("testdata/osquerycfg/*.in.json") + if err != nil { + t.Fatal(err) + } + + for _, match := range matches { + dir := filepath.Dir(match) + key := strings.TrimSuffix(filepath.Base(match), `.in.json`) + + out := filepath.Join(dir, key+".out.json") + t.Run(key, func(in, out string) func(t *testing.T) { + return func(t *testing.T) { + var rawIn proto.UnitExpectedConfig + err := readRawIn(in, &rawIn) + if err != nil { + t.Fatal(err) + } + + want, err := readOut(out) + if err != nil { + t.Fatal(err) + } + + cfg, err := osquerybeatCfg(&rawIn, &client.AgentInfo{ID: "abc7d0a8-ce04-4663-95da-ff6d537c268f", Version: "8.13.1"}) + if err != nil { + t.Fatal(err) + } + got, err := cfgToArrMap(cfg) + if err != nil { + t.Fatal(err) + } + + diff := cmp.Diff(want, got) + if diff != "" { + t.Fatal(diff) + } + } + }(match, out)) + } +} + +func readRawIn(filename string, rawIn *proto.UnitExpectedConfig) error { + b, err := os.ReadFile(filename) + if err != nil { + return err + } + err = json.Unmarshal(b, rawIn) + return err +} + +func readOut(filename string) (cfg []map[string]interface{}, err error) { + b, err := os.ReadFile(filename) + if err != nil { + return nil, err + } + err = json.Unmarshal(b, &cfg) + if err != nil { + return nil, err + } + return cfg, err +} + +func cfgToArrMap(cfg []*reload.ConfigWithMeta) ([]map[string]interface{}, error) { + res := make([]map[string]interface{}, 0, len(cfg)) + for _, c := range cfg { + var m mapstr.M + err := c.Config.Unpack(&m) + if err != nil { + return nil, err + } + res = append(res, map[string]interface{}(m)) + } + return res, nil +} diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json new file mode 100644 index 000000000000..f358b4fdf6e7 --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.in.json @@ -0,0 +1,51 @@ +{ + "source": { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 2 + }, + "revision": 1, + "streams": [ + ], + "type": "osquery" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "type": "osquery", + "name": "osquery_manager-1", + "revision": 1, + "meta": { + "source": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "package": { + "source": { + "name": "osquery_manager", + "version": "1.12.1" + }, + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "data_stream": { + "source": { + "namespace": "default" + }, + "namespace": "default" + }, + "streams": [ + ] +} \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.out.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.out.json new file mode 100644 index 000000000000..2ec760a08f81 --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy.out.json @@ -0,0 +1,77 @@ +[ + { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.result-default", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 2 + }, + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "stream_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "revision": 1, + "streams": [], + "type": "log" + } +] \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.in.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.in.json new file mode 100644 index 000000000000..c3bb5d4e3802 --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.in.json @@ -0,0 +1,56 @@ +{ + "source": { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "osquery": { + "options": { + "host_identifier": "hostname" + } + }, + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 3 + }, + "revision": 2, + "streams": [ + ], + "type": "osquery" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "type": "osquery", + "name": "osquery_manager-1", + "revision": 2, + "meta": { + "source": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "package": { + "source": { + "name": "osquery_manager", + "version": "1.12.1" + }, + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "data_stream": { + "source": { + "namespace": "default" + }, + "namespace": "default" + }, + "streams": [ + ] +} \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.out.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.out.json new file mode 100644 index 000000000000..7568395785ea --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/legacy_with_osquery.out.json @@ -0,0 +1,82 @@ +[ + { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.result-default", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "osquery": { + "options": { + "host_identifier": "hostname" + } + }, + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 3 + }, + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "stream_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "revision": 2, + "streams": [], + "type": "log" + } +] \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.in.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.in.json new file mode 100644 index 000000000000..8081cb2c4845 --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.in.json @@ -0,0 +1,104 @@ +{ + "source": { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 2 + }, + "revision": 1, + "streams": [ + { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "query": null + }, + { + "data_stream": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "id": null, + "query": null + } + ], + "type": "osquery" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "type": "osquery", + "name": "osquery_manager-1", + "revision": 1, + "meta": { + "source": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "package": { + "source": { + "name": "osquery_manager", + "version": "1.12.1" + }, + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "data_stream": { + "source": { + "namespace": "default" + }, + "namespace": "default" + }, + "streams": [ + { + "source": { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "query": null + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "data_stream": { + "source": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "dataset": "osquery_manager.action.responses", + "type": "logs" + } + }, + { + "source": { + "data_stream": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "id": null, + "query": null + }, + "data_stream": { + "source": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "dataset": "osquery_manager.result", + "type": "logs" + } + } + ] +} \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.out.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.out.json new file mode 100644 index 000000000000..b691078a3f61 --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams.out.json @@ -0,0 +1,122 @@ +[ + { + "data_stream": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.result-default", + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "type": "osquery" + }, + { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "namespace": "default", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.action.responses-default", + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.action.responses", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.action.responses" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "stream_id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "type": "osquery" + } +] \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.in.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.in.json new file mode 100644 index 000000000000..f6703263e6ad --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.in.json @@ -0,0 +1,109 @@ +{ + "source": { + "data_stream": { + "namespace": "default" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "meta": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "name": "osquery_manager-1", + "osquery": { + "options": { + "host_identifier": "hostname" + } + }, + "package_policy_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "policy": { + "revision": 3 + }, + "revision": 2, + "streams": [ + { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "query": null + }, + { + "data_stream": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "id": null, + "query": null + } + ], + "type": "osquery" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "type": "osquery", + "name": "osquery_manager-1", + "revision": 2, + "meta": { + "source": { + "package": { + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "package": { + "source": { + "name": "osquery_manager", + "version": "1.12.1" + }, + "name": "osquery_manager", + "version": "1.12.1" + } + }, + "data_stream": { + "source": { + "namespace": "default" + }, + "namespace": "default" + }, + "streams": [ + { + "source": { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "query": null + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "data_stream": { + "source": { + "dataset": "osquery_manager.action.responses", + "type": "logs" + }, + "dataset": "osquery_manager.action.responses", + "type": "logs" + } + }, + { + "source": { + "data_stream": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "id": null, + "query": null + }, + "data_stream": { + "source": { + "dataset": "osquery_manager.result", + "type": "logs" + }, + "dataset": "osquery_manager.result", + "type": "logs" + } + } + ] +} \ No newline at end of file diff --git a/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.out.json b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.out.json new file mode 100644 index 000000000000..aa4a70a74efe --- /dev/null +++ b/x-pack/osquerybeat/cmd/testdata/osquerycfg/two_streams_with_osquery.out.json @@ -0,0 +1,127 @@ +[ + { + "data_stream": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "id": "74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.result-default", + "osquery": { + "options": { + "host_identifier": "hostname" + } + }, + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.result" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "type": "osquery" + }, + { + "data_stream": { + "dataset": "osquery_manager.action.responses", + "namespace": "default", + "type": "logs" + }, + "id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c", + "index": "logs-osquery_manager.action.responses-default", + "processors": [ + { + "add_fields": { + "fields": { + "input_id": "74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.action.responses", + "namespace": "default", + "type": "logs" + }, + "target": "data_stream" + } + }, + { + "add_fields": { + "fields": { + "dataset": "osquery_manager.action.responses" + }, + "target": "event" + } + }, + { + "add_fields": { + "fields": { + "stream_id": "osquery-osquery_manager.action.responses-74c7d0a8-ce04-4663-95da-ff6d537c268c" + }, + "target": "@metadata" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f", + "snapshot": false, + "version": "8.13.1" + }, + "target": "elastic_agent" + } + }, + { + "add_fields": { + "fields": { + "id": "abc7d0a8-ce04-4663-95da-ff6d537c268f" + }, + "target": "agent" + } + } + ], + "type": "osquery" + } +] \ No newline at end of file diff --git a/x-pack/osquerybeat/internal/config/config.go b/x-pack/osquerybeat/internal/config/config.go index 0d23af8186fc..ec873206de75 100644 --- a/x-pack/osquerybeat/internal/config/config.go +++ b/x-pack/osquerybeat/internal/config/config.go @@ -22,9 +22,10 @@ import ( // query: select * from usb_devices const ( - DefaultNamespace = "default" - DefaultDataset = "osquery_manager.result" - DefaultType = "logs" + DefaultNamespace = "default" + DefaultDataset = "osquery_manager.result" + DefaultType = "logs" + DefaultActionResponsesDataset = "osquery_manager.action.responses" ) var datastreamPrefix = fmt.Sprintf("%s-%s-", DefaultType, DefaultDataset) diff --git a/x-pack/osquerybeat/internal/pub/publisher.go b/x-pack/osquerybeat/internal/pub/publisher.go index 6c49ef060170..d336a42515f9 100644 --- a/x-pack/osquerybeat/internal/pub/publisher.go +++ b/x-pack/osquerybeat/internal/pub/publisher.go @@ -26,8 +26,13 @@ type Publisher struct { b *beat.Beat log *logp.Logger - mx sync.Mutex + mx sync.Mutex + + // client for osquery_manager.result client beat.Client + + // client for osquery_manager.action.responses + actionResponsesClient beat.Client } func New(b *beat.Beat, log *logp.Logger) *Publisher { @@ -45,27 +50,64 @@ func (p *Publisher) Configure(inputs []config.InputConfig) error { p.mx.Lock() defer p.mx.Unlock() - processors, err := p.processorsForInputsConfig(inputs) - if err != nil { - return err - } + // Setup configuration pointers to the clients and corresponding default datasets + + // The osquery_manager.result is always first + if len(inputs) > 0 { + processors, err := p.processorsForInputConfig(inputs[0], config.DefaultDataset) + if err != nil { + return err + } + + p.log.Debugf("Connect publisher for %s with processors: %d", config.DefaultDataset, len(processors.All())) + // Connect publisher + client, err := p.b.Publisher.ConnectWith(beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: processors, + }, + }) + if err != nil { + return err + } + + // Swap client + oldclient := p.client + p.client = client + if oldclient != nil { + oldclient.Close() + } - p.log.Debugf("Connect publisher with processors: %d", len(processors.All())) - // Connect publisher - client, err := p.b.Publisher.ConnectWith(beat.ClientConfig{ - Processing: beat.ProcessingConfig{ - Processor: processors, - }, - }) - if err != nil { - return err } - // Swap client - oldclient := p.client - p.client = client - if oldclient != nil { - oldclient.Close() + // Attach remaining DefaultActionResultsDataset if present + if len(inputs) > 1 { + processors, err := p.processorsForInputConfig(inputs[1], config.DefaultActionResponsesDataset) + if err != nil { + return err + } + + p.log.Debugf("Connect publisher for %s with processors: %d", config.DefaultActionResponsesDataset, len(processors.All())) + // Connect publisher + client, err := p.b.Publisher.ConnectWith(beat.ClientConfig{ + Processing: beat.ProcessingConfig{ + Processor: processors, + }, + }) + if err != nil { + return err + } + + // Swap client + oldclient := p.actionResponsesClient + p.actionResponsesClient = client + if oldclient != nil { + oldclient.Close() + } + } else { + if p.actionResponsesClient != nil { + p.actionResponsesClient.Close() + p.actionResponsesClient = nil + } } return nil } @@ -91,40 +133,93 @@ func (p *Publisher) Close() { } } -func (p *Publisher) processorsForInputsConfig(inputs []config.InputConfig) (procs *processors.Processors, err error) { +func (p *Publisher) PublishActionResult(req map[string]interface{}, res map[string]interface{}) { + p.mx.Lock() + defer p.mx.Unlock() + + if p.actionResponsesClient == nil { + p.log.Info("Action responses stream is not configured. Action response is dropped.") + return + } + + fields := actionResultToEvent(req, res) + event := beat.Event{ + Timestamp: time.Now(), + Fields: fields, + } + + p.log.Debugf("Action response event is sent, fields: %#v", fields) + + p.actionResponsesClient.Publish(event) +} + +func actionResultToEvent(req, res map[string]interface{}) map[string]interface{} { + m := make(map[string]interface{}, 8) + + copyKey := func(key string, src, dst map[string]interface{}) { + if v, ok := src[key]; ok { + dst[key] = v + } + } + + copyKey("started_at", res, m) + copyKey("completed_at", res, m) + copyKey("error", res, m) + + if v, ok := res["count"]; ok { + m["action_response"] = map[string]interface{}{ + "osquery": map[string]interface{}{ + "count": v, + }, + } + } + + if v, ok := req["id"]; ok { + m["action_id"] = v + } + + if v, ok := req["input_type"]; ok { + m["action_input_type"] = v + } + + if v, ok := req["data"]; ok { + m["action_data"] = v + } + + return m +} + +func (p *Publisher) processorsForInputConfig(inCfg config.InputConfig, defaultDataset string) (procs *processors.Processors, err error) { procs = processors.NewList(nil) // Use only first input processor // Every input will have a processor that adds the elastic_agent info, we need only one // Not expecting other processors at the moment and this needs to work for 7.13 - for _, input := range inputs { - if len(input.Processors) > 0 { - // Attach the data_stream processor. This will append the data_stream attributes to the events. - // This is needed for the proper logstash auto-discovery of the destination datastream for the results. - ds := add_data_stream.DataStream{ - Namespace: input.Datastream.Namespace, - Dataset: input.Datastream.Dataset, - Type: input.Datastream.Type, - } - if ds.Namespace == "" { - ds.Namespace = config.DefaultNamespace - } - if ds.Dataset == "" { - ds.Dataset = config.DefaultDataset - } - if ds.Type == "" { - ds.Type = config.DefaultType - } - - procs.AddProcessor(add_data_stream.New(ds)) - - userProcs, err := processors.New(input.Processors) - if err != nil { - return nil, err - } - procs.AddProcessors(*userProcs) - break + if len(inCfg.Processors) > 0 { + // Attach the data_stream processor. This will append the data_stream attributes to the events. + // This is needed for the proper logstash auto-discovery of the destination datastream for the results. + ds := add_data_stream.DataStream{ + Namespace: inCfg.Datastream.Namespace, + Dataset: inCfg.Datastream.Dataset, + Type: inCfg.Datastream.Type, + } + if ds.Namespace == "" { + ds.Namespace = config.DefaultNamespace + } + if ds.Dataset == "" { + ds.Dataset = defaultDataset + } + if ds.Type == "" { + ds.Type = config.DefaultType + } + + procs.AddProcessor(add_data_stream.New(ds)) + + userProcs, err := processors.New(inCfg.Processors) + if err != nil { + return nil, err } + procs.AddProcessors(*userProcs) } return procs, nil } diff --git a/x-pack/osquerybeat/internal/pub/publisher_test.go b/x-pack/osquerybeat/internal/pub/publisher_test.go index 488516bb01a7..4c34b667ff8b 100644 --- a/x-pack/osquerybeat/internal/pub/publisher_test.go +++ b/x-pack/osquerybeat/internal/pub/publisher_test.go @@ -5,6 +5,7 @@ package pub import ( + "encoding/json" "testing" "time" @@ -112,3 +113,92 @@ func TestHitToEvent(t *testing.T) { } } } + +func TestActionResultToEvent(t *testing.T) { + + tests := []struct { + name string + req, res map[string]interface{} + want map[string]interface{} + }{ + { + name: "successful", + req: toMap(t, `{ + "data": { + "id": "a72d65d8-200a-4b43-8dbd-7bc0e9ce8e65", + "query": "select * from osquery_info" + }, + "id": "5c433f88-ab0d-41e2-af76-6ff16ae3ced8", + "input_type": "osquery", + "type": "INPUT_ACTION" + }`), + res: toMap(t, `{ + "completed_at": "2024-04-18T19:39:39.740162Z", + "count": 1, + "started_at": "2024-04-18T19:39:39.532125Z" + } `), + // "agent_id": "bf3d6036-2260-4bbf-94a3-5ccce0d75d9e", + want: toMap(t, `{ + "completed_at": "2024-04-18T19:39:39.740162Z", + "action_response": { + "osquery": { + "count": 1 + } + }, + "action_id": "5c433f88-ab0d-41e2-af76-6ff16ae3ced8", + "started_at": "2024-04-18T19:39:39.532125Z", + "action_input_type": "osquery", + "action_data": { + "id": "a72d65d8-200a-4b43-8dbd-7bc0e9ce8e65", + "query": "select * from osquery_info" + } + }`), + }, + { + name: "error", + req: toMap(t, `{ + "data": { + "id": "08995ee8-5182-423e-9527-552736411010", + "query": "select * from osquery_foo" + }, + "id": "70539d80-4082-41e9-aff4-fbb877dd752b", + "input_type": "osquery", + "type": "INPUT_ACTION" + }`), + res: toMap(t, `{ + "completed_at": "2024-04-20T14:56:34.87195Z", + "error": "query failed, code: 1, message: no such table: osquery_foo", + "started_at": "2024-04-20T14:56:34.87195Z" + }`), + // "agent_id": "bf3d6036-2260-4bbf-94a3-5ccce0d75d9e", + want: toMap(t, `{ + "completed_at": "2024-04-20T14:56:34.87195Z", + "action_id": "70539d80-4082-41e9-aff4-fbb877dd752b", + "started_at": "2024-04-20T14:56:34.87195Z", + "action_input_type": "osquery", + "error": "query failed, code: 1, message: no such table: osquery_foo", + "action_data": { + "id": "08995ee8-5182-423e-9527-552736411010", + "query": "select * from osquery_foo" + } + }`), + }, + } + + for _, tc := range tests { + got := actionResultToEvent(tc.req, tc.res) + diff := cmp.Diff(tc.want, got) + if diff != "" { + t.Error(diff) + } + } +} + +func toMap(t *testing.T, s string) map[string]interface{} { + var m map[string]interface{} + err := json.Unmarshal([]byte(s), &m) + if err != nil { + t.Fatal(err) + } + return m +}