Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve octetDeltaCount calculation #251

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -518,7 +517,6 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
76 changes: 46 additions & 30 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"github.com/vmware/go-ipfix/pkg/util"
)

var (
Expand Down Expand Up @@ -471,6 +472,24 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
existingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(element)
switch ieWithValue.Element.Name {
case "flowEndSeconds":
srcPodName, _, _ := existingRecord.GetInfoElementWithValue("sourcePodName")
dstPodName, _, _ := existingRecord.GetInfoElementWithValue("destinationPodName")
octetDeltaCount, _, _ := incomingRecord.GetInfoElementWithValue("octetDeltaCount")
octetTotalCount, _, _ := incomingRecord.GetInfoElementWithValue("octetTotalCount")
if srcPodName.Value.(string) == "perftest-a" {
klog.Info(srcPodName.Value.(string))
klog.Info("FLOWENDSECONDS: ", ieWithValue.Value)
klog.Info("OCTETDELTACOUNT: ", octetDeltaCount.Value.(uint64))
klog.Info("OCTETTOTALCOUNT: ", octetTotalCount.Value.(uint64))
klog.Info("\n\n")
}
if dstPodName.Value.(string) == "perftest-c" {
klog.Info(dstPodName.Value.(string))
klog.Info("FLOWENDSECONDS: ", ieWithValue.Value)
klog.Info("OCTETDELTACOUNT: ", octetDeltaCount.Value.(uint64))
klog.Info("OCTETTOTALCOUNT: ", octetTotalCount.Value.(uint64))
klog.Info("\n\n")
}
// Update flow end timestamp if it is latest.
if isLatest {
existingIeWithValue.Value = ieWithValue.Value
Expand Down Expand Up @@ -506,54 +525,51 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
statsElementList := a.aggregateElements.StatsElements
antreaSourceStatsElements := a.aggregateElements.AggregatedSourceStatsElements
antreaDestinationStatsElements := a.aggregateElements.AggregatedDestinationStatsElements
var srcDeltaVal, dstDeltaVal uint64
for i, element := range statsElementList {
isDelta := false
if strings.Contains(element, "Delta") {
isDelta = true
}
isDelta := strings.Contains(element, "Delta")
if ieWithValue, _, exist := incomingRecord.GetInfoElementWithValue(element); exist {
existingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(element)
// Update the corresponding element in existing record.
if !isDelta {
if existingIeWithValue.Value.(uint64) < ieWithValue.Value.(uint64) {
existingIeWithValue.Value = ieWithValue.Value
}
} else {
// We are simply adding the delta stats now. We expect delta stats to be
// reset after sending the record from flowKeyMap in aggregation process.
// Delta stats from source and destination nodes are added, so we will have
// two times the stats approximately.
// For delta stats, it is better to use source and destination specific
// stats.
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
return err
}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, index, _ = existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
srcExistingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
srcExistingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
srcExistingIeWithValue.Value = srcExistingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
srcDeltaVal = srcExistingIeWithValue.Value.(uint64)
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
if err := existingRecord.SetInfoElementWithValue(index, *srcExistingIeWithValue); err != nil {
return err
}
}
// Update the corresponding destination element in antreaStatsElement list.
if fillDstStats {
existingIeWithValue, index, _ = existingRecord.GetInfoElementWithValue(antreaDestinationStatsElements[i])
dstExistingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(antreaDestinationStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
dstExistingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
dstExistingIeWithValue.Value = dstExistingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
dstDeltaVal = dstExistingIeWithValue.Value.(uint64)
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
if err := existingRecord.SetInfoElementWithValue(index, *dstExistingIeWithValue); err != nil {
return err
}
}
// Update the corresponding common element in statsElement list.
commonExistingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(element)
if !isDelta {
commonExistingIeWithValue.Value = util.MaxUint64(commonExistingIeWithValue.Value.(uint64), ieWithValue.Value.(uint64))
} else {
if fillSrcStats {
commonExistingIeWithValue.Value = util.MaxUint64(commonExistingIeWithValue.Value.(uint64), srcDeltaVal)
}
if fillDstStats {
commonExistingIeWithValue.Value = util.MaxUint64(commonExistingIeWithValue.Value.(uint64), dstDeltaVal)
}
}
if err := existingRecord.SetInfoElementWithValue(index, *commonExistingIeWithValue); err != nil {
return err
}
} else {
return fmt.Errorf("element with name %v in statsElements not present in the incoming record", element)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"github.com/vmware/go-ipfix/pkg/util"
)

var (
Expand Down Expand Up @@ -918,7 +919,7 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equalf(t, latestRecord.Value, ieWithValue.Value, "values should be equal for element %v", e)
} else {
prevRecord, _, _ := srcRecordLatest.GetInfoElementWithValue(e)
assert.Equalf(t, prevRecord.Value.(uint64)+latestRecord.Value.(uint64), ieWithValue.Value, "values should be equal for element %v", e)
assert.Equalf(t, util.MaxUint64(prevRecord.Value.(uint64), latestRecord.Value.(uint64)), ieWithValue.Value, "values should be equal for element %v", e)
}
}
for i, e := range antreaSourceStatsElementList {
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/collector_intermediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) {
case "packetTotalCount":
assert.Equal(t, uint64(1000), element.Value)
case "packetDeltaCount":
assert.Equal(t, uint64(1000), element.Value)
assert.Equal(t, uint64(500), element.Value)
case "destinationClusterIPv4":
assert.Equal(t, net.IP{10, 0, 0, 3}, element.Value)
case "destinationClusterIPv6":
assert.Equal(t, net.IP{0x20, 0x1, 0x0, 0x0, 0x32, 0x38, 0xbb, 0xbb, 0x0, 0x63, 0x0, 0x0, 0x0, 0x0, 0xaa, 0xaa}, element.Value)
case "destinationServicePort":
assert.Equal(t, uint16(4739), element.Value)
case "reversePacketDeltaCount":
assert.Equal(t, uint64(350), element.Value)
assert.Equal(t, uint64(200), element.Value)
case "reversePacketTotalCount":
assert.Equal(t, uint64(400), element.Value)
case "packetTotalCountFromSourceNode":
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ func Decode(buffer io.Reader, byteOrder binary.ByteOrder, outputs ...interface{}
}
return nil
}

func MaxUint64(num1, num2 uint64) uint64 {
if num1 >= num2 {
return num1
}
return num2
}