Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for active timeout #37834

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,10 @@ type InterfaceConfig struct {
}

type Flows struct {
Enabled *bool `config:"enabled"`
Timeout string `config:"timeout"`
Enabled *bool `config:"enabled"`
Timeout string `config:"timeout"`
// Active Timeout kills flow after set time out period even if there traffic on the flow
ActiveTimeout string `config:"active_timeout"`
Period string `config:"period"`
EventMetadata mapstr.EventMetadata `config:",inline"`
Processors processors.PluginConfig `config:"processors"`
Expand Down
5 changes: 5 additions & 0 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ in time. Periodical reporting can be disabled by setting the value to -1. If
disabled, flows are still reported once being timed out. The default value is
10s.

[float]
==== `active_timeout`

ActiveTimeout configures the lifetime of a flow. When we hit the active timeout seconds after the flow was created, the flow is killed and reported. The default value is -1. If set to -1, the active timeout is disabled.

[float]
[[packetbeat-configuration-flows-fields]]
==== `fields`
Expand Down
10 changes: 9 additions & 1 deletion packetbeat/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var debugf = logp.MakeDebug("flows")
const (
defaultTimeout = 30 * time.Second
defaultPeriod = 10 * time.Second
// By default we don't set any active timeout
defaultActiveTimeout = 0 * time.Second
)

// Flows holds and publishes network flow information for running processes.
Expand All @@ -59,6 +61,12 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow
return nil, err
}

activeTimeout, err := duration(config.ActiveTimeout, defaultActiveTimeout)
if err != nil {
logp.Err("failed to parse active flow timeout: %v", err)
return nil, err
}

period, err := duration(config.Period, defaultPeriod)
if err != nil {
logp.Err("failed to parse period: %v", err)
Expand All @@ -71,7 +79,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow

counter := &counterReg{}

worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period)
worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, activeTimeout)
if err != nil {
logp.Err("failed to configure flows processing intervals: %v", err)
return nil, err
Expand Down
1 change: 1 addition & 0 deletions packetbeat/flows/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestFlowsCounting(t *testing.T) {
10*time.Millisecond,
1,
-1,
2,
0)
if err != nil {
t.Fatalf("Failed to create flow worker: %v", err)
Expand Down
146 changes: 113 additions & 33 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ type worker struct {
run func(*worker)
}

type flowEndReason int

const (
// Flow is still active.
FlowActive flowEndReason = iota
// The Flow was terminated because it was considered to be idle.
IdleTimeout
// The Flow was terminated for reporting purposes while it was still active.
ActiveTimeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we implement the logic to add a reason to events when the end of flow is detected (i.e. TCP FIN)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a reason, but not a 100% sure if this is what you had in mind. Please take a look and let me know. Thanks

// The Flow was terminated because the Metering Process detected signals indicating the end of the Flow, for example, the TCP FIN flag.
EndOfFlowDetected
)

func (f flowEndReason) String() string {
return [...]string{"FlowActive", "IdleTimeout", "ActiveTimeout", "EndOfFlowDetected"}[f]
}

// newWorker returns a handle to a worker to run fn.
func newWorker(fn func(w *worker)) *worker {
return &worker{
Expand Down Expand Up @@ -127,7 +144,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) {
// reporting will be done at flow lifetime end.
// Flows are published via the pub Reporter after being enriched with process information
// by watcher.
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) {
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period, activeTimeout time.Duration) (*worker, error) {
if timeout < time.Second {
return nil, ErrInvalidTimeout
}
Expand All @@ -136,11 +153,59 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe
return nil, ErrInvalidPeriod
}

tick, ticksTimeout, ticksPeriod, ticksActiveTimeout := getTicksAndTimeouts(timeout, period, activeTimeout)

debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v, activeTimeout=%v, ticksActiveTO=%v",
timeout, period, tick, ticksTimeout, ticksPeriod, activeTimeout, ticksActiveTimeout)

defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
}
processor.spool.init(pub, defaultBatchSize)

return makeWorker(processor, tick, ticksTimeout, ticksPeriod, ticksActiveTimeout, 10)
}

func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Duration, int, int, int) {
tick := timeout
ticksTimeout := 1
ticksActiveTimeout := -1
ticksPeriod := -1

// If ActiveTimeout is set, we need to calculate the tick for the worker
// The tick will be gcd of timeout and activeTimeout
// example timeout is 30 and activeTimeout is 60, then tick will be 30
// so the worker is going to try to run process every 30seconds
// ticksTimeout will be 1 and ticksActiveTimeout will be 2
// so we will checkTimeout at every tick and checkActiveTimeout at every 2 ticks
// TODO: I think these two if conditions can maybe be represented in a better way
if activeTimeout > 0 {
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
tick = gcd(timeout, activeTimeout)
if tick < time.Second {
tick = time.Second
}

ticksTimeout = int(timeout / tick)
if ticksTimeout == 0 {
ticksTimeout = 1
}

ticksActiveTimeout = int(activeTimeout / tick)
if ticksActiveTimeout == 0 {
ticksActiveTimeout = 1
}
}

// If period is set, we need to calculate the tick for the worker based on the period as well
// If period is 10, timeout is 30 and ative timeout is 60, then tick will be 10 (gcd of all 3)
// ticksTimeout will be 3, ticksPeriod will be 1 and ticksActiveTimeout will be 6
// So we will report flow at every tick, check for timeout every 3 ticks and check for active timeout every 6 ticks
if period > 0 {
tick = gcd(timeout, period)
tick = gcd(tick, period)
if tick < time.Second {
tick = time.Second
}
Expand All @@ -154,21 +219,17 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe
if ticksPeriod == 0 {
ticksPeriod = 1
}
}

debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v",
timeout, period, tick, ticksTimeout, ticksPeriod)

defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
// If activeTImeout is set, we need to calculate the tick for the worker based on the activeTimeout as well
if activeTimeout > 0 {
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
ticksActiveTimeout = int(activeTimeout / tick)
if ticksActiveTimeout == 0 {
ticksActiveTimeout = 1
}
}
}
processor.spool.init(pub, defaultBatchSize)

return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10)
return tick, ticksTimeout, ticksPeriod, ticksActiveTimeout
}

// gcd returns the greatest common divisor of a and b.
Expand All @@ -182,9 +243,9 @@ func gcd(a, b time.Duration) time.Duration {
// makeWorker returns a worker that runs processor.execute each tick. Each timeout'th tick,
// the worker will check flow timeouts and each period'th tick, the worker will report flow
// events to be published.
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64) (*worker, error) {
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period, activeTimeout int, align int64) (*worker, error) {
return newWorker(func(w *worker) {
defer processor.execute(w, false, true, true)
defer processor.execute(w, false, true, true, false)

if align > 0 {
// Wait until the current time rounded up to nearest align seconds.
Expand All @@ -198,12 +259,15 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i

nTimeout := timeout
nPeriod := period
nActiveTimeout := activeTimeout
reportPeriodically := period > 0
enableActiveFlowTimeout := activeTimeout > 0
debugf("start flows worker loop")
w.periodically(tick, func() error {
nTimeout--
nPeriod--
debugf("worker tick, nTimeout=%v, nPeriod=%v", nTimeout, nPeriod)
nActiveTimeout--
debugf("worker tick, nTimeout=%v, nPeriod=%v, nActiveTimeout=%v", nTimeout, nPeriod, nActiveTimeout)

handleTimeout := nTimeout == 0
if handleTimeout {
Expand All @@ -213,8 +277,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i
if nPeriod <= 0 {
nPeriod = period
}
handleActiveTimeout := enableActiveFlowTimeout && nActiveTimeout == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is nActiveTimeout and why might it be 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tick will be gcd of period, timeout and activetimeout
so if period = 10, timeout = 30 and activetimeout = 60
tick = 10
Initial nPeriod = 1
Initial ntimeout = 3
Initial nActiveTimeout = 6

At each tick we decrement these values and then when any of these values reach 0 we either report flow, check for timeout or check for active timeout.

Now if say activetimeout is not set, or period is set to -1

then tick = timeout
nperiod = -1
nActivetimeout = -1

we will never decrement nperiod or nActivetimeout during the ticks, they will never reach 0 so we will never check for period or active timeout.

I added unit tests around this. Maybe that helps a little bit

if nActiveTimeout <= 0 {
nActiveTimeout = activeTimeout
}

processor.execute(w, handleTimeout, handleReports, false)
processor.execute(w, handleTimeout, handleReports, false, handleActiveTimeout)
return nil
})
}), nil
Expand All @@ -228,12 +296,12 @@ type flowsProcessor struct {
timeout time.Duration
}

func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) {
if !checkTimeout && !handleReports {
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, handleActiveTimeout bool) {
if !checkTimeout && !handleReports && !handleActiveTimeout {
return
}

debugf("exec tick, timeout=%v, report=%v", checkTimeout, handleReports)
debugf("exec tick, timeout=%v, report=%v, activeTimeout=%v", checkTimeout, handleReports, handleActiveTimeout)

// get counter names snapshot if reports must be generated
fw.counters.mutex.Lock()
Expand All @@ -254,40 +322,49 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
var next *biFlow
for flow := table.flows.head; flow != nil; flow = next {
next = flow.next
endReason := FlowActive

debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID)

reportFlow := handleReports
isOver := lastReport
if checkTimeout {
if ts.Sub(flow.ts) > fw.timeout {
debugf("kill flow")

reportFlow = true
flow.kill() // mark flow as killed
isOver = true
table.remove(flow)
idleTimeout := checkTimeout && (ts.Sub(flow.ts) > fw.timeout)
if handleActiveTimeout || idleTimeout {
if handleActiveTimeout {
endReason = ActiveTimeout
} else {
endReason = IdleTimeout
}
// End flow irrespective of when the last traffic was seen on the flow
debugf("kill flow")

reportFlow = true
flow.kill() // mark flow as killed
isOver = true
table.remove(flow)
} else if lastReport {
// End of flow was detected
endReason = EndOfFlowDetected
}

if reportFlow {
debugf("report flow")
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames)
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames, endReason)
}
}
}

fw.spool.flush()
}

func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames)
func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, endReason)

debugf("add event: %v", event)
fw.spool.publish(event)
}

func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event {
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) beat.Event {
timestamp := ts

event := mapstr.M{
Expand All @@ -309,6 +386,9 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve
"id": common.NetString(f.id.Serialize()),
"final": isOver,
}
if endReason != FlowActive {
flow["kill_reason"] = endReason.String()
}
fields := mapstr.M{
"event": event,
"flow": flow,
Expand Down
Loading
Loading