From 1eb14a52c9cc51b9fa6fc0191ee53ea98c7c9cee Mon Sep 17 00:00:00 2001 From: Peter Haag Date: Sun, 19 Jan 2025 15:33:15 +0100 Subject: [PATCH] Make sure aggregated records are written properly to disk with -w. --- src/libnffile/nfxV3.h | 11 +++++++++ src/nfdump/nflowcache.c | 54 ++++++++++++++++++++++++++++++++++++++++- src/output/output_fmt.c | 12 ++++----- 3 files changed, 70 insertions(+), 7 deletions(-) diff --git a/src/libnffile/nfxV3.h b/src/libnffile/nfxV3.h index cc06214e..56e7f957 100755 --- a/src/libnffile/nfxV3.h +++ b/src/libnffile/nfxV3.h @@ -752,6 +752,17 @@ typedef struct EXipInfo_s { h->numElements++; \ h->size += s; +#define PushExtensionID(h, x, v) \ + { \ + elementHeader_t *elementHeader = (elementHeader_t *)((void *)h + h->size); \ + elementHeader->type = x; \ + elementHeader->length = extensionTable[x].size; \ + } \ + void *v = (void *)((void *)h + h->size + sizeof(elementHeader_t)); \ + memset(v, 0, extensionTable[x].size - sizeof(elementHeader_t)); \ + h->size += extensionTable[x].size; \ + h->numElements++; + #define ExtensionLength(ext) (((elementHeader_t *)((void *)ext - sizeof(elementHeader_t)))->length - sizeof(elementHeader_t)) #define EXTENSION(s) {s##ID, s##Size, #s} diff --git a/src/nfdump/nflowcache.c b/src/nfdump/nflowcache.c index 9f82df57..5ac44e73 100755 --- a/src/nfdump/nflowcache.c +++ b/src/nfdump/nflowcache.c @@ -1639,6 +1639,56 @@ static inline void PrintSortList(SortElement_t *SortList, uint64_t maxindex, out } // End of PrintSortList +// in case of custom aggregation, we need to rebuild the record +// with only those elements aggregated +static inline void RebuildRecord(void *buffPtr, recordHeaderV3_t *recordHeaderV3, uint64_t i) { + if (aggregateInfo[0] >= 0) { + // custom user aggregation - create new record + void *newExtensionList[MAXLISTSIZE] = {0}; + AddV3Header(buffPtr, newV3Record); + PushExtension(newV3Record, EXgenericFlow, newGenericFlow); + + // orig record + recordHandle_t recordHandle = {0}; + MapRecordHandle(&recordHandle, recordHeaderV3, i); + + // process record + // copy all custom aggregation elements + for (int j = 0; aggregateInfo[j] >= 0; j++) { + uint32_t tableIndex = aggregateInfo[j]; + uint16_t extID = aggregationTable[tableIndex].param.extID; + if (recordHandle.extensionList[extID]) { + // extension exists in orig record + void *newRecord = NULL; + if (newExtensionList[extID] == NULL) { + // add this extension to the output record as well + PushExtensionID(newV3Record, extID, extension); + newExtensionList[extID] = extension; + newRecord = extension; + } else { + newRecord = newExtensionList[extID]; + } + pointer_addr_t offset = aggregationTable[tableIndex].param.offset; + pointer_addr_t length = aggregationTable[tableIndex].param.length; + memcpy(newRecord + offset, recordHandle.extensionList[extID] + offset, length); + } + } + + // copy timestamps + EXgenericFlow_t *origGenericFlow = (EXgenericFlow_t *)recordHandle.extensionList[EXgenericFlowID]; + if (origGenericFlow && newGenericFlow) { + newGenericFlow->msecFirst = origGenericFlow->msecFirst; + newGenericFlow->msecLast = origGenericFlow->msecLast; + newGenericFlow->msecReceived = origGenericFlow->msecReceived; + newGenericFlow->inBytes = origGenericFlow->inBytes; + newGenericFlow->inPackets = origGenericFlow->inPackets; + } + } else { + // copy orig record + memcpy(buffPtr, (void *)recordHeaderV3, recordHeaderV3->size); + } +} // End of RebuildRecord + // export SortList - apply possible aggregation mask to zero out aggregated fields static inline void ExportSortList(SortElement_t *SortList, uint64_t maxindex, nffile_t *nffile, int GuessFlowDirection, int ascending) { dbg_printf("Enter %s\n", __func__); @@ -1665,7 +1715,9 @@ static inline void ExportSortList(SortElement_t *SortList, uint64_t maxindex, nf // write record void *buffPtr = GetCurrentCursor(dataBlock); - memcpy(buffPtr, (void *)recordHeaderV3, recordHeaderV3->size); + + // prepare record to export into new file + RebuildRecord(buffPtr, recordHeaderV3, i); // remap header to written memory recordHeaderV3 = (recordHeaderV3_t *)buffPtr; diff --git a/src/output/output_fmt.c b/src/output/output_fmt.c index d66aa04b..671be2fd 100644 --- a/src/output/output_fmt.c +++ b/src/output/output_fmt.c @@ -75,7 +75,7 @@ static int max_format_index = 0; static int long_v6 = 0; static int printPlain = 0; -static double duration = 0; +static uint64_t duration = 0; #define IP_STRING_LEN (INET6_ADDRSTRLEN) @@ -661,7 +661,7 @@ void fmt_record(FILE *stream, recordHandle_t *recordHandle, outputParams_t *outp duration = 0; if (genericFlow && genericFlow->msecFirst && genericFlow->msecLast) { if (genericFlow->msecLast >= genericFlow->msecFirst) { - duration = (genericFlow->msecLast - genericFlow->msecFirst) / 1000.0; + duration = (genericFlow->msecLast - genericFlow->msecFirst); } else { LogError("Record: %u Time error - last < first", recordHandle->flowCount); duration = 0; @@ -1308,7 +1308,7 @@ static void String_EventTime(FILE *stream, recordHandle_t *recordHandle) { static void String_Duration(FILE *stream, recordHandle_t *recordHandle) { if (printPlain) { - fprintf(stream, "%16.3f", duration); + fprintf(stream, "%16.3f", (double)duration / 1000.0); } else { char *s = DurationString(duration); fprintf(stream, "%s", s); @@ -1316,7 +1316,7 @@ static void String_Duration(FILE *stream, recordHandle_t *recordHandle) { } // End of String_Duration static void String_Duration_Seconds(FILE *stream, recordHandle_t *recordHandle) { - fprintf(stream, "%16.3f", duration); + fprintf(stream, "%16.3f", (double)duration / 1000.0); } // End of String_Duration_Seconds static void String_Protocol(FILE *stream, recordHandle_t *recordHandle) { @@ -2181,7 +2181,7 @@ static void String_bps(FILE *stream, recordHandle_t *recordHandle) { uint64_t bps = 0; if (duration) { - bps = ((inBytes << 3) / duration); // bits per second. ( >> 3 ) -> * 8 to convert octets into bits + bps = (1000 * (inBytes << 3) / duration); // bits per second. ( >> 3 ) -> * 8 to convert octets into bits } numStr bpsString; @@ -2196,7 +2196,7 @@ static void String_pps(FILE *stream, recordHandle_t *recordHandle) { uint64_t pps = 0; if (duration) { - pps = inPackets / duration; // packets per second + pps = 1000 * inPackets / duration; // packets per second } numStr ppsString;