Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for active timeout #37834

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ type Flows struct {
KeepNull bool `config:"keep_null"`
// Index is used to overwrite the index where flows are published
Index string `config:"index"`
// Enabling Active Flow Timeout will kill flow once the TImeout is reached irrespective of when traffic was last seen on the flow
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
EnableActiveFlowTimeout bool `config:"enable_active_flow_timeout"`
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
}

type ProtocolCommon struct {
Expand Down
8 changes: 7 additions & 1 deletion packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ value is true.
==== `timeout`

Timeout configures the lifetime of a flow. If no packets have been received for
a flow within the timeout time window, the flow is killed and reported. The
a flow within the timeout time window or if `enable_active_flow_timeout` , the flow is killed and reported. The
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
default value is 30s.

[float]
Expand All @@ -461,6 +461,12 @@ in time. Periodical reporting can be disabled by setting the value to -1. If
disabled, flows are still reported once being timed out. The default value is
10s.

[float]
==== `enable_active_flow_timeout`
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved

When `enable_active_flow_timeout` is set to true, flow is killed and reported every time the timeout period is hit. For eg. if the `timeout` is set
to 30s, the flow will be killed and reported every 30s.

[float]
[[packetbeat-configuration-flows-fields]]
==== `fields`
Expand Down
2 changes: 1 addition & 1 deletion packetbeat/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow

counter := &counterReg{}

worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period)
worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, config.EnableActiveFlowTimeout)
if err != nil {
logp.Err("failed to configure flows processing intervals: %v", err)
return nil, err
Expand Down
3 changes: 2 additions & 1 deletion packetbeat/flows/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func TestFlowsCounting(t *testing.T) {
10*time.Millisecond,
1,
-1,
0)
0,
false)
if err != nil {
t.Fatalf("Failed to create flow worker: %v", err)
}
Expand Down
63 changes: 52 additions & 11 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,21 @@ type worker struct {
run func(*worker)
}

type flowKillReason int
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved

const (
// Flow was not killed
NoKill flowKillReason = iota
// The Flow was terminated because it was considered to be idle.
IdleTimeout
// The Flow was terminated for reporting purposes while it was still active.
ActiveTimeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we implement the logic to add a reason to events when the end of flow is detected (i.e. TCP FIN)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a reason, but not a 100% sure if this is what you had in mind. Please take a look and let me know. Thanks

)

func (f flowKillReason) String() string {
return [...]string{"NoKill", "IdleTimeout", "ActiveTimeout"}[f]
}

// newWorker returns a handle to a worker to run fn.
func newWorker(fn func(w *worker)) *worker {
return &worker{
Expand Down Expand Up @@ -127,7 +142,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) {
// reporting will be done at flow lifetime end.
// Flows are published via the pub Reporter after being enriched with process information
// by watcher.
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) {
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableActiveFlowTimeout bool) (*worker, error) {
if timeout < time.Second {
return nil, ErrInvalidTimeout
}
Expand Down Expand Up @@ -168,7 +183,7 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe
}
processor.spool.init(pub, defaultBatchSize)

return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10)
return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10, enableActiveFlowTimeout)
}

// gcd returns the greatest common divisor of a and b.
Expand All @@ -182,9 +197,9 @@ func gcd(a, b time.Duration) time.Duration {
// makeWorker returns a worker that runs processor.execute each tick. Each timeout'th tick,
// the worker will check flow timeouts and each period'th tick, the worker will report flow
// events to be published.
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64) (*worker, error) {
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64, enableActiveFlowTimeout bool) (*worker, error) {
return newWorker(func(w *worker) {
defer processor.execute(w, false, true, true)
defer processor.execute(w, false, true, true, false)

if align > 0 {
// Wait until the current time rounded up to nearest align seconds.
Expand Down Expand Up @@ -214,7 +229,7 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i
nPeriod = period
}

processor.execute(w, handleTimeout, handleReports, false)
processor.execute(w, handleTimeout, handleReports, false, enableActiveFlowTimeout)
return nil
})
}), nil
Expand All @@ -228,7 +243,7 @@ type flowsProcessor struct {
timeout time.Duration
}

func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) {
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, enableActiveFlowTimeout bool) {
if !checkTimeout && !handleReports {
return
}
Expand All @@ -254,13 +269,16 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
var next *biFlow
for flow := table.flows.head; flow != nil; flow = next {
next = flow.next
killReason := NoKill
var killFlow bool

debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID)

reportFlow := handleReports
isOver := lastReport
if checkTimeout {
if ts.Sub(flow.ts) > fw.timeout {
killReason, killFlow = shouldKillFlow(flow, fw, ts, enableActiveFlowTimeout)
if killFlow {
debugf("kill flow")

reportFlow = true
Expand All @@ -272,22 +290,42 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe

if reportFlow {
debugf("report flow")
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames)
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames, killReason)
}
}
}

fw.spool.flush()
}

func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames)
func shouldKillFlow(flow *biFlow, fw *flowsProcessor, ts time.Time, activeFlowTimeout bool) (flowKillReason, bool) {
if ts.Sub(flow.ts) > fw.timeout {
debugf("Killing flow because no traffic was seen since %v, flowid: %s", flow.ts, common.NetString(flow.id.Serialize()))
return IdleTimeout, true
}

if !activeFlowTimeout {
// Return NoKill because we do not kill the flow in this case
return NoKill, false
}

// Kill flow only when the flow duration is atleast timeout seconds. This prevents having very small flows.
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
if ts.Sub(flow.createTS) >= fw.timeout {
debugf("Killing flow because active flow timeout is enabled, flowid: %s", common.NetString(flow.id.Serialize()))
return ActiveTimeout, true
}

return NoKill, false
}

func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string, killReason flowKillReason) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, killReason)

debugf("add event: %v", event)
fw.spool.publish(event)
}

func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event {
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, killReason flowKillReason) beat.Event {
timestamp := ts

event := mapstr.M{
Expand All @@ -309,6 +347,9 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve
"id": common.NetString(f.id.Serialize()),
"final": isOver,
}
if killReason != NoKill {
flow["kill_reason"] = killReason.String()
}
fields := mapstr.M{
"event": event,
"flow": flow,
Expand Down
121 changes: 116 additions & 5 deletions packetbeat/flows/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import (
"testing"
"time"

"github.com/elastic/go-lookslike/isdef"

"github.com/elastic/go-lookslike"
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/packetbeat/procs"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/go-lookslike"
"github.com/elastic/go-lookslike/isdef"
)

// Use `go test -data` to update sample event files.
Expand Down Expand Up @@ -65,7 +65,7 @@ func TestCreateEvent(t *testing.T) {
}
bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}}
bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}}
event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil)
event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, NoKill)

// Validate the contents of the event.
validate := lookslike.MustCompile(map[string]interface{}{
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestCreateEvent(t *testing.T) {

// Write the event to disk if -data is used.
if *dataFlag {
event.Fields.Put("@timestamp", common.Time(end)) //nolint:errcheck // Never fails.
event.Fields.Put("@timestamp", common.Time(end))
output, err := json.MarshalIndent(&event.Fields, "", " ")
if err != nil {
t.Fatal(err)
Expand All @@ -127,3 +127,114 @@ func TestCreateEvent(t *testing.T) {
}
}
}

func Test_shouldKillFlow(t *testing.T) {
logp.TestingSetup()

// Build biflow event.
vlan := uint16(171)
mac1 := []byte{1, 2, 3, 4, 5, 6}
mac2 := []byte{6, 5, 4, 3, 2, 1}
ip1 := []byte{203, 0, 113, 3}
ip2 := []byte{198, 51, 100, 2}
port1 := uint16(38901)
port2 := uint16(80)

id := newFlowID()
id.AddEth(mac1, mac2)
id.AddVLan(vlan)
id.AddIPv4(ip1, ip2)
id.AddTCP(port1, port2)
processor := &flowsProcessor{}

type args struct {
flow *biFlow
flowProcessorTimeout time.Duration
currentTime time.Time
activeTimeoutEnabled bool
}
tests := []struct {
kvalliyurnatt marked this conversation as resolved.
Show resolved Hide resolved
name string
args args
want flowKillReason
flowKilled bool
}{
{
name: "Test active timeout",
args: args{
flow: &biFlow{
id: id.rawFlowID,
killed: 1,
dir: flowDirForward,
createTS: time.Now(),
ts: time.Now().Add(40 * time.Second),
},
flowProcessorTimeout: 30 * time.Second,
currentTime: time.Now().Add(50 * time.Second),
activeTimeoutEnabled: true,
},
want: ActiveTimeout,
flowKilled: true,
},
{
name: "Test active timeout when flow duration is less than timeout",
args: args{
flow: &biFlow{
id: id.rawFlowID,
killed: 1,
dir: flowDirForward,
createTS: time.Now(),
ts: time.Now().Add(10 * time.Second),
},
flowProcessorTimeout: 30 * time.Second,
currentTime: time.Now().Add(20 * time.Second),
activeTimeoutEnabled: true,
},
want: NoKill,
flowKilled: false,
},
{
name: "Test idle timeout",
args: args{
flow: &biFlow{
id: id.rawFlowID,
killed: 1,
dir: flowDirForward,
createTS: time.Now(),
ts: time.Now(),
},
flowProcessorTimeout: 30 * time.Second,
currentTime: time.Now().Add(30 * time.Second),
activeTimeoutEnabled: false,
},
want: IdleTimeout,
flowKilled: true,
},
{
name: "Test flow should not be killed",
args: args{
flow: &biFlow{
id: id.rawFlowID,
killed: 1,
dir: flowDirForward,
createTS: time.Now(),
ts: time.Now().Add(40 * time.Second),
},
flowProcessorTimeout: 30 * time.Second,
currentTime: time.Now().Add(50 * time.Second),
activeTimeoutEnabled: false,
},
want: NoKill,
flowKilled: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
processor.timeout = tt.args.flowProcessorTimeout
killReason, killFlow := shouldKillFlow(tt.args.flow, processor, tt.args.currentTime, tt.args.activeTimeoutEnabled)

assert.Equal(t, tt.flowKilled, killFlow)
assert.Equal(t, tt.want, killReason)
})
}
}
Loading