diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index 679394beb5..90d3dbf85f 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -95,6 +95,7 @@ class NetworkInterface : public NetworkInterfaceAlertableEntity { std::atomic num_active_alerted_flows_error; /* Counts all flow alerts with severity >= error */ + std::atomic num_active_probes; /* Count active ZMQ probes */ u_int32_t num_host_dropped_alerts, num_flow_dropped_alerts, num_other_dropped_alerts, last_purge_idle; u_int64_t num_written_alerts, num_alerts_queries, score_as_cli, score_as_srv; @@ -1201,6 +1202,10 @@ class NetworkInterface : public NetworkInterfaceAlertableEntity { }; #endif + void incNumActiveProbes(); + void decNumActiveProbes(); + u_int64_t getNumActiveProbes() const; + void incNumAlertedFlows(Flow *f, AlertLevel severity); void decNumAlertedFlows(Flow *f, AlertLevel severity); virtual u_int64_t getNumActiveAlertedFlows() const; diff --git a/include/Ntop.h b/include/Ntop.h index 29f0a167aa..c0f585a9e8 100644 --- a/include/Ntop.h +++ b/include/Ntop.h @@ -786,6 +786,7 @@ class Ntop { return (kafkaClient.sendMessage(kafka_broker_info, msg, msg_len)); } #endif + u_int64_t getNumActiveProbes() const; }; extern Ntop *ntop; diff --git a/include/ZMQCollectorInterface.h b/include/ZMQCollectorInterface.h index 97b8017d4f..33b2bad3cf 100644 --- a/include/ZMQCollectorInterface.h +++ b/include/ZMQCollectorInterface.h @@ -28,15 +28,11 @@ class LuaEngine; -typedef struct { - char *endpoint; - void *socket; -} zmq_subscriber; - class ZMQCollectorInterface : public ZMQParserInterface { private: void *context; - std::map source_id_last_msg_id; + std::map active_probes; + time_t last_active_probes_check; bool is_collector; u_int16_t num_subscribers; zmq_subscriber subscriber[MAX_ZMQ_SUBSCRIBERS]; @@ -45,6 +41,7 @@ class ZMQCollectorInterface : public ZMQParserInterface { #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) char *findInterfaceEncryptionKeys(char *public_key, char *secret_key, int public_key_len, int secret_key_len); #endif + void checkIdleProbes(time_t now); public: ZMQCollectorInterface(const char *_endpoint); diff --git a/include/ntop_defines.h b/include/ntop_defines.h index 720a29b005..094792276e 100644 --- a/include/ntop_defines.h +++ b/include/ntop_defines.h @@ -178,6 +178,8 @@ #define MAX_ZMQ_SUBSCRIBERS UNLIMITED_MAX_ZMQ_SUBSCRIBERS #endif +#define ZMQ_PROBE_EXPIRATION_TIME 10 /* seconds */ + #define MAX_INTERFACE_NAME_LEN 512 #define MAX_USER_NETS_VAL_LEN 255 #define NUM_HOSTS_RESOLVED_BITS 2 << 19 /* ~1 million */ diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index f66d728bf2..699b35bb48 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -254,6 +254,16 @@ struct zmq_msg_hdr_v2 { u_int32_t msg_id, source_id; }; +typedef struct { + char *endpoint; + void *socket; +} zmq_subscriber; + +typedef struct { + u_int32_t last_seen; + u_int32_t last_msg_id; +} zmq_probe; + typedef u_int8_t dump_mac_t[DUMP_MAC_SIZE]; typedef char macstr_t[MACSTR_SIZE]; diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 2c6b77ceea..4ca2433090 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -310,6 +310,7 @@ void NetworkInterface::init(const char *interface_name) { tot_num_anomalies.local_hosts = tot_num_anomalies.remote_hosts = 0; num_active_alerted_flows_notice = 0, num_active_alerted_flows_warning = 0, num_active_alerted_flows_error = 0; + num_active_probes = 0; is_view = false; viewed_by = NULL; @@ -10125,6 +10126,24 @@ u_int64_t NetworkInterface::getNumActiveAlertedFlows() const { /* *************************************** */ +void NetworkInterface::incNumActiveProbes() { + num_active_probes++; +} + +/* *************************************** */ + +void NetworkInterface::decNumActiveProbes() { + num_active_probes--; +} + +/* *************************************** */ + +u_int64_t NetworkInterface::getNumActiveProbes() const { + return num_active_probes; +} + +/* *************************************** */ + bool NetworkInterface::dequeueFlowFromCompanion(ParsedFlow **f) { if (!companionQueue[next_compq_remove_idx]) { *f = NULL; diff --git a/src/Ntop.cpp b/src/Ntop.cpp index 3c9b3af080..c47ba56c8a 100644 --- a/src/Ntop.cpp +++ b/src/Ntop.cpp @@ -3717,6 +3717,17 @@ bool Ntop::broadcastControlMessage(char *msg) { /* ******************************************* */ +u_int64_t Ntop::getNumActiveProbes() const { + u_int64_t n = 0; + + for (int i = 0; i < num_defined_interfaces; i++) + n += iface[i]->getNumActiveProbes(); + + return n; +} + +/* ******************************************* */ + #ifndef WIN32 /* ******************************************* */ @@ -4010,3 +4021,6 @@ bool Ntop::createPcapInterface(const char *path, int *iface_id) { /* ******************************************* */ void Ntop::incBlacklisHits(std::string listname) { blStats.incHits(listname); } + +/* ******************************************* */ + diff --git a/src/ZMQCollectorInterface.cpp b/src/ZMQCollectorInterface.cpp index 8a4479ff77..f53d0b67cf 100644 --- a/src/ZMQCollectorInterface.cpp +++ b/src/ZMQCollectorInterface.cpp @@ -242,6 +242,23 @@ void ZMQCollectorInterface::checkPointCounters(bool drops_only) { /* **************************************************** */ +void ZMQCollectorInterface::checkIdleProbes(time_t now) { + map::iterator p; + + /* Loop through active flows to find idle ones to be removed */ + for (p = active_probes.begin(); p != active_probes.end(); p++) { + zmq_probe *probe = p->second; + if (now > probe->last_seen + ZMQ_PROBE_EXPIRATION_TIME) { + //ntop->getTrace()->traceEvent(TRACE_NORMAL, "Check Idle Probes - expired probe removed"); + active_probes.erase(p->first); /* expired found - remove */ + decNumActiveProbes(); + free(probe); + } + } +} + +/* **************************************************** */ + void ZMQCollectorInterface::collect_flows() { struct zmq_msg_hdr_v0 h0; struct zmq_msg_hdr_v1 *h = @@ -294,19 +311,21 @@ void ZMQCollectorInterface::collect_flows() { if ((rc == 0) || (now >= next_purge_idle) || (zmq_max_num_polls_before_purge == 0)) { + checkIdleProbes(now); purgeIdle(now); next_purge_idle = now + FLOW_PURGE_FREQUENCY; zmq_max_num_polls_before_purge = MAX_ZMQ_POLLS_BEFORE_PURGE; } } while (rc == 0); - for (int subscriber_id = 0; subscriber_id < num_subscribers; - subscriber_id++) { - u_int32_t msg_id = 0, last_msg_id; - u_int32_t source_id = 0; - u_int32_t publisher_version = 0; + for (int subscriber_id = 0; subscriber_id < num_subscribers; subscriber_id++) { if (items[subscriber_id].revents & ZMQ_POLLIN) { + u_int32_t msg_id = 0, current_msg_id = 0; + u_int32_t source_id = 0; + u_int32_t publisher_version = 0; + zmq_probe *probe = NULL; + size = zmq_recv(items[subscriber_id].socket, &h0, sizeof(h0), 0); if (size == sizeof(struct zmq_msg_hdr_v0)) { @@ -358,43 +377,43 @@ void ZMQCollectorInterface::collect_flows() { ntop->getTrace()->traceEvent(TRACE_NORMAL, "[topic: %s]", h->url); #endif - /* Read last message ID for the current source ID */ - if (source_id_last_msg_id.find(source_id) != - source_id_last_msg_id.end()) { - last_msg_id = source_id_last_msg_id[source_id]; + if (active_probes.find(source_id) != active_probes.end()) { + /* Found - read last message ID for the current source ID */ + + probe = active_probes[source_id]; #if 0 ntop->getTrace()->traceEvent(TRACE_NORMAL, "[subscriber_id: %u][message source: %u]" "[msg_id: %u][last_msg_id: %u][lost: %i]", - subscriber_id, source_id, msg_id, last_msg_id, msg_id - last_msg_id - 1); + subscriber_id, source_id, msg_id, probe->last_msg_id, msg_id - probe->last_msg_id - 1); #endif #if 0 fprintf(stdout, "."); fflush(stdout); #endif - if (msg_id == (last_msg_id + 1)) { + if (msg_id == (probe->last_msg_id + 1)) { /* No drop */ } else { #ifdef MSG_ID_DEBUG ntop->getTrace()->traceEvent(TRACE_NORMAL, "DROP [msg_id: %u][last_msg_id: %u]", - msg_id, last_msg_id); + msg_id, probe->last_msg_id); #endif - if (msg_id < last_msg_id) { - /* Start over (just reset source_id_last_msg_id) */ + if (msg_id < probe->last_msg_id) { + /* Start over (just reset active_probes) */ #ifdef MSG_ID_DEBUG ntop->getTrace()->traceEvent( TRACE_NORMAL, "ROLLBACK [subscriber_id: " "%u][msg_id=%u][last=%u][tot_msgs=%u][drops=%u]", - subscriber_id, msg_id, last_msg_id, recvStats.zmq_msg_rcvd, + subscriber_id, msg_id, probe->last_msg_id, recvStats.zmq_msg_rcvd, recvStats.zmq_msg_drops); #endif } else { /* Compute delta (this message ID - last message ID) */ - int32_t diff = msg_id - last_msg_id; + int32_t diff = msg_id - probe->last_msg_id; if (diff > 1) { /* Lost message detected */ @@ -404,7 +423,7 @@ void ZMQCollectorInterface::collect_flows() { TRACE_NORMAL, "DROP [subscriber_id: " "%u][msg_id=%u][last=%u][tot_msgs=%u][drops=%u][+%u]", - subscriber_id, msg_id, last_msg_id, recvStats.zmq_msg_rcvd, + subscriber_id, msg_id, probe->last_msg_id, recvStats.zmq_msg_rcvd, recvStats.zmq_msg_drops, diff - 1); #endif } @@ -412,8 +431,7 @@ void ZMQCollectorInterface::collect_flows() { } } - /* Store last message ID for the current source ID */ - source_id_last_msg_id[source_id] = msg_id; + current_msg_id = msg_id; if (recvStats.zmq_msg_drops > 0) { /* @@ -500,10 +518,28 @@ void ZMQCollectorInterface::collect_flows() { } #if defined(NTOPNG_PRO) && !defined(HAVE_NEDGE) - if (ntop->getPro()->handleProbeMessage(h, uncompressed, uncompressed_len, source_id, msg_id)) { + if (ntop->getPro()->handleProbeMessage(probe, h, uncompressed, uncompressed_len, source_id, msg_id)) { /* Handled - nothing to do */ - } else /* Process the message */ + goto recv_next; + } #endif + + /* Allocate probe info if it's the first time we see it */ + if (probe == NULL) { + probe = (zmq_probe *) calloc(1, sizeof(zmq_probe)); + if (probe != NULL) { + active_probes[source_id] = probe; + incNumActiveProbes(); + } + } + + /* Store last message ID for the current source ID */ + if (probe != NULL) { + probe->last_seen = now; + probe->last_msg_id = current_msg_id; + } + + /* Process the message */ switch (h->url[0]) { case 'e': /* event */ recvStats.num_events++; @@ -565,6 +601,7 @@ void ZMQCollectorInterface::collect_flows() { /* ntop->getTrace()->traceEvent(TRACE_INFO, "[%s] %s", h->url, * uncompressed); */ + recv_next: #ifdef HAVE_ZLIB if (compressed /* only if the traffic was actually compressed */) if (uncompressed) free(uncompressed); diff --git a/src/ZMQParserInterface.cpp b/src/ZMQParserInterface.cpp index c503ee71ff..6c44806c45 100644 --- a/src/ZMQParserInterface.cpp +++ b/src/ZMQParserInterface.cpp @@ -325,13 +325,6 @@ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size, if (o) { json_object *w, *z; -#if defined(NTOPNG_PRO) && !defined(HAVE_NEDGE) - if (ntop->getPro()->handleProbeEvent(o, source_id, msg_id)) { - json_object_put(o); - return 0; - } -#endif - zrs.source_id = source_id; if (json_object_object_get_ex(o, "bytes", &w))