From e7eda4e32898ba92c33d8269d580c3229c404e1d Mon Sep 17 00:00:00 2001 From: Yun-Tang Hsu Date: Tue, 12 Dec 2023 13:22:31 -0800 Subject: [PATCH] Bug fix for inter-node traffic MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we do: 1. Set ReadyToSend to true for flows don’t need correlation. 2. Set areCorrelatedFieldsFilled to true for flows don’t belong to inter-node traffic. For flows need to do correlation, its areCorrelatedFieldsFilled will be set to true once the correlation job is finished. Signed-off-by: Yun-Tang Hsu --- pkg/intermediate/aggregate.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/intermediate/aggregate.go b/pkg/intermediate/aggregate.go index d63188da..8b5d46b3 100644 --- a/pkg/intermediate/aggregate.go +++ b/pkg/intermediate/aggregate.go @@ -340,9 +340,22 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent klog.Warning("FlowType does not exist in current record.") } correlationRequired := isCorrelationRequired(flowType, record) - - currTime := time.Now() aggregationRecord, exist := a.flowKeyRecordMap[*flowKey] + if exist { + if flowTypeIE, _, exist := aggregationRecord.Record.GetInfoElementWithValue("flowType"); exist { + flowType = flowTypeIE.GetUnsigned8Value() + } else { + klog.Warning("FlowType does not exist in current record.") + } + prevCorrelationRequired := isCorrelationRequired(flowType, aggregationRecord.Record) + if prevCorrelationRequired && !correlationRequired { + delete(a.flowKeyRecordMap, *flowKey) + exist = false + } else if !prevCorrelationRequired && correlationRequired { + return nil + } + } + currTime := time.Now() if exist { if correlationRequired { // Do correlation of records if record belongs to inter-node flow and @@ -411,14 +424,9 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent if !correlationRequired { aggregationRecord.ReadyToSend = true - // If no correlation is required for an Inter-Node record, K8s metadata is - // expected to be not completely filled. For Intra-Node flows and ToExternal - // flows, areCorrelatedFieldsFilled is set to true by default. - if flowType == registry.FlowTypeInterNode { - aggregationRecord.areCorrelatedFieldsFilled = false - } else { - aggregationRecord.areCorrelatedFieldsFilled = true - } + } + if flowType != registry.FlowTypeInterNode { + aggregationRecord.areCorrelatedFieldsFilled = true } aggregationRecord.areExternalFieldsFilled = false // Push the record to the priority queue.