/* * * (C) 2013-24 - ntop.org * * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */ #include "ntop_includes.h" #ifndef HAVE_NEDGE #define VLAN_HASH_KEY "ntopng.vlan.%d.cache" /* **************************************************** */ /* IMPORTANT: keep it in sync with flow_fields_description part of * flow_utils.lua */ ZMQParserInterface::ZMQParserInterface(const char *endpoint, const char *custom_interface_type) : ParserInterface(endpoint, custom_interface_type) { if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__); zmq_initial_bytes = 0, zmq_initial_pkts = 0; zmq_remote_stats = zmq_remote_stats_shadow = NULL; memset(&last_zmq_remote_stats_update, 0, sizeof(last_zmq_remote_stats_update)); zmq_remote_initial_exported_flows = 0; remote_lifetime_timeout = remote_idle_timeout = 0; once = false, is_sampled_traffic = false; flow_max_idle = ntop->getPrefs()->get_pkt_ifaces_flow_max_idle(); #ifdef NTOPNG_PRO custom_app_maps = NULL; #endif polling_start_time = 0; updateFlowMaxIdle(); memset(&recvStats, 0, sizeof(recvStats)); memset(&recvStatsCheckpoint, 0, sizeof(recvStatsCheckpoint)); /* Populate defaults for @NTOPNG@ nProbe templates. No need to populate all the fields as nProbe will sent them periodically. This minimum set is required for backward compatibility. */ addMapping("IN_SRC_MAC", IN_SRC_MAC); addMapping("OUT_SRC_MAC", OUT_SRC_MAC); addMapping("IN_DST_MAC", IN_DST_MAC); addMapping("OUT_DST_MAC", OUT_DST_MAC); addMapping("SRC_VLAN", SRC_VLAN); addMapping("DST_VLAN", DST_VLAN); addMapping("DOT1Q_SRC_VLAN", DOT1Q_SRC_VLAN); addMapping("DOT1Q_DST_VLAN", DOT1Q_DST_VLAN); addMapping("INPUT_SNMP", INPUT_SNMP); addMapping("OUTPUT_SNMP", OUTPUT_SNMP); addMapping("IPV4_SRC_ADDR", IPV4_SRC_ADDR); addMapping("IPV4_DST_ADDR", IPV4_DST_ADDR); addMapping("SRC_TOS", SRC_TOS); addMapping("DST_TOS", DST_TOS); addMapping("L4_SRC_PORT", L4_SRC_PORT); addMapping("L4_DST_PORT", L4_DST_PORT); addMapping("IPV6_SRC_ADDR", IPV6_SRC_ADDR); addMapping("IPV6_DST_ADDR", IPV6_DST_ADDR); addMapping("IP_PROTOCOL_VERSION", IP_PROTOCOL_VERSION); addMapping("PROTOCOL", PROTOCOL); addMapping("L7_PROTO", L7_PROTO, NTOP_PEN); addMapping("L7_PROTO_NAME", L7_PROTO_NAME, NTOP_PEN); addMapping("L7_INFO", L7_INFO, NTOP_PEN); addMapping("L7_CONFIDENCE", L7_CONFIDENCE, NTOP_PEN); addMapping("L7_ERROR_CODE", L7_ERROR_CODE, NTOP_PEN); addMapping("IN_BYTES", IN_BYTES); addMapping("IN_PKTS", IN_PKTS); addMapping("OUT_BYTES", OUT_BYTES); addMapping("OUT_PKTS", OUT_PKTS); addMapping("FIRST_SWITCHED", FIRST_SWITCHED); addMapping("LAST_SWITCHED", LAST_SWITCHED); addMapping("EXPORTER_IPV4_ADDRESS", EXPORTER_IPV4_ADDRESS); addMapping("EXPORTER_IPV6_ADDRESS", EXPORTER_IPV6_ADDRESS); addMapping("TOTAL_FLOWS_EXP", TOTAL_FLOWS_EXP); addMapping("NPROBE_IPV4_ADDRESS", NPROBE_IPV4_ADDRESS, NTOP_PEN); addMapping("NPROBE_INSTANCE_NAME", NPROBE_INSTANCE_NAME, NTOP_PEN); addMapping("TCP_FLAGS", TCP_FLAGS); addMapping("INITIATOR_PKTS", INITIATOR_PKTS); addMapping("INITIATOR_OCTETS", INITIATOR_OCTETS); addMapping("RESPONDER_PKTS", RESPONDER_PKTS); addMapping("RESPONDER_OCTETS", RESPONDER_OCTETS); addMapping("SAMPLING_INTERVAL", SAMPLING_INTERVAL); addMapping("DIRECTION", DIRECTION); addMapping("POST_NAT_SRC_IPV4_ADDR", POST_NAT_SRC_IPV4_ADDR); addMapping("POST_NAT_DST_IPV4_ADDR", POST_NAT_DST_IPV4_ADDR); addMapping("POST_NAT_SRC_TRANSPORT_PORT", POST_NAT_SRC_TRANSPORT_PORT); addMapping("POST_NAT_DST_TRANSPORT_PORT", POST_NAT_DST_TRANSPORT_PORT); addMapping("OBSERVATION_POINT_ID", OBSERVATION_POINT_ID); addMapping("INGRESS_VRFID", INGRESS_VRFID); addMapping("IPV4_SRC_MASK", IPV4_SRC_MASK); addMapping("IPV4_DST_MASK", IPV4_DST_MASK); addMapping("IPV4_NEXT_HOP", IPV4_NEXT_HOP); addMapping("SRC_AS", SRC_AS); addMapping("DST_AS", DST_AS); addMapping("BGP_NEXT_ADJACENT_ASN", BGP_NEXT_ADJACENT_ASN); addMapping("BGP_PREV_ADJACENT_ASN", BGP_PREV_ADJACENT_ASN); addMapping("FLOW_END_REASON", FLOW_END_REASON); addMapping("OOORDER_IN_PKTS", OOORDER_IN_PKTS, NTOP_PEN); addMapping("OOORDER_OUT_PKTS", OOORDER_OUT_PKTS, NTOP_PEN); addMapping("RETRANSMITTED_IN_PKTS", RETRANSMITTED_IN_PKTS, NTOP_PEN); addMapping("RETRANSMITTED_OUT_PKTS", RETRANSMITTED_OUT_PKTS, NTOP_PEN); addMapping("DNS_QUERY", DNS_QUERY, NTOP_PEN); addMapping("DNS_QUERY_TYPE", DNS_QUERY_TYPE, NTOP_PEN); addMapping("DNS_RET_CODE", DNS_RET_CODE, NTOP_PEN); addMapping("HTTP_URL", HTTP_URL, NTOP_PEN); addMapping("HTTP_SITE", HTTP_SITE, NTOP_PEN); addMapping("HTTP_RET_CODE", HTTP_RET_CODE, NTOP_PEN); addMapping("HTTP_METHOD", HTTP_METHOD, NTOP_PEN); addMapping("HTTP_USER_AGENT", HTTP_USER_AGENT, NTOP_PEN); addMapping("TLS_SERVER_NAME", TLS_SERVER_NAME, NTOP_PEN); addMapping("TLS_CIPHER", TLS_CIPHER, NTOP_PEN); addMapping("SSL_UNSAFE_CIPHER", SSL_UNSAFE_CIPHER, NTOP_PEN); addMapping("JA3C_HASH", JA3C_HASH, NTOP_PEN); addMapping("JA3S_HASH", JA3S_HASH, NTOP_PEN); addMapping("JA4C_HASH", JA4C_HASH, NTOP_PEN); addMapping("BITTORRENT_HASH", BITTORRENT_HASH, NTOP_PEN); addMapping("SRC_FRAGMENTS", SRC_FRAGMENTS, NTOP_PEN); addMapping("DST_FRAGMENTS", DST_FRAGMENTS, NTOP_PEN); addMapping("CLIENT_NW_LATENCY_MS", CLIENT_NW_LATENCY_MS, NTOP_PEN); addMapping("SERVER_NW_LATENCY_MS", SERVER_NW_LATENCY_MS, NTOP_PEN); addMapping("L7_PROTO_RISK", L7_PROTO_RISK, NTOP_PEN); addMapping("L7_PROTO_RISK_NAME", L7_PROTO_RISK_NAME, NTOP_PEN ); addMapping("FLOW_VERDICT", FLOW_VERDICT, NTOP_PEN); addMapping("L7_RISK_INFO", L7_RISK_INFO, NTOP_PEN); addMapping("FLOW_SOURCE", FLOW_SOURCE, NTOP_PEN); addMapping("SMTP_MAIL_FROM", SMTP_MAIL_FROM, NTOP_PEN); addMapping("SMTP_RCPT_TO", SMTP_RCPT_TO, NTOP_PEN); addMapping("UNIQUE_SOURCE_ID", UNIQUE_SOURCE_ID, NTOP_PEN); /* eBPF / Process */ addMapping("SRC_PROC_PID", SRC_PROC_PID, NTOP_PEN); addMapping("SRC_PROC_NAME", SRC_PROC_NAME, NTOP_PEN); addMapping("SRC_PROC_UID", SRC_PROC_UID, NTOP_PEN); addMapping("SRC_PROC_USER_NAME", SRC_PROC_USER_NAME, NTOP_PEN); addMapping("SRC_FATHER_PROC_PID", SRC_FATHER_PROC_PID, NTOP_PEN); addMapping("SRC_FATHER_PROC_NAME", SRC_FATHER_PROC_NAME, NTOP_PEN); addMapping("SRC_FATHER_PROC_PKG_NAME", SRC_FATHER_PROC_PKG_NAME, NTOP_PEN); addMapping("SRC_FATHER_PROC_UID", SRC_FATHER_PROC_UID, NTOP_PEN); addMapping("SRC_FATHER_PROC_USER_NAME", SRC_FATHER_PROC_USER_NAME, NTOP_PEN); addMapping("SRC_PROC_ACTUAL_MEMORY", SRC_PROC_ACTUAL_MEMORY, NTOP_PEN); addMapping("SRC_PROC_PEAK_MEMORY", SRC_PROC_PEAK_MEMORY, NTOP_PEN); addMapping("SRC_PROC_AVERAGE_CPU_LOAD", SRC_PROC_AVERAGE_CPU_LOAD, NTOP_PEN); addMapping("SRC_PROC_NUM_PAGE_FAULTS", SRC_PROC_NUM_PAGE_FAULTS, NTOP_PEN); addMapping("SRC_PROC_PCTG_IOWAIT", SRC_PROC_PCTG_IOWAIT, NTOP_PEN); addMapping("SRC_PROC_PKG_NAME", SRC_PROC_PKG_NAME, NTOP_PEN); addMapping("SRC_PROC_CMDLINE", SRC_PROC_CMDLINE, NTOP_PEN); addMapping("SRC_PROC_CONTAINER_ID", SRC_PROC_CONTAINER_ID, NTOP_PEN); addMapping("DST_PROC_PID", DST_PROC_PID, NTOP_PEN); addMapping("DST_PROC_NAME", DST_PROC_NAME, NTOP_PEN); addMapping("DST_PROC_UID", DST_PROC_UID, NTOP_PEN); addMapping("DST_PROC_USER_NAME", DST_PROC_USER_NAME, NTOP_PEN); addMapping("DST_FATHER_PROC_PID", DST_FATHER_PROC_PID, NTOP_PEN); addMapping("DST_FATHER_PROC_NAME", DST_FATHER_PROC_NAME, NTOP_PEN); addMapping("DST_FATHER_PROC_PKG_NAME", DST_FATHER_PROC_PKG_NAME, NTOP_PEN); addMapping("DST_FATHER_PROC_UID", DST_FATHER_PROC_UID, NTOP_PEN); addMapping("DST_FATHER_PROC_USER_NAME", DST_FATHER_PROC_USER_NAME, NTOP_PEN); addMapping("DST_PROC_ACTUAL_MEMORY", DST_PROC_ACTUAL_MEMORY, NTOP_PEN); addMapping("DST_PROC_PEAK_MEMORY", DST_PROC_PEAK_MEMORY, NTOP_PEN); addMapping("DST_PROC_AVERAGE_CPU_LOAD", DST_PROC_AVERAGE_CPU_LOAD, NTOP_PEN); addMapping("DST_PROC_NUM_PAGE_FAULTS", DST_PROC_NUM_PAGE_FAULTS, NTOP_PEN); addMapping("DST_PROC_PCTG_IOWAIT", DST_PROC_PCTG_IOWAIT, NTOP_PEN); addMapping("DST_PROC_PKG_NAME", DST_PROC_PKG_NAME, NTOP_PEN); addMapping("DST_PROC_CMDLINE", DST_PROC_CMDLINE, NTOP_PEN); addMapping("DST_PROC_CONTAINER_ID", DST_PROC_CONTAINER_ID, NTOP_PEN); /* sFlow Counter Fields */ addCounterMapping("deviceIP", SFLOW_DEVICE_IP); addCounterMapping("samplesGenerated", SFLOW_SAMPLES_GENERATED); addCounterMapping("ifIndex", SFLOW_IF_INDEX); addCounterMapping("ifName", SFLOW_IF_NAME); addCounterMapping("ifType", SFLOW_IF_TYPE); addCounterMapping("ifSpeed", SFLOW_IF_SPEED); addCounterMapping("ifDirection", SFLOW_IF_DIRECTION); addCounterMapping("ifAdminStatus", SFLOW_IF_ADMIN_STATUS); addCounterMapping("ifOperStatus", SFLOW_IF_OPER_STATUS); addCounterMapping("ifInOctets", SFLOW_IF_IN_OCTETS); addCounterMapping("ifInPackets", SFLOW_IF_IN_PACKETS); addCounterMapping("ifInErrors", SFLOW_IF_IN_ERRORS); addCounterMapping("ifOutOctets", SFLOW_IF_OUT_OCTETS); addCounterMapping("ifOutPackets", SFLOW_IF_OUT_PACKETS); addCounterMapping("ifOutErrors", SFLOW_IF_OUT_ERRORS); addCounterMapping("ifPromiscuousMode", SFLOW_IF_PROMISCUOUS_MODE); if(ntop->getPrefs()->is_edr_mode()) loadVLANMappings(); } /* **************************************************** */ ZMQParserInterface::~ZMQParserInterface() { map::iterator it; if (zmq_remote_stats) free(zmq_remote_stats); if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow); #ifdef NTOPNG_PRO if (custom_app_maps) delete (custom_app_maps); #endif for (it = source_id_last_zmq_remote_stats.begin(); it != source_id_last_zmq_remote_stats.end(); ++it) delete (it->second); source_id_last_zmq_remote_stats.clear(); } /* **************************************************** */ /* these mappings are sent by nprobe via zmq at startup and collected via zmqparserinterface::parsetemplate() */ void ZMQParserInterface::addMapping(const char *sym, u_int32_t num, u_int32_t pen, const char *descr) { string label(sym); labels_map_t::iterator it; pen_value_t cur_pair = make_pair(pen, num); if ((it = labels_map.find(label)) == labels_map.end()) labels_map.insert(make_pair(label, cur_pair)); else it->second.first = pen, it->second.second = num; if (descr) { descriptions_map_t::iterator dit; if ((dit = descriptions_map.find(cur_pair)) == descriptions_map.end()) descriptions_map.insert(make_pair(cur_pair, descr)); } } /* **************************************************** */ bool ZMQParserInterface::getKeyId(char *sym, u_int32_t sym_len, u_int32_t *const pen, u_int32_t *const field) const { u_int32_t cur_pen, cur_field; string label(sym); labels_map_t::const_iterator it; bool is_num, is_dotted; *pen = UNKNOWN_PEN, *field = UNKNOWN_FLOW_ELEMENT; is_num = Utils::isNumber(sym, sym_len, &is_dotted); if (is_num && is_dotted) { if (sscanf(sym, "%u.%u", &cur_pen, &cur_field) != 2) return false; *pen = cur_pen, *field = cur_field; } else if (is_num) { cur_field = atoi(sym); *pen = 0, *field = cur_field; } else if ((it = labels_map.find(label)) != labels_map.end()) { *pen = it->second.first, *field = it->second.second; } else { return false; } return true; } /* **************************************************** */ void ZMQParserInterface::addCounterMapping(const char *sym, u_int32_t id) { string label(sym); counters_map_t::iterator it; if ((it = counters_map.find(label)) == counters_map.end()) counters_map.insert(make_pair(label, id)); } /* **************************************************** */ bool ZMQParserInterface::getCounterId(char *sym, u_int32_t sym_len, u_int32_t *id) const { string label(sym); counters_map_t::const_iterator it; bool is_num, is_dotted; is_num = Utils::isNumber(sym, sym_len, &is_dotted); if (is_num) { *id = atoi(sym); } else if ((it = counters_map.find(label)) != counters_map.end()) { *id = it->second; } else { return false; } return true; } /* **************************************************** */ const char *ZMQParserInterface::getKeyDescription(u_int32_t pen, u_int32_t field) const { descriptions_map_t::const_iterator it; if ((it = descriptions_map.find(make_pair(pen, field))) != descriptions_map.end()) return it->second.c_str(); return NULL; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { json_object *o; enum json_tokener_error jerr = json_tokener_success; nProbeStats zrs; /* Do not instantiate, automatically cleaned outside of the scope */ const u_int32_t max_timeout = 600, min_timeout = 60; if (polling_start_time == 0) polling_start_time = (u_int32_t)time(NULL); //#define TRACE_EXPORTERS #ifdef TRACE_EXPORTERS ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msg_id: %u] %s", msg_id, payload); #endif o = json_tokener_parse_verbose(payload, &jerr); if (o) { json_object *w, *z; zrs.source_id = source_id; if (json_object_object_get_ex(o, "bytes", &w)) zrs.remote_bytes = (u_int64_t)json_object_get_int64(w); if (json_object_object_get_ex(o, "packets", &w)) zrs.remote_pkts = (u_int64_t)json_object_get_int64(w); if (json_object_object_get_ex(o, "iface", &w)) { if (json_object_object_get_ex(w, "name", &z)) snprintf(zrs.remote_ifname, sizeof(zrs.remote_ifname), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "speed", &z)) zrs.remote_ifspeed = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "ip", &z)) snprintf(zrs.remote_ifaddress, sizeof(zrs.remote_ifaddress), "%s", json_object_get_string(z)); } if (json_object_object_get_ex(o, "mode", &w)) snprintf(zrs.mode, sizeof(zrs.mode), "%s", json_object_get_string(w)); if (json_object_object_get_ex(o, "probe", &w)) { if (json_object_object_get_ex(w, "public_ip", &z)) snprintf(zrs.remote_probe_public_address, sizeof(zrs.remote_probe_public_address), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "uuid", &z)) snprintf(zrs.uuid, sizeof(zrs.uuid), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "uuid_num", &z) /* uuid_num (old) == unique_source_id (new) */ || json_object_object_get_ex(w, "unique_source_id", &z)) { zrs.uuid_num = (u_int32_t)json_object_get_int64(z); } if (json_object_object_get_ex(w, "ip", &z)) snprintf(zrs.remote_probe_address, sizeof(zrs.remote_probe_address), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "version", &z)) snprintf(zrs.remote_probe_version, sizeof(zrs.remote_probe_version), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "osname", &z)) snprintf(zrs.remote_probe_os, sizeof(zrs.remote_probe_os), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "license", &z)) snprintf(zrs.remote_probe_license, sizeof(zrs.remote_probe_license), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "edition", &z)) snprintf(zrs.remote_probe_edition, sizeof(zrs.remote_probe_edition), "%s", json_object_get_string(z)); if (json_object_object_get_ex(w, "maintenance", &z)) snprintf(zrs.remote_probe_maintenance, sizeof(zrs.remote_probe_maintenance), "%s", json_object_get_string(z)); } if (json_object_object_get_ex(o, "time", &w)) { int32_t time_delta; zrs.local_time = (u_int32_t)time(NULL); zrs.remote_time = (u_int32_t)json_object_get_int64(w); /* nProbe remote time */ time_delta = (int32_t)zrs.local_time - zrs.remote_time; /* Skip the check for the first few seconds as we might receive old messages from the probe */ if ((msg_id != 0) /* Skip message with drops to avoid miscalculations */ && ((zrs.local_time - polling_start_time) > 5)) { if (abs(time_delta) >= 10) { ntop->getTrace()->traceEvent(TRACE_NORMAL, "Remote probe clock drift %u sec " "detected (local: %u remote: %u [%s])", abs(time_delta), zrs.local_time, zrs.remote_time, zrs.remote_probe_address); } } else zrs.remote_time = zrs.local_time; /* Avoid clock drift messages during the grace period */ } if (json_object_object_get_ex(o, "avg", &w)) { if (json_object_object_get_ex(w, "bps", &z)) zrs.avg_bps = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "pps", &z)) zrs.avg_pps = (u_int32_t)json_object_get_int64(z); } if (json_object_object_get_ex(o, "timeout", &w)) { if (json_object_object_get_ex(w, "lifetime", &z)) { zrs.remote_lifetime_timeout = (u_int32_t)json_object_get_int64(z); if (zrs.remote_lifetime_timeout > max_timeout) zrs.remote_lifetime_timeout = max_timeout; } if (json_object_object_get_ex(w, "idle", &z)) { zrs.remote_idle_timeout = (u_int32_t)json_object_get_int64(z); zrs.remote_idle_timeout *= 2; /* Double the idle timeout for NetFlow */ if (zrs.remote_idle_timeout > max_timeout) zrs.remote_idle_timeout = max_timeout; if (zrs.remote_idle_timeout < min_timeout) zrs.remote_idle_timeout = min_timeout; } if (json_object_object_get_ex(w, "collected_lifetime", &z)) zrs.remote_collected_lifetime_timeout = (u_int32_t)json_object_get_int64(z); } if (json_object_object_get_ex(o, "drops", &w)) { if (json_object_object_get_ex(w, "export_queue_full", &z)) zrs.export_queue_full = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "too_many_flows", &z)) zrs.too_many_flows = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "elk_flow_drops", &z)) zrs.elk_flow_drops = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "sflow_pkt_sample_drops", &z)) zrs.sflow_pkt_sample_drops = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "flow_collection_drops", &z)) zrs.flow_collection_drops = (u_int32_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "flow_collection_udp_socket_drops", &z)) zrs.flow_collection_udp_socket_drops = (u_int32_t)json_object_get_int64(z); } if (json_object_object_get_ex(o, "flow_collection", &w)) { if (json_object_object_get_ex(w, "nf_ipfix_flows", &z)) zrs.flow_collection.nf_ipfix_flows = (u_int64_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "collection_port", &z)) zrs.flow_collection.collection_port = (u_int64_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "sflow_samples", &z)) zrs.flow_collection.sflow_samples = (u_int64_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "exporters", &z)) { json_object_object_foreach(z, key, val) { //ntop->getTrace()->traceEvent(TRACE_NORMAL, "Exporter: %s", key); u_int32_t ip = ntohl(inet_addr(key)); ExporterStats exp_stats; json_object *x; memset(&exp_stats, 0, sizeof(exp_stats)); if (json_object_object_get_ex(val, "time_last_used", &x)) exp_stats.time_last_used = (u_int32_t)json_object_get_int64(x); if (json_object_object_get_ex(val, "num_sflow_flows", &x)) exp_stats.num_sflow_flows = (u_int32_t)json_object_get_int64(x); if (json_object_object_get_ex(val, "num_netflow_ipfix_flows", &x)) exp_stats.num_netflow_flows = (u_int32_t)json_object_get_int64(x); if (json_object_object_get_ex(val, "num_drops", &x)) exp_stats.num_drops = (u_int32_t)json_object_get_int64(x); if (json_object_object_get_ex(val, "unique_source_id", &x)) exp_stats.unique_source_id = (u_int32_t)json_object_get_int64(x); zrs.exportersStats[ip] = exp_stats; } } } if (json_object_object_get_ex(o, "zmq", &w)) { if (json_object_object_get_ex(w, "num_flow_exports", &z)) zrs.num_flow_exports = (u_int64_t)json_object_get_int64(z); if (json_object_object_get_ex(w, "num_exporters", &z)) zrs.num_exporters = (u_int8_t)json_object_get_int(z); } #ifdef ZMQ_EVENT_DEBUG ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event parsed " "[iface: {name: %s, speed: %u, ip: %s}]" "[probe: {public_ip: %s, ip: %s, version: %s, os: %s, license: %s, " "edition: %s, maintenance: %s}]" "[avg: {bps: %u, pps: %u}]" "[remote: {time: %u, bytes: %u, packets: %u, idle_timeout: %u, " "lifetime_timeout: %u," " collected_lifetime_timeout: %u }]" "[zmq: {num_exporters: %u, num_flow_exports: %u}]", zrs.remote_ifname, zrs.remote_ifspeed, zrs.remote_ifaddress, zrs.remote_probe_version, zrs.remote_probe_os, zrs.remote_probe_license, zrs.remote_probe_edition, zrs.remote_probe_maintenance, zrs.remote_probe_public_address, zrs.remote_probe_address, zrs.avg_bps, zrs.avg_pps, zrs.remote_time, (u_int32_t)zrs.remote_bytes, (u_int32_t)zrs.remote_pkts, zrs.remote_idle_timeout, zrs.remote_lifetime_timeout, zrs.remote_collected_lifetime_timeout, zrs.num_exporters, zrs.num_flow_exports); #endif remote_lifetime_timeout = zrs.remote_lifetime_timeout, remote_idle_timeout = zrs.remote_idle_timeout; /* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps); */ /* Process Flow */ setRemoteStats(&zrs); for (std::map::iterator it = flowHashing.begin(); it != flowHashing.end(); ++it) { ZMQParserInterface *z = (ZMQParserInterface *)it->second; z->setRemoteStats(&zrs); } /* Dispose memory */ json_object_put(o); } else { // if o != NULL if (!once) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid message received: " "your nProbe sender is outdated, data " "encrypted, invalid JSON, or oom?"); ntop->getTrace()->traceEvent( TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", json_tokener_error_desc(jerr), payload_size, payload); } once = true; if (o) json_object_put(o); return -1; } return 0; } /* **************************************************** */ bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow, u_int32_t field, ParsedValue *value) { IpAddress ip_aux; /* used to check empty IPs */ switch (field) { case IN_SRC_MAC: case OUT_SRC_MAC: /* Format 00:00:00:00:00:00 */ Utils::parseMac(flow->src_mac, value->string); break; case IN_DST_MAC: case OUT_DST_MAC: Utils::parseMac(flow->dst_mac, value->string); break; case SRC_TOS: flow->src_tos = value->int_num; break; case DST_TOS: flow->dst_tos = value->int_num; break; case IPV4_SRC_ADDR: case IPV6_SRC_ADDR: /* The following check prevents an empty ip address (e.g., ::) to to overwrite another valid ip address already set. This can happen for example when nProbe is configured (-T) to export both %IPV4_SRC_ADDR and the %IPV6_SRC_ADDR. In that cases nProbe can export a valid ipv4 and an empty ipv6. Without the check, the empty v6 address may overwrite the non empty v4. */ if (flow->src_ip.isEmpty()) { if (value->string) flow->src_ip.set((char *)value->string); else flow->src_ip.set(ntohl(value->int_num)); } else { ip_aux.set((char *)value->string); if (!ip_aux.isEmpty() && !ntop->getPrefs()->do_override_src_with_post_nat_src()) /* tried to overwrite a non-empty IP with another non-empty IP */ ntop->getTrace()->traceEvent( TRACE_WARNING, "Attempt to set source ip multiple times. " "Check exported fields"); } /* Pre-Post nat IPs are only supported for IPv4 */ if (flow->src_ip.isIPv4()) { flow->setPreNATSrcIp(flow->src_ip.get_ipv4()); } break; case IP_PROTOCOL_VERSION: flow->version = value->int_num; break; case IPV4_DST_ADDR: case IPV6_DST_ADDR: if (flow->dst_ip.isEmpty()) { if (value->string) flow->dst_ip.set((char *)value->string); else flow->dst_ip.set(ntohl(value->int_num)); } else { ip_aux.set((char *)value->string); if (!ip_aux.isEmpty() && !ntop->getPrefs()->do_override_dst_with_post_nat_dst()) ntop->getTrace()->traceEvent(TRACE_WARNING, "Attempt to set destination ip multiple times. " "Check exported fields"); } /* Pre-Post nat IPs are only supported for IPv4 */ if (flow->dst_ip.isIPv4()) { flow->setPreNATDstIp(flow->dst_ip.get_ipv4()); } break; case L4_SRC_PORT: if (!flow->src_port) { if (value->string) flow->src_port = atoi(value->string); else flow->src_port = ntohs((u_int32_t)value->int_num); flow->setPreNATSrcPort(flow->src_port); } break; case L4_DST_PORT: if (!flow->dst_port) { if (value->string) flow->dst_port = atoi(value->string); else flow->dst_port = ntohs((u_int32_t)value->int_num); flow->setPreNATDstPort(flow->dst_port); } break; case SRC_VLAN: case DST_VLAN: if((flow->vlan_id = value->int_num) > 4095) /* Sanity check */ flow->vlan_id = 0; break; case DOT1Q_SRC_VLAN: case DOT1Q_DST_VLAN: if (flow->vlan_id == 0) { /* as those fields are the outer vlans in q-in-q we set the vlan_id only if there is no inner vlan value set */ flow->vlan_id = value->int_num; } break; case PROTOCOL: if (value->string) flow->l4_proto = atoi(value->string); else flow->l4_proto = value->int_num; break; case TCP_FLAGS: flow->tcp.tcp_flags = value->int_num; break; case INITIATOR_PKTS: flow->absolute_packet_octet_counters = true; /* Don't break */ case IN_PKTS: if (value->string != NULL) flow->in_pkts = atol(value->string); else flow->in_pkts = value->int_num; break; case INITIATOR_OCTETS: flow->absolute_packet_octet_counters = true; /* Don't break */ case IN_BYTES: if (value->string != NULL) flow->in_bytes = atol(value->string); else flow->in_bytes = value->int_num; break; case RESPONDER_PKTS: flow->absolute_packet_octet_counters = true; /* Don't break */ case OUT_PKTS: if (value->string != NULL) flow->out_pkts = atol(value->string); else flow->out_pkts = value->int_num; break; case RESPONDER_OCTETS: flow->absolute_packet_octet_counters = true; /* Don't break */ case OUT_BYTES: if (value->string != NULL) flow->out_bytes = atol(value->string); else flow->out_bytes = value->int_num; break; case FIRST_SWITCHED: if (value->string != NULL) flow->first_switched = atoi(value->string); else flow->first_switched = value->int_num; break; case LAST_SWITCHED: if (value->string != NULL) flow->last_switched = atoi(value->string); else flow->last_switched = value->int_num; break; case SAMPLING_INTERVAL: #if 0 /* Ignore it as nProbe as already implemented upscale */ flow->pkt_sampling_rate = value->int_num; #endif break; case DIRECTION: if (value->string != NULL) flow->direction = atoi(value->string); else flow->direction = value->int_num; break; case EXPORTER_IPV4_ADDRESS: if (value->string != NULL) { /* Format: a.b.c.d, possibly overrides NPROBE_IPV4_ADDRESS */ u_int32_t ip = ntohl(inet_addr(value->string)); if (ip) { flow->exporter_device_ip = ip; if(ntop->getPrefs()->is_edr_mode()) { char buf[32], ipb[24]; std::unordered_map::iterator it = cloud_flow_exporters.find(ip); if(it == cloud_flow_exporters.end()) { cloud_flow_exporters[ip] = true; snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(ip, ipb, sizeof(ipb))); ntop->addLocalCloudAddress(buf); /* Re-evaluate IPVx_SRC_ADDR/IPVx_DST_ADDR */ flow->src_ip.checkIP(); flow->dst_ip.checkIP(); } } } } break; case EXPORTER_IPV6_ADDRESS: if (value->string != NULL && strlen(value->string) > 0) inet_pton(AF_INET6, value->string, &flow->exporter_device_ipv6); break; case FLOW_END_REASON: if (value->string) flow->setEndReason(value->string); break; case TOTAL_FLOWS_EXP: /* if(value->string != NULL) total_flows_exp = atol(value->string); else total_flows_exp = value->int_num; ntop->getTrace()->traceEvent(TRACE_INFO, "Total Exported Flows %u", total_flows_exp); */ break; case INPUT_SNMP: flow->inIndex = value->int_num; break; case OUTPUT_SNMP: flow->outIndex = value->int_num; break; case OBSERVATION_POINT_ID: flow->observationPointId = value->int_num; break; case POST_NAT_SRC_IPV4_ADDR: /* Alwais set src_ip_addr_post_nat, however switch the src_ip only if preference is set*/ if (value->string) { IpAddress tmp; tmp.set(value->string); if (!tmp.isEmpty()) { flow->setPostNATSrcIp(tmp.get_ipv4()); } if (ntop->getPrefs()->do_override_src_with_post_nat_src()) { if (!tmp.isEmpty()) { flow->src_ip.set((char *)value->string); } } } else if (value->int_num) { if (ntop->getPrefs()->do_override_src_with_post_nat_src()) { flow->src_ip.set(ntohl(value->int_num)); } flow->setPostNATSrcIp(ntohl(value->int_num)); } break; case POST_NAT_DST_IPV4_ADDR: /* Alwais set dst_ip_addr_post_nat, however switch the dst_ip only if preference is set*/ if (value->string) { IpAddress tmp; tmp.set(value->string); if (!tmp.isEmpty()) { flow->setPostNATDstIp(tmp.get_ipv4()); } if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) { if (!tmp.isEmpty()) { flow->dst_ip.set((char *)value->string); } } } else if (value->int_num) { if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) { flow->dst_ip.set(ntohl(value->int_num)); } flow->setPostNATDstIp(ntohl(value->int_num)); } break; case POST_NAT_SRC_TRANSPORT_PORT: if (ntop->getPrefs()->do_override_src_with_post_nat_src() && (value->int_num != 0)) flow->src_port = htons((u_int16_t)value->int_num); if (value->int_num != 0) flow->setPostNATSrcPort(htons((u_int16_t)value->int_num)); break; case POST_NAT_DST_TRANSPORT_PORT: if (ntop->getPrefs()->do_override_dst_with_post_nat_dst() && (value->int_num != 0)) flow->dst_port = htons((u_int16_t)value->int_num); if (value->int_num != 0) flow->setPostNATDstPort(htons((u_int16_t)value->int_num)); break; case INGRESS_VRFID: flow->vrfId = value->int_num; break; case IPV4_SRC_MASK: case IPV4_DST_MASK: if (value->int_num != 0) return false; break; case IPV4_NEXT_HOP: if (value->string && strcmp(value->string, "0.0.0.0")) return false; break; case SRC_AS: flow->src_as = value->int_num; break; case DST_AS: flow->dst_as = value->int_num; break; case BGP_NEXT_ADJACENT_ASN: flow->next_adjacent_as = value->int_num; break; case BGP_PREV_ADJACENT_ASN: flow->prev_adjacent_as = value->int_num; break; default: ntop->getTrace()->traceEvent(TRACE_INFO, "Skipping no-PEN flow fieldId %u", field); return false; } return true; } /* **************************************************** */ bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow, u_int32_t field, ParsedValue *value) { /* Check for backward compatibility to handle cases like field = 123 * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */ if (field < NTOP_BASE_ID) field += NTOP_BASE_ID; /* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[field %d][%s]", field, value->string ? value->string : ""); */ switch (field) { case L7_PROTO: if (value->string) { if (!strchr(value->string, '.')) { /* Old behaviour, only the app protocol */ flow->l7_proto.app_protocol = atoi(value->string); } else { char *proto_dot; flow->l7_proto.master_protocol = (u_int16_t)strtoll(value->string, &proto_dot, 10); flow->l7_proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10); } } else { flow->l7_proto.app_protocol = value->int_num; } #if 0 ntop->getTrace()->traceEvent(TRACE_NORMAL, "[value: %s][master: %u][app: %u]", value->string ? value->string : "(int)", flow->l7_proto.master_protocol, flow->l7_proto.app_protocol); #endif break; case NPROBE_INSTANCE_NAME: if(ntop->getPrefs()->is_edr_mode() && ntop->getPrefs()->addVLANCloudToExporters()) { u_int16_t vlan_id = findVLANMapping((char*)value->string); flow->vlan_id = vlan_id; } break; case L7_PROTO_NAME: break; case L7_INFO: if (value->string && value->string[0] && value->string[0] != '\n') flow->setL7Info(value->string); break; case L7_CONFIDENCE: flow->setConfidence((ndpi_confidence_t)((value->int_num < NDPI_CONFIDENCE_MAX) ? value->int_num : NDPI_CONFIDENCE_UNKNOWN)); break; case L7_ERROR_CODE: flow->setL7ErrorCode(value->int_num); break; case OOORDER_IN_PKTS: flow->tcp.ooo_in_pkts = value->int_num; break; case OOORDER_OUT_PKTS: flow->tcp.ooo_out_pkts = value->int_num; break; case RETRANSMITTED_IN_PKTS: flow->tcp.retr_in_pkts = value->int_num; break; case RETRANSMITTED_OUT_PKTS: flow->tcp.retr_out_pkts = value->int_num; break; /* TODO add lost in/out to nProbe and here */ case CLIENT_NW_LATENCY_MS: { float client_nw_latency; client_nw_latency = value->double_num; flow->tcp.clientNwLatency.tv_sec = client_nw_latency / 1e3; flow->tcp.clientNwLatency.tv_usec = 1e3 * (client_nw_latency - flow->tcp.clientNwLatency.tv_sec * 1e3); } break; case SERVER_NW_LATENCY_MS: { float server_nw_latency; server_nw_latency = value->double_num; flow->tcp.serverNwLatency.tv_sec = server_nw_latency / 1e3; flow->tcp.serverNwLatency.tv_usec = 1e3 * (server_nw_latency - flow->tcp.serverNwLatency.tv_sec * 1e3); } break; case CLIENT_TCP_FLAGS: flow->tcp.client_tcp_flags = value->int_num; flow->tcp.tcp_flags |= flow->tcp.client_tcp_flags; break; case SERVER_TCP_FLAGS: flow->tcp.server_tcp_flags = value->int_num; flow->tcp.tcp_flags |= flow->tcp.server_tcp_flags; break; case APPL_LATENCY_MS: flow->tcp.applLatencyMsec = value->double_num; break; case TCP_WIN_MAX_IN: flow->tcp.in_window = value->int_num; break; case TCP_WIN_MAX_OUT: flow->tcp.out_window = value->int_num; break; case DNS_QUERY: if (value->string && value->string[0] && value->string[0] != '\n') flow->setDNSQuery(value->string); break; case DNS_QUERY_TYPE: flow->setDNSQueryType(value->string ? atoi(value->string) : value->int_num); break; case DNS_RET_CODE: flow->setDNSRetCode(value->string ? atoi(value->string) : value->int_num); break; case HTTP_URL: if (value->string && value->string[0] && value->string[0] != '\n') flow->setHTTPurl(value->string); break; case HTTP_USER_AGENT: if (value->string && value->string[0] && value->string[0] != '\n') flow->setHTTPuserAgent(value->string); break; case HTTP_SITE: if (value->string && value->string[0] && value->string[0] != '\n') flow->setHTTPsite(value->string); break; case HTTP_RET_CODE: flow->setHTTPRetCode(value->string ? atoi(value->string) : value->int_num); break; case HTTP_METHOD: if (value->string && value->string[0] && value->string[0] != '\n') flow->setHTTPMethod(ndpi_http_str2method(value->string, strlen(value->string))); break; case TLS_SERVER_NAME: if (value->string && value->string[0] && value->string[0] != '\n') flow->setTLSserverName(value->string); break; case JA3C_HASH: if (value->string && value->string[0]) flow->setJA3cHash(value->string); break; case JA3S_HASH: if (value->string && value->string[0]) flow->setJA3sHash(value->string); break; case JA4C_HASH: if (value->string && value->string[0]) flow->setJA4cHash(value->string); break; case TLS_CIPHER: flow->setTLSCipher(value->int_num); break; case SSL_UNSAFE_CIPHER: flow->setTLSUnsafeCipher(value->int_num); break; case L7_PROTO_RISK: flow->setRisk((ndpi_risk)value->int_num); break; case L7_PROTO_RISK_NAME: flow->setRiskName(value->string); break; case FLOW_VERDICT: flow->setFlowVerdict(value->int_num); break; case L7_RISK_INFO: if (value->string && value->string[0]) flow->setRiskInfo(value->string); break; case FLOW_SOURCE: { FlowSource s; if((value->int_num < packet_to_flow) || (value->int_num > collected_sflow)) s = packet_to_flow; /* Default value */ else s = static_cast(value->int_num); flow->setFlowSource(s); } break; case BITTORRENT_HASH: if (value->string && value->string[0] && value->string[0] != '\n') flow->setBittorrentHash(value->string); break; case NPROBE_IPV4_ADDRESS: if (value->string) { flow->nprobe_ip = ntohl(inet_addr(value->string)); if(flow->exporter_device_ip == 0 && (flow->exporter_device_ip = ntohl(inet_addr(value->string)))) return false; } break; case UNIQUE_SOURCE_ID: flow->unique_source_id = value->int_num; break; case SRC_FRAGMENTS: flow->in_fragments = value->int_num; break; case DST_FRAGMENTS: flow->out_fragments = value->int_num; break; case SRC_PROC_PID: flow->src_process_info.pid = value->int_num; break; case SRC_FATHER_PROC_PID: flow->src_process_info.father_pid = value->int_num; break; case SRC_PROC_NAME: if (value->string && value->string[0]) { flow->setParsedProcessInfo(); flow->src_process_info.process_name = strdup(value->string); #if 0 ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SRC] %s (%u)", flow->src_process_info.process_name, ntohs(flow->src_port)); #endif } break; case SRC_FATHER_PROC_NAME: if (value->string && value->string[0]) { flow->setParsedProcessInfo(); flow->src_process_info.father_process_name = strdup(value->string); } break; case SRC_PROC_PKG_NAME: if (value->string && value->string[0]) flow->src_process_info.pkg_name = strdup(value->string); break; case SRC_FATHER_PROC_PKG_NAME: if (value->string && value->string[0]) flow->src_process_info.father_pkg_name = strdup(value->string); break; case SRC_PROC_CMDLINE: if (value->string && value->string[0]) flow->src_process_info.cmd_line = strdup(value->string); break; case SRC_PROC_UID: flow->src_process_info.uid = value->int_num; break; case SRC_FATHER_PROC_UID: flow->src_process_info.father_uid = value->int_num; break; case SRC_PROC_USER_NAME: if (value->string && value->string[0]) flow->src_process_info.uid_name = strdup(value->string); break; case SRC_FATHER_PROC_USER_NAME: if (value->string && value->string[0]) flow->src_process_info.father_uid_name = strdup(value->string); break; case SRC_PROC_CONTAINER_ID: if (value->string && value->string[0]) { flow->setParsedContainerInfo(); flow->src_container_info.id = strdup(value->string); } break; case DST_PROC_PID: flow->dst_process_info.pid = value->int_num; break; case DST_FATHER_PROC_PID: flow->dst_process_info.father_pid = value->int_num; break; case DST_PROC_NAME: if (value->string && value->string[0]) { flow->setParsedProcessInfo(); flow->dst_process_info.process_name = strdup(value->string); #if 0 ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DST] %s (%u)", flow->dst_process_info.process_name, ntohs(flow->dst_port)); #endif } break; case DST_FATHER_PROC_NAME: if (value->string && value->string[0]) { flow->setParsedProcessInfo(); flow->dst_process_info.father_process_name = strdup(value->string); } break; case DST_PROC_PKG_NAME: if (value->string && value->string[0]) flow->dst_process_info.pkg_name = strdup(value->string); break; case DST_FATHER_PROC_PKG_NAME: if (value->string && value->string[0]) flow->dst_process_info.father_pkg_name = strdup(value->string); break; case DST_PROC_CMDLINE: if (value->string && value->string[0]) flow->dst_process_info.cmd_line = strdup(value->string); break; case DST_PROC_UID: flow->dst_process_info.uid = value->int_num; break; case DST_FATHER_PROC_UID: flow->dst_process_info.father_uid = value->int_num; break; case DST_PROC_USER_NAME: if (value->string && value->string[0]) flow->dst_process_info.uid_name = strdup(value->string); break; case DST_FATHER_PROC_USER_NAME: if (value->string && value->string[0]) flow->dst_process_info.father_uid_name = strdup(value->string); break; case DST_PROC_CONTAINER_ID: if (value->string && value->string[0]) { flow->setParsedContainerInfo(); flow->dst_container_info.id = strdup(value->string); } break; case SMTP_RCPT_TO: if(value->string && value->string[0]) flow->setSMTPRcptTo(value->string); break; case SMTP_MAIL_FROM: if(value->string && value->string[0]) flow->setSMTPMailFrom(value->string); break; case DHCP_CLIENT_NAME: if(value->string && value->string[0]) flow->setDHCPClientName(value->string); break; case SIP_CALL_ID: if(value->string && value->string[0]) flow->setSIPCallId(value->string); break; default: return false; } return true; } /* **************************************************** */ bool ZMQParserInterface::matchPENZeroField(ParsedFlow *const flow, u_int32_t field, ParsedValue *value) { IpAddress ip_aux; /* used to check empty IPs */ switch (field) { case IN_SRC_MAC: case OUT_SRC_MAC: { u_int8_t mac[6]; Utils::parseMac(mac, value->string); return (memcmp(flow->src_mac, mac, sizeof(mac)) == 0); } case IN_DST_MAC: case OUT_DST_MAC: { u_int8_t mac[6]; Utils::parseMac(mac, value->string); return (memcmp(flow->dst_mac, mac, sizeof(mac)) == 0); } case SRC_TOS: if (value->string) return (flow->src_tos == atoi(value->string)); else return (flow->src_tos == value->int_num); case DST_TOS: if (value->string) return (flow->dst_tos == atoi(value->string)); else return (flow->dst_tos == value->int_num); case IPV4_SRC_ADDR: case IPV6_SRC_ADDR: { IpAddress ip; if (value->string) ip.set((char *)value->string); else ip.set(ntohl(value->int_num)); return (flow->src_ip.compare(&ip) == 0); } case IP_PROTOCOL_VERSION: if (value->string) return (flow->version == atoi(value->string)); else return (flow->version == value->int_num); case IPV4_DST_ADDR: case IPV6_DST_ADDR: { IpAddress ip; if (value->string) ip.set((char *)value->string); else ip.set(ntohl(value->int_num)); return (flow->dst_ip.compare(&ip) == 0); } case L4_SRC_PORT: if (value->string) return (flow->src_port == htons((u_int32_t)atoi(value->string))); else return (flow->src_port == htons((u_int32_t)value->int_num)); case L4_DST_PORT: if (value->string) return (flow->dst_port == htons((u_int32_t)atoi(value->string))); else return (flow->dst_port == htons((u_int32_t)value->int_num)); case SRC_VLAN: case DST_VLAN: case DOT1Q_SRC_VLAN: case DOT1Q_DST_VLAN: if (value->string) return (flow->vlan_id == atoi(value->string)); else return (flow->vlan_id == value->int_num); case PROTOCOL: if (value->string) return (flow->l4_proto == atoi(value->string)); else return (flow->l4_proto == value->int_num); case DIRECTION: if (value->string) return (flow->direction == atoi(value->string)); else return (flow->direction == value->int_num); case FLOW_END_REASON: if (value->string) { if (flow->getEndReason()) return (!strcmp(value->string, flow->getEndReason())); } case EXPORTER_IPV4_ADDRESS: return (flow->exporter_device_ip == ntohl(inet_addr(value->string))); case EXPORTER_IPV6_ADDRESS: if (value->string != NULL && strlen(value->string) > 0) { struct ndpi_in6_addr ipv6; if (inet_pton(AF_INET6, value->string, &ipv6) <= 0) return false; return (memcmp(&flow->exporter_device_ipv6, &ipv6, sizeof(flow->exporter_device_ipv6)) == 0); } case INPUT_SNMP: if (value->string) return (flow->inIndex == (u_int32_t)atoi(value->string)); else return (flow->inIndex == value->int_num); case OUTPUT_SNMP: if (value->string) return (flow->outIndex == (u_int32_t)atoi(value->string)); else return (flow->outIndex == value->int_num); case OBSERVATION_POINT_ID: if (value->string) return (flow->observationPointId == atoi(value->string)); else return (flow->observationPointId == value->int_num); case INGRESS_VRFID: if (value->string) return (flow->vrfId == (u_int)atoi(value->string)); else return (flow->vrfId == value->int_num); case SRC_AS: if (value->string) return (flow->src_as == (u_int32_t)atoi(value->string)); else return (flow->src_as == value->int_num); case DST_AS: if (value->string) return (flow->dst_as == (u_int32_t)atoi(value->string)); else return (flow->dst_as == value->int_num); case BGP_NEXT_ADJACENT_ASN: if (value->string) return (flow->next_adjacent_as == (u_int32_t)atoi(value->string)); else return (flow->next_adjacent_as == value->int_num); case BGP_PREV_ADJACENT_ASN: if (value->string) return (flow->prev_adjacent_as == (u_int32_t)atoi(value->string)); else return (flow->prev_adjacent_as == value->int_num); default: ntop->getTrace()->traceEvent(TRACE_INFO, "Skipping no-PEN flow fieldId %u", field); break; } return false; } /* **************************************************** */ bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow, u_int32_t field, ParsedValue *value) { /* Check for backward compatibility to handle cases like field = 123 * (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */ if (field < NTOP_BASE_ID) field += NTOP_BASE_ID; switch (field) { case L7_PROTO: { ndpi_proto l7_proto = {0}; if (value->string) { if (!strchr(value->string, '.')) { /* Old behaviour, only the app protocol */ l7_proto.app_protocol = atoi(value->string); } else { char *proto_dot; l7_proto.master_protocol = (u_int16_t)strtoll(value->string, &proto_dot, 10); l7_proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10); } } else { l7_proto.app_protocol = value->int_num; } return (flow->l7_proto.app_protocol == l7_proto.app_protocol); } case L7_PROTO_NAME: if (value->string) { /* This lookup should be optimized */ u_int16_t app_protocol = ndpi_get_proto_by_name(get_ndpi_struct(), value->string); return (flow->l7_proto.app_protocol == app_protocol); } else return false; case L7_INFO: if (value->string && flow->getL7Info()) return (strcmp(flow->getL7Info(), value->string) == 0); else return false; case L7_ERROR_CODE: return (flow->getL7ErrorCode() == value->int_num); case DNS_QUERY: if (value->string && flow->getDNSQuery()) return (strcmp(flow->getDNSQuery(), value->string) == 0); else return false; case DNS_QUERY_TYPE: if (value->string) return (flow->getDNSQueryType() == atoi(value->string)); else return (flow->getDNSQueryType() == value->int_num); case HTTP_URL: if (value->string && flow->getHTTPurl()) return (strcmp(flow->getHTTPurl(), value->string) == 0); else return false; case HTTP_USER_AGENT: if (value->string && flow->getHTTPuserAgent()) return (strcmp(flow->getHTTPuserAgent(), value->string) == 0); else return false; case HTTP_SITE: if (value->string && flow->getHTTPsite()) return (strcmp(flow->getHTTPsite(), value->string) == 0); else return false; case TLS_SERVER_NAME: if (value->string && flow->getTLSserverName()) return (strcmp(flow->getTLSserverName(), value->string) == 0); else return false; case NPROBE_IPV4_ADDRESS: return (flow->nprobe_ip == ntohl(inet_addr(value->string))); case UNIQUE_SOURCE_ID: return (flow->unique_source_id == value->int_num); case SMTP_MAIL_FROM: if (value->string && flow->getSMTPMailFrom()) return (strcmp(flow->getSMTPMailFrom(), value->string) == 0); else return false; case SMTP_RCPT_TO: if (value->string && flow->getSMTPRcptTo()) return (strcmp(flow->getSMTPRcptTo(), value->string) == 0); else return false; default: break; } ntop->getTrace()->traceEvent( TRACE_WARNING, "Field %u not supported by flow filtering", field); return false; } /* **************************************************** */ bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key, ParsedValue *value) { u_int32_t pen, key_id; bool res; if (!getKeyId((char *)key, strlen(key), &pen, &key_id)) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Field %s not supported by flow filtering", key); return false; } switch (pen) { case 0: /* No PEN */ res = matchPENZeroField(flow, key_id, value); break; case NTOP_PEN: res = matchPENNtopField(flow, key_id, value); break; case UNKNOWN_PEN: default: ntop->getTrace()->traceEvent( TRACE_WARNING, "Field %s not supported by flow filtering", key); res = false; break; } return res; } /* **************************************************** */ bool ZMQParserInterface::parseNProbeAgentField( ParsedFlow *const flow, const char *key, ParsedValue *value, json_object *const jvalue) { bool ret = false; json_object *obj; if (!strncmp(key, "timestamp", 9)) { u_int32_t seconds, nanoseconds /* nanoseconds not currently used */; if (sscanf(value->string, "%u.%u", &seconds, &nanoseconds) == 2) { flow->first_switched = flow->last_switched = seconds; ret = true; } } else if (!strncmp(key, "IPV4_LOCAL_ADDR", 15) || !strncmp(key, "IPV6_LOCAL_ADDR", 15)) { flow->src_ip.set(value->string); /* FIX: do not always assume Local == Client */ ret = true; } else if (!strncmp(key, "IPV4_REMOTE_ADDR", 16) || !strncmp(key, "IPV6_REMOTE_ADDR", 16)) { flow->dst_ip.set(value->string); /* FIX: do not always assume Remote == Server */ ret = true; } else if (!strncmp(key, "L4_LOCAL_PORT", 13)) { flow->src_port = htons((u_int32_t)value->int_num); ret = true; } else if (!strncmp(key, "L4_REMOTE_PORT", 14)) { flow->dst_port = htons((u_int32_t)value->int_num); ret = true; } else if (!strncmp(key, "INTERFACE_NAME", 7) && strlen(key) == 14) { flow->ifname = (char *)json_object_get_string(jvalue); ret = true; } else if (strlen(key) >= 14 && !strncmp(&key[strlen(key) - 14], "FATHER_PROCESS", 14)) { if (json_object_object_get_ex(jvalue, "PID", &obj)) flow->src_process_info.father_pid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "UID", &obj)) flow->src_process_info.father_uid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "UID_NAME", &obj)) flow->src_process_info.father_uid_name = strdup((char *)json_object_get_string(obj)); if (json_object_object_get_ex(jvalue, "GID", &obj)) flow->src_process_info.father_gid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj)) flow->src_process_info.actual_memory = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj)) flow->src_process_info.peak_memory = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj)) flow->src_process_info.father_process_name = strdup((char *)json_object_get_string(obj)); if (!flow->process_info_set) flow->process_info_set = true; ret = true; // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Father Process [pid: %u][uid: // %u][gid: %u][path: %s]", // flow->src_process_info.father_pid, // flow->src_process_info.father_uid, // flow->src_process_info.father_gid, // flow->src_process_info.father_process_name); } else if (strlen(key) >= 7 && !strncmp(&key[strlen(key) - 7], "PROCESS", 7)) { if (json_object_object_get_ex(jvalue, "PID", &obj)) flow->src_process_info.pid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "UID", &obj)) flow->src_process_info.uid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "UID_NAME", &obj)) flow->src_process_info.uid_name = strdup((char *)json_object_get_string(obj)); if (json_object_object_get_ex(jvalue, "GID", &obj)) flow->src_process_info.gid = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj)) flow->src_process_info.actual_memory = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj)) flow->src_process_info.peak_memory = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj)) flow->src_process_info.process_name = strdup((char *)json_object_get_string(obj)); if (!flow->process_info_set) flow->process_info_set = true; ret = true; // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Process [pid: %u][uid: // %u][gid: %u][size/peak vm: %u/%u][path: %s]", // flow->src_process_info.pid, // flow->src_process_info.uid, flow->src_process_info.gid, // flow->src_process_info.actual_memory, flow->src_process_info.peak_memory, // flow->src_process_info.process_name); } else if (strlen(key) >= 9 && !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) { if ((ret = parseContainerInfo(jvalue, &flow->src_container_info))) flow->container_info_set = true; } else if (!strncmp(key, "TCP", 3) && strlen(key) == 3) { if (json_object_object_get_ex(jvalue, "CONN_STATE", &obj)) flow->src_tcp_info.conn_state = Utils::tcpStateStr2State(json_object_get_string(obj)); if (json_object_object_get_ex(jvalue, "SEGS_IN", &obj)) flow->src_tcp_info.in_segs = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "SEGS_OUT", &obj)) flow->src_tcp_info.out_segs = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "UNACK_SEGMENTS", &obj)) flow->src_tcp_info.unacked_segs = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "RETRAN_PKTS", &obj)) flow->src_tcp_info.retx_pkts = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "LOST_PKTS", &obj)) flow->src_tcp_info.lost_pkts = (u_int32_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "RTT", &obj)) flow->src_tcp_info.rtt = json_object_get_double(obj); if (json_object_object_get_ex(jvalue, "RTT_VARIANCE", &obj)) flow->src_tcp_info.rtt_var = json_object_get_double(obj); if (json_object_object_get_ex(jvalue, "BYTES_RCVD", &obj)) flow->out_bytes = flow->src_tcp_info.rcvd_bytes = (u_int64_t)json_object_get_int64(obj); if (json_object_object_get_ex(jvalue, "BYTES_ACKED", &obj)) flow->in_bytes = flow->src_tcp_info.sent_bytes = (u_int64_t)json_object_get_int64(obj); if (!flow->tcp_info_set) flow->tcp_info_set = true; flow->absolute_packet_octet_counters = true; ret = true; // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TCP INFO [conn state: // %s][rcvd_bytes: %u][retx_pkts: %u][lost_pkts: %u]" // "[in_segs: %u][out_segs: %u][unacked_segs: %u]" // "[rtt: %f][rtt_var: %f]", // Utils::tcpState2StateStr(flow->src_tcp_info.conn_state), // flow->src_tcp_info.rcvd_bytes, // flow->src_tcp_info.retx_pkts, // flow->src_tcp_info.lost_pkts, // flow->src_tcp_info.in_segs, // flow->src_tcp_info.out_segs, // flow->src_tcp_info.unacked_segs, // flow->src_tcp_info.rtt, // flow->src_tcp_info.rtt_var); } else if ((!strncmp(key, "TCP_EVENT_TYPE", 14) && strlen(key) == 14) || (!strncmp(key, "UDP_EVENT_TYPE", 14) && strlen(key) == 14)) { flow->event_type = Utils::eBPFEventStr2Event(value->string); // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event Type [type: %s]", // Utils::eBPFEvent2EventStr(flow->event_type)); } return ret; } /* **************************************************** */ bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) { bool invalid_flow = false; bool rc = false; if (flow->vlan_id && ntop->getPrefs()->do_ignore_vlans()) flow->vlan_id = 0; /* Some flow exporters write in host ports the ICMP type/code. hence we need to make sure such fields are zeroed in order to avoid odd values in the web GUI */ if ((flow->l4_proto == IPPROTO_ICMP) || (flow->l4_proto == IPPROTO_ICMPV6)) flow->src_port = flow->dst_port = 0; /* Handle zero IPv4/IPv6 discrepancies */ if (!flow->hasParsedeBPF()) { if (flow->src_ip.getVersion() != flow->dst_ip.getVersion()) { if (flow->dst_ip.isIPv4() && flow->src_ip.isIPv6() && flow->src_ip.isEmpty()) flow->src_ip.setVersion(4); else if (flow->src_ip.isIPv4() && flow->dst_ip.isIPv6() && flow->dst_ip.isEmpty()) flow->dst_ip.setVersion(4); else if (flow->dst_ip.isIPv6() && flow->src_ip.isIPv4() && flow->src_ip.isEmpty()) flow->src_ip.setVersion(6); else if (flow->src_ip.isIPv6() && flow->dst_ip.isIPv4() && flow->dst_ip.isEmpty()) flow->dst_ip.setVersion(6); else { invalid_flow = true; ntop->getTrace()->traceEvent(TRACE_WARNING, "IP version mismatch: client:%d server:%d - flow will be ignored", flow->src_ip.getVersion(), flow->dst_ip.getVersion()); } } } if (!invalid_flow) { if (flow->hasParsedeBPF()) { /* Direction already reliable when the event is an accept or a connect. Heuristic is only used in the other cases. */ #if 0 /* Disabled as Flow::setParsedeBPFInfo() now supports directions */ if(flow->event_type != ebpf_event_type_tcp_accept && flow->event_type != ebpf_event_type_tcp_connect && ntohs(flow->src_port) < ntohs(flow->dst_port)) flow->swap(); #endif } else { /* NOTE: keep in sync with Flow::check_swap() */ if(flow->src_ip.isBroadMulticastAddress() || flow->dst_ip.isBroadMulticastAddress()) ; /* Ignore non-unicast IP addresses */ else { u_int16_t cli_port = ntohs(flow->src_port), srv_port = ntohs(flow->dst_port); bool do_swap = (cli_port < srv_port) ? true : false; if(do_swap) { if(flow->l4_proto == IPPROTO_TCP) { /* Don't swap if the client has sent a SYN. Unfortunately as TCP flags are in OR we cannot see if this is a initiator or a responder so better to be conservative rather than swapping wrongly See also https://github.com/ntop/ntopng/issues/1978 */ if((flow->tcp.client_tcp_flags & TH_SYN) == TH_SYN) do_swap = false; } else if(flow->l4_proto == IPPROTO_UDP) { #if 1 /* We disable UDP swap that might be wrong in particular for probing attempts */ do_swap = false; #else if((cli_port > 32768) && (srv_port > 32768)) do_swap = false; /* Don't do anything: this might be RTP or similar */ #endif } if(do_swap) flow->swap(); } } } if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1; /* Process Flow */ INTERFACE_PROFILING_SECTION_ENTER("processFlow", 30); rc = processFlow(flow); INTERFACE_PROFILING_SECTION_EXIT(30); } if (!rc) recvStats.num_dropped_flows++; return rc; } /* **************************************************** */ int ZMQParserInterface::parseSingleJSONFlow(json_object *o, u_int32_t source_id) { ParsedFlow flow; struct json_object_iterator it = json_object_iter_begin(o); struct json_object_iterator itEnd = json_object_iter_end(o); int ret = 0; /* Reset data */ flow.source_id = source_id; flow.direction = UNKNOWN_FLOW_DIRECTION; while (!json_object_iter_equal(&it, &itEnd)) { const char *key = json_object_iter_peek_name(&it); json_object *jvalue = json_object_iter_peek_value(&it); json_object *additional_o = NULL; enum json_type type = json_object_get_type(jvalue); ParsedValue value = {0}; bool add_to_additional_fields = false; switch (type) { case json_type_int: value.int_num = json_object_get_int64(jvalue); value.double_num = value.int_num; break; case json_type_double: value.double_num = json_object_get_double(jvalue); break; case json_type_boolean: value.boolean = json_object_get_boolean(jvalue); break; case json_type_string: value.string = json_object_get_string(jvalue); if (strcmp(key, "json") == 0) additional_o = json_tokener_parse(value.string); break; case json_type_object: /* This is handled by parseNProbeAgentField or addAdditionalField */ break; case json_type_array: /* This is handled by parseNProbeAgentField or addAdditionalField */ break; default: ntop->getTrace()->traceEvent( TRACE_WARNING, "JSON type %u not supported [key: %s]\n", type, key); break; } if ((key != NULL) && (jvalue != NULL)) { u_int32_t pen, key_id; bool res; getKeyId((char *)key, strlen(key), &pen, &key_id); switch (pen) { case 0: /* No PEN */ res = parsePENZeroField(&flow, key_id, &value); if (res) break; /* Dont'break when res == false for backward compatibility: attempt to * parse Zero-PEN as Ntop-PEN */ case NTOP_PEN: res = parsePENNtopField(&flow, key_id, &value); break; case UNKNOWN_PEN: default: res = false; break; } if (!res) { switch (key_id) { case 0: // json additional object added by Flow::serialize() if (additional_o != NULL) { struct json_object_iterator additional_it = json_object_iter_begin(additional_o); struct json_object_iterator additional_itEnd = json_object_iter_end(additional_o); while ( !json_object_iter_equal(&additional_it, &additional_itEnd)) { const char *additional_key = json_object_iter_peek_name(&additional_it); json_object *additional_v = json_object_iter_peek_value(&additional_it); const char *additional_value = json_object_get_string(additional_v); if ((additional_key != NULL) && (additional_value != NULL)) { // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional // field: %s", additional_key); flow.addAdditionalField( additional_key, json_object_new_string(additional_value)); } json_object_iter_next(&additional_it); } } break; case UNKNOWN_FLOW_ELEMENT: /* Attempt to parse it as an nProbe mini field */ if (parseNProbeAgentField(&flow, key, &value, jvalue)) { if (!flow.hasParsedeBPF()) { flow.setParsedeBPF(); flow.absolute_packet_octet_counters = true; } break; } default: #ifdef NTOPNG_PRO if (custom_app_maps || (custom_app_maps = new (std::nothrow) CustomAppMaps())) custom_app_maps->checkCustomApp(key, &value, &flow); #endif ntop->getTrace()->traceEvent( TRACE_DEBUG, "Not handled ZMQ field %u/%s", key_id, key); add_to_additional_fields = true; break; } /* switch */ } if (add_to_additional_fields) { // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s", // key); flow.addAdditionalField(key, json_object_get(jvalue)); } if (additional_o) json_object_put(additional_o); } /* if */ /* Move to the next element */ json_object_iter_next(&it); } // while json_object_iter_equal if (preprocessFlow(&flow)) ret = 1; return ret; } /* **************************************************** */ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, u_int32_t source_id) { ndpi_serialization_type kt, et; ParsedFlow flow; int ret = 0, rc; bool recordFound = false; /* Reset data */ flow.source_id = source_id; flow.direction = UNKNOWN_FLOW_DIRECTION; INTERFACE_PROFILING_SECTION_ENTER("Decode TLV", 9); // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Processing TLV record"); while ((et = ndpi_deserialize_get_item_type(deserializer, &kt)) != ndpi_serialization_unknown) { ParsedValue value = {0}; u_int32_t pen = 0, key_id = 0; u_int32_t v32 = 0; int32_t i32 = 0; float f = 0; u_int64_t v64 = 0; int64_t i64 = 0; ndpi_string key, vs; char key_str[64]; u_int8_t vbkp = 0; bool add_to_additional_fields = false; bool key_is_string = false, value_is_string = false; // ntop->getTrace()->traceEvent(TRACE_NORMAL, "TLV key type = %u value type = %u", kt, et); if (et == ndpi_serialization_end_of_record) { ndpi_deserialize_next(deserializer); goto end_of_record; } recordFound = true; switch (kt) { case ndpi_serialization_uint32: ndpi_deserialize_key_uint32(deserializer, &key_id); break; case ndpi_serialization_string: ndpi_deserialize_key_string(deserializer, &key); key_is_string = true; break; default: ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV key type %u: please update both ntopng and " "nprobe to the same version", kt); ret = -1; goto error; } switch (et) { case ndpi_serialization_uint32: ndpi_deserialize_value_uint32(deserializer, &v32); value.double_num = value.int_num = v32; break; case ndpi_serialization_uint64: ndpi_deserialize_value_uint64(deserializer, &v64); value.double_num = value.int_num = v64; break; case ndpi_serialization_int32: ndpi_deserialize_value_int32(deserializer, &i32); value.double_num = value.int_num = i32; break; case ndpi_serialization_int64: ndpi_deserialize_value_int64(deserializer, &i64); value.double_num = value.int_num = i64; break; case ndpi_serialization_float: ndpi_deserialize_value_float(deserializer, &f); value.double_num = f; break; case ndpi_serialization_string: ndpi_deserialize_value_string(deserializer, &vs); value.string = vs.str; value_is_string = true; break; default: ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n", et); ret = -1; goto error; } if (key_is_string) { u_int8_t kbkp = key.str[key.str_len]; key.str[key.str_len] = '\0'; snprintf(key_str, sizeof(key_str), "%s", key.str); getKeyId(key.str, key.str_len, &pen, &key_id); key.str[key.str_len] = kbkp; } if (value_is_string) { /* Adding '\0' to the end of the string, backing up the character */ vbkp = vs.str[vs.str_len]; vs.str[vs.str_len] = '\0'; } switch (pen) { case 0: /* No PEN */ rc = parsePENZeroField(&flow, key_id, &value); if (rc) break; /* Dont'break when rc == false for backward compatibility: attempt to * parse Zero-PEN as Ntop-PEN */ case NTOP_PEN: rc = parsePENNtopField(&flow, key_id, &value); break; case UNKNOWN_PEN: default: rc = false; break; } if (!key_is_string) { if (pen) snprintf(key_str, sizeof(key_str), "%u.%u", pen, key_id); else snprintf(key_str, sizeof(key_str), "%u", key_id); } #if 0 if(ntop->getTrace()->get_trace_level() >= TRACE_LEVEL_DEBUG) { switch(et) { case ndpi_serialization_uint32: case ndpi_serialization_uint64: case ndpi_serialization_int32: case ndpi_serialization_int64: ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %lld", key_str, key_id, pen, value.int_num); break; case ndpi_serialization_float: ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %.3f", key_str, key_id, pen, value.double_num); break; case ndpi_serialization_string: ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %s", key_str, key_id, pen, value.string); break; default: ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: -", key_str, key_id, pen); break; } } #endif if (!rc) { /* Not handled */ switch (key_id) { case 0: // json additional object added by Flow::serialize() if (strcmp(key_str, "json") == 0 && value_is_string) { json_object *additional_o = json_tokener_parse(vs.str); if (additional_o) { struct json_object_iterator additional_it = json_object_iter_begin(additional_o); struct json_object_iterator additional_itEnd = json_object_iter_end(additional_o); while ( !json_object_iter_equal(&additional_it, &additional_itEnd)) { const char *additional_key = json_object_iter_peek_name(&additional_it); json_object *additional_v = json_object_iter_peek_value(&additional_it); const char *additional_value = json_object_get_string(additional_v); if ((additional_key != NULL) && (additional_value != NULL)) { // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional // field: %s", additional_key); flow.addAdditionalField( additional_key, json_object_new_string(additional_value)); } json_object_iter_next(&additional_it); } json_object_put(additional_o); } } break; case UNKNOWN_FLOW_ELEMENT: #if 0 // TODO /* Attempt to parse it as an nProbe mini field */ if(parseNProbeAgentField(&flow, key_str, &value)) { if(!flow.hasParsedeBPF()) { flow->setParsedeBPF(); flow.absolute_packet_octet_counters = true; } break; } #endif default: #ifdef NTOPNG_PRO if (custom_app_maps || (custom_app_maps = new (std::nothrow) CustomAppMaps())) custom_app_maps->checkCustomApp(key_str, &value, &flow); #endif ntop->getTrace()->traceEvent( TRACE_DEBUG, "Not handled ZMQ field %u.%u", pen, key_id); add_to_additional_fields = true; break; } /* switch */ } if (add_to_additional_fields) { // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s // (Key-ID: %u PEN: %u)", key_str, key_id, pen); #if 1 flow.addAdditionalField(deserializer); #else flow.addAdditionalField( key_str, value_is_string ? json_object_new_string(value.string) : json_object_new_int64(value.int_num)); #endif } /* Restoring backed up character at the end of the string in place of '\0' */ if (value_is_string) vs.str[vs.str_len] = vbkp; /* Move to the next element */ ndpi_deserialize_next(deserializer); } /* while */ end_of_record: if (recordFound) { INTERFACE_PROFILING_SECTION_EXIT(9); /* Closes Decode TLV */ INTERFACE_PROFILING_SECTION_ENTER("processFlow", 10); if (preprocessFlow(&flow)) ret = 1; INTERFACE_PROFILING_SECTION_EXIT(10); } error: return ret; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id) { json_object *f = NULL; enum json_tokener_error jerr = json_tokener_success; #ifndef NTOPNG_PRO /* nProbe exports flows in TLV so this code will be removed in the future Leaving here for old nProbes that will be discontinued soon */ return(0); #endif #if 0 ntop->getTrace()->traceEvent(TRACE_NORMAL, "JSON: '%s' [len=%lu]", payload, strlen(payload)); printf("\n\n%s\n\n", payload); #endif if (payload) f = json_tokener_parse_verbose(payload, &jerr); if (f != NULL) { int n = 0, rc; if (json_object_get_type(f) == json_type_array) { /* Flow array */ int id, num_elements = json_object_array_length(f); for (id = 0; id < num_elements; id++) { rc = parseSingleJSONFlow(json_object_array_get_idx(f, id), source_id); if (rc > 0) n++; } } else { rc = parseSingleJSONFlow(f, source_id); if (rc > 0) n++; } json_object_put(f); return n; } else { // if o != NULL if (!once) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, data " "encrypted or invalid JSON?"); ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", json_tokener_error_desc(jerr), payload_size, payload); } once = true; return 0; } return 0; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseTLVFlow(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { ndpi_deserializer deserializer; ndpi_serialization_type kt; int n = 0, rc; rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload, payload_size); if (rc == -1) return 0; if (ndpi_deserialize_get_format(&deserializer) != ndpi_serialization_format_tlv) { if (!once) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Invalid TLV message: the TLV generated by your probe does not match " "the version supported " "by ntopng, please update both the probe and ntopng to the latest " "version available"); once = true; } return 0; } while (ndpi_deserialize_get_item_type(&deserializer, &kt) != ndpi_serialization_unknown) { rc = parseSingleTLVFlow(&deserializer, source_id); if (rc < 0) break; else if (rc > 0) n++; } return n; } /* **************************************************** */ bool ZMQParserInterface::parseContainerInfo( json_object *jo, ContainerInfo *const container_info) { json_object *obj, *obj2; /* Keep in sync with ZMQParserInterface::freeContainerInfo and * ParsedeBPF::~ParsedeBPF */ if (json_object_object_get_ex(jo, "ID", &obj)) container_info->id = strdup((char *)json_object_get_string(obj)); if (json_object_object_get_ex(jo, "K8S", &obj)) { if (json_object_object_get_ex(obj, "POD", &obj2)) container_info->data.k8s.pod = strdup((char *)json_object_get_string(obj2)); if (json_object_object_get_ex(obj, "NS", &obj2)) container_info->data.k8s.ns = strdup((char *)json_object_get_string(obj2)); container_info->data_type = container_info_data_type_k8s; } else if (json_object_object_get_ex(jo, "DOCKER", &obj)) { container_info->data_type = container_info_data_type_k8s; } else container_info->data_type = container_info_data_type_unknown; if (obj) { if (json_object_object_get_ex(obj, "NAME", &obj2)) container_info->name = strdup((char *)json_object_get_string(obj2)); } return true; } /* **************************************************** */ void ZMQParserInterface::freeContainerInfo( ContainerInfo *const container_info) { if (container_info->id) free(container_info->id); if (container_info->name) free(container_info->name); if (container_info->data.k8s.pod) free(container_info->data.k8s.pod); if (container_info->data.k8s.ns) free(container_info->data.k8s.ns); } /* **************************************************** */ u_int8_t ZMQParserInterface::parseJSONCounter(const char *payload, int payload_size) { json_object *o; enum json_tokener_error jerr = json_tokener_success; sFlowInterfaceStats stats; //ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); memset(&stats, 0, sizeof(stats)); o = json_tokener_parse_verbose(payload, &jerr); if (o != NULL) { struct json_object_iterator it = json_object_iter_begin(o); struct json_object_iterator itEnd = json_object_iter_end(o); /* Reset data */ memset(&stats, 0, sizeof(stats)); while (!json_object_iter_equal(&it, &itEnd)) { const char *key = json_object_iter_peek_name(&it); json_object *v = json_object_iter_peek_value(&it); const char *value = json_object_get_string(v); if ((key != NULL) && (value != NULL)) { if (!strcmp(key, "deviceIP")) stats.deviceIP = (u_int32_t)json_object_get_int64(v); // ntohl(inet_addr(value)); else if (!strcmp(key, "samplesGenerated")) stats.samplesGenerated = (u_int32_t)json_object_get_int64(v); else if (!strcmp(key, "ifIndex")) stats.ifIndex = (u_int32_t)json_object_get_int64(v); else if (!strcmp(key, "ifName")) stats.ifName = (char *)json_object_get_string(v); else if (!strcmp(key, "ifType")) stats.ifType = (u_int32_t)json_object_get_int64(v); else if (!strcmp(key, "ifSpeed")) stats.ifSpeed = (u_int32_t)json_object_get_int64(v); else if (!strcmp(key, "ifDirection")) stats.ifFullDuplex = (!strcmp(value, "Full")) ? true : false; else if (!strcmp(key, "ifAdminStatus")) stats.ifAdminStatus = (!strcmp(value, "Up")) ? true : false; else if (!strcmp(key, "ifOperStatus")) stats.ifOperStatus = (!strcmp(value, "Up")) ? true : false; else if (!strcmp(key, "ifInOctets")) stats.ifInOctets = json_object_get_int64(v); else if (!strcmp(key, "ifInPackets")) stats.ifInPackets = json_object_get_int64(v); else if (!strcmp(key, "ifInErrors")) stats.ifInErrors = json_object_get_int64(v); else if (!strcmp(key, "ifOutOctets")) stats.ifOutOctets = json_object_get_int64(v); else if (!strcmp(key, "ifOutPackets")) stats.ifOutPackets = json_object_get_int64(v); else if (!strcmp(key, "ifOutErrors")) stats.ifOutErrors = json_object_get_int64(v); else if (!strcmp(key, "ifPromiscuousMode")) stats.ifPromiscuousMode = (((u_int32_t)json_object_get_int64(v)) == 1) ? true : false; else if (strlen(key) >= 9 && !strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) { if (parseContainerInfo(v, &stats.container_info)) stats.container_info_set = true; } } /* if */ /* Move to the next element */ json_object_iter_next(&it); } // while json_object_iter_equal /* Process Flow */ processInterfaceStats(&stats); freeContainerInfo(&stats.container_info); json_object_put(o); } else { // if o != NULL if (!once) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, data " "encrypted or invalid JSON?"); ntop->getTrace()->traceEvent( TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", json_tokener_error_desc(jerr), payload_size, payload); } once = true; return -1; } return 0; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseTLVCounter(const char *payload, int payload_size) { sFlowInterfaceStats stats; ndpi_deserializer deserializer; ndpi_serialization_type kt, et; int rc, ret = -1; memset(&stats, 0, sizeof(stats)); rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload, payload_size); if (rc == -1) { goto error; } if (ndpi_deserialize_get_format(&deserializer) != ndpi_serialization_format_tlv) { if (!once) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Invalid TLV message: the TLV generated by your probe does not match " "the version supported " "by ntopng, please update both the probe and ntopng to the latest " "version available"); once = true; } goto error; } while ((et = ndpi_deserialize_get_item_type(&deserializer, &kt)) != ndpi_serialization_unknown) { /* Key */ bool key_is_string = false; ndpi_string key; u_int32_t key_id = 0; switch (kt) { case ndpi_serialization_uint32: ndpi_deserialize_key_uint32(&deserializer, &key_id); break; case ndpi_serialization_string: ndpi_deserialize_key_string(&deserializer, &key); key_is_string = true; break; default: ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV key type %u: " "please update both ntopng and the probe to the same version", kt); goto error; } if (key_is_string) { u_int8_t kbkp = key.str[key.str_len]; key.str[key.str_len] = '\0'; bool found = getCounterId(key.str, key.str_len, &key_id); if (!found) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %s\n", key.str); key.str[key.str_len] = kbkp; goto error; } key.str[key.str_len] = kbkp; } /* Value */ ParsedValue value = {0}; bool value_is_string = false; ndpi_string vs; u_int32_t v32 = 0; u_int64_t v64 = 0; switch (et) { case ndpi_serialization_uint32: ndpi_deserialize_value_uint32(&deserializer, &v32); value.int_num = v32; break; case ndpi_serialization_uint64: ndpi_deserialize_value_uint64(&deserializer, &v64); value.int_num = v64; break; case ndpi_serialization_string: ndpi_deserialize_value_string(&deserializer, &vs); value.string = vs.str; value_is_string = true; break; default: ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n", et); goto error; } u_int8_t vbkp; if (value_is_string) { /* Adding '\0' to the end of the string, backing up the character */ vbkp = vs.str[vs.str_len]; vs.str[vs.str_len] = '\0'; } switch (key_id) { case SFLOW_DEVICE_IP: if (value_is_string) stats.deviceIP = ntohl(inet_addr((char *)value.string)); else stats.deviceIP = (u_int32_t)value.int_num; break; case SFLOW_SAMPLES_GENERATED: stats.samplesGenerated = (u_int32_t)value.int_num; break; case SFLOW_IF_INDEX: stats.ifIndex = (u_int32_t)value.int_num; break; case SFLOW_IF_NAME: stats.ifName = strdup((char *)value.string); break; case SFLOW_IF_TYPE: stats.ifType = (u_int32_t)value.int_num; break; case SFLOW_IF_SPEED: stats.ifSpeed = (u_int32_t)value.int_num; break; case SFLOW_IF_DIRECTION: stats.ifFullDuplex = (!strcmp(value.string, "Full")) ? true : false; break; case SFLOW_IF_ADMIN_STATUS: stats.ifAdminStatus = (!strcmp(value.string, "Up")) ? true : false; break; case SFLOW_IF_OPER_STATUS: stats.ifOperStatus = (!strcmp(value.string, "Up")) ? true : false; break; case SFLOW_IF_IN_OCTETS: stats.ifInOctets = value.int_num; break; case SFLOW_IF_IN_PACKETS: stats.ifInPackets = value.int_num; break; case SFLOW_IF_IN_ERRORS: stats.ifInErrors = value.int_num; break; case SFLOW_IF_OUT_OCTETS: stats.ifOutOctets = value.int_num; break; case SFLOW_IF_OUT_PACKETS: stats.ifOutPackets = value.int_num; break; case SFLOW_IF_OUT_ERRORS: stats.ifOutErrors = value.int_num; break; case SFLOW_IF_PROMISCUOUS_MODE: stats.ifPromiscuousMode = (value.int_num == 1) ? true : false; break; default: ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %u\n", key_id); break; } /* Restoring backed up character at the end of the string in place of '\0' */ if (value_is_string) vs.str[vs.str_len] = vbkp; ndpi_deserialize_next(&deserializer); } /* Process Flow */ processInterfaceStats(&stats); ret = 0; if (stats.ifName) free(stats.ifName); error: return ret; } /* **************************************************** */ /* * Minimum set of fields expected by ntopng * NOTE: * - the following fields may or may not appear depending on the traffic: * "IPV4_SRC_ADDR", "IPV4_DST_ADDR", "IPV6_SRC_ADDR", "IPV6_DST_ADDR" * - some fields may not appear when nprobe runs with --collector-passthrough * "L7_PROTO" */ static std::string mandatory_template_fields[] = { "FIRST_SWITCHED", "LAST_SWITCHED", "L4_SRC_PORT", "L4_DST_PORT", "IP_PROTOCOL_VERSION", "PROTOCOL", "IN_BYTES", "IN_PKTS", "OUT_BYTES", "OUT_PKTS"}; u_int8_t ZMQParserInterface::parseTemplate(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { /* The format that is currently defined for templates is a JSON as follows: [{"12/Apr/2023 23:53:04 [util.cPEN":0,"field":1,"len":4,"format":"formatted_uint","name":"IN_BYTES","descr":"Incoming flow bytes (src->dst)"},{"PEN":0,"field":2,"len":4,"format":"formatted_uint","name":"IN_PKTS","descr":"Incoming flow packets (src->dst)"},] */ // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); ZMQ_Template zmq_template; json_object *obj, *w, *z; enum json_tokener_error jerr = json_tokener_success; memset(&zmq_template, 0, sizeof(zmq_template)); obj = json_tokener_parse_verbose(payload, &jerr); if (obj) { if (json_object_get_type(obj) == json_type_array) { int i, num_elements = json_object_array_length(obj); std::set mandatory_fields( mandatory_template_fields, mandatory_template_fields + sizeof(mandatory_template_fields) / sizeof(mandatory_template_fields[0])); for (i = 0; i < num_elements; i++) { memset(&zmq_template, 0, sizeof(zmq_template)); w = json_object_array_get_idx(obj, i); if (json_object_object_get_ex(w, "PEN", &z)) zmq_template.pen = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "field", &z)) zmq_template.field = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "format", &z)) zmq_template.format = json_object_get_string(z); if (json_object_object_get_ex(w, "name", &z)) zmq_template.name = json_object_get_string(z); if (json_object_object_get_ex(w, "descr", &z)) zmq_template.descr = json_object_get_string(z); if (zmq_template.name) { addMapping(zmq_template.name, zmq_template.field, zmq_template.pen, zmq_template.descr); mandatory_fields.erase(zmq_template.name); } // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Template [PEN: %u][field: // %u][format: %s][name: %s][descr: %s]", // zmq_template.pen, zmq_template.field, // zmq_template.format, zmq_template.name, zmq_template.descr) ; } if (mandatory_fields.size() > 0) { static bool template_warning_sent = 0; if (!template_warning_sent) { std::set::iterator it; ntop->getTrace()->traceEvent( TRACE_WARNING, "Some mandatory fields are missing in the ZMQ template:"); template_warning_sent = true; for (it = mandatory_fields.begin(); it != mandatory_fields.end(); ++it) { ntop->getTrace()->traceEvent(TRACE_WARNING, "\t%s", (*it).c_str()); } } } } json_object_put(obj); } else { // if o != NULL if (!once) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, data " "encrypted or invalid JSON?"); ntop->getTrace()->traceEvent( TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", json_tokener_error_desc(jerr), payload_size, payload); } once = true; return -1; } return 0; } /* **************************************************** */ void ZMQParserInterface::setFieldMap( const ZMQ_FieldMap *const field_map) const { char hname[CONST_MAX_LEN_REDIS_KEY], key[32]; snprintf(hname, sizeof(hname), CONST_FIELD_MAP_CACHE_KEY, get_id(), field_map->pen); snprintf(key, sizeof(key), "%u", field_map->field); ntop->getRedis()->hashSet(hname, key, field_map->map); } /* **************************************************** */ void ZMQParserInterface::setFieldValueMap( const ZMQ_FieldValueMap *const field_value_map) const { char hname[CONST_MAX_LEN_REDIS_KEY], key[32]; snprintf(hname, sizeof(hname), CONST_FIELD_VALUE_MAP_CACHE_KEY, get_id(), field_value_map->pen, field_value_map->field); snprintf(key, sizeof(key), "%u", field_value_map->value); ntop->getRedis()->hashSet(hname, key, field_value_map->map); } /* **************************************************** */ u_int8_t ZMQParserInterface::parseOptionFieldMap(json_object *const jo) { int arraylen = json_object_array_length(jo); json_object *w, *z; ZMQ_FieldMap field_map; memset(&field_map, 0, sizeof(field_map)); for (int i = 0; i < arraylen; i++) { w = json_object_array_get_idx(jo, i); if (json_object_object_get_ex(w, "PEN", &z)) field_map.pen = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "field", &z)) { field_map.field = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "map", &z)) { field_map.map = json_object_to_json_string(z); setFieldMap(&field_map); #ifdef CUSTOM_APP_DEBUG ntop->getTrace()->traceEvent( TRACE_NORMAL, "Option FieldMap [PEN: %u][field: %u][map: %s]", field_map.pen, field_map.field, field_map.map); #endif } } } return 0; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseOptionFieldValueMap( json_object *const w) { json_object *z; ZMQ_FieldValueMap field_value_map; memset(&field_value_map, 0, sizeof(field_value_map)); if (json_object_object_get_ex(w, "PEN", &z)) field_value_map.pen = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "field", &z)) { field_value_map.field = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "value", &z)) { field_value_map.value = (u_int32_t)json_object_get_int(z); if (json_object_object_get_ex(w, "map", &z)) { field_value_map.map = json_object_to_json_string(z); setFieldValueMap(&field_value_map); #ifdef CUSTOM_APP_DEBUG ntop->getTrace()->traceEvent( TRACE_NORMAL, "Option FieldValueMap [PEN: %u][field: %u][value: %u][map: %s]", field_value_map.pen, field_value_map.field, field_value_map.value, field_value_map.map); #endif } } } return 0; } /* **************************************************** */ u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { enum json_tokener_error jerr = json_tokener_success; json_object *o = json_tokener_parse_verbose(payload, &jerr); if (o != NULL) { json_object *z; ListeningPorts pinfo; u_int16_t vlan_id = 0; if(ntop->getPrefs()->is_edr_mode() && ntop->getPrefs()->addVLANCloudToExporters()) { if(json_object_object_get_ex(o, "instance-name", &z)) vlan_id = findVLANMapping(json_object_get_string(z)); } /* Parse port information */ if (json_object_object_get_ex(o, "listening-ports", &z)) { enum json_type o_type = json_object_get_type(z); if (o_type == json_type_object) { pinfo.parsePorts(z); /* Parse list of IP addresses */ if (json_object_object_get_ex(o, "ip-addresses", &z)) { enum json_type o_type = json_object_get_type(z); if (o_type == json_type_array) { for (u_int i = 0; i < (u_int)json_object_array_length(z); i++) { Host *h = NULL; json_object *host = json_object_array_get_idx(z, i); const char *ip_addr = json_object_get_string(host); // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Received listening // ports for %s", ip_addr); // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); /* Assign list of ports to the host */ h = getHost((char *)ip_addr, vlan_id, 0 /* observationPointId */, true /* inline */); if (h) { h->setListeningPorts(pinfo); } else { // ntop->getTrace()->traceEvent(TRACE_INFO, "Unable to find host %s", (char *)ip_addr); } } } } } } json_object_put(o); /* Free memory */ } return (0); } /* **************************************************** */ u_int8_t ZMQParserInterface::parseSNMPIntefaces(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { enum json_tokener_error jerr = json_tokener_success; json_object *f = json_tokener_parse_verbose(payload, &jerr); if (f != NULL) { struct json_object_iterator it = json_object_iter_begin(f); struct json_object_iterator itEnd = json_object_iter_end(f); while (!json_object_iter_equal(&it, &itEnd)) { const char *key = json_object_iter_peek_name(&it); json_object *value = json_object_iter_peek_value(&it); const char *rsp = json_object_to_json_string(value); char redis_key[64]; /* Saving 'short' interface names Use lua/modules/snmp_mappings.lua to access them from Lua // */ snprintf(redis_key, sizeof(redis_key), "cachedexporters.%s.ifnames", key); ntop->getRedis()->set(redis_key, rsp); ntop->getTrace()->traceEvent(TRACE_INFO, "[JSON] %s = %s", redis_key, rsp); /* Move to the next element */ json_object_iter_next(&it); } // while json_object_iter_equal json_object_put(f); /* Free memory */ return (0); } else return (-1); } /* **************************************************** */ u_int8_t ZMQParserInterface::parseOption(const char *payload, int payload_size, u_int32_t source_id, u_int32_t msg_id, void *data) { /* The format that is currently defined for options is a JSON as follows: char opt[] = " "{\"PEN\":8741, \"field\": 22, \"value\":1, \"map\":{\"name\":\"Skype\"}}," "{\"PEN\":8741, \"field\": 22, \"value\":3, \"map\":{\"name\":\"Winni\"}}"; parseOption(opt, strlen(opt), source_id, this); */ // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); json_object *o; enum json_tokener_error jerr = json_tokener_success; o = json_tokener_parse_verbose(payload, &jerr); if (o != NULL) { parseOptionFieldValueMap(o); json_object_put(o); } else { // if o != NULL if (!once) { ntop->getTrace()->traceEvent( TRACE_WARNING, "Invalid message received: your nProbe sender is outdated, " "data encrypted or invalid JSON?"); ntop->getTrace()->traceEvent( TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s", json_tokener_error_desc(jerr), payload_size, payload); } once = true; return -1; } return 0; } /* **************************************** */ u_int32_t ZMQParserInterface::periodicStatsUpdateFrequency() const { nProbeStats *zrs = zmq_remote_stats; u_int32_t update_freq; u_int32_t update_freq_min = ntop->getPrefs()->get_housekeeping_frequency(); if (zrs) update_freq = min_val(max_val(zrs->remote_lifetime_timeout, zrs->remote_idle_timeout), zrs->remote_collected_lifetime_timeout); else update_freq = update_freq_min; return max_val(update_freq, update_freq_min); } /* **************************************** */ void ZMQParserInterface::setRemoteStats(nProbeStats *zrs) { nProbeStats *last_zrs, *cumulative_zrs; map::iterator it; u_int32_t last_time = getTimeLastPktRcvdRemote(); struct timeval now; gettimeofday(&now, NULL); /* Store stats for the current exporter */ lock.wrlock(__FILE__, __LINE__); if (source_id_last_zmq_remote_stats.find(zrs->source_id) == source_id_last_zmq_remote_stats.end()) { last_zrs = new (std::nothrow) nProbeStats(); if (!last_zrs) { lock.unlock(__FILE__, __LINE__); return; } source_id_last_zmq_remote_stats[zrs->source_id] = last_zrs; } else last_zrs = source_id_last_zmq_remote_stats[zrs->source_id]; *last_zrs = *zrs; lock.unlock(__FILE__, __LINE__); if (Utils::msTimevalDiff(&now, &last_zmq_remote_stats_update) < 1000) { /* Do not update cumulative stats more frequently than once per second. * Note: this also avoids concurrent access (use after free) of shadow */ return; } /* Sum stats from all exporters */ cumulative_zrs = new (std::nothrow) nProbeStats(); if (!cumulative_zrs) return; lock.wrlock(__FILE__, __LINE__); /* Need write lock due to (*) */ for (it = source_id_last_zmq_remote_stats.begin(); it != source_id_last_zmq_remote_stats.end();) { nProbeStats *zrs_i = it->second; if (last_time > MAX_HASH_ENTRY_IDLE && zrs_i->remote_time < last_time - MAX_HASH_ENTRY_IDLE /* sec */) { /* Do not account inactive exporters, release them */ // ntop->getTrace()->traceEvent(TRACE_NORMAL, "Erased %s [local_time: // %u][last_time: %u]", zrs_i->remote_ifname, zrs_i->local_time, // last_time); delete (zrs_i); source_id_last_zmq_remote_stats.erase(it++); /* (*) */ } else { cumulative_zrs->num_exporters += zrs_i->num_exporters; cumulative_zrs->remote_bytes += zrs_i->remote_bytes; cumulative_zrs->remote_pkts += zrs_i->remote_pkts; cumulative_zrs->num_flow_exports += zrs_i->num_flow_exports; cumulative_zrs->remote_ifspeed = max_val(cumulative_zrs->remote_ifspeed, zrs_i->remote_ifspeed); cumulative_zrs->remote_time = max_val(cumulative_zrs->remote_time, zrs_i->remote_time); cumulative_zrs->local_time = max_val(cumulative_zrs->local_time, zrs_i->local_time); cumulative_zrs->avg_bps += zrs_i->avg_bps; cumulative_zrs->avg_pps += zrs_i->avg_pps; cumulative_zrs->remote_lifetime_timeout = max_val(cumulative_zrs->remote_lifetime_timeout, zrs_i->remote_lifetime_timeout); cumulative_zrs->remote_collected_lifetime_timeout = max_val(cumulative_zrs->remote_collected_lifetime_timeout, zrs_i->remote_collected_lifetime_timeout); cumulative_zrs->remote_idle_timeout = max_val( cumulative_zrs->remote_idle_timeout, zrs_i->remote_idle_timeout); cumulative_zrs->export_queue_full += zrs_i->export_queue_full; cumulative_zrs->too_many_flows += zrs_i->too_many_flows; cumulative_zrs->elk_flow_drops += zrs_i->elk_flow_drops; cumulative_zrs->sflow_pkt_sample_drops += zrs_i->sflow_pkt_sample_drops; cumulative_zrs->flow_collection_drops += zrs_i->flow_collection_drops; cumulative_zrs->flow_collection_udp_socket_drops += zrs_i->flow_collection_udp_socket_drops; cumulative_zrs->flow_collection.nf_ipfix_flows += zrs_i->flow_collection.nf_ipfix_flows; cumulative_zrs->flow_collection.sflow_samples += zrs_i->flow_collection.sflow_samples; ++it; } } lock.unlock(__FILE__, __LINE__); ifSpeed = cumulative_zrs->remote_ifspeed; last_pkt_rcvd = 0; last_pkt_rcvd_remote = cumulative_zrs->remote_time; last_remote_pps = cumulative_zrs->avg_pps; last_remote_bps = cumulative_zrs->avg_bps; if (cumulative_zrs->flow_collection.sflow_samples > 0) is_sampled_traffic = true; /* Recalculate the flow max idle according to the timeouts received */ flow_max_idle = min_val(cumulative_zrs->remote_lifetime_timeout, cumulative_zrs->remote_collected_lifetime_timeout) + 10 /* Safe margin */; updateFlowMaxIdle(); if ((zmq_initial_pkts == 0) /* ntopng has been restarted */ || (cumulative_zrs->remote_bytes < zmq_initial_bytes) /* nProbe has been restarted */ ) { /* Start over */ zmq_initial_bytes = cumulative_zrs->remote_bytes, zmq_initial_pkts = cumulative_zrs->remote_pkts; } if (zmq_remote_initial_exported_flows == 0 /* ntopng has been restarted */ || cumulative_zrs->num_flow_exports < zmq_remote_initial_exported_flows) /* nProbe has been restarted */ zmq_remote_initial_exported_flows = cumulative_zrs->num_flow_exports; /* Swap values */ if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow); zmq_remote_stats_shadow = zmq_remote_stats; zmq_remote_stats = cumulative_zrs; memcpy(&last_zmq_remote_stats_update, &now, sizeof(now)); /* * Don't override ethStats here, these stats are properly updated * inside NetworkInterface::processFlow for ZMQ interfaces. * Overriding values here may cause glitches and non-strictly-increasing counters * yielding negative rates. ethStats.setNumBytes(cumulative_zrs->remote_bytes), ethStats.setNumPackets(cumulative_zrs->remote_pkts); * */ } /* **************************************************** */ #ifdef NTOPNG_PRO bool ZMQParserInterface::getCustomAppDetails(u_int32_t remapped_app_id, u_int32_t *const pen, u_int32_t *const app_field, u_int32_t *const app_id) { return custom_app_maps && custom_app_maps->getCustomAppDetails(remapped_app_id, pen, app_field, app_id); } #endif /* **************************************************** */ void ZMQParserInterface::probeLuaStats(lua_State *vm) { std::map::iterator it; lua_newtable(vm); lock.rdlock(__FILE__, __LINE__); for (it = source_id_last_zmq_remote_stats.begin(); it != source_id_last_zmq_remote_stats.end(); ++it) { nProbeStats *zrs = it->second; lua_newtable(vm); // ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s (%u)", zrs->remote_ifname, // it->first); lua_push_str_table_entry(vm, "remote.name", zrs->remote_ifname); lua_push_str_table_entry(vm, "remote.if_addr", zrs->remote_ifaddress); lua_push_uint64_table_entry(vm, "remote.ifspeed", zrs->remote_ifspeed); lua_push_str_table_entry(vm, "probe.ip", zrs->remote_probe_address); lua_push_str_table_entry(vm, "probe.uuid", zrs->uuid); lua_push_uint64_table_entry(vm, "probe.uuid_num", zrs->uuid_num); lua_push_str_table_entry(vm, "probe.public_ip", zrs->remote_probe_public_address); lua_push_str_table_entry(vm, "probe.probe_version", zrs->remote_probe_version); lua_push_str_table_entry(vm, "probe.probe_os", zrs->remote_probe_os); lua_push_str_table_entry(vm, "probe.probe_license", zrs->remote_probe_license); lua_push_str_table_entry(vm, "probe.probe_edition", zrs->remote_probe_edition); lua_push_str_table_entry(vm, "probe.mode", zrs->mode); lua_push_str_table_entry(vm, "probe.probe_maintenance", zrs->remote_probe_maintenance); lua_push_uint64_table_entry(vm, "drops.export_queue_full", zrs->export_queue_full); lua_push_uint64_table_entry(vm, "drops.too_many_flows", zrs->too_many_flows); lua_push_uint64_table_entry(vm, "drops.elk_flow_drops", zrs->elk_flow_drops); lua_push_uint64_table_entry(vm, "drops.sflow_pkt_sample_drops", zrs->sflow_pkt_sample_drops); lua_push_uint64_table_entry(vm, "drops.flow_collection_drops", zrs->flow_collection_drops); lua_push_uint64_table_entry(vm, "drops.flow_collection_udp_socket_drops", zrs->flow_collection_udp_socket_drops); lua_push_uint64_table_entry(vm, "flow_collection.nf_ipfix_flows", zrs->flow_collection.nf_ipfix_flows); lua_push_uint64_table_entry(vm, "flow_collection.collection_port", zrs->flow_collection.collection_port); lua_push_uint64_table_entry(vm, "flow_collection.sflow_samples", zrs->flow_collection.sflow_samples); lua_push_uint64_table_entry(vm, "zmq.num_flow_exports", zrs->num_flow_exports); lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters); exporterLuaStats(vm, zrs); /* ************************************* */ if (zrs) { lua_push_uint64_table_entry(vm, "probe.remote_time", zrs->remote_time); /* remote time when last event has been sent */ lua_push_uint64_table_entry(vm, "probe.local_time", zrs->local_time); /* local time when last event has been received */ if(zrs->num_flow_exports < zmq_remote_initial_exported_flows) zmq_remote_initial_exported_flows = zrs->num_flow_exports; /* nProbe has been reset */ lua_push_uint64_table_entry(vm, "zmq.num_flow_exports", zrs->num_flow_exports - zmq_remote_initial_exported_flows); lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters); if (zrs->export_queue_full > 0) lua_push_uint64_table_entry(vm, "zmq.drops.export_queue_full", zrs->export_queue_full); if (zrs->flow_collection_drops) lua_push_uint64_table_entry(vm, "zmq.drops.flow_collection_drops", zrs->flow_collection_drops); if (zrs->flow_collection_udp_socket_drops) lua_push_uint64_table_entry(vm, "zmq.drops.flow_collection_udp_socket_drops", zrs->flow_collection_udp_socket_drops); lua_push_uint64_table_entry(vm, "timeout.lifetime", zrs->remote_lifetime_timeout); lua_push_uint64_table_entry(vm, "timeout.collected_lifetime", zrs->remote_collected_lifetime_timeout); lua_push_uint64_table_entry(vm, "timeout.idle", zrs->remote_idle_timeout); } lua_pushstring(vm, std::to_string(it->first).c_str() /* The source_id as string (can't use integers or Lua will think it's an array ) */); lua_insert(vm, -2); lua_settable(vm, -3); } lua_rawseti(vm, -2, get_id()); /* Here the Interface ID is added because in case of View Interfaces * this field could be the same for different interfaces */ lock.unlock(__FILE__, __LINE__); } /* **************************************************** */ void ZMQParserInterface::lua(lua_State *vm, bool fullStats) { NetworkInterface::lua(vm, fullStats); lua_newtable(vm); probeLuaStats(vm); lua_pushstring(vm, "probes"); lua_insert(vm, -2); lua_settable(vm, -3); } /* **************************************************** */ void ZMQParserInterface::exporterLuaStats(lua_State *vm, nProbeStats *zrs) { std::map::iterator it; lua_newtable(vm); for (it = zrs->exportersStats.begin(); it != zrs->exportersStats.end(); ++it) { lua_newtable(vm); char buf[32], ipb[24]; snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(it->first, ipb, sizeof(ipb))); lua_push_uint64_table_entry(vm, "time_last_used", it->second.time_last_used); lua_push_uint64_table_entry(vm, "num_netflow_flows", it->second.num_netflow_flows); lua_push_uint64_table_entry(vm, "num_sflow_flows", it->second.num_sflow_flows); lua_push_uint64_table_entry(vm, "num_drops", it->second.num_drops); lua_push_uint64_table_entry(vm, "unique_source_id", it->second.unique_source_id); lua_pushstring(vm, buf); lua_insert(vm, -2); lua_settable(vm, -3); } lua_pushstring(vm, "exporters"); lua_insert(vm, -2); lua_settable(vm, -3); } /* **************************************************** */ void ZMQParserInterface::loadVLANMappings() { char **keys, **values, buf[64]; int rc; Redis *redis = ntop->getRedis(); top_vlan_id = 0; snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id()); rc = redis->hashGetAll(buf, &keys, &values); if(rc > 0) { for (int i = 0; i < rc; i++) { if(values[i] && keys[i]) { u_int16_t v = atoi(values[i]); if(v > top_vlan_id) top_vlan_id = v; name_to_vlan[keys[i]] = v; } if (values[i]) free(values[i]); if (keys[i]) free(keys[i]); } free(keys); free(values); } } /* **************************************************** */ u_int16_t ZMQParserInterface::findVLANMapping(std::string name) { std::unordered_map::iterator it = name_to_vlan.find(name); if(it != name_to_vlan.end()) return(it->second); else if(top_vlan_id < 4095) { char value[16], buf[64]; u_int16_t id = ++top_vlan_id; Redis *redis = ntop->getRedis(); if(id >= 4096) return(0 /* too many vlans */); snprintf(value, sizeof(value), "%u", id); snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id()); redis->hashSet(buf, name.c_str(), value); /* Add VLAN mapping */ redis->hashSet(NTOPNG_VLAN_ALIASES, value, name.c_str()); name_to_vlan[name] = id; ntop->getTrace()->traceEvent(TRACE_NORMAL, "Added %s = %d", name.c_str(), id); return(id); } else return(0); } /* **************************************************** */ #endif