Skip to content

Commit

Permalink
[Filebeat] ETW input - add basic metrics (elastic#38017)
Browse files Browse the repository at this point in the history
Add input metrics to for ETW.

Remove the very verbose debug message that was in the hot path for
processing each event.

I think more metrics taken from the ETW session itself should be added
separately (e.g. events lost, buffers lost). This lays the ground works needed
to expose those metrics.

Closes elastic#38007
  • Loading branch information
andrewkroh authored Mar 6, 2024
1 parent 00e38f5 commit ae312c5
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 22 deletions.
100 changes: 79 additions & 21 deletions x-pack/filebeat/docs/inputs/input-etw.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,29 @@

beta[]

https://learn.microsoft.com/en-us/windows/win32/etw/event-tracing-portal[Event Tracing for Windows] is a powerful logging and tracing mechanism built into the Windows operating system. It provides a detailed view of application and system behavior, performance issues, and runtime diagnostics. Trace events contain an event header and provider-defined data that describes the current state of an application or operation. You can use the events to debug an application and perform capacity and performance analysis.

The ETW input can interact with ETW in three distinct ways: it can create a new session to capture events from user-mode providers, attach to an already existing session to collect ongoing event data, or read events from a pre-recorded .etl file. This functionality enables the module to adapt to different scenarios, such as real-time event monitoring or analyzing historical data.

This input currently supports manifest-based, MOF (classic) and TraceLogging providers while WPP providers are not supported. https://learn.microsoft.com/en-us/windows/win32/etw/about-event-tracing#types-of-providers[Here] you can find more information about the available types of providers.

It has been tested in every Windows versions supported by Filebeat, starting from Windows 8.1 and Windows Server 2016. In addition, administrative privileges are required in order to control event tracing sessions.
https://learn.microsoft.com/en-us/windows/win32/etw/event-tracing-portal[Event
Tracing for Windows] is a powerful logging and tracing mechanism built into the
Windows operating system. It provides a detailed view of application and system
behavior, performance issues, and runtime diagnostics. Trace events contain an
event header and provider-defined data that describes the current state of an
application or operation. You can use the events to debug an application and
perform capacity and performance analysis.

The ETW input can interact with ETW in three distinct ways: it can create a new
session to capture events from user-mode providers, attach to an already
existing session to collect ongoing event data, or read events from a
pre-recorded .etl file. This functionality enables the module to adapt to
different scenarios, such as real-time event monitoring or analyzing historical
data.

This input currently supports manifest-based, MOF (classic) and TraceLogging
providers while WPP providers are not supported.
https://learn.microsoft.com/en-us/windows/win32/etw/about-event-tracing#types-of-providers[Here]
you can find more information about the available types of providers.

It has been tested in the Windows versions supported by {beatname_uc}, starting
from Windows 10 and Windows Server 2016. In addition, administrative privileges
are required to control event tracing sessions.

Example configurations:

Expand All @@ -35,7 +51,7 @@ Read from a provider by name:
match_all_keyword: 0
----

Same provider can be defined by its GUID:
Read from a provider by its GUID:
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
Expand All @@ -49,7 +65,7 @@ Same provider can be defined by its GUID:
match_all_keyword: 0
----

Read from a current session:
Read from an existing session:
["source","yaml",subs="attributes"]
----
{beatname_lc}.inputs:
Expand All @@ -69,7 +85,9 @@ Read from a .etl file:
file: "C\Windows\System32\Winevt\Logs\Logfile.etl"
----

NOTE: Examples shown above are mutually exclusive, since the options `provider.name`, `provider.guid`, `session` and `file` cannot be present at the same time. Nevertheless, it is a requirement that one of them appears.
NOTE: Examples shown above are mutually exclusive, the options
`provider.name`, `provider.guid`, `session` and `file` cannot be present at the
same time. Nevertheless, it is a requirement that one of them is present.

Multiple providers example:
["source","yaml",subs="attributes"]
Expand All @@ -95,50 +113,90 @@ Multiple providers example:

==== Configuration options

The `ETW` input supports the following configuration options.
The `etw` input supports the following configuration options plus the
<<{beatname_lc}-input-{type}-common-options>> described later.

[float]
==== `file`

Specifies the path to an .etl file for reading ETW events. This file format is commonly used for storing ETW event logs.
Specifies the path to an .etl file for reading ETW events. This file format is
commonly used for storing ETW event logs.

[float]
==== `provider.guid`

Identifies the GUID of an ETW provider. To see available providers, use the command `logman query providers`.
Identifies the GUID of an ETW provider. To see available providers, use the
command `logman query providers`.

[float]
==== `provider.name`

Specifies the name of the ETW provider. Available providers can be listed using `logman query providers`.
Specifies the name of the ETW provider. Available providers can be listed using
`logman query providers`.

[float]
==== `session_name`

When specified a provider, a new session is created. It sets the name for a new ETW session associated with the provider. If not provided, the default is the provider ID prefixed with 'Elastic-'.
When specifying a provider, a new session is created. This controls the name for
the new ETW session it will create. If not specified, the session will be named
using the provider ID prefixed by 'Elastic-'.

[float]
==== `trace_level`

Defines the filtering level for events based on severity. Valid options include critical, error, warning, informational, and verbose.
Defines the filtering level for events based on severity. Valid options include
critical, error, warning, informational, and verbose.

[float]
==== `match_any_keyword`

An 8-byte bitmask used for filtering events from specific provider subcomponents based on keyword matching. Any matching keyword will enable the event to be written. Default value is `0xfffffffffffffffff` so it matches every available keyword.
An 8-byte bitmask used for filtering events from specific provider subcomponents
based on keyword matching. Any matching keyword will enable the event to be
written. Default value is `0xfffffffffffffffff` so it matches every available
keyword.

Run `logman query providers "<provider.name>"` to list the available keywords for a specific provider.
Run `logman query providers "<provider.name>"` to list the available keywords
for a specific provider.

[float]
==== `match_all_keyword`

Similar to MatchAnyKeyword, this 8-byte bitmask filters events that match all specified keyword bits. Default value is `0` to let every event pass.
Similar to MatchAnyKeyword, this 8-byte bitmask filters events that match all
specified keyword bits. Default value is `0` to let every event pass.

Run `logman query providers "<provider.name>"` to list the available keywords for a specific provider.
Run `logman query providers "<provider.name>"` to list the available keywords
for a specific provider.

[float]
==== `session`

Names an existing ETW session to read from. Existing sessions can be listed using `logman query -ets`.
Names an existing ETW session to read from. Existing sessions can be listed
using `logman query -ets`.

[id="{beatname_lc}-input-{type}-common-options"]
include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[]

[float]
=== Metrics

This input exposes metrics under the <<http-endpoint, HTTP monitoring endpoint>>.
These metrics are exposed under the `/inputs/` path. They can be used to
observe the activity of the input.

You must assign a unique `id` to the input to expose metrics.

[options="header"]
|=======
| Metric | Description
| `session` | Name of the ETW session.
| `received_events_total` | Total number of events received.
| `discarded_events_total` | Total number of discarded events.
| `errors_total` | Total number of errors.
| `source_lag_time` | Histogram of the difference between timestamped event's creation and reading.
| `arrival_period` | Histogram of the elapsed time between event notification callbacks.
| `processing_time` | Histogram of the elapsed time between event notification callback and publication to the internal queue.
|=======

Histogram metrics are aggregated over the previous 1024 events.

:type!:
65 changes: 64 additions & 1 deletion x-pack/filebeat/input/etw/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@ import (
stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/feature"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/beats/v7/x-pack/libbeat/reader/etw"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"

"github.com/rcrowley/go-metrics"
"golang.org/x/sync/errgroup"
"golang.org/x/sys/windows"
)
Expand Down Expand Up @@ -65,6 +69,7 @@ func (op *realSessionOperator) stopSession(session *etw.Session) error {
// etwInput struct holds the configuration and state for the ETW input
type etwInput struct {
log *logp.Logger
metrics *inputMetrics
config config
etwSession *etw.Session
publisher stateless.Publisher
Expand Down Expand Up @@ -109,6 +114,8 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
}
e.etwSession.Callback = e.consumeEvent
e.publisher = publisher
e.metrics = newInputMetrics(e.etwSession.Name, ctx.ID)
defer e.metrics.unregister()

// Set up logger with session information
e.log = ctx.Logger.With("session", e.etwSession.Name)
Expand Down Expand Up @@ -149,6 +156,7 @@ func (e *etwInput) Run(ctx input.Context, publisher stateless.Publisher) error {
e.log.Debug("starting ETW consumer")
defer e.log.Debug("stopped ETW consumer")
if err = e.operator.startConsumer(e.etwSession); err != nil {
e.metrics.errors.Inc()
return fmt.Errorf("failed running ETW consumer: %w", err)
}
return nil
Expand Down Expand Up @@ -239,28 +247,83 @@ func convertFileTimeToGoTime(fileTime64 uint64) time.Time {
func (e *etwInput) consumeEvent(record *etw.EventRecord) uintptr {
if record == nil {
e.log.Error("received null event record")
e.metrics.errors.Inc()
return 1
}

e.log.Debugf("received event with ID %d and user-data length %d", record.EventHeader.EventDescriptor.Id, record.UserDataLength)
start := time.Now()
defer func() {
elapsed := time.Since(start)
e.metrics.processingTime.Update(elapsed.Nanoseconds())
}()

data, err := etw.GetEventProperties(record)
if err != nil {
e.log.Errorw("failed to read event properties", "error", err)
e.metrics.errors.Inc()
e.metrics.dropped.Inc()
return 1
}

evt := buildEvent(data, record.EventHeader, e.etwSession, e.config)
e.publisher.Publish(evt)

e.metrics.events.Inc()
e.metrics.sourceLag.Update(start.Sub(evt.Timestamp).Nanoseconds())
if !e.metrics.lastCallback.IsZero() {
e.metrics.arrivalPeriod.Update(start.Sub(e.metrics.lastCallback).Nanoseconds())
}
e.metrics.lastCallback = start

return 0
}

// Close stops the ETW session and logs the outcome.
func (e *etwInput) Close() {
if err := e.operator.stopSession(e.etwSession); err != nil {
e.log.Error("failed to shutdown ETW session")
e.metrics.errors.Inc()
return
}
e.log.Info("successfully shutdown")
}

// inputMetrics handles event log metric reporting.
type inputMetrics struct {
unregister func()

lastCallback time.Time

name *monitoring.String // name of the etw session being read
events *monitoring.Uint // total number of events received
dropped *monitoring.Uint // total number of discarded events
errors *monitoring.Uint // total number of errors
sourceLag metrics.Sample // histogram of the difference between timestamped event's creation and reading
arrivalPeriod metrics.Sample // histogram of the elapsed time between callbacks.
processingTime metrics.Sample // histogram of the elapsed time between event callback receipt and publication.
}

// newInputMetrics returns an input metric for windows ETW.
// If id is empty, a nil inputMetric is returned.
func newInputMetrics(session, id string) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, nil)
out := &inputMetrics{
unregister: unreg,
name: monitoring.NewString(reg, "session"),
events: monitoring.NewUint(reg, "received_events_total"),
dropped: monitoring.NewUint(reg, "discarded_events_total"),
errors: monitoring.NewUint(reg, "errors_total"),
sourceLag: metrics.NewUniformSample(1024),
arrivalPeriod: metrics.NewUniformSample(1024),
processingTime: metrics.NewUniformSample(1024),
}
out.name.Set(session)
_ = adapter.NewGoMetrics(reg, "source_lag_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.sourceLag))
_ = adapter.NewGoMetrics(reg, "arrival_period", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.arrivalPeriod))
_ = adapter.NewGoMetrics(reg, "processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.processingTime))

return out
}
5 changes: 5 additions & 0 deletions x-pack/filebeat/input/etw/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func Test_RunEtwInput_NewSessionError(t *testing.T) {
MatchAllKeyword: 0,
},
operator: mockOperator,
metrics: newInputMetrics("", ""),
}

// Run test
Expand Down Expand Up @@ -131,6 +132,7 @@ func Test_RunEtwInput_AttachToExistingSessionError(t *testing.T) {
MatchAllKeyword: 0,
},
operator: mockOperator,
metrics: newInputMetrics("", ""),
}

// Run test
Expand Down Expand Up @@ -175,6 +177,7 @@ func Test_RunEtwInput_CreateRealtimeSessionError(t *testing.T) {
MatchAllKeyword: 0,
},
operator: mockOperator,
metrics: newInputMetrics("", ""),
}

// Run test
Expand Down Expand Up @@ -231,6 +234,7 @@ func Test_RunEtwInput_StartConsumerError(t *testing.T) {
MatchAllKeyword: 0,
},
operator: mockOperator,
metrics: newInputMetrics("", ""),
}

// Run test
Expand Down Expand Up @@ -287,6 +291,7 @@ func Test_RunEtwInput_Success(t *testing.T) {
MatchAllKeyword: 0,
},
operator: mockOperator,
metrics: newInputMetrics("", ""),
}

// Run test
Expand Down

0 comments on commit ae312c5

Please sign in to comment.