From 7ec5ac5301c60a110350c99c88ce43e372f0db52 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Sat, 11 Nov 2023 21:52:35 +1030 Subject: [PATCH] x-pack/filebeat/input/cel: log transaction ID in CEL evaluations (#37065) Emit a previous transaction ID before starting CEL evaluation and the final ID after completing the work. Also write transaction IDs to CEL debug log call if available. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/cel/config.go | 2 +- x-pack/filebeat/input/cel/input.go | 38 ++++++++++++------- .../input/internal/httplog/roundtripper.go | 13 +++++++ 4 files changed, 40 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 462af8725a1e..9863ff6db2da 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -201,6 +201,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d - Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950] - Add network processor in addition to interface based direction resolution. {pull}37023[37023] - Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999] +- Make CEL input log current transaction ID when request tracing is turned on. {pull}37065[37065] *Auditbeat* diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go index 5a62a3ad572c..94b41190fa6c 100644 --- a/x-pack/filebeat/input/cel/config.go +++ b/x-pack/filebeat/input/cel/config.go @@ -89,7 +89,7 @@ func (c config) Validate() error { if len(c.Regexps) != 0 { patterns = map[string]*regexp.Regexp{".": nil} } - _, err = newProgram(context.Background(), c.Program, root, client, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel")) + _, err = newProgram(context.Background(), c.Program, root, client, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil) if err != nil { return fmt.Errorf("failed to check program: %w", err) } diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 03d716e0c6c8..b65f0ae8d100 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -132,7 +132,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id) } - client, err := newClient(ctx, cfg, log) + client, trace, err := newClient(ctx, cfg, log) if err != nil { return err } @@ -151,7 +151,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p Password: cfg.Auth.Basic.Password, } } - prg, err := newProgram(ctx, cfg.Program, root, client, limiter, auth, patterns, cfg.XSDs, log) + prg, err := newProgram(ctx, cfg.Program, root, client, limiter, auth, patterns, cfg.XSDs, log, trace) if err != nil { return err } @@ -227,6 +227,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p } // Process a set of event requests. + if trace != nil { + log.Debugw("previous transaction", "transaction.id", trace.TxID()) + } log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact}) metrics.executions.Add(1) start := i.now() @@ -240,6 +243,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p log.Errorw("failed evaluation", "error", err) } metrics.celProcessingTime.Update(time.Since(start).Nanoseconds()) + if trace != nil { + log.Debugw("final transaction", "transaction.id", trace.TxID()) + } // On exit, state is expected to be in the shape: // @@ -680,13 +686,13 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger) return limit, true } -func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, error) { +func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, *httplog.LoggingRoundTripper, error) { if !wantClient(cfg) { - return nil, nil + return nil, nil, nil } c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...) if err != nil { - return nil, err + return nil, nil, err } if cfg.Auth.Digest.isEnabled() { @@ -702,6 +708,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, } } + var trace *httplog.LoggingRoundTripper if cfg.Resource.Tracer != nil { w := zapcore.AddSync(cfg.Resource.Tracer) go func() { @@ -716,7 +723,8 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, ) traceLogger := zap.New(core) - c.Transport = httplog.NewLoggingRoundTripper(c.Transport, traceLogger) + trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger) + c.Transport = trace } c.CheckRedirect = checkRedirect(cfg.Resource, log) @@ -736,12 +744,12 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, if cfg.Auth.OAuth2.isEnabled() { authClient, err := cfg.Auth.OAuth2.client(ctx, c) if err != nil { - return nil, err + return nil, nil, err } - return authClient, nil + return authClient, trace, nil } - return c, nil + return c, trace, nil } func wantClient(cfg config) bool { @@ -877,7 +885,7 @@ var ( } ) -func newProgram(ctx context.Context, src, root string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger) (cel.Program, error) { +func newProgram(ctx context.Context, src, root string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, error) { xml, err := lib.XML(nil, xsd) if err != nil { return nil, fmt.Errorf("failed to build xml type hints: %w", err) @@ -891,7 +899,7 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi lib.Strings(), lib.Time(), lib.Try(), - lib.Debug(debug(log)), + lib.Debug(debug(log, trace)), lib.File(mimetypes), lib.MIME(mimetypes), lib.Regexp(patterns), @@ -923,14 +931,18 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi return prg, nil } -func debug(log *logp.Logger) func(string, any) { +func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, any) { log = log.Named("cel_debug") return func(tag string, value any) { level := "DEBUG" if _, ok := value.(error); ok { level = "ERROR" } - log.Debugw(level, "tag", tag, "value", value) + if trace == nil { + log.Debugw(level, "tag", tag, "value", value) + } else { + log.Debugw(level, "tag", tag, "value", value, "transaction.id", trace.TxID()) + } } } diff --git a/x-pack/filebeat/input/internal/httplog/roundtripper.go b/x-pack/filebeat/input/internal/httplog/roundtripper.go index 78e872efa665..bf350aadc374 100644 --- a/x-pack/filebeat/input/internal/httplog/roundtripper.go +++ b/x-pack/filebeat/input/internal/httplog/roundtripper.go @@ -167,10 +167,23 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err return resp, err } +// TxID returns the current transaction.id value. If rt is nil, the empty string is returned. +func (rt *LoggingRoundTripper) TxID() string { + if rt == nil { + return "" + } + count := rt.txIDCounter.Load() + return rt.formatTxID(count) +} + // nextTxID returns the next transaction.id value. It increments the internal // request counter. func (rt *LoggingRoundTripper) nextTxID() string { count := rt.txIDCounter.Inc() + return rt.formatTxID(count) +} + +func (rt *LoggingRoundTripper) formatTxID(count uint64) string { return rt.txBaseID + "-" + strconv.FormatUint(count, 10) }