From ac81fbd70a9682b46626a66e0b45036a82204202 Mon Sep 17 00:00:00 2001 From: Nicholas Berlin <56366649+nicholasberlin@users.noreply.github.com> Date: Wed, 16 Oct 2024 14:17:44 -0400 Subject: [PATCH] [Auditbeat] Use a separate netlink socket for control to avoid data congestion. (#41207) --- CHANGELOG.next.asciidoc | 2 +- auditbeat/module/auditd/audit_linux.go | 16 +++++-- auditbeat/module/auditd/audit_linux_test.go | 46 +++++++++++++++----- auditbeat/module/auditd/golden_files_test.go | 13 ++++-- 4 files changed, 58 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 72ff8083fea3..0ac51f5990c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -122,7 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Auditbeat* - +- Request status from a separate socket to avoid data congestion {pull}41207[41207] *Filebeat* diff --git a/auditbeat/module/auditd/audit_linux.go b/auditbeat/module/auditd/audit_linux.go index f627c0cbefd6..97f755ca4139 100644 --- a/auditbeat/module/auditd/audit_linux.go +++ b/auditbeat/module/auditd/audit_linux.go @@ -87,6 +87,7 @@ func init() { type MetricSet struct { mb.BaseMetricSet config Config + control *libaudit.AuditClient client *libaudit.AuditClient log *logp.Logger kernelLost struct { @@ -107,9 +108,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { _, _, kernel, _ := kernelVersion() log.Infof("auditd module is running as euid=%v on kernel=%v", os.Geteuid(), kernel) + control, err := libaudit.NewAuditClient(nil) + if err != nil { + return nil, fmt.Errorf("failed to create audit control client: %w", err) + } + client, err := newAuditClient(&config, log) if err != nil { - return nil, fmt.Errorf("failed to create audit client: %w", err) + return nil, fmt.Errorf("failed to create audit data client: %w", err) } reassemblerGapsMetric.Set(0) @@ -119,6 +125,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return &MetricSet{ BaseMetricSet: base, + control: control, client: client, config: config, log: log, @@ -168,10 +175,13 @@ func closeAuditClient(client *libaudit.AuditClient, log *logp.Logger) { // kernel until the reporter's done channel is closed. func (ms *MetricSet) Run(reporter mb.PushReporterV2) { defer closeAuditClient(ms.client, ms.log) + defer ms.control.Close() // Don't attempt to change configuration if audit rules are locked (enabled == 2). - // Will result in EPERM. - status, err := ms.client.GetStatus() + // Will result in EPERM. Also, ensure that another socket is used to determine the + // status, because audit data can already buffering for ms.client. Which can lead + // to an ENOBUFS error bubbling up. + status, err := ms.control.GetStatus() if err != nil { err = fmt.Errorf("failed to get audit status before adding rules: %w", err) reporter.Error(err) diff --git a/auditbeat/module/auditd/audit_linux_test.go b/auditbeat/module/auditd/audit_linux_test.go index 9f9950d1050e..f0358b7a71c5 100644 --- a/auditbeat/module/auditd/audit_linux_test.go +++ b/auditbeat/module/auditd/audit_linux_test.go @@ -75,10 +75,12 @@ var ( func TestImmutable(t *testing.T) { logp.TestingSetup() - // Create a mock netlink client that provides the expected responses. - mock := NewMock(). + // Create mocks of netlink client and control that provide the expected responses. + controlMock := NewMock(). // Get Status response for initClient - returnACK().returnStatus(). + returnACK().returnStatus() + + mock := NewMock(). // Send expected ACKs for initialization // With one extra for SetImmutable returnACK().returnStatus().returnACK().returnACK(). @@ -91,7 +93,13 @@ func TestImmutable(t *testing.T) { config["immutable"] = true ms := mbtest.NewPushMetricSetV2WithRegistry(t, config, ab.Registry) - auditMetricSet := ms.(*MetricSet) + auditMetricSet, ok := ms.(*MetricSet) + if !ok { + t.Fatalf("Expected *MetricSet but got %T", ms) + } + + auditMetricSet.control.Close() + auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock} auditMetricSet.client.Close() auditMetricSet.client = &libaudit.AuditClient{Netlink: mock} @@ -110,10 +118,12 @@ func TestImmutable(t *testing.T) { func TestData(t *testing.T) { logp.TestingSetup() - // Create a mock netlink client that provides the expected responses. - mock := NewMock(). + // Create mocks of netlink client and control that provide the expected responses. + controlMock := NewMock(). // Get Status response for initClient - returnACK().returnStatus(). + returnACK().returnStatus() + + mock := NewMock(). // Send expected ACKs for initialization returnACK().returnStatus().returnACK().returnACK(). returnACK().returnACK().returnACK(). @@ -124,7 +134,12 @@ func TestData(t *testing.T) { // Replace the default AuditClient with a mock. ms := mbtest.NewPushMetricSetV2WithRegistry(t, getConfig(), ab.Registry) - auditMetricSet := ms.(*MetricSet) + auditMetricSet, ok := ms.(*MetricSet) + if !ok { + t.Fatalf("Expected *MetricSet but got %T", ms) + } + auditMetricSet.control.Close() + auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock} auditMetricSet.client.Close() auditMetricSet.client = &libaudit.AuditClient{Netlink: mock} @@ -143,10 +158,12 @@ func TestData(t *testing.T) { func TestLoginType(t *testing.T) { logp.TestingSetup() - // Create a mock netlink client that provides the expected responses. - mock := NewMock(). + // Create mocks of netlink client and control that provide the expected responses. + controlMock := NewMock(). // Get Status response for initClient - returnACK().returnStatus(). + returnACK().returnStatus() + + mock := NewMock(). // Send expected ACKs for initialization returnACK().returnStatus().returnACK().returnACK(). returnACK().returnACK().returnACK(). @@ -157,7 +174,12 @@ func TestLoginType(t *testing.T) { // Replace the default AuditClient with a mock. ms := mbtest.NewPushMetricSetV2WithRegistry(t, getConfig(), ab.Registry) - auditMetricSet := ms.(*MetricSet) + auditMetricSet, ok := ms.(*MetricSet) + if !ok { + t.Fatalf("Expected *MetricSet but got %T", ms) + } + auditMetricSet.control.Close() + auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock} auditMetricSet.client.Close() auditMetricSet.client = &libaudit.AuditClient{Netlink: mock} diff --git a/auditbeat/module/auditd/golden_files_test.go b/auditbeat/module/auditd/golden_files_test.go index 096d53d1b903..a121b9371dc6 100644 --- a/auditbeat/module/auditd/golden_files_test.go +++ b/auditbeat/module/auditd/golden_files_test.go @@ -191,9 +191,11 @@ func TestGoldenFiles(t *testing.T) { if err != nil { t.Fatalf("error reading log file '%s': %v", file, err) } - mock := NewMock(). + // Create mocks of netlink client and control that provide the expected responses. + controlMock := NewMock(). // Get Status response for initClient - returnACK().returnStatus(). + returnACK().returnStatus() + mock := NewMock(). // Send expected ACKs for initialization returnACK().returnStatus().returnACK().returnACK(). returnACK().returnACK().returnACK(). @@ -203,7 +205,12 @@ func TestGoldenFiles(t *testing.T) { returnMessage(terminator) ms := mbtest.NewPushMetricSetV2WithRegistry(t, configForGolden(), ab.Registry) - auditMetricSet := ms.(*MetricSet) + auditMetricSet, ok := ms.(*MetricSet) + if !ok { + t.Fatalf("Expected *MetricSet but got %T", ms) + } + auditMetricSet.control.Close() + auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock} auditMetricSet.client.Close() auditMetricSet.client = &libaudit.AuditClient{Netlink: mock} mbEvents := runTerminableReporter(fileTimeout, ms, isTestEvent)