diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index 7978ad2a5f..2cfe63041e 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -487,10 +487,7 @@ class NetworkInterface { bool passShaperPacket(int a_shaper_id, int b_shaper_id, struct pcap_pkthdr *h); void initL7Policer(); #endif - void setRemoteStats(char *name, char *address, u_int32_t speedMbit, - char *remoteProbeAddress, char *remoteProbePublicAddress, - u_int64_t remBytes, u_int64_t remPkts, u_int32_t remote_time, - u_int32_t last_pps, u_int32_t last_bps); + void setRemoteStats(ZMQ_RemoteStats *zrs); void getFlowsStatus(lua_State *vm); void startDBLoop() { if(db) db->startDBLoop(); }; inline bool createDBSchema() {if(db) {return db->createDBSchema();} return false;}; diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index 8dcc5126d0..41c6ea600b 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -175,6 +175,15 @@ typedef struct zmq_flow { ProcessInfo src_process, dst_process; } ZMQ_Flow; +typedef struct zmq_remote_stats { + char remote_ifname[32], remote_ifaddress[64]; + char remote_probe_address[64], remote_probe_public_address[64]; + u_int64_t remote_bytes, remote_pkts; + u_int32_t remote_ifspeed, remote_time, avg_bps, avg_pps; + u_int32_t remote_lifetime_timeout, remote_idle_timeout; + u_int32_t export_queue_too_long, too_many_flows, elk_flow_drops; +} ZMQ_RemoteStats; + struct vm_ptree { lua_State* vm; AddressTree *ptree; diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 04e35a63ed..b295033210 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -5091,37 +5091,31 @@ u_int32_t NetworkInterface::getCheckPointNumPacketDrops() { /* **************************************** */ -void NetworkInterface::setRemoteStats(char *name, char *address, u_int32_t speedMbit, - char *remoteProbeAddress, char *remoteProbePublicAddress, - u_int64_t remBytes, u_int64_t remPkts, - u_int32_t remTime, u_int32_t last_pps, u_int32_t last_bps) { - if(name) setRemoteIfname(name); - if(address) setRemoteIfIPaddr(address); - if(remoteProbeAddress) setRemoteProbeAddr(remoteProbeAddress); - if(remoteProbePublicAddress) setRemoteProbePublicAddr(remoteProbePublicAddress); - ifSpeed = speedMbit, last_pkt_rcvd = 0, last_pkt_rcvd_remote = remTime, - last_remote_pps = last_pps, last_remote_bps = last_bps; +void NetworkInterface::setRemoteStats(ZMQ_RemoteStats *zrs) { + if(!zrs) return; + + if(zrs->remote_ifname) setRemoteIfname(zrs->remote_ifname); + if(zrs->remote_ifaddress) setRemoteIfIPaddr(zrs->remote_ifaddress); + if(zrs->remote_probe_address) setRemoteProbeAddr(zrs->remote_probe_address); + if(zrs->remote_probe_public_address) setRemoteProbePublicAddr(zrs->remote_probe_public_address); + + ifSpeed = zrs->remote_ifspeed, last_pkt_rcvd = 0, last_pkt_rcvd_remote = zrs->remote_time, + last_remote_pps = zrs->avg_pps, last_remote_bps = zrs->avg_bps; if((zmq_initial_pkts == 0) /* ntopng has been restarted */ - || (remBytes < zmq_initial_bytes) /* nProbe has been restarted */ + || (zrs->remote_bytes < zmq_initial_bytes) /* nProbe has been restarted */ ) { /* Start over */ - zmq_initial_bytes = remBytes, zmq_initial_pkts = remPkts; - } else { - remBytes -= zmq_initial_bytes, remPkts -= zmq_initial_pkts; - - ntop->getTrace()->traceEvent(TRACE_INFO, "[%s][bytes=%u/%u (%d)][pkts=%u/%u (%d)]", - ifname, remBytes, ethStats.getNumBytes(), remBytes-ethStats.getNumBytes(), - remPkts, ethStats.getNumPackets(), remPkts-ethStats.getNumPackets()); - /* - * Don't override ethStats here, these stats are properly updated - * inside NetworkInterface::processFlow for ZMQ interfaces. - * Overriding values here may cause glitches and non-strictly-increasing counters - * yielding negative rates. - ethStats.setNumBytes(remBytes), ethStats.setNumPackets(remPkts); - * - */ + zmq_initial_bytes = zrs->remote_bytes, zmq_initial_pkts = zrs->remote_pkts; } + /* + * Don't override ethStats here, these stats are properly updated + * inside NetworkInterface::processFlow for ZMQ interfaces. + * Overriding values here may cause glitches and non-strictly-increasing counters + * yielding negative rates. + ethStats.setNumBytes(zrs->remote_bytes), ethStats.setNumPackets(zrs->remote_pkts); + * + */ } /* **************************************** */ diff --git a/src/ParserInterface.cpp b/src/ParserInterface.cpp index 559e3e8b0e..43a6bdd113 100755 --- a/src/ParserInterface.cpp +++ b/src/ParserInterface.cpp @@ -479,6 +479,8 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s json_object *o; enum json_tokener_error jerr = json_tokener_success; NetworkInterface * iface = (NetworkInterface*)data; + ZMQ_RemoteStats zrs; + memset((void*)&zrs, 0, sizeof(zrs)); // payload[payload_size] = '\0'; @@ -486,52 +488,73 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s o = json_tokener_parse_verbose(payload, &jerr); if(o != NULL) { - struct json_object_iterator it = json_object_iter_begin(o); - struct json_object_iterator itEnd = json_object_iter_end(o); - char remote_ifname[32] = { 0 }, remote_ifaddress[64] = { 0 }; - char remote_probe_address[64] = { 0 }, remote_probe_public_address[64] = { 0 }; - u_int64_t remote_bytes = 0, remote_pkts = 0; - u_int32_t remote_ifspeed = 0, remote_time = 0, avg_bps = 0, avg_pps = 0; + json_object *w, *z; - while(!json_object_iter_equal(&it, &itEnd)) { - const char *key = json_object_iter_peek_name(&it); - json_object *v = json_object_iter_peek_value(&it); - const char *value = json_object_get_string(v); + if(json_object_object_get_ex(o, "time", &w)) zrs.remote_time = (u_int32_t)json_object_get_int64(w); + if(json_object_object_get_ex(o, "bytes", &w)) zrs.remote_bytes = (u_int64_t)json_object_get_int64(w); + if(json_object_object_get_ex(o, "packets", &w)) zrs.remote_pkts = (u_int64_t)json_object_get_int64(w); - if((key != NULL) && (value != NULL)) { - /* - Example - { "if.name": "en0", "if.speed": 1000, "if.ip": "fe80::c62c:3ff:fe06:49fe%en0", "probe.ip": "192.168.1.5", "time" : 1456595814, "bytes": 18505, "packets": 85 } - */ - if(!strcmp(key, "if.name")) snprintf(remote_ifname, sizeof(remote_ifname), "%s", value); - else if(!strcmp(key, "if.ip")) snprintf(remote_ifaddress, sizeof(remote_ifaddress), "%s", value); - else if(!strcmp(key, "if.speed")) remote_ifspeed = atol(value); - else if(!strcmp(key, "probe.ip")) snprintf(remote_probe_address, sizeof(remote_probe_address), "%s", value); - else if(!strcmp(key, "probe.public_ip")) snprintf(remote_probe_public_address, sizeof(remote_probe_public_address), "%s", value); - else if(!strcmp(key, "bytes")) remote_bytes = atol(value); - else if(!strcmp(key, "packets")) remote_pkts = atol(value); - else if(!strcmp(key, "time")) remote_time = atol(value); /* Format 1461424017.299 . */ - else if(!strcmp(key, "avg.bps")) avg_bps = atol(value); - else if(!strcmp(key, "avg.pps")) avg_pps = atol(value); + if(json_object_object_get_ex(o, "iface", &w)) { + if(json_object_object_get_ex(w, "name", &z)) + snprintf(zrs.remote_ifname, sizeof(zrs.remote_ifname), "%s", json_object_get_string(z)); + if(json_object_object_get_ex(w, "speed", &z)) + zrs.remote_ifspeed = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "ip", &z)) + snprintf(zrs.remote_ifaddress, sizeof(zrs.remote_ifaddress), "%s", json_object_get_string(z)); + } - /* Move to the next element */ - json_object_iter_next(&it); - } - } // while json_object_iter_equal + if(json_object_object_get_ex(o, "probe", &w)) { + if(json_object_object_get_ex(w, "public_ip", &z)) + snprintf(zrs.remote_probe_public_address, sizeof(zrs.remote_probe_public_address), "%s", json_object_get_string(z)); + if(json_object_object_get_ex(w, "ip", &z)) + snprintf(zrs.remote_probe_address, sizeof(zrs.remote_probe_address), "%s", json_object_get_string(z)); + } + + if(json_object_object_get_ex(o, "avg", &w)) { + if(json_object_object_get_ex(w, "bps", &z)) + zrs.avg_bps = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "pps", &z)) + zrs.avg_pps = (u_int32_t)json_object_get_int64(z); + } + + if(json_object_object_get_ex(o, "timeout", &w)) { + if(json_object_object_get_ex(w, "lifetime", &z)) + zrs.remote_lifetime_timeout = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "idle", &z)) + zrs.remote_idle_timeout = (u_int32_t)json_object_get_int64(z); + } + + if(json_object_object_get_ex(o, "drops", &w)) { + if(json_object_object_get_ex(w, "export_queue_too_long", &z)) + zrs.export_queue_too_long = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "too_many_flows", &z)) + zrs.too_many_flows = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "elk_flow_drops", &z)) + zrs.elk_flow_drops = (u_int32_t)json_object_get_int64(z); + } + +#ifdef ZMQ_EVENT_DEBUG + ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event parsed " + "[iface: {name: %s, speed: %u, ip:%s}]" + "[probe: {public_ip: %s, ip: %s}]" + "[avg: {bps: %u, pps: %u}]" + "[remote: {time: %u, bytes: %u, packets: %u, idle_timeout: %u, lifetime_timeout:%u}]", + zrs.remote_ifname, zrs.remote_ifspeed, zrs.remote_ifaddress, + zrs.remote_probe_public_address, zrs.remote_probe_address, + zrs.avg_bps, zrs.avg_pps, + zrs.remote_time, (u_int32_t)zrs.remote_bytes, (u_int32_t)zrs.remote_pkts, + zrs.remote_idle_timeout, zrs.remote_lifetime_timeout); +#endif /* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps); */ /* Process Flow */ - iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed, - remote_probe_address, remote_probe_public_address, - remote_bytes, remote_pkts, remote_time, avg_pps, avg_bps); + iface->setRemoteStats(&zrs); if(flowHashing) { FlowHashing *current, *tmp; HASH_ITER(hh, flowHashing, current, tmp) { - current->iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed, - remote_probe_address, remote_probe_public_address, - remote_bytes, remote_pkts, remote_time, avg_pps, avg_bps); + current->iface->setRemoteStats(&zrs); } }