diff --git a/httpdocs/dist b/httpdocs/dist index c7db2d8b12..5b24ef1ceb 160000 --- a/httpdocs/dist +++ b/httpdocs/dist @@ -1 +1 @@ -Subproject commit c7db2d8b12ef525dfcffdf7bd9b82960df73411d +Subproject commit 5b24ef1cebd930da69eabd4b4b711395753dc357 diff --git a/include/Flow.h b/include/Flow.h index 0950eff5f6..9b432a3a9a 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -1256,6 +1256,9 @@ inline float get_goodput_bytes_thpt() const { return (goodput_bytes_thpt); }; inline u_int32_t getFlowDeviceInIndex() { return flow_device.in_index; }; inline u_int32_t getFlowDeviceOutIndex() { return flow_device.out_index; }; + inline void setFlowDeviceInIndex(u_int32_t idx) { if(idx != 0) flow_device.in_index = idx; }; + inline void setFlowDeviceOutIndex(u_int32_t idx) { if(idx != 0) flow_device.out_index = idx; }; + inline const u_int16_t getScore() const { return (flow_score); }; #ifdef HAVE_NEDGE diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index c5976085ba..c0aba1f2f9 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -6039,8 +6039,7 @@ int NetworkInterface::sortFlows(u_int32_t *begin_slot, bool walk_all, p->deviceIpFilter(&deviceIP), p->inIndexFilter(&inIndex), p->outIndexFilter(&outIndex); - ntop->getTrace()->traceEvent( - TRACE_NORMAL, "[Device IP] %s / [In Idx] %u / [Out Idx] %u", + ntop->getTrace()->traceEvent(TRACE_NORMAL, "[Device IP] %s / [In Idx] %u / [Out Idx] %u", Utils::intoaV4(deviceIP, buf, sizeof(buf)), inIndex, outIndex); } diff --git a/src/ParserInterface.cpp b/src/ParserInterface.cpp index 8f20ed1d7b..26574ca132 100644 --- a/src/ParserInterface.cpp +++ b/src/ParserInterface.cpp @@ -87,36 +87,6 @@ bool ParserInterface::processFlow(ParsedFlow *zflow) { if ((zflow->vlan_id == 0) && ntop->getPrefs()->do_simulate_vlans()) zflow->vlan_id = rand() % SIMULATE_VLANS_MAX_VALUE; -#ifdef NTOPNG_PRO - if ((unique_source_id != 0) && (!isSubInterface())) { - if (!flow_interfaces_stats->checkExporters(unique_source_id, zflow->inIndex, zflow->outIndex, - zflow->exporter_device_ip, zflow->nprobe_ip)) { - static bool shown = false; - - if (!shown) { - ntop->getTrace()->traceEvent(TRACE_NORMAL, "Flow dropped due to limits to the license"); - - ntop->getTrace()->traceEvent(TRACE_NORMAL, - "Exporters: %d/%d max | Exporter Interfaces: %d/%d max", - ntop->getNumFlowExporters(), get_max_num_flow_exporters(), - ntop->getNumFlowExportersInterfaces(), - get_max_num_flow_exporters_interfaces()); - - ntop->getTrace()->traceEvent(TRACE_NORMAL, - "Discarded [unique_source_id: %u];device_ip: %u][probe_ip: " - "%u][iface: %u->%u]", - unique_source_id, zflow->exporter_device_ip, zflow->nprobe_ip, - zflow->inIndex, zflow->outIndex); - - ntop->getRedis()->set(EXPORTERS_EXCEEDED_LIMITS_KEY, "1"); - shown = true; - } - - return false; - } - } -#endif - if (!isSubInterface()) { bool processed = false; @@ -184,8 +154,7 @@ bool ParserInterface::processFlow(ParsedFlow *zflow) { switch (flowHashingMode) { case flowhashing_probe_ip: - vIface = - getDynInterface((u_int64_t)zflow->exporter_device_ip, true); + vIface = getDynInterface((u_int64_t)zflow->exporter_device_ip, true); break; case flowhashing_iface_idx: @@ -204,9 +173,7 @@ bool ParserInterface::processFlow(ParsedFlow *zflow) { case flowhashing_probe_ip_and_ingress_iface_idx: // ntop->getTrace()->traceEvent(TRACE_NORMAL, "[IP: %u][inIndex: // %u]", zflow->exporter_device_ip, zflow->inIndex); - vIface = getDynInterface( - (((u_int64_t)zflow->exporter_device_ip) << 32) + zflow->inIndex, - true); + vIface = getDynInterface((((u_int64_t)zflow->exporter_device_ip) << 32) + zflow->inIndex, true); break; case flowhashing_vrfid: @@ -271,6 +238,66 @@ bool ParserInterface::processFlow(ParsedFlow *zflow) { INTERFACE_PROFILING_SECTION_EXIT(0); + if(flow) { + /* Fix interaface Id (if zero) */ + + if(zflow->inIndex != 0) { + if(src2dst_direction) + flow->setFlowDeviceInIndex(zflow->inIndex); + else + flow->setFlowDeviceOutIndex(zflow->inIndex); + } + + if(zflow->outIndex != 0) { + if(src2dst_direction) + flow->setFlowDeviceOutIndex(zflow->outIndex); + else + flow->setFlowDeviceInIndex(zflow->outIndex); + } + } + +#ifdef NTOPNG_PRO + if ((unique_source_id != 0) && (!isSubInterface())) { + +#if 0 + ntop->getTrace()->traceEvent(TRACE_NORMAL, "unique_source_id=%u, inIndex=%u, outIndex=%u, exporter_device_ip=%u, nprobe_ip=%u [%u / %u]", + unique_source_id, + flow ? flow->getFlowDeviceInIndex() : zflow->inIndex, + flow ? flow->getFlowDeviceOutIndex() : zflow->outIndex, + zflow->exporter_device_ip, zflow->nprobe_ip, + zflow->inIndex, zflow->outIndex); +#endif + + if (!flow_interfaces_stats->checkExporters(unique_source_id, + flow ? flow->getFlowDeviceInIndex() : zflow->inIndex, + flow ? flow->getFlowDeviceOutIndex() : zflow->outIndex, + zflow->exporter_device_ip, zflow->nprobe_ip)) { + static bool shown = false; + + if (!shown) { + ntop->getTrace()->traceEvent(TRACE_NORMAL, "Flow dropped due to license limitations"); + + ntop->getTrace()->traceEvent(TRACE_NORMAL, + "Exporters: %d/%d max | Exporter Interfaces: %d/%d max", + ntop->getNumFlowExporters(), get_max_num_flow_exporters(), + ntop->getNumFlowExportersInterfaces(), + get_max_num_flow_exporters_interfaces()); + + ntop->getTrace()->traceEvent(TRACE_NORMAL, + "Discarded [unique_source_id: %u];device_ip: %u][probe_ip: " + "%u][iface: %u->%u]", + unique_source_id, zflow->exporter_device_ip, zflow->nprobe_ip, + zflow->inIndex, zflow->outIndex); + + ntop->getRedis()->set(EXPORTERS_EXCEEDED_LIMITS_KEY, "1"); + shown = true; + } + + return false; + } + } +#endif + if (flow == NULL) return (false); if (zflow->absolute_packet_octet_counters) { diff --git a/src/ZMQCollectorInterface.cpp b/src/ZMQCollectorInterface.cpp index 5a853be700..962c522471 100644 --- a/src/ZMQCollectorInterface.cpp +++ b/src/ZMQCollectorInterface.cpp @@ -103,8 +103,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI if (zmq_bind(subscriber[num_subscribers].socket, e) != 0) { zmq_close(subscriber[num_subscribers].socket); zmq_ctx_destroy(context); - ntop->getTrace()->traceEvent( - TRACE_ERROR, + ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to bind to ZMQ endpoint %s [collector]: %s (%d)", e, strerror(errno), errno); free(tmp); @@ -114,8 +113,7 @@ ZMQCollectorInterface::ZMQCollectorInterface(const char *_endpoint) : ZMQParserI if (zmq_connect(subscriber[num_subscribers].socket, e) != 0) { zmq_close(subscriber[num_subscribers].socket); zmq_ctx_destroy(context); - ntop->getTrace()->traceEvent( - TRACE_ERROR, + ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to connect to ZMQ endpoint %s [probe]: %s (%d)", e, strerror(errno), errno); free(tmp); @@ -154,8 +152,7 @@ ZMQCollectorInterface::~ZMQCollectorInterface() { if (n > 0) { for (u_int i = 0; i < INTERFACE_PROFILING_NUM_SECTIONS; i++) { if (INTERFACE_PROFILING_SECTION_LABEL(i) != NULL) - ntop->getTrace()->traceEvent( - TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i, + ntop->getTrace()->traceEvent(TRACE_NORMAL, "[PROFILING] Section #%d '%s': AVG %llu ticks", i, INTERFACE_PROFILING_SECTION_LABEL(i), INTERFACE_PROFILING_SECTION_AVG(i, n)); ntop->getTrace()->traceEvent(TRACE_NORMAL, @@ -183,7 +180,8 @@ ZMQCollectorInterface::~ZMQCollectorInterface() { /* **************************************************** */ #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 1, 0) -char *ZMQCollectorInterface::findInterfaceEncryptionKeys(char *public_key, char *secret_key, int public_key_len, int secret_key_len) { +char *ZMQCollectorInterface::findInterfaceEncryptionKeys(char *public_key, char *secret_key, + int public_key_len, int secret_key_len) { char public_key_path[PATH_MAX], secret_key_path[PATH_MAX]; bool rc = false; @@ -193,7 +191,8 @@ char *ZMQCollectorInterface::findInterfaceEncryptionKeys(char *public_key, char ntop->get_working_dir(), get_id()); snprintf(secret_key_path, sizeof(secret_key_path), "%s/%d/key.priv", ntop->get_working_dir(), get_id()); - rc = ZMQUtils::readEncryptionKeysFromFile(public_key_path, secret_key_path, public_key, secret_key, public_key_len, secret_key_len); + rc = ZMQUtils::readEncryptionKeysFromFile(public_key_path, secret_key_path, + public_key, secret_key, public_key_len, secret_key_len); } if (!rc) { diff --git a/src/ZMQParserInterface.cpp b/src/ZMQParserInterface.cpp index dbafae2e5d..9e865a16fc 100644 --- a/src/ZMQParserInterface.cpp +++ b/src/ZMQParserInterface.cpp @@ -2010,6 +2010,8 @@ bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) { if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1; + if(flow->nprobe_ip == 0) flow->nprobe_ip = flow->exporter_device_ip; + /* Process Flow */ INTERFACE_PROFILING_SECTION_ENTER("processFlow", 30); @@ -2102,8 +2104,7 @@ int ZMQParserInterface::parseSingleJSONFlow(json_object *o, struct json_object_iterator additional_itEnd = json_object_iter_end(additional_o); - while ( - !json_object_iter_equal(&additional_it, &additional_itEnd)) { + while (!json_object_iter_equal(&additional_it, &additional_itEnd)) { const char *additional_key = json_object_iter_peek_name(&additional_it); json_object *additional_v = json_object_iter_peek_value(&additional_it); @@ -2322,8 +2323,7 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, struct json_object_iterator additional_itEnd = json_object_iter_end(additional_o); - while ( - !json_object_iter_equal(&additional_it, &additional_itEnd)) { + while (!json_object_iter_equal(&additional_it, &additional_itEnd)) { const char *additional_key = json_object_iter_peek_name(&additional_it); json_object *additional_v = json_object_iter_peek_value(&additional_it); const char *additional_value = json_object_get_string(additional_v);