Fix sflow counters collection from TLV

This commit is contained in:
Alfredo Cardigliano 2023-02-23 13:00:33 +01:00
parent 7965c8657d
commit d65a03076a
4 changed files with 71 additions and 59 deletions

View file

@ -58,6 +58,7 @@ InterfaceStatsHash::~InterfaceStatsHash() {
/* ************************************ */
bool InterfaceStatsHash::set(const sFlowInterfaceStats * const stats) {
sFlowInterfaceStats *head;
u_int32_t ifIndex = stats->ifIndex, deviceIP = stats->deviceIP;
const char * ifName = stats->ifName;
u_int32_t hash = (deviceIP + ifIndex + Utils::hashString(ifName)) % max_hash_size, num_runs = 0;
@ -67,38 +68,36 @@ bool InterfaceStatsHash::set(const sFlowInterfaceStats * const stats) {
if(!buckets[hash])
goto new_bucket;
else {
sFlowInterfaceStats *head;
head = (sFlowInterfaceStats*)buckets[hash];
head = (sFlowInterfaceStats*)buckets[hash];
while(head != NULL) {
if(head->deviceIP == deviceIP
&& head->ifIndex == ifIndex
&& ((!head->ifName && !ifName)
|| (head->ifName && ifName && !strcmp(head->ifName, ifName))))
break;
else {
/* Inplace hash */
hash = (hash + 1) % max_hash_size, num_runs++;
while(head != NULL) {
if(head->deviceIP == deviceIP
&& head->ifIndex == ifIndex
&& ((!head->ifName && !ifName)
|| (head->ifName && ifName && strcmp(head->ifName, ifName) == 0))) {
break;
} else {
/* Inplace hash */
hash = (hash + 1) % max_hash_size, num_runs++;
if(num_runs >= max_hash_size) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "Internal error: too many loops=%u", max_hash_size);
m.unlock(__FILE__, __LINE__);
return(false);
}
head = buckets[hash];
if(num_runs >= max_hash_size) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "Internal error: too many loops=%u", max_hash_size);
ret = false;
goto unlock;
}
head = buckets[hash];
}
}
if(head) {
/* Update values */
head->ifType = stats->ifType, head->ifSpeed = stats->ifSpeed,
head->ifFullDuplex = stats->ifFullDuplex, head->ifAdminStatus = stats->ifAdminStatus,
head->ifOperStatus = stats->ifOperStatus, head->ifPromiscuousMode = stats->ifPromiscuousMode;
if(head) {
/* Update values */
head->ifType = stats->ifType, head->ifSpeed = stats->ifSpeed,
head->ifFullDuplex = stats->ifFullDuplex, head->ifAdminStatus = stats->ifAdminStatus,
head->ifOperStatus = stats->ifOperStatus, head->ifPromiscuousMode = stats->ifPromiscuousMode;
if(stats->samplesGenerated > 0 && stats->samplesGenerated == head->samplesGenerated) {
if(stats->samplesGenerated > 0 && stats->samplesGenerated == head->samplesGenerated) {
/*
This is an update as `samplesGenerated`.
Ubiquiti routers when generating sFlow send two counters samples in two different packets
@ -106,40 +105,43 @@ bool InterfaceStatsHash::set(const sFlowInterfaceStats * const stats) {
and the second counter sample has data for the OUT direction.
In this case, we need to check and update values rather than overwriting them.
*/
head->ifInOctets += stats->ifInOctets, head->ifInPackets += stats->ifInPackets,
head->ifInErrors += stats->ifInErrors, head->ifOutOctets += stats->ifOutOctets,
head->ifOutPackets += stats->ifOutPackets, head->ifOutErrors += stats->ifOutErrors;
} else {
head->ifInOctets = stats->ifInOctets, head->ifInPackets = stats->ifInPackets,
head->ifInErrors = stats->ifInErrors, head->ifOutOctets = stats->ifOutOctets,
head->ifOutPackets = stats->ifOutPackets, head->ifOutErrors = stats->ifOutErrors;
}
head->samplesGenerated = stats->samplesGenerated;
head->ifInOctets += stats->ifInOctets, head->ifInPackets += stats->ifInPackets,
head->ifInErrors += stats->ifInErrors, head->ifOutOctets += stats->ifOutOctets,
head->ifOutPackets += stats->ifOutPackets, head->ifOutErrors += stats->ifOutErrors;
} else {
new_bucket:
buckets[hash] = (sFlowInterfaceStats*)malloc(sizeof(sFlowInterfaceStats));
if(buckets[hash]) {
memcpy(buckets[hash], stats, sizeof(sFlowInterfaceStats));
head->ifInOctets = stats->ifInOctets, head->ifInPackets = stats->ifInPackets,
head->ifInErrors = stats->ifInErrors, head->ifOutOctets = stats->ifOutOctets,
head->ifOutPackets = stats->ifOutPackets, head->ifOutErrors = stats->ifOutErrors;
}
if(buckets[hash]->ifName) buckets[hash]->ifName = strdup(buckets[hash]->ifName);
head->samplesGenerated = stats->samplesGenerated;
} else {
new_bucket:
buckets[hash] = (sFlowInterfaceStats*)malloc(sizeof(sFlowInterfaceStats));
if(buckets[hash]->container_info_set) {
if(buckets[hash]->container_info.id) buckets[hash]->container_info.id = strdup(buckets[hash]->container_info.id);
if(buckets[hash]->container_info.name) buckets[hash]->container_info.name = strdup(buckets[hash]->container_info.name);
if (!buckets[hash]) {
ret = false;
goto unlock;
}
if(buckets[hash]->container_info.data_type == container_info_data_type_k8s) {
if(buckets[hash]->container_info.data.k8s.pod) buckets[hash]->container_info.data.k8s.pod = strdup(buckets[hash]->container_info.data.k8s.pod);
if(buckets[hash]->container_info.data.k8s.ns) buckets[hash]->container_info.data.k8s.ns = strdup(buckets[hash]->container_info.data.k8s.ns);
} else if(buckets[hash]->container_info.data_type == container_info_data_type_docker)
;
}
memcpy(buckets[hash], stats, sizeof(sFlowInterfaceStats));
} else
ret = false;
if(buckets[hash]->ifName) buckets[hash]->ifName = strdup(buckets[hash]->ifName);
if(buckets[hash]->container_info_set) {
if(buckets[hash]->container_info.id) buckets[hash]->container_info.id = strdup(buckets[hash]->container_info.id);
if(buckets[hash]->container_info.name) buckets[hash]->container_info.name = strdup(buckets[hash]->container_info.name);
if(buckets[hash]->container_info.data_type == container_info_data_type_k8s) {
if(buckets[hash]->container_info.data.k8s.pod) buckets[hash]->container_info.data.k8s.pod = strdup(buckets[hash]->container_info.data.k8s.pod);
if(buckets[hash]->container_info.data.k8s.ns) buckets[hash]->container_info.data.k8s.ns = strdup(buckets[hash]->container_info.data.k8s.ns);
} else if(buckets[hash]->container_info.data_type == container_info_data_type_docker) {
}
}
}
unlock:
m.unlock(__FILE__, __LINE__);
return(ret);