Skip to content

Commit

Permalink
Make sure aggregated records are written properly to disk with -w.
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Jan 19, 2025
1 parent 5d43e46 commit 1eb14a5
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 7 deletions.
11 changes: 11 additions & 0 deletions src/libnffile/nfxV3.h
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,17 @@ typedef struct EXipInfo_s {
h->numElements++; \
h->size += s;

#define PushExtensionID(h, x, v) \
{ \
elementHeader_t *elementHeader = (elementHeader_t *)((void *)h + h->size); \
elementHeader->type = x; \
elementHeader->length = extensionTable[x].size; \
} \
void *v = (void *)((void *)h + h->size + sizeof(elementHeader_t)); \
memset(v, 0, extensionTable[x].size - sizeof(elementHeader_t)); \
h->size += extensionTable[x].size; \
h->numElements++;

#define ExtensionLength(ext) (((elementHeader_t *)((void *)ext - sizeof(elementHeader_t)))->length - sizeof(elementHeader_t))

#define EXTENSION(s) {s##ID, s##Size, #s}
Expand Down
54 changes: 53 additions & 1 deletion src/nfdump/nflowcache.c
Original file line number Diff line number Diff line change
Expand Up @@ -1639,6 +1639,56 @@ static inline void PrintSortList(SortElement_t *SortList, uint64_t maxindex, out

} // End of PrintSortList

// in case of custom aggregation, we need to rebuild the record
// with only those elements aggregated
static inline void RebuildRecord(void *buffPtr, recordHeaderV3_t *recordHeaderV3, uint64_t i) {
if (aggregateInfo[0] >= 0) {
// custom user aggregation - create new record
void *newExtensionList[MAXLISTSIZE] = {0};
AddV3Header(buffPtr, newV3Record);
PushExtension(newV3Record, EXgenericFlow, newGenericFlow);

// orig record
recordHandle_t recordHandle = {0};
MapRecordHandle(&recordHandle, recordHeaderV3, i);

// process record
// copy all custom aggregation elements
for (int j = 0; aggregateInfo[j] >= 0; j++) {
uint32_t tableIndex = aggregateInfo[j];
uint16_t extID = aggregationTable[tableIndex].param.extID;
if (recordHandle.extensionList[extID]) {
// extension exists in orig record
void *newRecord = NULL;
if (newExtensionList[extID] == NULL) {
// add this extension to the output record as well
PushExtensionID(newV3Record, extID, extension);
newExtensionList[extID] = extension;
newRecord = extension;
} else {
newRecord = newExtensionList[extID];
}
pointer_addr_t offset = aggregationTable[tableIndex].param.offset;
pointer_addr_t length = aggregationTable[tableIndex].param.length;
memcpy(newRecord + offset, recordHandle.extensionList[extID] + offset, length);
}
}

// copy timestamps
EXgenericFlow_t *origGenericFlow = (EXgenericFlow_t *)recordHandle.extensionList[EXgenericFlowID];
if (origGenericFlow && newGenericFlow) {
newGenericFlow->msecFirst = origGenericFlow->msecFirst;
newGenericFlow->msecLast = origGenericFlow->msecLast;
newGenericFlow->msecReceived = origGenericFlow->msecReceived;
newGenericFlow->inBytes = origGenericFlow->inBytes;
newGenericFlow->inPackets = origGenericFlow->inPackets;
}
} else {
// copy orig record
memcpy(buffPtr, (void *)recordHeaderV3, recordHeaderV3->size);
}
} // End of RebuildRecord

// export SortList - apply possible aggregation mask to zero out aggregated fields
static inline void ExportSortList(SortElement_t *SortList, uint64_t maxindex, nffile_t *nffile, int GuessFlowDirection, int ascending) {
dbg_printf("Enter %s\n", __func__);
Expand All @@ -1665,7 +1715,9 @@ static inline void ExportSortList(SortElement_t *SortList, uint64_t maxindex, nf

// write record
void *buffPtr = GetCurrentCursor(dataBlock);
memcpy(buffPtr, (void *)recordHeaderV3, recordHeaderV3->size);

// prepare record to export into new file
RebuildRecord(buffPtr, recordHeaderV3, i);

// remap header to written memory
recordHeaderV3 = (recordHeaderV3_t *)buffPtr;
Expand Down
12 changes: 6 additions & 6 deletions src/output/output_fmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ static int max_format_index = 0;

static int long_v6 = 0;
static int printPlain = 0;
static double duration = 0;
static uint64_t duration = 0;

#define IP_STRING_LEN (INET6_ADDRSTRLEN)

Expand Down Expand Up @@ -661,7 +661,7 @@ void fmt_record(FILE *stream, recordHandle_t *recordHandle, outputParams_t *outp
duration = 0;
if (genericFlow && genericFlow->msecFirst && genericFlow->msecLast) {
if (genericFlow->msecLast >= genericFlow->msecFirst) {
duration = (genericFlow->msecLast - genericFlow->msecFirst) / 1000.0;
duration = (genericFlow->msecLast - genericFlow->msecFirst);
} else {
LogError("Record: %u Time error - last < first", recordHandle->flowCount);
duration = 0;
Expand Down Expand Up @@ -1308,15 +1308,15 @@ static void String_EventTime(FILE *stream, recordHandle_t *recordHandle) {

static void String_Duration(FILE *stream, recordHandle_t *recordHandle) {
if (printPlain) {
fprintf(stream, "%16.3f", duration);
fprintf(stream, "%16.3f", (double)duration / 1000.0);
} else {
char *s = DurationString(duration);
fprintf(stream, "%s", s);
}
} // End of String_Duration

static void String_Duration_Seconds(FILE *stream, recordHandle_t *recordHandle) {
fprintf(stream, "%16.3f", duration);
fprintf(stream, "%16.3f", (double)duration / 1000.0);
} // End of String_Duration_Seconds

static void String_Protocol(FILE *stream, recordHandle_t *recordHandle) {
Expand Down Expand Up @@ -2181,7 +2181,7 @@ static void String_bps(FILE *stream, recordHandle_t *recordHandle) {

uint64_t bps = 0;
if (duration) {
bps = ((inBytes << 3) / duration); // bits per second. ( >> 3 ) -> * 8 to convert octets into bits
bps = (1000 * (inBytes << 3) / duration); // bits per second. ( >> 3 ) -> * 8 to convert octets into bits
}

numStr bpsString;
Expand All @@ -2196,7 +2196,7 @@ static void String_pps(FILE *stream, recordHandle_t *recordHandle) {

uint64_t pps = 0;
if (duration) {
pps = inPackets / duration; // packets per second
pps = 1000 * inPackets / duration; // packets per second
}

numStr ppsString;
Expand Down

0 comments on commit 1eb14a5

Please sign in to comment.