Skip to content

Commit

Permalink
Bug fix for inter-node traffic
Browse files Browse the repository at this point in the history
In this commit, we do:

1. Set ReadyToSend to true for flows don’t need correlation.
2. Set areCorrelatedFieldsFilled to true for flows don’t belong to inter-node traffic. For flows need to do correlation, its areCorrelatedFieldsFilled will be set to true once the correlation job is finished.

Signed-off-by: Yun-Tang Hsu <[email protected]>
  • Loading branch information
Yun-Tang Hsu committed Dec 16, 2023
1 parent 837050b commit f369d83
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,9 +340,21 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent
klog.Warning("FlowType does not exist in current record.")
}
correlationRequired := isCorrelationRequired(flowType, record)

currTime := time.Now()
aggregationRecord, exist := a.flowKeyRecordMap[*flowKey]
if exist {
if flowTypeIE, _, exist := aggregationRecord.Record.GetInfoElementWithValue("flowType"); exist {
flowType = flowTypeIE.GetUnsigned8Value()
} else {

Check failure on line 347 in pkg/intermediate/aggregate.go

View workflow job for this annotation

GitHub Actions / Golangci-lint

SA9003: empty branch (staticcheck)
}
prevCorrelationRequired := isCorrelationRequired(flowType, aggregationRecord.Record)
if prevCorrelationRequired && !correlationRequired {
delete(a.flowKeyRecordMap, *flowKey)
exist = false

Check warning on line 352 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L351-L352

Added lines #L351 - L352 were not covered by tests
} else if !prevCorrelationRequired && correlationRequired {
return nil

Check warning on line 354 in pkg/intermediate/aggregate.go

View check run for this annotation

Codecov / codecov/patch

pkg/intermediate/aggregate.go#L354

Added line #L354 was not covered by tests
}
}
currTime := time.Now()
if exist {
if correlationRequired {
// Do correlation of records if record belongs to inter-node flow and
Expand Down Expand Up @@ -411,14 +423,9 @@ func (a *AggregationProcess) addOrUpdateRecordInMap(flowKey *FlowKey, record ent

if !correlationRequired {
aggregationRecord.ReadyToSend = true
// If no correlation is required for an Inter-Node record, K8s metadata is
// expected to be not completely filled. For Intra-Node flows and ToExternal
// flows, areCorrelatedFieldsFilled is set to true by default.
if flowType == registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = false
} else {
aggregationRecord.areCorrelatedFieldsFilled = true
}
}
if flowType != registry.FlowTypeInterNode {
aggregationRecord.areCorrelatedFieldsFilled = true
}
aggregationRecord.areExternalFieldsFilled = false
// Push the record to the priority queue.
Expand Down

0 comments on commit f369d83

Please sign in to comment.