From ffcd1814666645a5d7a644911ecf6e2b7d8db3f5 Mon Sep 17 00:00:00 2001 From: Michael Wolf Date: Wed, 1 May 2024 14:52:27 -0700 Subject: [PATCH] [Auditbeat][add_session_metadata processor] Fix more potential enrichment failures (#39243) Fix two more cases that could cause unenriched processes in the add_session_metadata processor. It was possible for auditd events to arrive before the ebpf event added processes to the process DB, now the enrichment will wait for the process to be inserted into the DB, if it's not already before enrichment is run on it. Also stop attempting to enrich failed syscall events, and modifying the DB based on these. Changes: With the ebpf backend, when an event is processed wait for a process to be added to the DB before enriching, if it's not already in the DB before the event is received. Do not enrich failed syscall auditd events. Since failed syscalls don't actually cause a process to be created, they should not be enriched, or inserted to the process Remove scrapeAncestors from DB. The intention of this was to fill in missed processes, but now processes should not be missed with epbf, and ineffective with procfs, as the process will most likely already be ended. This was causing DB inconsistancies when run on failed syscall events, and I haven't ever seen any cases where it's helpful now. --- CHANGELOG.next.asciidoc | 3 +- .../sessionmd/add_session_metadata.go | 19 ++++- .../processors/sessionmd/processdb/db.go | 39 ++------- .../provider/ebpf_provider/ebpf_provider.go | 80 ++++++++++++++++++- .../procfs_provider/procfs_provider.go | 21 ++--- .../procfs_provider/procfs_provider_test.go | 10 +-- .../processors/sessionmd/provider/provider.go | 2 +- 7 files changed, 114 insertions(+), 60 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 58ce7ac0f65..68eb43677ea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -94,8 +94,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* - Set field types to correctly match ECS in sessionmd processor {issue}38955[38955] {pull}38994[38994] -- Keep process info on exited processes, to avoid failing to enrich events in sessionmd processor {pull}39173[39173] - +- Fix failing to enrich process events in sessionmd processor {issue}38955[38955] {pull}39173[39173] {pull}39243[39243] - Prevent scenario of losing children-related file events in a directory for recursive fsnotify backend of auditbeat file integrity module {pull}39133[39133] diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index ff9fa54e556..766e9623b9e 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -96,13 +96,24 @@ func New(cfg *cfg.C) (beat.Processor, error) { } func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) { - _, err := ev.GetValue(p.config.PIDField) + pi, err := ev.GetValue(p.config.PIDField) if err != nil { // Do not attempt to enrich events without PID; it's not a supported event return ev, nil //nolint:nilerr // Running on events without PID is expected } - err = p.provider.UpdateDB(ev) + // Do not enrich failed syscalls, as there was no actual process change related to it + v, err := ev.GetValue("auditd.result") + if err == nil && v == "fail" { + return ev, nil + } + + pid, err := pidToUInt32(pi) + if err != nil { + return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error + } + + err = p.provider.UpdateDB(ev, pid) if err != nil { return ev, err } @@ -136,7 +147,9 @@ func (p *addSessionMetadata) enrich(ev *beat.Event) (*beat.Event, error) { fullProcess, err := p.db.GetProcess(pid) if err != nil { - return nil, fmt.Errorf("pid %v not found in db: %w", pid, err) + e := fmt.Errorf("pid %v not found in db: %w", pid, err) + p.logger.Errorf("%v", e) + return nil, e } processMap := fullProcess.ToMap() diff --git a/x-pack/auditbeat/processors/sessionmd/processdb/db.go b/x-pack/auditbeat/processors/sessionmd/processdb/db.go index 2c7c228e2c1..b8c624abe00 100644 --- a/x-pack/auditbeat/processors/sessionmd/processdb/db.go +++ b/x-pack/auditbeat/processors/sessionmd/processdb/db.go @@ -238,7 +238,6 @@ func (db *DB) InsertFork(fork types.ProcessForkEvent) { pid := fork.ChildPIDs.Tgid ppid := fork.ParentPIDs.Tgid - db.scrapeAncestors(db.processes[pid]) if entry, ok := db.processes[ppid]; ok { entry.PIDs = pidInfoFromProto(fork.ChildPIDs) @@ -282,7 +281,6 @@ func (db *DB) InsertExec(exec types.ProcessExecEvent) { } db.processes[exec.PIDs.Tgid] = proc - db.scrapeAncestors(proc) entryLeaderPID := db.evaluateEntryLeader(proc) if entryLeaderPID != nil { db.entryLeaderRelationships[exec.PIDs.Tgid] = *entryLeaderPID @@ -568,6 +566,14 @@ func setSameAsProcess(process *types.Process) { } } +func (db *DB) HasProcess(pid uint32) bool { + db.mutex.RLock() + defer db.mutex.RUnlock() + + _, ok := db.processes[pid] + return ok +} + func (db *DB) GetProcess(pid uint32) (types.Process, error) { db.mutex.RLock() defer db.mutex.RUnlock() @@ -585,8 +591,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillParent(&ret, parent) break } - db.logger.Debugf("failed to find %d in DB (parent of %d), attempting to scrape", process.PIDs.Ppid, pid) - db.scrapeAncestors(process) } } @@ -596,8 +600,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillGroupLeader(&ret, groupLeader) break } - db.logger.Debugf("failed to find %d in DB (group leader of %d), attempting to scrape", process.PIDs.Pgid, pid) - db.scrapeAncestors(process) } } @@ -607,8 +609,6 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) { fillSessionLeader(&ret, sessionLeader) break } - db.logger.Debugf("failed to find %d in DB (session leader of %d), attempting to scrape", process.PIDs.Sid, pid) - db.scrapeAncestors(process) } } @@ -712,29 +712,6 @@ func getTTYType(major uint16, minor uint16) TTYType { return TTYUnknown } -func (db *DB) scrapeAncestors(proc Process) { - for _, pid := range []uint32{proc.PIDs.Pgid, proc.PIDs.Ppid, proc.PIDs.Sid} { - if _, exists := db.processes[pid]; pid == 0 || exists { - continue - } - procInfo, err := db.procfs.GetProcess(pid) - if err != nil { - db.logger.Debugf("couldn't get %v from procfs: %w", pid, err) - continue - } - p := Process{ - PIDs: pidInfoFromProto(procInfo.PIDs), - Creds: credInfoFromProto(procInfo.Creds), - CTTY: ttyDevFromProto(procInfo.CTTY), - Argv: procInfo.Argv, - Cwd: procInfo.Cwd, - Env: procInfo.Env, - Filename: procInfo.Filename, - } - db.insertProcess(p) - } -} - func (db *DB) Close() { close(db.stopChan) } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go index 2b9b540e037..f1b8bae0b67 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/ebpf_provider/ebpf_provider.go @@ -9,6 +9,7 @@ package ebpf_provider import ( "context" "fmt" + "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/ebpf" @@ -151,7 +152,80 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr return &p, nil } -func (s prvdr) UpdateDB(ev *beat.Event) error { - // no-op for ebpf, DB is updated from pushed ebpf events - return nil +const ( + maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process + combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration + backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time + resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset +) + +var ( + combinedWait = 0 * time.Millisecond + inBackoff = false + backoffStart = time.Now() + since = time.Now() + backoffSkipped = 0 +) + +// With ebpf, process events are pushed to the DB by the above goroutine, so this doesn't actually update the DB. +// It does to try sync the processor and ebpf events, so that the process is in the process db before continuing. +// +// It's possible that the event to enrich arrives before the process is inserted into the DB. In that case, this +// will block continuing the enrichment until the process is seen (or the timeout is reached). +// +// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during +// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up +// waiting for these processes, at the cost of possibly not enriching some processes. +func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { + if s.db.HasProcess(pid) { + return nil + } + + now := time.Now() + if inBackoff { + if now.Sub(backoffStart) > backoffDuration { + s.logger.Warnf("ended backoff, skipped %d processes", backoffSkipped) + inBackoff = false + combinedWait = 0 * time.Millisecond + } else { + backoffSkipped += 1 + return nil + } + } else { + if combinedWait > combinedWaitLimit { + s.logger.Warn("starting backoff") + inBackoff = true + backoffStart = now + backoffSkipped = 0 + return nil + } + // maintain a moving window of time for the delays we track + if now.Sub(since) > resetDuration { + since = now + combinedWait = 0 * time.Millisecond + } + } + + start := now + nextWait := 5 * time.Millisecond + for { + waited := time.Since(start) + if s.db.HasProcess(pid) { + s.logger.Debugf("got process that was missing after %v", waited) + combinedWait = combinedWait + waited + return nil + } + if waited >= maxWaitLimit { + e := fmt.Errorf("process %v was not seen after %v", pid, waited) + s.logger.Warnf("%w", e) + combinedWait = combinedWait + waited + return e + } + time.Sleep(nextWait) + if nextWait*2+waited > maxWaitLimit { + nextWait = maxWaitLimit - waited + } else { + nextWait = nextWait * 2 + } + } } diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go index 2f99dd72b1f..6525b860b6d 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider.go @@ -41,16 +41,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB, rea } // UpdateDB will update the process DB with process info from procfs or the event itself -func (s prvdr) UpdateDB(ev *beat.Event) error { - pi, err := ev.Fields.GetValue(s.pidField) - if err != nil { - return fmt.Errorf("event not supported, no pid") - } - pid, ok := pi.(int) - if !ok { - return fmt.Errorf("pid field not int") - } - +func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error { syscall, err := ev.GetValue(syscallField) if err != nil { return fmt.Errorf("event not supported, no syscall data") @@ -59,7 +50,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { switch syscall { case "execveat", "execve": pe := types.ProcessExecEvent{} - proc_info, err := s.reader.GetProcess(uint32(pid)) + proc_info, err := s.reader.GetProcess(pid) if err == nil { pe.PIDs = proc_info.PIDs pe.Creds = proc_info.Creds @@ -72,7 +63,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { s.logger.Warnf("couldn't get process info from proc for pid %v: %w", pid, err) // If process info couldn't be taken from procfs, populate with as much info as // possible from the event - pe.PIDs.Tgid = uint32(pid) + pe.PIDs.Tgid = pid var intr interface{} var i int var ok bool @@ -106,7 +97,7 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { case "exit_group": pe := types.ProcessExitEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), + Tgid: pid, }, } s.db.InsertExit(pe) @@ -122,8 +113,8 @@ func (s prvdr) UpdateDB(ev *beat.Event) error { if result == "success" { setsid_ev := types.ProcessSetsidEvent{ PIDs: types.PIDInfo{ - Tgid: uint32(pid), - Sid: uint32(pid), + Tgid: pid, + Sid: pid, }, } s.db.InsertSetsid(setsid_ev) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go index 6fd333c4711..c438efcfe1a 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/procfs_provider/procfs_provider_test.go @@ -124,7 +124,7 @@ func TestExecveEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -234,7 +234,7 @@ func TestExecveatEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -317,7 +317,7 @@ func TestSetSidEvent(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -399,7 +399,7 @@ func TestSetSidEventFailed(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) @@ -470,7 +470,7 @@ func TestSetSidSessionLeaderNotScraped(t *testing.T) { provider, err := NewProvider(context.TODO(), &logger, db, reader, "process.pid") require.Nil(t, err, "error creating provider") - err = provider.UpdateDB(&event) + err = provider.UpdateDB(&event, expected.PIDs.Tgid) require.Nil(t, err) actual, err := db.GetProcess(pid) diff --git a/x-pack/auditbeat/processors/sessionmd/provider/provider.go b/x-pack/auditbeat/processors/sessionmd/provider/provider.go index e3fa1547806..6452eb9e2bf 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/provider.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/provider.go @@ -11,5 +11,5 @@ import ( ) type Provider interface { - UpdateDB(*beat.Event) error + UpdateDB(*beat.Event, uint32) error }