Implements Flow Devices statistics collection and storage

Flows sent via nProbe (proxy mode) carry information on the exporter
address (EXPORTER_IPV4_ADDRESS) and on the exporter interfaces
(INPUT_SNMP and OUTPUT_SNMP). This implementation attempts at saving
this information to allow comparisons with raw SNMP data.
This commit is contained in:
Simone Mainardi 2017-03-27 23:58:50 +02:00
parent ff87d8922d
commit 13613fc2da
10 changed files with 113 additions and 50 deletions

View file

@ -38,8 +38,8 @@ class Host : public GenericHost {
FrequentStringItems *top_sites;
char * old_sites;
bool blacklisted_host, drop_all_host_traffic, dump_host_traffic, dhcpUpdated, host_label_set;
int16_t local_network_id, deviceIfIdx;
u_int32_t deviceIP;
u_int32_t host_quota_mb;
int16_t local_network_id;
float latitude, longitude;
IpAddress ip;
Mutex *m;
@ -195,7 +195,6 @@ class Host : public GenericHost {
void setDumpTrafficPolicy(bool new_policy);
void loadAlertPrefs(void);
void getPeerBytes(lua_State* vm, u_int32_t peer_key);
inline u_int16_t getDeviceIfIdx() { return(deviceIfIdx); }
inline void incIngressNetworkStats(int16_t networkId, u_int64_t num_bytes) { if(networkStats) networkStats->incIngress(num_bytes); };
inline void incEgressNetworkStats(int16_t networkId, u_int64_t num_bytes) { if(networkStats) networkStats->incEgress(num_bytes); };
inline void incInnerNetworkStats(int16_t networkId, u_int64_t num_bytes) { if(networkStats) networkStats->incInner(num_bytes); };

View file

@ -38,7 +38,7 @@ class Paginator;
#ifdef NTOPNG_PRO
class L7Policer;
class SNMPStats;
class FlowInterfacesStats;
#endif
typedef struct {
@ -72,7 +72,7 @@ class NetworkInterface {
#ifdef NTOPNG_PRO
L7Policer *policer;
FlowProfiles *flow_profiles, *shadow_flow_profiles;
SNMPStats *snmp_stats;
FlowInterfacesStats *flow_interfaces_stats;
#endif
EthStats ethStats;
LocalTrafficStats localStats;
@ -382,8 +382,8 @@ class NetworkInterface {
#ifdef NTOPNG_PRO
void refreshL7Rules();
void refreshShapers();
inline L7Policer* getL7Policer() { return(policer); }
inline SNMPStats* getSNMPStats() { return(snmp_stats); }
inline L7Policer* getL7Policer() { return(policer); }
inline FlowInterfacesStats* getFlowInterfacesStats() { return(flow_interfaces_stats); }
#endif
inline HostPools* getHostPools() { return(host_pools); }
@ -447,8 +447,18 @@ class NetworkInterface {
void getFlowsStatus(lua_State *vm);
void startDBLoop() { if(db) db->startDBLoop(); };
inline bool createDBSchema() {if(db) {return db->createDBSchema();} return false;};
inline void getFlowDevices(lua_State *vm) { if(interfaceStats) interfaceStats->luaDeviceList(vm); else lua_newtable(vm); };
inline void getFlowDeviceInfo(lua_State *vm, u_int32_t deviceIP) { if(interfaceStats) interfaceStats->luaDeviceInfo(vm, deviceIP); else lua_newtable(vm); };
inline void getFlowDevices(lua_State *vm) {
if(flow_interfaces_stats) flow_interfaces_stats->luaDeviceList(vm); else lua_newtable(vm);
};
inline void getFlowDeviceInfo(lua_State *vm, u_int32_t deviceIP) {
if(flow_interfaces_stats) flow_interfaces_stats->luaDeviceInfo(vm, deviceIP); else lua_newtable(vm);
};
inline void getSFlowDevices(lua_State *vm) {
if(interfaceStats) interfaceStats->luaDeviceList(vm); else lua_newtable(vm);
};
inline void getSFlowDeviceInfo(lua_State *vm, u_int32_t deviceIP) {
if(interfaceStats) interfaceStats->luaDeviceInfo(vm, deviceIP); else lua_newtable(vm);
};
int luaEvalFlow(Flow *f, const LuaCallback cb);
inline void forceLuaInterpreterReload() { reloadLuaInterpreter = true; };
inline virtual bool isView() { return(false); };

View file

@ -156,6 +156,7 @@ using namespace std;
#include "FlowProfiles.h"
#include "CounterTrend.h"
#include "LRUMacIP.h"
#include "FlowInterfacesStats.h"
#ifdef HAVE_LDAP
#include "LdapAuthenticator.h"
#endif
@ -223,7 +224,6 @@ using namespace std;
#include "FrequentStringItems.h"
#include "FrequentNumericItems.h"
#include "HostPoolStats.h"
#include "SNMPstats.h"
#ifdef HAVE_NETFILTER
#include "NetfilterInterface.h"
#endif

View file

@ -315,25 +315,25 @@ callback_utils.foreachInterface(ifnames, verbose, function(_ifname, ifstats)
end
end -- for
-- Create RRDs for flow devices
-- Create RRDs for flow and sFlow devices
if(tostring(flow_devices_rrd_creation) == "1") then
local flowdevs = interface.getFlowDevices()
local flowdevs = interface.getSFlowDevices()
for flow_device_ip,_ in pairs(flowdevs) do
local ports = interface.getFlowDeviceInfo(flow_device_ip)
local ports = interface.getSFlowDeviceInfo(flow_device_ip)
if(verbose) then
print ("["..__FILE__()..":"..__LINE__().."] Processing flow device "..flow_device_ip.."\n")
print ("["..__FILE__()..":"..__LINE__().."] Processing sFlow device "..flow_device_ip.."\n")
end
for port_idx,port_value in pairs(ports) do
local base = dirs.workingdir .. "/" .. ifstats.id .. "/rrd/flowdevs/".. flow_device_ip
local base = dirs.workingdir .. "/" .. ifstats.id .. "/rrd/sflow_devices/".. flow_device_ip
base = fixPath(base)
if(not(ntop.exists(base))) then ntop.mkdir(base) end
name = fixPath(base .. "/"..port_idx..".rrd")
createRRDcounter(name, 300, verbose)
str = "N:".. tolongint(port_value.ifInOctets) .. ":" .. tolongint(port_value.ifOutOctets)
str = "N:".. tolongint(port_value.ifOutOctets) .. ":" .. tolongint(port_value.ifInOctets)
ntop.rrd_update(name, str)
if(verbose) then
@ -341,6 +341,32 @@ callback_utils.foreachInterface(ifnames, verbose, function(_ifname, ifstats)
end
end
end
local flowdevs = interface.getFlowDevices() -- Flow, not sFlow here
for flow_device_ip,_ in pairs(flowdevs) do
local ports = interface.getFlowDeviceInfo(flow_device_ip)
if(verbose) then
print ("["..__FILE__()..":"..__LINE__().."] Processing flow device "..flow_device_ip.."\n")
end
for port_idx,port_value in pairs(ports) do
local base = getRRDName(ifstats.id, "flow_device:"..flow_device_ip, port_idx)
if(not(ntop.exists(base))) then ntop.mkdir(base) end
local name = getRRDName(ifstats.id, "flow_device:"..flow_device_ip, port_idx.."/bytes.rrd")
createRRDcounter(name, 300, verbose)
str = "N:".. tolongint(port_value["bytes.out_bytes"]) .. ":" .. tolongint(port_value["bytes.in_bytes"])
ntop.rrd_update(name, str)
if(verbose) then
print ("["..__FILE__()..":"..__LINE__().."] Processing flow device "..flow_device_ip.." / port "..port_idx.." ["..name.."]\n")
end
end
end
end
-- Save host activity stats only if flow activities are actually enabled

View file

@ -411,11 +411,6 @@ if((page == "overview") or (page == nil)) then
end
end
if host.deviceIfIdx ~= nil and host.deviceIfIdx ~= 0 and ntop.isPro() then
print("<tr><th>Device IP / Port Index</th><td colspan=2><A HREF='"..ntop.getHttpPrefix().."/lua/pro/flow_device_info.lua?ip="..host.deviceIP.."&ifIdx=".. host.deviceIfIdx.."'>".. host.deviceIP .."</A>@"..host.deviceIfIdx.."</td></tr>\n")
end
print("<tr><th>IP Address</th><td colspan=1>" .. host["ip"])
historicalProtoHostHref(getInterfaceId(ifname), host["ip"], nil, nil, nil)

View file

@ -261,7 +261,8 @@ end
if(info["version.enterprise_edition"] == true) then
if ifs["type"] == "zmq" then
print('<li><a href="'..ntop.getHttpPrefix()..'/lua/pro/enterprise/flowdevices_stats.lua">sFlow/NetFlow</a></li>')
print('<li><a href="'..ntop.getHttpPrefix()..'/lua/pro/enterprise/flowdevices_stats.lua">Flow Devices</a></li>')
print('<li><a href="'..ntop.getHttpPrefix()..'/lua/pro/enterprise/flowdevices_stats.lua?sflow_filter=All">sFlow</a></li>')
end
print('<li><a href="'..ntop.getHttpPrefix()..'/lua/pro/enterprise/snmpdevices_stats.lua">SNMP</a></li>')
end

View file

@ -199,6 +199,9 @@ function getRRDName(ifid, host_or_network, rrdFile)
host_or_network = string.gsub(host_or_network, 'snmp:', '')
-- snmpstats are ntopng-wide so ifid is ignored
rrdname = fixPath(dirs.workingdir .. "/snmpstats/")
elseif host_or_network ~= nil and string.starts(host_or_network, 'flow_device:') then
host_or_network = string.gsub(host_or_network, 'flow_device:', '')
rrdname = fixPath(dirs.workingdir .. "/" .. ifid .. "/flow_devices/")
elseif host_or_network ~= nil and string.starts(host_or_network, 'asn:') then
host_or_network = string.gsub(host_or_network, 'asn:', '')
rrdname = fixPath(dirs.workingdir .. "/" .. ifid .. "/asnstats/")
@ -1095,6 +1098,7 @@ function singlerrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json
touchRRD(rrdname)
--io.write(prefixLabel.."\n")
if(prefixLabel == "Bytes" or string.starts(rrdFile, 'categories/')) then
prefixLabel = "Traffic"
end
@ -1129,7 +1133,7 @@ function singlerrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json
-- Pretty printing for flowdevs/a.b.c.d/e.rrd
local elems = split(prefixLabel, "/")
if((elems[#elems] ~= nil) and (#elems > 1)) then
if(not host:starts('flow_device:') and (elems[#elems] ~= nil) and (#elems > 1)) then
prefixLabel = "Port "..elems[#elems]
port_mode = true
end

View file

@ -134,7 +134,7 @@ void Host::initialize(u_int8_t _mac[6], u_int16_t _vlanId, bool init_all) {
mac->incUses();
drop_all_host_traffic = false, dump_host_traffic = false, dhcpUpdated = false,
deviceIP = 0, deviceIfIdx = 0, num_resolve_attempts = 0;
num_resolve_attempts = 0;
max_new_flows_sec_threshold = CONST_MAX_NEW_FLOWS_SECOND;
max_num_syn_sec_threshold = CONST_MAX_NUM_SYN_PER_SECOND;
max_num_active_flows = CONST_MAX_NUM_HOST_ACTIVE_FLOWS, good_low_flow_detected = false;
@ -487,8 +487,6 @@ void Host::lua(lua_State* vm, AddressTree *ptree,
if(host_details) {
lua_push_int_table_entry(vm, "totalActivity", duration);
lua_push_str_table_entry(vm, "deviceIP", Utils::intoaV4(deviceIP, buf, sizeof(buf)));
lua_push_int_table_entry(vm, "deviceIfIdx", deviceIfIdx);
if(info) lua_push_str_table_entry(vm, "info", info);
lua_push_float_table_entry(vm, "latitude", latitude);
lua_push_float_table_entry(vm, "longitude", longitude);
@ -792,8 +790,6 @@ json_object* Host::getJSONObject() {
if(latitude) json_object_object_add(my_object, "latitude", json_object_new_double(latitude));
if(longitude) json_object_object_add(my_object, "longitude", json_object_new_double(longitude));
json_object_object_add(my_object, "ip", ip.getJSONObject());
if(deviceIfIdx) json_object_object_add(my_object, "device_if_idx", json_object_new_int(deviceIfIdx));
if(deviceIP) json_object_object_add(my_object, "device_ip", json_object_new_int(deviceIP));
json_object_object_add(my_object, "localHost", json_object_new_boolean(localHost));
json_object_object_add(my_object, "systemHost", json_object_new_boolean(systemHost));
json_object_object_add(my_object, "is_blacklisted", json_object_new_boolean(blacklisted_host));
@ -908,8 +904,6 @@ bool Host::deserialize(char *json_str, char *key) {
if(json_object_object_get_ex(o, "os", &obj)) { snprintf(os, sizeof(os), "%s", json_object_get_string(obj)); }
if(json_object_object_get_ex(o, "trafficCategory", &obj)){ snprintf(trafficCategory, sizeof(trafficCategory), "%s", json_object_get_string(obj)); }
if(json_object_object_get_ex(o, "vlan_id", &obj)) vlan_id = json_object_get_int(obj);
if(json_object_object_get_ex(o, "device_if_idx", &obj)) deviceIfIdx = json_object_get_int(obj);
if(json_object_object_get_ex(o, "device_ip", &obj)) deviceIP = json_object_get_int(obj);
if(json_object_object_get_ex(o, "latitude", &obj)) latitude = (float)json_object_get_double(obj);
if(json_object_object_get_ex(o, "longitude", &obj)) longitude = (float)json_object_get_double(obj);
if(json_object_object_get_ex(o, "ip", &obj)) { ip.deserialize(obj); }

View file

@ -381,8 +381,8 @@ static int ntop_interface_get_snmp_stats(lua_State* vm) {
ntop->getTrace()->traceEvent(TRACE_DEBUG, "%s() called", __FUNCTION__);
if(ntop_interface && ntop_interface->getSNMPStats()) {
ntop_interface->getSNMPStats()->lua(vm);
if(ntop_interface && ntop_interface->getFlowInterfacesStats()) {
ntop_interface->getFlowInterfacesStats()->lua(vm);
return(CONST_LUA_OK);
} else
return(CONST_LUA_ERROR);
@ -1677,7 +1677,7 @@ static int ntop_get_grouped_interface_host(lua_State* vm) {
/* ****************************************** */
static int ntop_getflowdevices(lua_State* vm) {
static int ntop_get_flow_devices(lua_State* vm) {
NetworkInterface *ntop_interface = getCurrentInterface(vm);
ntop->getTrace()->traceEvent(TRACE_DEBUG, "%s() called", __FUNCTION__);
@ -1692,7 +1692,7 @@ static int ntop_getflowdevices(lua_State* vm) {
/* ****************************************** */
static int ntop_getflowdeviceinfo(lua_State* vm) {
static int ntop_get_flow_device_info(lua_State* vm) {
NetworkInterface *ntop_interface = getCurrentInterface(vm);
char *device_ip;
@ -1713,6 +1713,42 @@ static int ntop_getflowdeviceinfo(lua_State* vm) {
/* ****************************************** */
static int ntop_getsflowdevices(lua_State* vm) {
NetworkInterface *ntop_interface = getCurrentInterface(vm);
ntop->getTrace()->traceEvent(TRACE_DEBUG, "%s() called", __FUNCTION__);
if(!ntop_interface)
return(CONST_LUA_ERROR);
else {
ntop_interface->getSFlowDevices(vm);
return(CONST_LUA_OK);
}
}
/* ****************************************** */
static int ntop_getsflowdeviceinfo(lua_State* vm) {
NetworkInterface *ntop_interface = getCurrentInterface(vm);
char *device_ip;
ntop->getTrace()->traceEvent(TRACE_DEBUG, "%s() called", __FUNCTION__);
if(ntop_lua_check(vm, __FUNCTION__, 1, LUA_TSTRING)) return(CONST_LUA_ERROR);
device_ip = (char*)lua_tostring(vm, 1);
if(!ntop_interface)
return(CONST_LUA_ERROR);
else {
in_addr_t addr = inet_addr(device_ip);
ntop_interface->getSFlowDeviceInfo(vm, ntohl(addr));
return(CONST_LUA_OK);
}
}
/* ****************************************** */
static int ntop_interface_load_host_alert_prefs(lua_State* vm) {
NetworkInterface *ntop_interface = getCurrentInterface(vm);
char *host_ip;
@ -5614,9 +5650,13 @@ static const luaL_Reg ntop_interface_reg[] = {
/* DB */
{ "execSQLQuery", ntop_interface_exec_sql_query },
/* Flows */
{ "getFlowDevices", ntop_getflowdevices },
{ "getFlowDeviceInfo", ntop_getflowdeviceinfo },
/* Flow Devices */
{ "getFlowDevices", ntop_get_flow_devices },
{ "getFlowDeviceInfo", ntop_get_flow_device_info },
/* sFlow */
{ "getSFlowDevices", ntop_getsflowdevices },
{ "getSFlowDeviceInfo", ntop_getsflowdeviceinfo },
/* New generation alerts */
{ "getCachedNumAlerts", ntop_interface_get_cached_num_alerts },

View file

@ -174,7 +174,7 @@ NetworkInterface::NetworkInterface(const char *name,
if(flow_profiles) flow_profiles->loadProfiles();
shadow_flow_profiles = NULL;
snmp_stats = NULL; /* Lazy, instantiated on demand */
flow_interfaces_stats = NULL; /* Lazy, instantiated on demand */
#endif
loadDumpPrefs();
@ -958,19 +958,13 @@ void NetworkInterface::processFlow(ZMQ_Flow *zflow) {
}
#ifdef NTOPNG_PRO
if(ntop->getPrefs()->is_flow_device_port_rrd_creation_enabled() && ntop->getPro()->has_valid_license()) {
if(!snmp_stats)
snmp_stats = new SNMPStats();
// if(ntop->getPrefs()->is_flow_device_port_rrd_creation_enabled() && ntop->getPro()->has_valid_license()) {
if(!flow_interfaces_stats)
flow_interfaces_stats = new FlowInterfacesStats();
if(snmp_stats) {
snmp_stats->incStats(zflow->deviceIP, zflow->inIndex, 0, zflow->in_bytes + zflow->out_bytes);
snmp_stats->incStats(zflow->deviceIP, zflow->outIndex, zflow->in_bytes + zflow->out_bytes, 0);
}
} else {
if(snmp_stats) {
delete snmp_stats;
snmp_stats = NULL;
}
if(flow_interfaces_stats) {
flow_interfaces_stats->incStats(zflow->deviceIP, zflow->inIndex, zflow->in_bytes, zflow->out_bytes);
flow_interfaces_stats->incStats(zflow->deviceIP, zflow->outIndex, zflow->out_bytes, zflow->in_bytes);
}
#endif