Skip to content

Commit

Permalink
adding comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kvalliyurnatt committed Feb 27, 2024
1 parent 7b17b39 commit 61bd5ce
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
7 changes: 3 additions & 4 deletions packetbeat/docs/packetbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ value is true.
==== `timeout`

Timeout configures the lifetime of a flow. If no packets have been received for
a flow within the timeout time window or if `enable_active_flow_timeout` , the flow is killed and reported. The
a flow within the timeout time window, the flow is killed and reported. The
default value is 30s.

[float]
Expand All @@ -462,10 +462,9 @@ disabled, flows are still reported once being timed out. The default value is
10s.

[float]
==== `enable_active_flow_timeout`
==== `active_timeout`

When `enable_active_flow_timeout` is set to true, flow is killed and reported every time the timeout period is hit. For eg. if the `timeout` is set
to 30s, the flow will be killed and reported every 30s.
ActiveTimeout configures the lifetime of a flow. When we hit the active timeout seconds after the flow was created, the flow is killed and reported. The default value is -1. If set to -1, the active timeout is disabled.

[float]
[[packetbeat-configuration-flows-fields]]
Expand Down
24 changes: 17 additions & 7 deletions packetbeat/flows/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,12 @@ func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Dur
ticksActiveTimeout := -1
ticksPeriod := -1

// If activeTimeout is set, we need to calculate the tick for the worker
// If ActiveTimeout is set, we need to calculate the tick for the worker
// The tick will be gcd of timeout and activeTimeout
// example timeout is 30 and activeTimeout is 60, then tick will be 30
// so the worker is going to try to run process every 30seconds
// ticksTimeout will be 1 and ticksActiveTimeout will be 2
// so we will checkTimeout at every tick and checkActiveTimeout at every 2 ticks
// TODO: I think these two if conditions can maybe be represented in a better way
if activeTimeout > 0 {
tick = gcd(timeout, activeTimeout)
Expand All @@ -193,6 +198,10 @@ func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Dur
}
}

// If period is set, we need to calculate the tick for the worker based on the period as well
// If period is 10, timeout is 30 and ative timeout is 60, then tick will be 10 (gcd of all 3)
// ticksTimeout will be 3, ticksPeriod will be 1 and ticksActiveTimeout will be 6
// So we will report flow at every tick, check for timeout every 3 ticks and check for active timeout every 6 ticks
if period > 0 {
tick = gcd(tick, period)
if tick < time.Second {
Expand All @@ -209,6 +218,7 @@ func getTicksAndTimeouts(timeout, period, activeTimeout time.Duration) (time.Dur
ticksPeriod = 1
}

// If activeTImeout is set, we need to calculate the tick for the worker based on the activeTimeout as well
if activeTimeout > 0 {
ticksActiveTimeout = int(activeTimeout / tick)
if ticksActiveTimeout == 0 {
Expand Down Expand Up @@ -309,16 +319,16 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe
var next *biFlow
for flow := table.flows.head; flow != nil; flow = next {
next = flow.next
killReason := FlowActive
var killFlow bool
endReason := FlowActive
var endFlow bool

debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID)

reportFlow := handleReports
isOver := lastReport
if checkTimeout {
killReason, killFlow = shouldEndFlow(flow, fw, ts, handleActiveTimeout)
if killFlow {
if checkTimeout || handleActiveTimeout {
endReason, endFlow = shouldEndFlow(flow, fw, ts, handleActiveTimeout)
if endFlow {
debugf("kill flow")

reportFlow = true
Expand All @@ -330,7 +340,7 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe

if reportFlow {
debugf("report flow")
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames, killReason)
fw.report(w, ts, flow, isOver, intNames, uintNames, floatNames, endReason)
}
}
}
Expand Down

0 comments on commit 61bd5ce

Please sign in to comment.