Skip to content

Commit

Permalink
Fix buggy sflow code. Fixes #506
Browse files Browse the repository at this point in the history
  • Loading branch information
phaag committed Feb 3, 2024
1 parent a20249c commit 9db4215
Showing 1 changed file with 27 additions and 25 deletions.
52 changes: 27 additions & 25 deletions src/sflow/sflow_nfdump.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,17 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
}

uint32_t recordSize = BaseRecordSize;
if (sample->gotIPV6 && ExtensionsEnabled[EXipv6FlowID]) {
recordSize += EXipv6FlowSize;
}

if (sample->gotIPV4 && ExtensionsEnabled[EXipv4FlowID]) {
int isV4 = sample->ipsrc.type == SFLADDRESSTYPE_IP_V4;
if (isV4 && ExtensionsEnabled[EXipv4FlowID]) {
recordSize += EXipv4FlowSize;
}

if (sample->mpls_num_labels > 0 && ExtensionsEnabled[EXmplsLabelID]) {
recordSize += EXmplsLabelSize;
int isV6 = sample->ipsrc.type == SFLADDRESSTYPE_IP_V6;
if (isV6 && ExtensionsEnabled[EXipv6FlowID]) {
recordSize += EXipv6FlowSize;
}
dbg_printf("IPv4: %u, IPv6: %u\n", isV4, isV6);

if (sample->nextHop.type == SFLADDRESSTYPE_IP_V4 && ExtensionsEnabled[EXipNextHopV4ID]) {
recordSize += EXipNextHopV4Size;
Expand Down Expand Up @@ -306,12 +306,12 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
}
}

if (fs->sa_family == AF_INET6 && ExtensionsEnabled[EXipReceivedV6ID]) {
recordSize += EXipReceivedV6Size;
}
if (fs->sa_family == AF_INET && ExtensionsEnabled[EXipReceivedV4ID]) {
recordSize += EXipReceivedV4Size;
}
if (fs->sa_family == AF_INET6 && ExtensionsEnabled[EXipReceivedV6ID]) {
recordSize += EXipReceivedV6Size;
}

recordSize += sizeof(recordHeaderV3_t);
if (!CheckBufferSpace(fs->nffile, recordSize)) {
Expand Down Expand Up @@ -340,7 +340,13 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
genericFlow->inBytes = sample->meanSkipCount * sample->sampledPacketSize;
genericFlow->srcTos = sample->dcd_ipTos;

if (sample->gotIPV6 && ExtensionsEnabled[EXipv6FlowID]) {
if (isV4 && ExtensionsEnabled[EXipv4FlowID]) {
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
ipv4Flow->srcAddr = ntohl(sample->ipsrc.address.ip_v4.addr);
ipv4Flow->dstAddr = ntohl(sample->ipdst.address.ip_v4.addr);
}

if (isV6 && ExtensionsEnabled[EXipv6FlowID]) {
PushExtension(recordHeader, EXipv6Flow, ipv6Flow);
SetFlag(recordHeader->flags, V3_FLAG_IPV6_ADDR);

Expand All @@ -356,11 +362,6 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
u = (uint64_t *)&(b[8]);
ipv6Flow->dstAddr[1] = ntohll(*u);
}
if (sample->gotIPV4 && ExtensionsEnabled[EXipv4FlowID]) {
PushExtension(recordHeader, EXipv4Flow, ipv4Flow);
ipv4Flow->srcAddr = ntohl(sample->dcd_srcIP.s_addr);
ipv4Flow->dstAddr = ntohl(sample->dcd_dstIP.s_addr);
}

if (ExtensionsEnabled[EXflowMiscID]) {
PushExtension(recordHeader, EXflowMisc, flowMisc);
Expand Down Expand Up @@ -399,9 +400,9 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
}
if (sample->bgp_nextHop.type == SFLADDRESSTYPE_IP_V6 && ExtensionsEnabled[EXbgpNextHopV6ID]) {
uint64_t *addr = (void *)sample->bgp_nextHop.address.ip_v6.addr;
PushExtension(recordHeader, EXipReceivedV6, ipNextHopV6);
ipNextHopV6->ip[0] = ntohll(addr[0]);
ipNextHopV6->ip[1] = ntohll(addr[1]);
PushExtension(recordHeader, EXbgpNextHopV6, bgpNextHopV6);
bgpNextHopV6->ip[0] = ntohll(addr[0]);
bgpNextHopV6->ip[1] = ntohll(addr[1]);
}

if (ExtensionsEnabled[EXmacAddrID]) {
Expand Down Expand Up @@ -455,17 +456,17 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
}

// add router IP
if (fs->sa_family == PF_INET && ExtensionsEnabled[EXipReceivedV4ID]) {
PushExtension(recordHeader, EXipReceivedV4, ipReceivedV4);
ipReceivedV4->ip = fs->ip.V4;
dbg_printf("Add IPv4 route IP extension\n");
}
if (fs->sa_family == PF_INET6 && ExtensionsEnabled[EXipReceivedV6ID]) {
PushExtension(recordHeader, EXipReceivedV6, ipReceivedV6);
ipReceivedV6->ip[0] = fs->ip.V6[0];
ipReceivedV6->ip[1] = fs->ip.V6[1];
dbg_printf("Add IPv6 route IP extension\n");
}
if (fs->sa_family == PF_INET && ExtensionsEnabled[EXipReceivedV4ID]) {
PushExtension(recordHeader, EXipReceivedV4, ipReceivedV4);
ipReceivedV4->ip = fs->ip.V4;
dbg_printf("Add IPv4 route IP extension\n");
}

// update first_seen, last_seen
if (genericFlow->msecFirst < fs->msecFirst) // the very first time stamp need to be set
Expand Down Expand Up @@ -520,8 +521,9 @@ void StoreSflowRecord(SFSample *sample, FlowSource_t *fs) {
fs->nffile->block_header->NumRecords++;
fs->nffile->block_header->size += recordHeader->size;

dbg_assert(recordHeader->size == recordSize);
dbg_printf("Record size: Header: %u, calc: %u\n", recordHeader->size, recordSize);
dbg_assert(recordHeader->size <= recordSize);

fs->nffile->buff_ptr += recordSize;
fs->nffile->buff_ptr += recordHeader->size;

} // End of StoreSflowRecord

0 comments on commit 9db4215

Please sign in to comment.