Skip to content

Commit

Permalink
improve octet count calculation
Browse files Browse the repository at this point in the history
Signed-off-by: heanlan <[email protected]>
  • Loading branch information
heanlan committed Sep 1, 2021
1 parent ff181f9 commit 6906339
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 27 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,6 @@ golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
Expand Down Expand Up @@ -518,7 +517,6 @@ golang.org/x/tools v0.0.0-20200212150539-ea181f53ac56/go.mod h1:TB2adYChydJhpapK
golang.org/x/tools v0.0.0-20200224181240-023911ca70b2/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjsvvwRTJJuXz8XRtIHtEnmAXLyFUw=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
53 changes: 31 additions & 22 deletions pkg/intermediate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"github.com/vmware/go-ipfix/pkg/util"
)

var (
Expand Down Expand Up @@ -514,46 +515,54 @@ func (a *AggregationProcess) aggregateRecords(incomingRecord, existingRecord ent
if ieWithValue, _, exist := incomingRecord.GetInfoElementWithValue(element); exist {
existingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(element)
// Update the corresponding element in existing record.
if !isDelta {
if existingIeWithValue.Value.(uint64) < ieWithValue.Value.(uint64) {
existingIeWithValue.Value = ieWithValue.Value
}
} else {
// We are simply adding the delta stats now. We expect delta stats to be
// reset after sending the record from flowKeyMap in aggregation process.
// Delta stats from source and destination nodes are added, so we will have
// two times the stats approximately.
// For delta stats, it is better to use source and destination specific
// stats.
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
if existingIeWithValue.Element.Name == "octetDeltaCount" {
klog.InfoS("octetDeltaCount", "existingIeWithValue", existingIeWithValue.Value)
klog.InfoS("octetDeltaCount", "ieWithValue", ieWithValue.Value)
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
return err
if existingIeWithValue.Element.Name == "octetTotalCount" {
klog.InfoS("octetTotalCount", "existingIeWithValue", existingIeWithValue.Value)
klog.InfoS("octetTotalCount", "ieWithValue", ieWithValue.Value)
}
if !isDelta {
existingIeWithValue.Value = util.MaxUint64(existingIeWithValue.Value.(uint64), ieWithValue.Value.(uint64))
}
// Update the corresponding source element in antreaStatsElement list.
if fillSrcStats {
existingIeWithValue, index, _ = existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
srcExistingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(antreaSourceStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
srcExistingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
srcExistingIeWithValue.Value = srcExistingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
existingIeWithValue.Value = util.MaxUint64(existingIeWithValue.Value.(uint64), srcExistingIeWithValue.Value.(uint64))
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
klog.Info(srcExistingIeWithValue.Element.Name, srcExistingIeWithValue.Value)
if err := existingRecord.SetInfoElementWithValue(index, *srcExistingIeWithValue); err != nil {
return err
}
}
// Update the corresponding destination element in antreaStatsElement list.
if fillDstStats {
existingIeWithValue, index, _ = existingRecord.GetInfoElementWithValue(antreaDestinationStatsElements[i])
dstExistingIeWithValue, index, _ := existingRecord.GetInfoElementWithValue(antreaDestinationStatsElements[i])
if !isDelta {
existingIeWithValue.Value = ieWithValue.Value
dstExistingIeWithValue.Value = ieWithValue.Value
} else {
existingIeWithValue.Value = existingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
dstExistingIeWithValue.Value = dstExistingIeWithValue.Value.(uint64) + ieWithValue.Value.(uint64)
existingIeWithValue.Value = util.MaxUint64(existingIeWithValue.Value.(uint64), dstExistingIeWithValue.Value.(uint64))
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
klog.Info(dstExistingIeWithValue.Element.Name, dstExistingIeWithValue.Value)
if err := existingRecord.SetInfoElementWithValue(index, *dstExistingIeWithValue); err != nil {
return err
}
}
if err := existingRecord.SetInfoElementWithValue(index, *existingIeWithValue); err != nil {
return err
}
if existingIeWithValue.Element.Name == "octetDeltaCount" {
klog.InfoS("octetDeltaCount", "value", existingIeWithValue.Value)
}
if existingIeWithValue.Element.Name == "octetTotalCount" {
klog.InfoS("octetTotalCount", "value", existingIeWithValue.Value)
}
} else {
return fmt.Errorf("element with name %v in statsElements not present in the incoming record", element)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/intermediate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/vmware/go-ipfix/pkg/entities"
"github.com/vmware/go-ipfix/pkg/registry"
"github.com/vmware/go-ipfix/pkg/util"
)

var (
Expand Down Expand Up @@ -918,7 +919,7 @@ func runAggregationAndCheckResult(t *testing.T, ap *AggregationProcess, srcRecor
assert.Equalf(t, latestRecord.Value, ieWithValue.Value, "values should be equal for element %v", e)
} else {
prevRecord, _, _ := srcRecordLatest.GetInfoElementWithValue(e)
assert.Equalf(t, prevRecord.Value.(uint64)+latestRecord.Value.(uint64), ieWithValue.Value, "values should be equal for element %v", e)
assert.Equalf(t, util.MaxUint64(prevRecord.Value.(uint64), latestRecord.Value.(uint64)), ieWithValue.Value, "values should be equal for element %v", e)
}
}
for i, e := range antreaSourceStatsElementList {
Expand Down
4 changes: 2 additions & 2 deletions pkg/test/collector_intermediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,15 @@ func testCollectorToIntermediate(t *testing.T, address net.Addr, isIPv6 bool) {
case "packetTotalCount":
assert.Equal(t, uint64(1000), element.Value)
case "packetDeltaCount":
assert.Equal(t, uint64(1000), element.Value)
assert.Equal(t, uint64(500), element.Value)
case "destinationClusterIPv4":
assert.Equal(t, net.IP{10, 0, 0, 3}, element.Value)
case "destinationClusterIPv6":
assert.Equal(t, net.IP{0x20, 0x1, 0x0, 0x0, 0x32, 0x38, 0xbb, 0xbb, 0x0, 0x63, 0x0, 0x0, 0x0, 0x0, 0xaa, 0xaa}, element.Value)
case "destinationServicePort":
assert.Equal(t, uint16(4739), element.Value)
case "reversePacketDeltaCount":
assert.Equal(t, uint64(350), element.Value)
assert.Equal(t, uint64(200), element.Value)
case "reversePacketTotalCount":
assert.Equal(t, uint64(400), element.Value)
case "packetTotalCountFromSourceNode":
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ func Decode(buffer io.Reader, byteOrder binary.ByteOrder, outputs ...interface{}
}
return nil
}

func MaxUint64(num1, num2 uint64) uint64 {
if num1 >= num2 {
return num1
}
return num2
}

0 comments on commit 6906339

Please sign in to comment.