Skip to content

Commit

Permalink
[Auditbeat] Use a separate netlink socket for control to avoid data c…
Browse files Browse the repository at this point in the history
…ongestion. (#41207)
  • Loading branch information
nicholasberlin authored Oct 16, 2024
1 parent c1fc9a9 commit ac81fbd
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 19 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]

*Auditbeat*


- Request status from a separate socket to avoid data congestion {pull}41207[41207]

*Filebeat*

Expand Down
16 changes: 13 additions & 3 deletions auditbeat/module/auditd/audit_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func init() {
type MetricSet struct {
mb.BaseMetricSet
config Config
control *libaudit.AuditClient
client *libaudit.AuditClient
log *logp.Logger
kernelLost struct {
Expand All @@ -107,9 +108,14 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
_, _, kernel, _ := kernelVersion()
log.Infof("auditd module is running as euid=%v on kernel=%v", os.Geteuid(), kernel)

control, err := libaudit.NewAuditClient(nil)
if err != nil {
return nil, fmt.Errorf("failed to create audit control client: %w", err)
}

client, err := newAuditClient(&config, log)
if err != nil {
return nil, fmt.Errorf("failed to create audit client: %w", err)
return nil, fmt.Errorf("failed to create audit data client: %w", err)
}

reassemblerGapsMetric.Set(0)
Expand All @@ -119,6 +125,7 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

return &MetricSet{
BaseMetricSet: base,
control: control,
client: client,
config: config,
log: log,
Expand Down Expand Up @@ -168,10 +175,13 @@ func closeAuditClient(client *libaudit.AuditClient, log *logp.Logger) {
// kernel until the reporter's done channel is closed.
func (ms *MetricSet) Run(reporter mb.PushReporterV2) {
defer closeAuditClient(ms.client, ms.log)
defer ms.control.Close()

// Don't attempt to change configuration if audit rules are locked (enabled == 2).
// Will result in EPERM.
status, err := ms.client.GetStatus()
// Will result in EPERM. Also, ensure that another socket is used to determine the
// status, because audit data can already buffering for ms.client. Which can lead
// to an ENOBUFS error bubbling up.
status, err := ms.control.GetStatus()
if err != nil {
err = fmt.Errorf("failed to get audit status before adding rules: %w", err)
reporter.Error(err)
Expand Down
46 changes: 34 additions & 12 deletions auditbeat/module/auditd/audit_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,12 @@ var (
func TestImmutable(t *testing.T) {
logp.TestingSetup()

// Create a mock netlink client that provides the expected responses.
mock := NewMock().
// Create mocks of netlink client and control that provide the expected responses.
controlMock := NewMock().
// Get Status response for initClient
returnACK().returnStatus().
returnACK().returnStatus()

mock := NewMock().
// Send expected ACKs for initialization
// With one extra for SetImmutable
returnACK().returnStatus().returnACK().returnACK().
Expand All @@ -91,7 +93,13 @@ func TestImmutable(t *testing.T) {
config["immutable"] = true

ms := mbtest.NewPushMetricSetV2WithRegistry(t, config, ab.Registry)
auditMetricSet := ms.(*MetricSet)
auditMetricSet, ok := ms.(*MetricSet)
if !ok {
t.Fatalf("Expected *MetricSet but got %T", ms)
}

auditMetricSet.control.Close()
auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock}
auditMetricSet.client.Close()
auditMetricSet.client = &libaudit.AuditClient{Netlink: mock}

Expand All @@ -110,10 +118,12 @@ func TestImmutable(t *testing.T) {
func TestData(t *testing.T) {
logp.TestingSetup()

// Create a mock netlink client that provides the expected responses.
mock := NewMock().
// Create mocks of netlink client and control that provide the expected responses.
controlMock := NewMock().
// Get Status response for initClient
returnACK().returnStatus().
returnACK().returnStatus()

mock := NewMock().
// Send expected ACKs for initialization
returnACK().returnStatus().returnACK().returnACK().
returnACK().returnACK().returnACK().
Expand All @@ -124,7 +134,12 @@ func TestData(t *testing.T) {

// Replace the default AuditClient with a mock.
ms := mbtest.NewPushMetricSetV2WithRegistry(t, getConfig(), ab.Registry)
auditMetricSet := ms.(*MetricSet)
auditMetricSet, ok := ms.(*MetricSet)
if !ok {
t.Fatalf("Expected *MetricSet but got %T", ms)
}
auditMetricSet.control.Close()
auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock}
auditMetricSet.client.Close()
auditMetricSet.client = &libaudit.AuditClient{Netlink: mock}

Expand All @@ -143,10 +158,12 @@ func TestData(t *testing.T) {
func TestLoginType(t *testing.T) {
logp.TestingSetup()

// Create a mock netlink client that provides the expected responses.
mock := NewMock().
// Create mocks of netlink client and control that provide the expected responses.
controlMock := NewMock().
// Get Status response for initClient
returnACK().returnStatus().
returnACK().returnStatus()

mock := NewMock().
// Send expected ACKs for initialization
returnACK().returnStatus().returnACK().returnACK().
returnACK().returnACK().returnACK().
Expand All @@ -157,7 +174,12 @@ func TestLoginType(t *testing.T) {

// Replace the default AuditClient with a mock.
ms := mbtest.NewPushMetricSetV2WithRegistry(t, getConfig(), ab.Registry)
auditMetricSet := ms.(*MetricSet)
auditMetricSet, ok := ms.(*MetricSet)
if !ok {
t.Fatalf("Expected *MetricSet but got %T", ms)
}
auditMetricSet.control.Close()
auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock}
auditMetricSet.client.Close()
auditMetricSet.client = &libaudit.AuditClient{Netlink: mock}

Expand Down
13 changes: 10 additions & 3 deletions auditbeat/module/auditd/golden_files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,11 @@ func TestGoldenFiles(t *testing.T) {
if err != nil {
t.Fatalf("error reading log file '%s': %v", file, err)
}
mock := NewMock().
// Create mocks of netlink client and control that provide the expected responses.
controlMock := NewMock().
// Get Status response for initClient
returnACK().returnStatus().
returnACK().returnStatus()
mock := NewMock().
// Send expected ACKs for initialization
returnACK().returnStatus().returnACK().returnACK().
returnACK().returnACK().returnACK().
Expand All @@ -203,7 +205,12 @@ func TestGoldenFiles(t *testing.T) {
returnMessage(terminator)

ms := mbtest.NewPushMetricSetV2WithRegistry(t, configForGolden(), ab.Registry)
auditMetricSet := ms.(*MetricSet)
auditMetricSet, ok := ms.(*MetricSet)
if !ok {
t.Fatalf("Expected *MetricSet but got %T", ms)
}
auditMetricSet.control.Close()
auditMetricSet.control = &libaudit.AuditClient{Netlink: controlMock}
auditMetricSet.client.Close()
auditMetricSet.client = &libaudit.AuditClient{Netlink: mock}
mbEvents := runTerminableReporter(fileTimeout, ms, isTestEvent)
Expand Down

0 comments on commit ac81fbd

Please sign in to comment.