diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index abdf0fab35b3..33ab90c83b86 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -264,6 +264,7 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period, w.periodically(tick, func() error { nTimeout-- nPeriod-- + nActiveTimeout-- debugf("worker tick, nTimeout=%v, nPeriod=%v, nActiveTimeout=%v", nTimeout, nPeriod, nActiveTimeout) handleTimeout := nTimeout == 0 @@ -294,11 +295,11 @@ type flowsProcessor struct { } func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport, handleActiveTimeout bool) { - if !checkTimeout && !handleReports { + if !checkTimeout && !handleReports && !handleActiveTimeout { return } - debugf("exec tick, timeout=%v, report=%v", checkTimeout, handleReports) + debugf("exec tick, timeout=%v, report=%v, activeTimeout=%v", checkTimeout, handleReports, handleActiveTimeout) // get counter names snapshot if reports must be generated fw.counters.mutex.Lock() @@ -320,22 +321,25 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe for flow := table.flows.head; flow != nil; flow = next { next = flow.next endReason := FlowActive - var endFlow bool debugf("handle flow: %v, %v", flow.id.flowIDMeta, flow.id.flowID) reportFlow := handleReports isOver := lastReport - if checkTimeout || handleActiveTimeout { - endReason, endFlow = shouldEndFlow(flow, fw, ts, handleActiveTimeout) - if endFlow { - debugf("kill flow") - - reportFlow = true - flow.kill() // mark flow as killed - isOver = true - table.remove(flow) + idleTimeout := checkTimeout && (ts.Sub(flow.ts) > fw.timeout) + if handleActiveTimeout || idleTimeout { + if handleActiveTimeout { + endReason = ActiveTimeout + } else { + endReason = IdleTimeout } + // End flow irrespective of when the last traffic was seen on the flow + debugf("kill flow") + + reportFlow = true + flow.kill() // mark flow as killed + isOver = true + table.remove(flow) } if reportFlow {