Skip to content

Commit

Permalink
introduce active timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kvalliyurnatt committed Feb 23, 2024
1 parent ff6c6c2 commit 7b17b39
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 49 deletions.
8 changes: 4 additions & 4 deletions packetbeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,16 @@ type InterfaceConfig struct {
}

type Flows struct {
Enabled *bool `config:"enabled"`
Timeout string `config:"timeout"`
Enabled *bool `config:"enabled"`
Timeout string `config:"timeout"`
// Active Timeout kills flow after set time out period even if there traffic on the flow
ActiveTimeout string `config:"active_timeout"`
Period string `config:"period"`
EventMetadata mapstr.EventMetadata `config:",inline"`
Processors processors.PluginConfig `config:"processors"`
KeepNull bool `config:"keep_null"`
// Index is used to overwrite the index where flows are published
Index string `config:"index"`
// Enabling Active Flow Timeout will kill flow once the Timeout is reached irrespective of when traffic was last seen on the flow
EnableActiveFlowTimeout bool `config:"enable_active_flow_timeout"`
}

type ProtocolCommon struct {
Expand Down
10 changes: 9 additions & 1 deletion packetbeat/flows/flows.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var debugf = logp.MakeDebug("flows")
const (
defaultTimeout = 30 * time.Second
defaultPeriod = 10 * time.Second
// By default we don't set any active timeout
defaultActiveTimeout = 0 * time.Second
)

// Flows holds and publishes network flow information for running processes.
Expand All @@ -59,6 +61,12 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow
return nil, err
}

activeTimeout, err := duration(config.ActiveTimeout, defaultActiveTimeout)
if err != nil {
logp.Err("failed to parse active flow timeout: %v", err)
return nil, err
}

period, err := duration(config.Period, defaultPeriod)
if err != nil {
logp.Err("failed to parse period: %v", err)
Expand All @@ -71,7 +79,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow

counter := &counterReg{}

worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, config.EnableActiveFlowTimeout)
worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, activeTimeout)
if err != nil {
logp.Err("failed to configure flows processing intervals: %v", err)
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions packetbeat/flows/flows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ func TestFlowsCounting(t *testing.T) {
10*time.Millisecond,
1,
-1,
0,
false)
2,
0)
if err != nil {
t.Fatalf("Failed to create flow worker: %v", err)
}
Expand Down
115 changes: 78 additions & 37 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,19 @@ type worker struct {
run func(*worker)
}

type flowKillReason int
type flowEndReason int

const (
// Flow was not killed
NoKill flowKillReason = iota
// Flow is still active.
FlowActive flowEndReason = iota
// The Flow was terminated because it was considered to be idle.
IdleTimeout
// The Flow was terminated for reporting purposes while it was still active.
ActiveTimeout
)

func (f flowKillReason) String() string {
return [...]string{"NoKill", "IdleTimeout", "ActiveTimeout"}[f]
func (f flowEndReason) String() string {
return [...]string{"FlowActive", "IdleTimeout", "ActiveTimeout"}[f]
}

// newWorker returns a handle to a worker to run fn.
Expand Down Expand Up @@ -142,7 +142,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) {
// reporting will be done at flow lifetime end.
// Flows are published via the pub Reporter after being enriched with process information
// by watcher.
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableActiveFlowTimeout bool) (*worker, error) {
func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period, activeTimeout time.Duration) (*worker, error) {
if timeout < time.Second {
return nil, ErrInvalidTimeout
}
Expand All @@ -151,11 +151,50 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe
return nil, ErrInvalidPeriod
}

tick, ticksTimeout, ticksPeriod, ticksActiveTimeout := getTicksAndTimeouts(timeout, period, activeTimeout)

debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v, activeTimeout=%v, ticksActiveTO=%v",
timeout, period, tick, ticksTimeout, ticksPeriod, activeTimeout, ticksActiveTimeout)

defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
}
processor.spool.init(pub, defaultBatchSize)

return makeWorker(processor, tick, ticksTimeout, ticksPeriod, ticksActiveTimeout, 10)
}

func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Duration, int, int, int) {
tick := timeout
ticksTimeout := 1
ticksActiveTimeout := -1
ticksPeriod := -1

// If activeTimeout is set, we need to calculate the tick for the worker
// TODO: I think these two if conditions can maybe be represented in a better way
if activeTimeout > 0 {
tick = gcd(timeout, activeTimeout)
if tick < time.Second {
tick = time.Second
}

ticksTimeout = int(timeout / tick)
if ticksTimeout == 0 {
ticksTimeout = 1
}

ticksActiveTimeout = int(activeTimeout / tick)
if ticksActiveTimeout == 0 {
ticksActiveTimeout = 1
}
}

if period > 0 {
tick = gcd(timeout, period)
tick = gcd(tick, period)
if tick < time.Second {
tick = time.Second
}
Expand All @@ -169,21 +208,16 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe
if ticksPeriod == 0 {
ticksPeriod = 1
}
}

debugf("new flows worker. timeout=%v, period=%v, tick=%v, ticksTO=%v, ticksP=%v",
timeout, period, tick, ticksTimeout, ticksPeriod)

defaultBatchSize := 1024
processor := &flowsProcessor{
table: table,
watcher: watcher,
counters: counters,
timeout: timeout,
if activeTimeout > 0 {
ticksActiveTimeout = int(activeTimeout / tick)
if ticksActiveTimeout == 0 {
ticksActiveTimeout = 1
}
}
}
processor.spool.init(pub, defaultBatchSize)

return makeWorker(processor, tick, ticksTimeout, ticksPeriod, 10, enableActiveFlowTimeout)
return tick, ticksTimeout, ticksPeriod, ticksActiveTimeout
}

// gcd returns the greatest common divisor of a and b.
Expand All @@ -197,7 +231,7 @@ func gcd(a, b time.Duration) time.Duration {
// makeWorker returns a worker that runs processor.execute each tick. Each timeout'th tick,
// the worker will check flow timeouts and each period'th tick, the worker will report flow
// events to be published.
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period int, align int64, enableActiveFlowTimeout bool) (*worker, error) {
func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period, activeTimeout int, align int64) (*worker, error) {
return newWorker(func(w *worker) {
defer processor.execute(w, false, true, true, false)

Expand All @@ -213,12 +247,14 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i

nTimeout := timeout
nPeriod := period
nActiveTimeout := activeTimeout
reportPeriodically := period > 0
enableActiveFlowTimeout := activeTimeout > 0
debugf("start flows worker loop")
w.periodically(tick, func() error {
nTimeout--
nPeriod--
debugf("worker tick, nTimeout=%v, nPeriod=%v", nTimeout, nPeriod)
debugf("worker tick, nTimeout=%v, nPeriod=%v, nActiveTimeout=%v", nTimeout, nPeriod, nActiveTimeout)

handleTimeout := nTimeout == 0
if handleTimeout {
Expand All @@ -228,8 +264,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i
if nPeriod <= 0 {
nPeriod = period
}
handleActiveTimeout := enableActiveFlowTimeout && nActiveTimeout == 0
if nActiveTimeout <= 0 {
nActiveTimeout = activeTimeout
}

processor.execute(w, handleTimeout, handleReports, false, enableActiveFlowTimeout)
processor.execute(w, handleTimeout, handleReports, false, handleActiveTimeout)
return nil
})
}), nil
Expand All @@ -243,7 +283,7 @@ type flowsProcessor struct {
timeout time.Duration
}

func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, enableActiveFlowTimeout bool) {
func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, handleActiveTimeout bool) {
if !checkTimeout && !handleReports {
return
}
Expand All @@ -269,15 +309,15 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
var next *biFlow
for flow := table.flows.head; flow != nil; flow = next {
next = flow.next
killReason := NoKill
killReason := FlowActive
var killFlow bool

debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID)

reportFlow := handleReports
isOver := lastReport
if checkTimeout {
killReason, killFlow = shouldKillFlow(flow, fw, ts, enableActiveFlowTimeout)
killReason, killFlow = shouldEndFlow(flow, fw, ts, handleActiveTimeout)
if killFlow {
debugf("kill flow")

Expand All @@ -298,34 +338,35 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
fw.spool.flush()
}

func shouldKillFlow(flow *biFlow, fw *flowsProcessor, ts time.Time, activeFlowTimeout bool) (flowKillReason, bool) {
func shouldEndFlow(flow *biFlow, fw *flowsProcessor, ts time.Time, activeFlowTimeout bool) (flowEndReason, bool) {
if ts.Sub(flow.ts) > fw.timeout {
debugf("Killing flow because no traffic was seen since %v, flowid: %s", flow.ts, common.NetString(flow.id.Serialize()))
debugf("Ending flow because no traffic was seen since %v, flowid: %s", flow.ts, common.NetString(flow.id.Serialize()))
return IdleTimeout, true
}

if !activeFlowTimeout {
// Return NoKill because we do not kill the flow in this case
return NoKill, false
// Return FlowActive because we do not end the flow in this case
return FlowActive, false
}

// Kill flow only when the flow duration is at least timeout seconds. This prevents having very small flows.
// End flow only when the flow duration is at least timeout seconds. This prevents having very small flows.
// TDOO: Does this still apply ?
if ts.Sub(flow.createTS) >= fw.timeout {
debugf("Killing flow because active flow timeout is enabled, flowid: %s", common.NetString(flow.id.Serialize()))
debugf("Ending flow because active flow timeout is enabled, flowid: %s", common.NetString(flow.id.Serialize()))
return ActiveTimeout, true
}

return NoKill, false
return FlowActive, false
}

func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string, killReason flowKillReason) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, killReason)
func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) {
event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, endReason)

debugf("add event: %v", event)
fw.spool.publish(event)
}

func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, killReason flowKillReason) beat.Event {
func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, endReason flowEndReason) beat.Event {
timestamp := ts

event := mapstr.M{
Expand All @@ -347,8 +388,8 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve
"id": common.NetString(f.id.Serialize()),
"final": isOver,
}
if killReason != NoKill {
flow["kill_reason"] = killReason.String()
if endReason != FlowActive {
flow["kill_reason"] = endReason.String()
}
fields := mapstr.M{
"event": event,
Expand Down
Loading

0 comments on commit 7b17b39

Please sign in to comment.