Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[add_session_metadata processor] Enrich events with user and group names #39537

Merged
merged 8 commits into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add procfs backend to the `add_session_metadata` processor. {pull}38799[38799]
- Add process.entity_id, process.group.name and process.group.id in add_process_metadata processor. Make fim module with kprobes backend to always add an appropriately configured add_process_metadata processor to enrich file events {pull}38776[38776]
- Reduce data size for add_session_metadata processor by removing unneeded fields {pull}39500[39500]
- Enrich process events with user and group names, with add_session_metadata processor {pull}39537[39537]

*Auditbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (p *addSessionMetadata) Run(ev *beat.Event) (*beat.Event, error) {
return ev, nil //nolint:nilerr // Running on events with a different PID type is not a processor error
}

err = p.provider.UpdateDB(ev, pid)
err = p.provider.SyncDB(ev, pid)
if err != nil {
return ev, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,14 @@ var (
Pgid: uint32(100),
Sid: uint32(40),
},
Creds: types.CredInfo{
Ruid: 0,
Euid: 0,
Suid: 0,
Rgid: 0,
Egid: 0,
Sgid: 0,
},
CWD: "/",
Filename: "/bin/ls",
},
Expand Down Expand Up @@ -78,12 +86,24 @@ var (
"pid": uint32(100),
"parent": mapstr.M{
"pid": uint32(50),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
"session_leader": mapstr.M{
"pid": uint32(40),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
"group_leader": mapstr.M{
"pid": uint32(100),
"user": mapstr.M{
"id": "0",
"name": "root",
},
},
},
},
Expand Down Expand Up @@ -318,33 +338,35 @@ var (

func TestEnrich(t *testing.T) {
for _, tt := range enrichTests {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
require.Nil(t, err)
t.Run(tt.testName, func(t *testing.T) {
reader := procfs.NewMockReader()
db, err := processdb.NewDB(reader, *logger)
require.Nil(t, err)

for _, ev := range tt.mockProcesses {
db.InsertExec(ev)
}
s := addSessionMetadata{
logger: logger,
db: db,
config: tt.config,
}
for _, ev := range tt.mockProcesses {
db.InsertExec(ev)
}
s := addSessionMetadata{
logger: logger,
db: db,
config: tt.config,
}

// avoid taking address of loop variable
i := tt.input
actual, err := s.enrich(&i)
if tt.expect_error {
require.Error(t, err, "%s: error unexpectedly nil", tt.testName)
} else {
require.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
require.NotNil(t, actual, "%s: returned nil event", tt.testName)
// avoid taking address of loop variable
i := tt.input
actual, err := s.enrich(&i)
if tt.expect_error {
require.Error(t, err, "%s: error unexpectedly nil", tt.testName)
} else {
require.Nil(t, err, "%s: enrich error: %w", tt.testName, err)
require.NotNil(t, actual, "%s: returned nil event", tt.testName)

//Validate output
if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" {
t.Errorf("field mismatch:\n%s", diff)
//Validate output
if diff := cmp.Diff(tt.expected.Fields, actual.Fields, ignoreMissingFrom(tt.expected.Fields)); diff != "" {
t.Errorf("field mismatch:\n%s", diff)
}
}
}
})
}
}

Expand All @@ -364,8 +386,10 @@ func ignoreMissingFrom(m mapstr.M) cmp.Option {
// Note: This validates test code only
func TestFilter(t *testing.T) {
for _, tt := range filterTests {
if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected {
t.Errorf("%s: unexpected comparator result", tt.testName)
}
t.Run(tt.testName, func(t *testing.T) {
if eq := cmp.Equal(tt.mx, tt.my, ignoreMissingFrom(tt.mx)); eq != tt.expected {
t.Errorf("%s: unexpected comparator result", tt.testName)
}
})
}
}
62 changes: 52 additions & 10 deletions x-pack/auditbeat/processors/sessionmd/processdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ type DB struct {
procfs procfs.Reader
stopChan chan struct{}
removalCandidates rcHeap
namesCache *namesCache
}

func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
Expand All @@ -204,6 +205,7 @@ func NewDB(reader procfs.Reader, logger logp.Logger) (*DB, error) {
procfs: reader,
stopChan: make(chan struct{}),
removalCandidates: make(rcHeap, 0),
namesCache: newNamesCache(),
}
db.startReaper()
return &db, nil
Expand Down Expand Up @@ -430,7 +432,7 @@ func interactiveFromTTY(tty types.TTYDev) bool {
return TTYUnknown != getTTYType(tty.Major, tty.Minor)
}

func fullProcessFromDBProcess(p Process) types.Process {
func (db *DB) fullProcessFromDBProcess(p Process) types.Process {
reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(p.PIDs.StartTimeNS)
interactive := interactiveFromTTY(p.CTTY)

Expand All @@ -447,7 +449,15 @@ func fullProcessFromDBProcess(p Process) types.Process {
euid := p.Creds.Euid
egid := p.Creds.Egid
ret.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := db.namesCache.getUserName(ret.User.ID)
if ok {
ret.User.Name = username
}
ret.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := db.namesCache.getGroupName(ret.Group.ID)
if ok {
ret.Group.Name = groupname
}
ret.Thread.Capabilities.Permitted, _ = capabilities.FromUint64(p.Creds.CapPermitted)
ret.Thread.Capabilities.Effective, _ = capabilities.FromUint64(p.Creds.CapEffective)
ret.TTY.CharDevice.Major = p.CTTY.Major
Expand All @@ -457,7 +467,7 @@ func fullProcessFromDBProcess(p Process) types.Process {
return ret
}

func fillParent(process *types.Process, parent Process) {
func (db *DB) fillParent(process *types.Process, parent Process) {
reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(parent.PIDs.StartTimeNS)

interactive := interactiveFromTTY(parent.CTTY)
Expand All @@ -471,10 +481,18 @@ func fillParent(process *types.Process, parent Process) {
process.Parent.WorkingDirectory = parent.Cwd
process.Parent.Interactive = &interactive
process.Parent.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := db.namesCache.getUserName(process.Parent.User.ID)
if ok {
process.Parent.User.Name = username
}
process.Parent.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := db.namesCache.getGroupName(process.Parent.Group.ID)
if ok {
process.Parent.Group.Name = groupname
}
}

func fillGroupLeader(process *types.Process, groupLeader Process) {
func (db *DB) fillGroupLeader(process *types.Process, groupLeader Process) {
reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(groupLeader.PIDs.StartTimeNS)

interactive := interactiveFromTTY(groupLeader.CTTY)
Expand All @@ -488,10 +506,18 @@ func fillGroupLeader(process *types.Process, groupLeader Process) {
process.GroupLeader.WorkingDirectory = groupLeader.Cwd
process.GroupLeader.Interactive = &interactive
process.GroupLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := db.namesCache.getUserName(process.GroupLeader.User.ID)
if ok {
process.GroupLeader.User.Name = username
}
process.GroupLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := db.namesCache.getGroupName(process.GroupLeader.Group.ID)
if ok {
process.GroupLeader.Group.Name = groupname
}
}

func fillSessionLeader(process *types.Process, sessionLeader Process) {
func (db *DB) fillSessionLeader(process *types.Process, sessionLeader Process) {
reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(sessionLeader.PIDs.StartTimeNS)

interactive := interactiveFromTTY(sessionLeader.CTTY)
Expand All @@ -505,10 +531,18 @@ func fillSessionLeader(process *types.Process, sessionLeader Process) {
process.SessionLeader.WorkingDirectory = sessionLeader.Cwd
process.SessionLeader.Interactive = &interactive
process.SessionLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := db.namesCache.getUserName(process.SessionLeader.User.ID)
if ok {
process.SessionLeader.User.Name = username
}
process.SessionLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := db.namesCache.getGroupName(process.SessionLeader.Group.ID)
if ok {
process.SessionLeader.Group.Name = groupname
}
}

func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Process) {
func (db *DB) fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Process) {
reducedPrecisionStartTime := timeutils.ReduceTimestampPrecision(entryLeader.PIDs.StartTimeNS)

interactive := interactiveFromTTY(entryLeader.CTTY)
Expand All @@ -522,7 +556,15 @@ func fillEntryLeader(process *types.Process, entryType EntryType, entryLeader Pr
process.EntryLeader.WorkingDirectory = entryLeader.Cwd
process.EntryLeader.Interactive = &interactive
process.EntryLeader.User.ID = strconv.FormatUint(uint64(euid), 10)
username, ok := db.namesCache.getUserName(process.EntryLeader.User.ID)
if ok {
process.EntryLeader.User.Name = username
}
process.EntryLeader.Group.ID = strconv.FormatUint(uint64(egid), 10)
groupname, ok := db.namesCache.getGroupName(process.EntryLeader.Group.ID)
if ok {
process.EntryLeader.Group.Name = groupname
}

process.EntryLeader.EntryMeta.Type = string(entryType)
}
Expand Down Expand Up @@ -583,12 +625,12 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
return types.Process{}, errors.New("process not found")
}

ret := fullProcessFromDBProcess(process)
ret := db.fullProcessFromDBProcess(process)

if process.PIDs.Ppid != 0 {
for i := 0; i < retryCount; i++ {
if parent, ok := db.processes[process.PIDs.Ppid]; ok {
fillParent(&ret, parent)
db.fillParent(&ret, parent)
break
}
}
Expand All @@ -597,7 +639,7 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
if process.PIDs.Pgid != 0 {
for i := 0; i < retryCount; i++ {
if groupLeader, ok := db.processes[process.PIDs.Pgid]; ok {
fillGroupLeader(&ret, groupLeader)
db.fillGroupLeader(&ret, groupLeader)
break
}
}
Expand All @@ -606,7 +648,7 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
if process.PIDs.Sid != 0 {
for i := 0; i < retryCount; i++ {
if sessionLeader, ok := db.processes[process.PIDs.Sid]; ok {
fillSessionLeader(&ret, sessionLeader)
db.fillSessionLeader(&ret, sessionLeader)
break
}
}
Expand All @@ -615,7 +657,7 @@ func (db *DB) GetProcess(pid uint32) (types.Process, error) {
if entryLeaderPID, foundEntryLeaderPID := db.entryLeaderRelationships[process.PIDs.Tgid]; foundEntryLeaderPID {
if entryLeader, foundEntryLeader := db.processes[entryLeaderPID]; foundEntryLeader {
// if there is an entry leader then there is a matching member in the entryLeaders table
fillEntryLeader(&ret, db.entryLeaders[entryLeaderPID], entryLeader)
db.fillEntryLeader(&ret, db.entryLeaders[entryLeaderPID], entryLeader)
} else {
db.logger.Debugf("failed to find entry leader entry %d for %d (%s)", entryLeaderPID, pid, db.processes[pid].Filename)
}
Expand Down
75 changes: 75 additions & 0 deletions x-pack/auditbeat/processors/sessionmd/processdb/names.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.

//go:build linux

package processdb

import (
"os/user"
"sync"
)

type cval struct {
name string
found bool
}

type namesCache struct {
mutex sync.RWMutex
users map[string]cval
groups map[string]cval
}

// newNamesCache will return a new namesCache, which can be used to get mappings
// of user and group IDs to names.
func newNamesCache() *namesCache {
u := namesCache{
users: make(map[string]cval),
groups: make(map[string]cval),
}
return &u
}

// getUserName will return the name associated with the user ID, if it exists
func (u *namesCache) getUserName(id string) (string, bool) {
u.mutex.Lock()
defer u.mutex.Unlock()

val, ok := u.users[id]
if ok {
return val.name, val.found
}
user, err := user.LookupId(id)
cval := cval{}
if err != nil {
cval.name = ""
cval.found = false
} else {
cval.name = user.Username
cval.found = true
}
return cval.name, cval.found
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I get this correctly before we return it we should save cval in the map right? That said, I think that the cache sure helps with the performance (especially if we are talking about LDAP-based users) but on the other hand it can affect the reliability of the data, e.g.

$ sudo useradd jane
$ sudo su jane
$ id
uid=1000(jane) gid=1001(jane) groups=1001(jane)
$ exit
$ sudo usermod -l new_jane jane
$ sudo su new_jane
$ id
uid=1000(new_jane) gid=1001(jane) groups=1001(jane)

I just renamed a user but the Id remains the same so better to search for it every-time?!?

Copy link
Contributor Author

@mjwolf mjwolf May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed all the caching here, it turns out it has no benefit. I've also tried changing the username as the benchmarking ran, and it returned the new username immediately.

}

// getGroupName will return the name associated with the group ID, if it exists
func (u *namesCache) getGroupName(id string) (string, bool) {
u.mutex.Lock()
defer u.mutex.Unlock()

val, ok := u.groups[id]
if ok {
return val.name, val.found
}
group, err := user.LookupGroupId(id)
cval := cval{}
if err != nil {
cval.name = ""
cval.found = false
} else {
cval.name = group.Name
cval.found = true
}
return cval.name, cval.found
mjwolf marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ func NewProvider(ctx context.Context, logger *logp.Logger, db *processdb.DB) (pr
}

const (
maxWaitLimit = 200 * time.Millisecond // Maximum time UpdateDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple UpdateDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // UpdateDB will stop waiting for processes for this time
maxWaitLimit = 200 * time.Millisecond // Maximum time SyncDB will wait for process
combinedWaitLimit = 2 * time.Second // Multiple SyncDB calls will wait up to this amount within resetDuration
backoffDuration = 10 * time.Second // SyncDB will stop waiting for processes for this time
resetDuration = 5 * time.Second // After this amount of times with no backoffs, the combinedWait will be reset
)

Expand All @@ -176,7 +176,7 @@ var (
// If for some reason a lot of time has been spent waiting for missing processes, this also has a backoff timer during
// which it will continue without waiting for missing events to arrive, so the processor doesn't become overly backed-up
// waiting for these processes, at the cost of possibly not enriching some processes.
func (s prvdr) UpdateDB(ev *beat.Event, pid uint32) error {
func (s prvdr) SyncDB(ev *beat.Event, pid uint32) error {
if s.db.HasProcess(pid) {
return nil
}
Expand Down
Loading
Loading