Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix for inter-node traffic #339

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 18 additions & 10 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,22 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
klog.Warning("FlowType does not exist in current record.")
}
correlationRequired := isCorrelationRequired(flowType, record)

currTime := time.Now()
aggregationRecord, exist := a.flowKeyRecordMap[*flowKey]
if exist {
if flowTypeIE, _, exist := aggregationRecord.Record.GetInfoElementWithValue("flowType"); exist {
flowType = flowTypeIE.GetUnsigned8Value()
} else {
klog.Warning("FlowType does not exist in current record.")
}
prevCorrelationRequired := isCorrelationRequired(flowType, aggregationRecord.Record)
if prevCorrelationRequired && !correlationRequired {
delete(a.flowKeyRecordMap, *flowKey)
exist = false
} else if !prevCorrelationRequired && correlationRequired {
return nil
}
}
currTime := time.Now()
if exist {
if correlationRequired {
// Do correlation of records if record belongs to inter-node flow and
Expand Down Expand Up @@ -411,14 +424,9 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent

if !correlationRequired {
aggregationRecord.ReadyToSend = true
// If no correlation is required for an Inter-Node record, K8s metadata is
// expected to be not completely filled. For Intra-Node flows and ToExternal
// flows, areCorrelatedFieldsFilled is set to true by default.
if flowType == registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = false
} else {
aggregationRecord.areCorrelatedFieldsFilled = true
}
}
if flowType != registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like areCorrelatedFieldsFilled is only being used in UT? If that's the case, do we still want to keep it? maybe I missed something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be set to true in line#355. And areCorrelatedFieldsFilled will be used in the flowaggregator.go in Antrea repo as well.

}
aggregationRecord.areExternalFieldsFilled = false
// Push the record to the priority queue.
Expand Down
Loading