diff --git a/include/ParserInterface.h b/include/ParserInterface.h index 17293fda29..b7586fb112 100755 --- a/include/ParserInterface.h +++ b/include/ParserInterface.h @@ -51,7 +51,7 @@ class ParserInterface : public NetworkInterface { u_int8_t parseCounter(char *payload, int payload_size, u_int8_t source_id, void *data); virtual void setRemoteStats(ZMQ_RemoteStats *zrs); - + u_int32_t getNumDroppedPackets() { return zmq_remote_stats ? zmq_remote_stats->sflow_pkt_sample_drops : 0; }; virtual void lua(lua_State* vm); }; diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index 375d7ecc38..9d84528e18 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -195,7 +195,7 @@ typedef struct zmq_remote_stats { u_int64_t remote_bytes, remote_pkts, num_flow_exports; u_int32_t remote_ifspeed, remote_time, avg_bps, avg_pps; u_int32_t remote_lifetime_timeout, remote_idle_timeout; - u_int32_t export_queue_too_long, too_many_flows, elk_flow_drops; + u_int32_t export_queue_too_long, too_many_flows, elk_flow_drops, sflow_pkt_sample_drops; } ZMQ_RemoteStats; struct vm_ptree { diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index b5d19d3d8a..f8715c9826 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -1224,7 +1224,7 @@ void NetworkInterface::processFlow(ZMQ_Flow *zflow) { src2dst_direction ? zflow->core.outIndex : zflow->core.inIndex)) { static bool flow_device_already_set = false; if(!flow_device_already_set) { - ntop->getTrace()->traceEvent(TRACE_WARNING, "A flow has been seen from multiple exprters or from " + ntop->getTrace()->traceEvent(TRACE_WARNING, "A flow has been seen from multiple exporters or from " "multiple IN/OUT interfaces. Check exporters configuration."); flow_device_already_set = true; } diff --git a/src/ParserInterface.cpp b/src/ParserInterface.cpp index fb4515ad9d..abc34d9038 100755 --- a/src/ParserInterface.cpp +++ b/src/ParserInterface.cpp @@ -492,7 +492,8 @@ int ParserInterface::getKeyId(char *sym) { /* **************************************************** */ -u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t source_id, void *data) { +u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, + u_int8_t source_id, void *data) { json_object *o; enum json_tokener_error jerr = json_tokener_success; NetworkInterface * iface = (NetworkInterface*)data; @@ -544,15 +545,21 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s if(json_object_object_get_ex(o, "drops", &w)) { if(json_object_object_get_ex(w, "export_queue_too_long", &z)) zrs->export_queue_too_long = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "too_many_flows", &z)) zrs->too_many_flows = (u_int32_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "elk_flow_drops", &z)) zrs->elk_flow_drops = (u_int32_t)json_object_get_int64(z); + + if(json_object_object_get_ex(w, "sflow_pkt_sample_drops", &z)) + zrs->sflow_pkt_sample_drops = (u_int32_t)json_object_get_int64(z); } if(json_object_object_get_ex(o, "zmq", &w)) { if(json_object_object_get_ex(w, "num_flow_exports", &z)) zrs->num_flow_exports = (u_int64_t)json_object_get_int64(z); + if(json_object_object_get_ex(w, "num_exporters", &z)) zrs->num_exporters = (u_int8_t)json_object_get_int(z); } @@ -578,8 +585,10 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s static_cast(iface)->setRemoteStats(zrs); if(flowHashing) { FlowHashing *current, *tmp; + HASH_ITER(hh, flowHashing, current, tmp) { ZMQ_RemoteStats *zrscopy = (ZMQ_RemoteStats*)malloc(sizeof(ZMQ_RemoteStats)); + if(zrscopy) memcpy(zrscopy, zrs, sizeof(ZMQ_RemoteStats)); @@ -591,7 +600,7 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s json_object_put(o); } else { // if o != NULL - if(!once){ + if(!once) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid message received: " "your nProbe sender is outdated, data encrypted, invalid JSON, or oom?"); @@ -968,7 +977,7 @@ u_int8_t ParserInterface::parseFlow(char *payload, int payload_size, u_int8_t so } } else { // if o != NULL - if(!once){ + if(!once) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, data encrypted or invalid JSON?"); ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", @@ -1036,7 +1045,7 @@ u_int8_t ParserInterface::parseCounter(char *payload, int payload_size, u_int8_t json_object_put(o); } else { // if o != NULL - if(!once){ + if(!once) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, data encrypted or invalid JSON?"); ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", @@ -1073,6 +1082,7 @@ void ParserInterface::setRemoteStats(ZMQ_RemoteStats *zrs) { if(zmq_remote_stats_shadow) free(zmq_remote_stats_shadow); zmq_remote_stats_shadow = zmq_remote_stats; zmq_remote_stats = zrs; + /* * Don't override ethStats here, these stats are properly updated * inside NetworkInterface::processFlow for ZMQ interfaces. @@ -1087,6 +1097,7 @@ void ParserInterface::setRemoteStats(ZMQ_RemoteStats *zrs) { void ParserInterface::lua(lua_State* vm) { ZMQ_RemoteStats *zrs = zmq_remote_stats; + NetworkInterface::lua(vm); if(zrs) {