Skip to content

Commit

Permalink
x-pack/filebeat/input/cel: log transaction ID in CEL evaluations (#37065
Browse files Browse the repository at this point in the history
)

Emit a previous transaction ID before starting CEL evaluation and the final ID
after completing the work. Also write transaction IDs to CEL debug log call if
available.
  • Loading branch information
efd6 authored Nov 11, 2023
1 parent 65a7833 commit 7ec5ac5
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will d
- Use filestream input with file_identity.fingerprint as default for hints autodiscover. {issue}35984[35984] {pull}36950[36950]
- Add network processor in addition to interface based direction resolution. {pull}37023[37023]
- Add setup option `--force-enable-module-filesets`, that will act as if all filesets have been enabled in a module during setup. {issue}30915[30915] {pull}99999[99999]
- Make CEL input log current transaction ID when request tracing is turned on. {pull}37065[37065]

*Auditbeat*

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c config) Validate() error {
if len(c.Regexps) != 0 {
patterns = map[string]*regexp.Regexp{".": nil}
}
_, err = newProgram(context.Background(), c.Program, root, client, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"))
_, err = newProgram(context.Background(), c.Program, root, client, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil)
if err != nil {
return fmt.Errorf("failed to check program: %w", err)
}
Expand Down
38 changes: 25 additions & 13 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
cfg.Resource.Tracer.Filename = strings.ReplaceAll(cfg.Resource.Tracer.Filename, "*", id)
}

client, err := newClient(ctx, cfg, log)
client, trace, err := newClient(ctx, cfg, log)
if err != nil {
return err
}
Expand All @@ -151,7 +151,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
Password: cfg.Auth.Basic.Password,
}
}
prg, err := newProgram(ctx, cfg.Program, root, client, limiter, auth, patterns, cfg.XSDs, log)
prg, err := newProgram(ctx, cfg.Program, root, client, limiter, auth, patterns, cfg.XSDs, log, trace)
if err != nil {
return err
}
Expand Down Expand Up @@ -227,6 +227,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
}

// Process a set of event requests.
if trace != nil {
log.Debugw("previous transaction", "transaction.id", trace.TxID())
}
log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact})
metrics.executions.Add(1)
start := i.now()
Expand All @@ -240,6 +243,9 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
log.Errorw("failed evaluation", "error", err)
}
metrics.celProcessingTime.Update(time.Since(start).Nanoseconds())
if trace != nil {
log.Debugw("final transaction", "transaction.id", trace.TxID())
}

// On exit, state is expected to be in the shape:
//
Expand Down Expand Up @@ -680,13 +686,13 @@ func getLimit(which string, rateLimit map[string]interface{}, log *logp.Logger)
return limit, true
}

func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, error) {
func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, *httplog.LoggingRoundTripper, error) {
if !wantClient(cfg) {
return nil, nil
return nil, nil, nil
}
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...)
if err != nil {
return nil, err
return nil, nil, err
}

if cfg.Auth.Digest.isEnabled() {
Expand All @@ -702,6 +708,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
}
}

var trace *httplog.LoggingRoundTripper
if cfg.Resource.Tracer != nil {
w := zapcore.AddSync(cfg.Resource.Tracer)
go func() {
Expand All @@ -716,7 +723,8 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
)
traceLogger := zap.New(core)

c.Transport = httplog.NewLoggingRoundTripper(c.Transport, traceLogger)
trace = httplog.NewLoggingRoundTripper(c.Transport, traceLogger)
c.Transport = trace
}

c.CheckRedirect = checkRedirect(cfg.Resource, log)
Expand All @@ -736,12 +744,12 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
if cfg.Auth.OAuth2.isEnabled() {
authClient, err := cfg.Auth.OAuth2.client(ctx, c)
if err != nil {
return nil, err
return nil, nil, err
}
return authClient, nil
return authClient, trace, nil
}

return c, nil
return c, trace, nil
}

func wantClient(cfg config) bool {
Expand Down Expand Up @@ -877,7 +885,7 @@ var (
}
)

func newProgram(ctx context.Context, src, root string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger) (cel.Program, error) {
func newProgram(ctx context.Context, src, root string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, error) {
xml, err := lib.XML(nil, xsd)
if err != nil {
return nil, fmt.Errorf("failed to build xml type hints: %w", err)
Expand All @@ -891,7 +899,7 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi
lib.Strings(),
lib.Time(),
lib.Try(),
lib.Debug(debug(log)),
lib.Debug(debug(log, trace)),
lib.File(mimetypes),
lib.MIME(mimetypes),
lib.Regexp(patterns),
Expand Down Expand Up @@ -923,14 +931,18 @@ func newProgram(ctx context.Context, src, root string, client *http.Client, limi
return prg, nil
}

func debug(log *logp.Logger) func(string, any) {
func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, any) {
log = log.Named("cel_debug")
return func(tag string, value any) {
level := "DEBUG"
if _, ok := value.(error); ok {
level = "ERROR"
}
log.Debugw(level, "tag", tag, "value", value)
if trace == nil {
log.Debugw(level, "tag", tag, "value", value)
} else {
log.Debugw(level, "tag", tag, "value", value, "transaction.id", trace.TxID())
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions x-pack/filebeat/input/internal/httplog/roundtripper.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,23 @@ func (rt *LoggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, err
return resp, err
}

// TxID returns the current transaction.id value. If rt is nil, the empty string is returned.
func (rt *LoggingRoundTripper) TxID() string {
if rt == nil {
return ""
}
count := rt.txIDCounter.Load()
return rt.formatTxID(count)
}

// nextTxID returns the next transaction.id value. It increments the internal
// request counter.
func (rt *LoggingRoundTripper) nextTxID() string {
count := rt.txIDCounter.Inc()
return rt.formatTxID(count)
}

func (rt *LoggingRoundTripper) formatTxID(count uint64) string {
return rt.txBaseID + "-" + strconv.FormatUint(count, 10)
}

Expand Down

0 comments on commit 7ec5ac5

Please sign in to comment.