-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for active timeout #37834
Changes from all commits
899ddc4
a39cfcb
c9f8b7a
a50ca74
99a2470
eb311df
d250c28
48ce020
d97d5fe
2008a45
ff6c6c2
7b17b39
61bd5ce
ee09f2f
aaabb56
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,23 @@ type worker struct { | |
run func(*worker) | ||
} | ||
|
||
type flowEndReason int | ||
|
||
const ( | ||
// Flow is still active. | ||
FlowActive flowEndReason = iota | ||
// The Flow was terminated because it was considered to be idle. | ||
IdleTimeout | ||
// The Flow was terminated for reporting purposes while it was still active. | ||
ActiveTimeout | ||
// The Flow was terminated because the Metering Process detected signals indicating the end of the Flow, for example, the TCP FIN flag. | ||
EndOfFlowDetected | ||
) | ||
|
||
func (f flowEndReason) String() string { | ||
return [...]string{"FlowActive", "IdleTimeout", "ActiveTimeout", "EndOfFlowDetected"}[f] | ||
} | ||
|
||
// newWorker returns a handle to a worker to run fn. | ||
func newWorker(fn func(w *worker)) *worker { | ||
return &worker{ | ||
|
@@ -127,7 +144,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) { | |
// reporting will be done at flow lifetime end. | ||
// Flows are published via the pub Reporter after being enriched with process information | ||
// by watcher. | ||
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) { | ||
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period, activeTimeout time.Duration) (*worker, error) { | ||
if timeout < time.Second { | ||
return nil, ErrInvalidTimeout | ||
} | ||
|
@@ -136,11 +153,59 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe | |
return nil, ErrInvalidPeriod | ||
} | ||
|
||
tick, ticksTimeout, ticksPeriod, ticksActiveTimeout := getTicksAndTimeouts(timeout, period, activeTimeout) | ||
|
||
debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v, activeTimeout=%v, ticksActiveTO=%v", | ||
timeout, period, tick, ticksTimeout, ticksPeriod, activeTimeout, ticksActiveTimeout) | ||
|
||
defaultBatchSize := 1024 | ||
processor := &flowsProcessor{ | ||
table: table, | ||
watcher: watcher, | ||
counters: counters, | ||
timeout: timeout, | ||
} | ||
processor.spool.init(pub, defaultBatchSize) | ||
|
||
return makeWorker(processor, tick, ticksTimeout, ticksPeriod, ticksActiveTimeout, 10) | ||
} | ||
|
||
func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Duration, int, int, int) { | ||
tick := timeout | ||
ticksTimeout := 1 | ||
ticksActiveTimeout := -1 | ||
ticksPeriod := -1 | ||
|
||
// If ActiveTimeout is set, we need to calculate the tick for the worker | ||
// The tick will be gcd of timeout and activeTimeout | ||
// example timeout is 30 and activeTimeout is 60, then tick will be 30 | ||
// so the worker is going to try to run process every 30seconds | ||
// ticksTimeout will be 1 and ticksActiveTimeout will be 2 | ||
// so we will checkTimeout at every tick and checkActiveTimeout at every 2 ticks | ||
// TODO: I think these two if conditions can maybe be represented in a better way | ||
if activeTimeout > 0 { | ||
kvalliyurnatt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
tick = gcd(timeout, activeTimeout) | ||
if tick < time.Second { | ||
tick = time.Second | ||
} | ||
|
||
ticksTimeout = int(timeout / tick) | ||
if ticksTimeout == 0 { | ||
ticksTimeout = 1 | ||
} | ||
|
||
ticksActiveTimeout = int(activeTimeout / tick) | ||
if ticksActiveTimeout == 0 { | ||
ticksActiveTimeout = 1 | ||
} | ||
} | ||
|
||
// If period is set, we need to calculate the tick for the worker based on the period as well | ||
// If period is 10, timeout is 30 and ative timeout is 60, then tick will be 10 (gcd of all 3) | ||
// ticksTimeout will be 3, ticksPeriod will be 1 and ticksActiveTimeout will be 6 | ||
// So we will report flow at every tick, check for timeout every 3 ticks and check for active timeout every 6 ticks | ||
if period > 0 { | ||
tick = gcd(timeout, period) | ||
tick = gcd(tick, period) | ||
if tick < time.Second { | ||
tick = time.Second | ||
} | ||
|
@@ -154,21 +219,17 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe | |
if ticksPeriod == 0 { | ||
ticksPeriod = 1 | ||
} | ||
} | ||
|
||
debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v", | ||
timeout, period, tick, ticksTimeout, ticksPeriod) | ||
|
||
defaultBatchSize := 1024 | ||
processor := &flowsProcessor{ | ||
table: table, | ||
watcher: watcher, | ||
counters: counters, | ||
timeout: timeout, | ||
// If activeTImeout is set, we need to calculate the tick for the worker based on the activeTimeout as well | ||
if activeTimeout > 0 { | ||
kvalliyurnatt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
ticksActiveTimeout = int(activeTimeout / tick) | ||
if ticksActiveTimeout == 0 { | ||
ticksActiveTimeout = 1 | ||
} | ||
} | ||
} | ||
processor.spool.init(pub, defaultBatchSize) | ||
|
||
return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10) | ||
return tick, ticksTimeout, ticksPeriod, ticksActiveTimeout | ||
} | ||
|
||
// gcd returns the greatest common divisor of a and b. | ||
|
@@ -182,9 +243,9 @@ func gcd(a, b time.Duration) time.Duration { | |
// makeWorker returns a worker that runs processor.execute each tick. Each timeout'th tick, | ||
// the worker will check flow timeouts and each period'th tick, the worker will report flow | ||
// events to be published. | ||
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64) (*worker, error) { | ||
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period, activeTimeout int, align int64) (*worker, error) { | ||
return newWorker(func(w *worker) { | ||
defer processor.execute(w, false, true, true) | ||
defer processor.execute(w, false, true, true, false) | ||
|
||
if align > 0 { | ||
// Wait until the current time rounded up to nearest align seconds. | ||
|
@@ -198,12 +259,15 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i | |
|
||
nTimeout := timeout | ||
nPeriod := period | ||
nActiveTimeout := activeTimeout | ||
reportPeriodically := period > 0 | ||
enableActiveFlowTimeout := activeTimeout > 0 | ||
debugf("start flows worker loop") | ||
w.periodically(tick, func() error { | ||
nTimeout-- | ||
nPeriod-- | ||
debugf("worker tick, nTimeout=%v, nPeriod=%v", nTimeout, nPeriod) | ||
nActiveTimeout-- | ||
debugf("worker tick, nTimeout=%v, nPeriod=%v, nActiveTimeout=%v", nTimeout, nPeriod, nActiveTimeout) | ||
|
||
handleTimeout := nTimeout == 0 | ||
if handleTimeout { | ||
|
@@ -213,8 +277,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i | |
if nPeriod <= 0 { | ||
nPeriod = period | ||
} | ||
handleActiveTimeout := enableActiveFlowTimeout && nActiveTimeout == 0 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tick will be gcd of period, timeout and activetimeout At each tick we decrement these values and then when any of these values reach 0 we either report flow, check for timeout or check for active timeout. Now if say activetimeout is not set, or period is set to -1 then tick = timeout we will never decrement nperiod or nActivetimeout during the ticks, they will never reach 0 so we will never check for period or active timeout. I added unit tests around this. Maybe that helps a little bit |
||
if nActiveTimeout <= 0 { | ||
nActiveTimeout = activeTimeout | ||
} | ||
|
||
processor.execute(w, handleTimeout, handleReports, false) | ||
processor.execute(w, handleTimeout, handleReports, false, handleActiveTimeout) | ||
return nil | ||
}) | ||
}), nil | ||
|
@@ -228,12 +296,12 @@ type flowsProcessor struct { | |
timeout time.Duration | ||
} | ||
|
||
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) { | ||
if !checkTimeout && !handleReports { | ||
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, handleActiveTimeout bool) { | ||
if !checkTimeout && !handleReports && !handleActiveTimeout { | ||
return | ||
} | ||
|
||
debugf("exec tick, timeout=%v, report=%v", checkTimeout, handleReports) | ||
debugf("exec tick, timeout=%v, report=%v, activeTimeout=%v", checkTimeout, handleReports, handleActiveTimeout) | ||
|
||
// get counter names snapshot if reports must be generated | ||
fw.counters.mutex.Lock() | ||
|
@@ -254,40 +322,49 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe | |
var next *biFlow | ||
for flow := table.flows.head; flow != nil; flow = next { | ||
next = flow.next | ||
endReason := FlowActive | ||
|
||
debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID) | ||
|
||
reportFlow := handleReports | ||
isOver := lastReport | ||
if checkTimeout { | ||
if ts.Sub(flow.ts) > fw.timeout { | ||
debugf("kill flow") | ||
|
||
reportFlow = true | ||
flow.kill() // mark flow as killed | ||
isOver = true | ||
table.remove(flow) | ||
idleTimeout := checkTimeout && (ts.Sub(flow.ts) > fw.timeout) | ||
if handleActiveTimeout || idleTimeout { | ||
if handleActiveTimeout { | ||
endReason = ActiveTimeout | ||
} else { | ||
endReason = IdleTimeout | ||
} | ||
// End flow irrespective of when the last traffic was seen on the flow | ||
debugf("kill flow") | ||
|
||
reportFlow = true | ||
flow.kill() // mark flow as killed | ||
isOver = true | ||
table.remove(flow) | ||
} else if lastReport { | ||
// End of flow was detected | ||
endReason = EndOfFlowDetected | ||
} | ||
|
||
if reportFlow { | ||
debugf("report flow") | ||
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames) | ||
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames, endReason) | ||
} | ||
} | ||
} | ||
|
||
fw.spool.flush() | ||
} | ||
|
||
func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) { | ||
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) | ||
func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) { | ||
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, endReason) | ||
|
||
debugf("add event: %v", event) | ||
fw.spool.publish(event) | ||
} | ||
|
||
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event { | ||
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) beat.Event { | ||
timestamp := ts | ||
|
||
event := mapstr.M{ | ||
|
@@ -309,6 +386,9 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve | |
"id": common.NetString(f.id.Serialize()), | ||
"final": isOver, | ||
} | ||
if endReason != FlowActive { | ||
flow["kill_reason"] = endReason.String() | ||
} | ||
fields := mapstr.M{ | ||
"event": event, | ||
"flow": flow, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we implement the logic to add a reason to events when the end of flow is detected (i.e. TCP FIN)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a reason, but not a 100% sure if this is what you had in mind. Please take a look and let me know. Thanks