Skip to content

Commit

Permalink
Bug fix for inter-node traffic
Browse files Browse the repository at this point in the history
In this commit, we do:

1. Set ReadyToSend to true for flows don’t need correlation.
2. Set areCorrelatedFieldsFilled to true for flows don’t belong to inter-node traffic. For flows need to do correlation, its areCorrelatedFieldsFilled will be set to true once the correlation job is finished.

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
Yun-Tang Hsu committed Dec 15, 2023
1 parent 837050b commit 0ac4515
Showing 1 changed file with 22 additions and 11 deletions.
33 changes: 22 additions & 11 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,20 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent

currTime := time.Now()
aggregationRecord, exist := a.flowKeyRecordMap[*flowKey]
if exist {
if flowTypeIE, _, exist := aggregationRecord.Record.GetInfoElementWithValue("flowType"); exist {
flowType = flowTypeIE.GetUnsigned8Value()
} else {
klog.Warning("FlowType does not exist in current record.")

Check warning on line 350 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L350

Added line #L350 was not covered by tests
}
prevCorrelationRequired := isCorrelationRequired(flowType, aggregationRecord.Record)
if prevCorrelationRequired && !correlationRequired {
delete(a.flowKeyRecordMap, *flowKey)
exist = false

Check warning on line 355 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L354-L355

Added lines #L354 - L355 were not covered by tests
} else if !prevCorrelationRequired && correlationRequired {
return nil

Check warning on line 357 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L357

Added line #L357 was not covered by tests
}
}
if exist {
if correlationRequired {
// Do correlation of records if record belongs to inter-node flow and
Expand Down Expand Up @@ -408,17 +422,10 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
waitForReadyToSendRetries: 0,
isIPv4: isIPv4,
}

if !correlationRequired {
aggregationRecord.ReadyToSend = true
// If no correlation is required for an Inter-Node record, K8s metadata is
// expected to be not completely filled. For Intra-Node flows and ToExternal
// flows, areCorrelatedFieldsFilled is set to true by default.
if flowType == registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = false
} else {
aggregationRecord.areCorrelatedFieldsFilled = true
}
// For intra-node and external traffic, areCorrelatedFieldsFilled is always true.
// For inter-node with allow np/anp action, its areExternalFieldsFilled will be set to true once the correlation job is finished.
if flowType != registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = true
}
aggregationRecord.areExternalFieldsFilled = false
// Push the record to the priority queue.
Expand All @@ -432,6 +439,10 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
pqItem.inactiveExpireTime = currTime.Add(a.inactiveExpiryTimeout)
heap.Push(&a.expirePriorityQueue, pqItem)
}
// For flows that do not need correlation, ReadyToSend should always be true.
if !correlationRequired {
aggregationRecord.ReadyToSend = true
}
a.flowKeyRecordMap[*flowKey] = aggregationRecord
return nil
}
Expand Down

0 comments on commit 0ac4515

Please sign in to comment.