ntopng/src/ZMQParserInterface.cpp
2024-07-18 16:58:33 +02:00

3372 lines
113 KiB
C++

/*
*
* (C) 2013-24 - ntop.org
*
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*/
#include "ntop_includes.h"
#ifndef HAVE_NEDGE
#define VLAN_HASH_KEY "ntopng.vlan.%d.cache"
/* **************************************************** */
/* IMPORTANT: keep it in sync with flow_fields_description part of
* flow_utils.lua */
ZMQParserInterface::ZMQParserInterface(const char *endpoint,
const char *custom_interface_type)
: ParserInterface(endpoint, custom_interface_type) {
if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__);
zmq_initial_bytes = 0, zmq_initial_pkts = 0;
zmq_remote_stats = zmq_remote_stats_shadow = NULL;
memset(&last_zmq_remote_stats_update, 0,
sizeof(last_zmq_remote_stats_update));
zmq_remote_initial_exported_flows = 0;
remote_lifetime_timeout = remote_idle_timeout = 0;
once = false, is_sampled_traffic = false;
flow_max_idle = ntop->getPrefs()->get_pkt_ifaces_flow_max_idle();
#ifdef NTOPNG_PRO
custom_app_maps = NULL;
#endif
polling_start_time = 0;
updateFlowMaxIdle();
memset(&recvStats, 0, sizeof(recvStats));
memset(&recvStatsCheckpoint, 0, sizeof(recvStatsCheckpoint));
/*
Populate defaults for @NTOPNG@ nProbe templates. No need to populate
all the fields as nProbe will sent them periodically.
This minimum set is required for backward compatibility.
*/
addMapping("IN_SRC_MAC", IN_SRC_MAC);
addMapping("OUT_SRC_MAC", OUT_SRC_MAC);
addMapping("IN_DST_MAC", IN_DST_MAC);
addMapping("OUT_DST_MAC", OUT_DST_MAC);
addMapping("SRC_VLAN", SRC_VLAN);
addMapping("DST_VLAN", DST_VLAN);
addMapping("DOT1Q_SRC_VLAN", DOT1Q_SRC_VLAN);
addMapping("DOT1Q_DST_VLAN", DOT1Q_DST_VLAN);
addMapping("INPUT_SNMP", INPUT_SNMP);
addMapping("OUTPUT_SNMP", OUTPUT_SNMP);
addMapping("IPV4_SRC_ADDR", IPV4_SRC_ADDR);
addMapping("IPV4_DST_ADDR", IPV4_DST_ADDR);
addMapping("SRC_TOS", SRC_TOS);
addMapping("DST_TOS", DST_TOS);
addMapping("L4_SRC_PORT", L4_SRC_PORT);
addMapping("L4_DST_PORT", L4_DST_PORT);
addMapping("IPV6_SRC_ADDR", IPV6_SRC_ADDR);
addMapping("IPV6_DST_ADDR", IPV6_DST_ADDR);
addMapping("IP_PROTOCOL_VERSION", IP_PROTOCOL_VERSION);
addMapping("PROTOCOL", PROTOCOL);
addMapping("L7_PROTO", L7_PROTO, NTOP_PEN);
addMapping("L7_PROTO_NAME", L7_PROTO_NAME, NTOP_PEN);
addMapping("L7_INFO", L7_INFO, NTOP_PEN);
addMapping("L7_CONFIDENCE", L7_CONFIDENCE, NTOP_PEN);
addMapping("L7_ERROR_CODE", L7_ERROR_CODE, NTOP_PEN);
addMapping("IN_BYTES", IN_BYTES);
addMapping("IN_PKTS", IN_PKTS);
addMapping("OUT_BYTES", OUT_BYTES);
addMapping("OUT_PKTS", OUT_PKTS);
addMapping("FIRST_SWITCHED", FIRST_SWITCHED);
addMapping("LAST_SWITCHED", LAST_SWITCHED);
addMapping("EXPORTER_IPV4_ADDRESS", EXPORTER_IPV4_ADDRESS);
addMapping("EXPORTER_IPV6_ADDRESS", EXPORTER_IPV6_ADDRESS);
addMapping("TOTAL_FLOWS_EXP", TOTAL_FLOWS_EXP);
addMapping("NPROBE_IPV4_ADDRESS", NPROBE_IPV4_ADDRESS, NTOP_PEN);
addMapping("NPROBE_INSTANCE_NAME", NPROBE_INSTANCE_NAME, NTOP_PEN);
addMapping("TCP_FLAGS", TCP_FLAGS);
addMapping("INITIATOR_PKTS", INITIATOR_PKTS);
addMapping("INITIATOR_OCTETS", INITIATOR_OCTETS);
addMapping("RESPONDER_PKTS", RESPONDER_PKTS);
addMapping("RESPONDER_OCTETS", RESPONDER_OCTETS);
addMapping("SAMPLING_INTERVAL", SAMPLING_INTERVAL);
addMapping("DIRECTION", DIRECTION);
addMapping("POST_NAT_SRC_IPV4_ADDR", POST_NAT_SRC_IPV4_ADDR);
addMapping("POST_NAT_DST_IPV4_ADDR", POST_NAT_DST_IPV4_ADDR);
addMapping("POST_NAT_SRC_TRANSPORT_PORT", POST_NAT_SRC_TRANSPORT_PORT);
addMapping("POST_NAT_DST_TRANSPORT_PORT", POST_NAT_DST_TRANSPORT_PORT);
addMapping("OBSERVATION_POINT_ID", OBSERVATION_POINT_ID);
addMapping("INGRESS_VRFID", INGRESS_VRFID);
addMapping("IPV4_SRC_MASK", IPV4_SRC_MASK);
addMapping("IPV4_DST_MASK", IPV4_DST_MASK);
addMapping("IPV4_NEXT_HOP", IPV4_NEXT_HOP);
addMapping("SRC_AS", SRC_AS);
addMapping("DST_AS", DST_AS);
addMapping("BGP_NEXT_ADJACENT_ASN", BGP_NEXT_ADJACENT_ASN);
addMapping("BGP_PREV_ADJACENT_ASN", BGP_PREV_ADJACENT_ASN);
addMapping("FLOW_END_REASON", FLOW_END_REASON);
addMapping("OOORDER_IN_PKTS", OOORDER_IN_PKTS, NTOP_PEN);
addMapping("OOORDER_OUT_PKTS", OOORDER_OUT_PKTS, NTOP_PEN);
addMapping("RETRANSMITTED_IN_PKTS", RETRANSMITTED_IN_PKTS, NTOP_PEN);
addMapping("RETRANSMITTED_OUT_PKTS", RETRANSMITTED_OUT_PKTS, NTOP_PEN);
addMapping("DNS_QUERY", DNS_QUERY, NTOP_PEN);
addMapping("DNS_QUERY_TYPE", DNS_QUERY_TYPE, NTOP_PEN);
addMapping("DNS_RET_CODE", DNS_RET_CODE, NTOP_PEN);
addMapping("HTTP_URL", HTTP_URL, NTOP_PEN);
addMapping("HTTP_SITE", HTTP_SITE, NTOP_PEN);
addMapping("HTTP_RET_CODE", HTTP_RET_CODE, NTOP_PEN);
addMapping("HTTP_METHOD", HTTP_METHOD, NTOP_PEN);
addMapping("HTTP_USER_AGENT", HTTP_USER_AGENT, NTOP_PEN);
addMapping("TLS_SERVER_NAME", TLS_SERVER_NAME, NTOP_PEN);
addMapping("TLS_CIPHER", TLS_CIPHER, NTOP_PEN);
addMapping("SSL_UNSAFE_CIPHER", SSL_UNSAFE_CIPHER, NTOP_PEN);
addMapping("JA3C_HASH", JA3C_HASH, NTOP_PEN);
addMapping("JA3S_HASH", JA3S_HASH, NTOP_PEN);
addMapping("JA4C_HASH", JA4C_HASH, NTOP_PEN);
addMapping("BITTORRENT_HASH", BITTORRENT_HASH, NTOP_PEN);
addMapping("SRC_FRAGMENTS", SRC_FRAGMENTS, NTOP_PEN);
addMapping("DST_FRAGMENTS", DST_FRAGMENTS, NTOP_PEN);
addMapping("CLIENT_NW_LATENCY_MS", CLIENT_NW_LATENCY_MS, NTOP_PEN);
addMapping("SERVER_NW_LATENCY_MS", SERVER_NW_LATENCY_MS, NTOP_PEN);
addMapping("L7_PROTO_RISK", L7_PROTO_RISK, NTOP_PEN);
addMapping("L7_PROTO_RISK_NAME", L7_PROTO_RISK_NAME, NTOP_PEN );
addMapping("FLOW_VERDICT", FLOW_VERDICT, NTOP_PEN);
addMapping("L7_RISK_INFO", L7_RISK_INFO, NTOP_PEN);
addMapping("FLOW_SOURCE", FLOW_SOURCE, NTOP_PEN);
addMapping("SMTP_MAIL_FROM", SMTP_MAIL_FROM, NTOP_PEN);
addMapping("SMTP_RCPT_TO", SMTP_RCPT_TO, NTOP_PEN);
addMapping("UNIQUE_SOURCE_ID", UNIQUE_SOURCE_ID, NTOP_PEN);
/* eBPF / Process */
addMapping("SRC_PROC_PID", SRC_PROC_PID, NTOP_PEN);
addMapping("SRC_PROC_NAME", SRC_PROC_NAME, NTOP_PEN);
addMapping("SRC_PROC_UID", SRC_PROC_UID, NTOP_PEN);
addMapping("SRC_PROC_USER_NAME", SRC_PROC_USER_NAME, NTOP_PEN);
addMapping("SRC_FATHER_PROC_PID", SRC_FATHER_PROC_PID, NTOP_PEN);
addMapping("SRC_FATHER_PROC_NAME", SRC_FATHER_PROC_NAME, NTOP_PEN);
addMapping("SRC_FATHER_PROC_PKG_NAME", SRC_FATHER_PROC_PKG_NAME, NTOP_PEN);
addMapping("SRC_FATHER_PROC_UID", SRC_FATHER_PROC_UID, NTOP_PEN);
addMapping("SRC_FATHER_PROC_USER_NAME", SRC_FATHER_PROC_USER_NAME, NTOP_PEN);
addMapping("SRC_PROC_ACTUAL_MEMORY", SRC_PROC_ACTUAL_MEMORY, NTOP_PEN);
addMapping("SRC_PROC_PEAK_MEMORY", SRC_PROC_PEAK_MEMORY, NTOP_PEN);
addMapping("SRC_PROC_AVERAGE_CPU_LOAD", SRC_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
addMapping("SRC_PROC_NUM_PAGE_FAULTS", SRC_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
addMapping("SRC_PROC_PCTG_IOWAIT", SRC_PROC_PCTG_IOWAIT, NTOP_PEN);
addMapping("SRC_PROC_PKG_NAME", SRC_PROC_PKG_NAME, NTOP_PEN);
addMapping("SRC_PROC_CMDLINE", SRC_PROC_CMDLINE, NTOP_PEN);
addMapping("SRC_PROC_CONTAINER_ID", SRC_PROC_CONTAINER_ID, NTOP_PEN);
addMapping("DST_PROC_PID", DST_PROC_PID, NTOP_PEN);
addMapping("DST_PROC_NAME", DST_PROC_NAME, NTOP_PEN);
addMapping("DST_PROC_UID", DST_PROC_UID, NTOP_PEN);
addMapping("DST_PROC_USER_NAME", DST_PROC_USER_NAME, NTOP_PEN);
addMapping("DST_FATHER_PROC_PID", DST_FATHER_PROC_PID, NTOP_PEN);
addMapping("DST_FATHER_PROC_NAME", DST_FATHER_PROC_NAME, NTOP_PEN);
addMapping("DST_FATHER_PROC_PKG_NAME", DST_FATHER_PROC_PKG_NAME, NTOP_PEN);
addMapping("DST_FATHER_PROC_UID", DST_FATHER_PROC_UID, NTOP_PEN);
addMapping("DST_FATHER_PROC_USER_NAME", DST_FATHER_PROC_USER_NAME, NTOP_PEN);
addMapping("DST_PROC_ACTUAL_MEMORY", DST_PROC_ACTUAL_MEMORY, NTOP_PEN);
addMapping("DST_PROC_PEAK_MEMORY", DST_PROC_PEAK_MEMORY, NTOP_PEN);
addMapping("DST_PROC_AVERAGE_CPU_LOAD", DST_PROC_AVERAGE_CPU_LOAD, NTOP_PEN);
addMapping("DST_PROC_NUM_PAGE_FAULTS", DST_PROC_NUM_PAGE_FAULTS, NTOP_PEN);
addMapping("DST_PROC_PCTG_IOWAIT", DST_PROC_PCTG_IOWAIT, NTOP_PEN);
addMapping("DST_PROC_PKG_NAME", DST_PROC_PKG_NAME, NTOP_PEN);
addMapping("DST_PROC_CMDLINE", DST_PROC_CMDLINE, NTOP_PEN);
addMapping("DST_PROC_CONTAINER_ID", DST_PROC_CONTAINER_ID, NTOP_PEN);
/* sFlow Counter Fields */
addCounterMapping("deviceIP", SFLOW_DEVICE_IP);
addCounterMapping("samplesGenerated", SFLOW_SAMPLES_GENERATED);
addCounterMapping("ifIndex", SFLOW_IF_INDEX);
addCounterMapping("ifName", SFLOW_IF_NAME);
addCounterMapping("ifType", SFLOW_IF_TYPE);
addCounterMapping("ifSpeed", SFLOW_IF_SPEED);
addCounterMapping("ifDirection", SFLOW_IF_DIRECTION);
addCounterMapping("ifAdminStatus", SFLOW_IF_ADMIN_STATUS);
addCounterMapping("ifOperStatus", SFLOW_IF_OPER_STATUS);
addCounterMapping("ifInOctets", SFLOW_IF_IN_OCTETS);
addCounterMapping("ifInPackets", SFLOW_IF_IN_PACKETS);
addCounterMapping("ifInErrors", SFLOW_IF_IN_ERRORS);
addCounterMapping("ifOutOctets", SFLOW_IF_OUT_OCTETS);
addCounterMapping("ifOutPackets", SFLOW_IF_OUT_PACKETS);
addCounterMapping("ifOutErrors", SFLOW_IF_OUT_ERRORS);
addCounterMapping("ifPromiscuousMode", SFLOW_IF_PROMISCUOUS_MODE);
if(ntop->getPrefs()->is_edr_mode())
loadVLANMappings();
}
/* **************************************************** */
ZMQParserInterface::~ZMQParserInterface() {
map<u_int32_t, nProbeStats *>::iterator it;
if (zmq_remote_stats) free(zmq_remote_stats);
if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
#ifdef NTOPNG_PRO
if (custom_app_maps) delete (custom_app_maps);
#endif
for (it = source_id_last_zmq_remote_stats.begin();
it != source_id_last_zmq_remote_stats.end(); ++it)
delete (it->second);
source_id_last_zmq_remote_stats.clear();
}
/* **************************************************** */
/*
these mappings are sent by nprobe via zmq at startup and collected via
zmqparserinterface::parsetemplate()
*/
void ZMQParserInterface::addMapping(const char *sym, u_int32_t num,
u_int32_t pen, const char *descr) {
string label(sym);
labels_map_t::iterator it;
pen_value_t cur_pair = make_pair(pen, num);
if ((it = labels_map.find(label)) == labels_map.end())
labels_map.insert(make_pair(label, cur_pair));
else
it->second.first = pen, it->second.second = num;
if (descr) {
descriptions_map_t::iterator dit;
if ((dit = descriptions_map.find(cur_pair)) == descriptions_map.end())
descriptions_map.insert(make_pair(cur_pair, descr));
}
}
/* **************************************************** */
bool ZMQParserInterface::getKeyId(char *sym, u_int32_t sym_len,
u_int32_t *const pen,
u_int32_t *const field) const {
u_int32_t cur_pen, cur_field;
string label(sym);
labels_map_t::const_iterator it;
bool is_num, is_dotted;
*pen = UNKNOWN_PEN, *field = UNKNOWN_FLOW_ELEMENT;
is_num = Utils::isNumber(sym, sym_len, &is_dotted);
if (is_num && is_dotted) {
if (sscanf(sym, "%u.%u", &cur_pen, &cur_field) != 2) return false;
*pen = cur_pen, *field = cur_field;
} else if (is_num) {
cur_field = atoi(sym);
*pen = 0, *field = cur_field;
} else if ((it = labels_map.find(label)) != labels_map.end()) {
*pen = it->second.first, *field = it->second.second;
} else {
return false;
}
return true;
}
/* **************************************************** */
void ZMQParserInterface::addCounterMapping(const char *sym, u_int32_t id) {
string label(sym);
counters_map_t::iterator it;
if ((it = counters_map.find(label)) == counters_map.end())
counters_map.insert(make_pair(label, id));
}
/* **************************************************** */
bool ZMQParserInterface::getCounterId(char *sym, u_int32_t sym_len,
u_int32_t *id) const {
string label(sym);
counters_map_t::const_iterator it;
bool is_num, is_dotted;
is_num = Utils::isNumber(sym, sym_len, &is_dotted);
if (is_num) {
*id = atoi(sym);
} else if ((it = counters_map.find(label)) != counters_map.end()) {
*id = it->second;
} else {
return false;
}
return true;
}
/* **************************************************** */
const char *ZMQParserInterface::getKeyDescription(u_int32_t pen,
u_int32_t field) const {
descriptions_map_t::const_iterator it;
if ((it = descriptions_map.find(make_pair(pen, field))) !=
descriptions_map.end())
return it->second.c_str();
return NULL;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseEvent(const char *payload, int payload_size,
u_int32_t source_id, u_int32_t msg_id,
void *data) {
json_object *o;
enum json_tokener_error jerr = json_tokener_success;
nProbeStats zrs; /* Do not instantiate, automatically cleaned outside of the scope */
const u_int32_t max_timeout = 600, min_timeout = 60;
if (polling_start_time == 0) polling_start_time = (u_int32_t)time(NULL);
//ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msg_id: %u] %s", msg_id,
//payload);
o = json_tokener_parse_verbose(payload, &jerr);
if (o) {
json_object *w, *z;
zrs.source_id = source_id;
if (json_object_object_get_ex(o, "bytes", &w))
zrs.remote_bytes = (u_int64_t)json_object_get_int64(w);
if (json_object_object_get_ex(o, "packets", &w))
zrs.remote_pkts = (u_int64_t)json_object_get_int64(w);
if (json_object_object_get_ex(o, "iface", &w)) {
if (json_object_object_get_ex(w, "name", &z))
snprintf(zrs.remote_ifname, sizeof(zrs.remote_ifname), "%s",
json_object_get_string(z));
if (json_object_object_get_ex(w, "speed", &z))
zrs.remote_ifspeed = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "ip", &z))
snprintf(zrs.remote_ifaddress, sizeof(zrs.remote_ifaddress), "%s",
json_object_get_string(z));
}
if (json_object_object_get_ex(o, "probe", &w)) {
if (json_object_object_get_ex(w, "public_ip", &z))
snprintf(zrs.remote_probe_public_address,
sizeof(zrs.remote_probe_public_address), "%s",
json_object_get_string(z));
if (json_object_object_get_ex(w, "uuid", &z))
snprintf(zrs.uuid, sizeof(zrs.uuid),
"%s", json_object_get_string(z));
if (json_object_object_get_ex(w, "uuid_num", &z)) {
zrs.uuid_num = (u_int32_t)json_object_get_int64(z);
} if (json_object_object_get_ex(w, "ip", &z))
snprintf(zrs.remote_probe_address, sizeof(zrs.remote_probe_address),
"%s", json_object_get_string(z));
if (json_object_object_get_ex(w, "version", &z))
snprintf(zrs.remote_probe_version, sizeof(zrs.remote_probe_version),
"%s", json_object_get_string(z));
if (json_object_object_get_ex(w, "osname", &z))
snprintf(zrs.remote_probe_os, sizeof(zrs.remote_probe_os), "%s",
json_object_get_string(z));
if (json_object_object_get_ex(w, "license", &z))
snprintf(zrs.remote_probe_license, sizeof(zrs.remote_probe_license),
"%s", json_object_get_string(z));
if (json_object_object_get_ex(w, "edition", &z))
snprintf(zrs.remote_probe_edition, sizeof(zrs.remote_probe_edition),
"%s", json_object_get_string(z));
if (json_object_object_get_ex(w, "maintenance", &z))
snprintf(zrs.remote_probe_maintenance,
sizeof(zrs.remote_probe_maintenance), "%s",
json_object_get_string(z));
}
if (json_object_object_get_ex(o, "time", &w)) {
int32_t time_delta;
zrs.local_time = (u_int32_t)time(NULL);
zrs.remote_time =
(u_int32_t)json_object_get_int64(w); /* nProbe remote time */
time_delta = (int32_t)zrs.local_time - zrs.remote_time;
/*
Skip the check for the first few seconds
as we might receive old messages from the probe
*/
if ((msg_id != 0) /* Skip message with drops to avoid miscalculations */
&& ((zrs.local_time - polling_start_time) > 5)) {
if (abs(time_delta) >= 10) {
ntop->getTrace()->traceEvent(TRACE_NORMAL,
"Remote probe clock drift %u sec "
"detected (local: %u remote: %u [%s])",
abs(time_delta), zrs.local_time,
zrs.remote_time,
zrs.remote_probe_address);
}
} else
zrs.remote_time = zrs.local_time; /* Avoid clock drift messages during
the grace period */
}
if (json_object_object_get_ex(o, "avg", &w)) {
if (json_object_object_get_ex(w, "bps", &z))
zrs.avg_bps = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "pps", &z))
zrs.avg_pps = (u_int32_t)json_object_get_int64(z);
}
if (json_object_object_get_ex(o, "timeout", &w)) {
if (json_object_object_get_ex(w, "lifetime", &z)) {
zrs.remote_lifetime_timeout = (u_int32_t)json_object_get_int64(z);
if (zrs.remote_lifetime_timeout > max_timeout)
zrs.remote_lifetime_timeout = max_timeout;
}
if (json_object_object_get_ex(w, "idle", &z)) {
zrs.remote_idle_timeout = (u_int32_t)json_object_get_int64(z);
zrs.remote_idle_timeout *= 2; /* Double the idle timeout for NetFlow */
if (zrs.remote_idle_timeout > max_timeout)
zrs.remote_idle_timeout = max_timeout;
if (zrs.remote_idle_timeout < min_timeout)
zrs.remote_idle_timeout = min_timeout;
}
if (json_object_object_get_ex(w, "collected_lifetime", &z))
zrs.remote_collected_lifetime_timeout =
(u_int32_t)json_object_get_int64(z);
}
if (json_object_object_get_ex(o, "drops", &w)) {
if (json_object_object_get_ex(w, "export_queue_full", &z))
zrs.export_queue_full = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "too_many_flows", &z))
zrs.too_many_flows = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "elk_flow_drops", &z))
zrs.elk_flow_drops = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "sflow_pkt_sample_drops", &z))
zrs.sflow_pkt_sample_drops = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "flow_collection_drops", &z))
zrs.flow_collection_drops = (u_int32_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "flow_collection_udp_socket_drops", &z))
zrs.flow_collection_udp_socket_drops =
(u_int32_t)json_object_get_int64(z);
}
if (json_object_object_get_ex(o, "flow_collection", &w)) {
if (json_object_object_get_ex(w, "nf_ipfix_flows", &z))
zrs.flow_collection.nf_ipfix_flows =
(u_int64_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "sflow_samples", &z))
zrs.flow_collection.sflow_samples = (u_int64_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "exporters", &z)) {
json_object_object_foreach(z, key, val) {
//ntop->getTrace()->traceEvent(TRACE_NORMAL, "Exporter: %s", key);
u_int32_t ip = ntohl(inet_addr(key));
ExporterStats exp_stats = { 0 };
json_object *x;
if (json_object_object_get_ex(val, "time_last_used", &x))
exp_stats.time_last_used = (u_int32_t)json_object_get_int64(x);
if (json_object_object_get_ex(val, "num_sflow_flows", &x))
exp_stats.num_sflow_flows = (u_int32_t)json_object_get_int64(x);
if (json_object_object_get_ex(val, "num_netflow_ipfix_flows", &x))
exp_stats.num_netflow_flows = (u_int32_t)json_object_get_int64(x);
if (json_object_object_get_ex(val, "num_drops", &x))
exp_stats.num_drops = (u_int32_t)json_object_get_int64(x);
if (json_object_object_get_ex(val, "unique_source_id", &x))
exp_stats.unique_source_id = (u_int32_t)json_object_get_int64(x);
zrs.exportersStats[ip] = exp_stats;
}
}
}
if (json_object_object_get_ex(o, "zmq", &w)) {
if (json_object_object_get_ex(w, "num_flow_exports", &z))
zrs.num_flow_exports = (u_int64_t)json_object_get_int64(z);
if (json_object_object_get_ex(w, "num_exporters", &z))
zrs.num_exporters = (u_int8_t)json_object_get_int(z);
}
#ifdef ZMQ_EVENT_DEBUG
ntop->getTrace()->traceEvent(
TRACE_NORMAL,
"Event parsed "
"[iface: {name: %s, speed: %u, ip: %s}]"
"[probe: {public_ip: %s, ip: %s, version: %s, os: %s, license: %s, "
"edition: %s, maintenance: %s}]"
"[avg: {bps: %u, pps: %u}]"
"[remote: {time: %u, bytes: %u, packets: %u, idle_timeout: %u, "
"lifetime_timeout: %u,"
" collected_lifetime_timeout: %u }]"
"[zmq: {num_exporters: %u, num_flow_exports: %u}]",
zrs.remote_ifname, zrs.remote_ifspeed, zrs.remote_ifaddress,
zrs.remote_probe_version, zrs.remote_probe_os, zrs.remote_probe_license,
zrs.remote_probe_edition, zrs.remote_probe_maintenance,
zrs.remote_probe_public_address, zrs.remote_probe_address, zrs.avg_bps,
zrs.avg_pps, zrs.remote_time, (u_int32_t)zrs.remote_bytes,
(u_int32_t)zrs.remote_pkts, zrs.remote_idle_timeout,
zrs.remote_lifetime_timeout, zrs.remote_collected_lifetime_timeout,
zrs.num_exporters, zrs.num_flow_exports);
#endif
remote_lifetime_timeout = zrs.remote_lifetime_timeout,
remote_idle_timeout = zrs.remote_idle_timeout;
/* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps);
*/
/* Process Flow */
setRemoteStats(&zrs);
for (std::map<u_int64_t, NetworkInterface *>::iterator it =
flowHashing.begin();
it != flowHashing.end(); ++it) {
ZMQParserInterface *z = (ZMQParserInterface *)it->second;
z->setRemoteStats(&zrs);
}
/* Dispose memory */
json_object_put(o);
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: "
"your nProbe sender is outdated, data "
"encrypted, invalid JSON, or oom?");
ntop->getTrace()->traceEvent(
TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;
if (o) json_object_put(o);
return -1;
}
return 0;
}
/* **************************************************** */
bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
u_int32_t field,
ParsedValue *value) {
IpAddress ip_aux; /* used to check empty IPs */
switch (field) {
case IN_SRC_MAC:
case OUT_SRC_MAC:
/* Format 00:00:00:00:00:00 */
Utils::parseMac(flow->src_mac, value->string);
break;
case IN_DST_MAC:
case OUT_DST_MAC:
Utils::parseMac(flow->dst_mac, value->string);
break;
case SRC_TOS:
flow->src_tos = value->int_num;
break;
case DST_TOS:
flow->dst_tos = value->int_num;
break;
case IPV4_SRC_ADDR:
case IPV6_SRC_ADDR:
/*
The following check prevents an empty ip address (e.g., ::) to
to overwrite another valid ip address already set.
This can happen for example when nProbe is configured (-T) to export
both %IPV4_SRC_ADDR and the %IPV6_SRC_ADDR. In that cases nProbe can
export a valid ipv4 and an empty ipv6. Without the check, the empty
v6 address may overwrite the non empty v4.
*/
if (flow->src_ip.isEmpty()) {
if (value->string)
flow->src_ip.set((char *)value->string);
else
flow->src_ip.set(ntohl(value->int_num));
} else {
ip_aux.set((char *)value->string);
if (!ip_aux.isEmpty() &&
!ntop->getPrefs()->do_override_src_with_post_nat_src())
/* tried to overwrite a non-empty IP with another non-empty IP */
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Attempt to set source ip multiple times. "
"Check exported fields");
}
/* Pre-Post nat IPs are only supported for IPv4 */
if (flow->src_ip.isIPv4()) {
flow->setPreNATSrcIp(flow->src_ip.get_ipv4());
}
break;
case IP_PROTOCOL_VERSION:
flow->version = value->int_num;
break;
case IPV4_DST_ADDR:
case IPV6_DST_ADDR:
if (flow->dst_ip.isEmpty()) {
if (value->string)
flow->dst_ip.set((char *)value->string);
else
flow->dst_ip.set(ntohl(value->int_num));
} else {
ip_aux.set((char *)value->string);
if (!ip_aux.isEmpty() &&
!ntop->getPrefs()->do_override_dst_with_post_nat_dst())
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Attempt to set destination ip multiple times. "
"Check exported fields");
}
/* Pre-Post nat IPs are only supported for IPv4 */
if (flow->dst_ip.isIPv4()) {
flow->setPreNATDstIp(flow->dst_ip.get_ipv4());
}
break;
case L4_SRC_PORT:
if (!flow->src_port) {
if (value->string)
flow->src_port = atoi(value->string);
else
flow->src_port = ntohs((u_int32_t)value->int_num);
flow->setPreNATSrcPort(flow->src_port);
}
break;
case L4_DST_PORT:
if (!flow->dst_port) {
if (value->string)
flow->dst_port = atoi(value->string);
else
flow->dst_port = ntohs((u_int32_t)value->int_num);
flow->setPreNATDstPort(flow->dst_port);
}
break;
case SRC_VLAN:
case DST_VLAN:
flow->vlan_id = value->int_num;
break;
case DOT1Q_SRC_VLAN:
case DOT1Q_DST_VLAN:
if (flow->vlan_id == 0) {
/* as those fields are the outer vlans in q-in-q
we set the vlan_id only if there is no inner vlan
value set
*/
flow->vlan_id = value->int_num;
}
break;
case PROTOCOL:
if (value->string)
flow->l4_proto = atoi(value->string);
else
flow->l4_proto = value->int_num;
break;
case TCP_FLAGS:
flow->tcp.tcp_flags = value->int_num;
break;
case INITIATOR_PKTS:
flow->absolute_packet_octet_counters = true;
/* Don't break */
case IN_PKTS:
if (value->string != NULL)
flow->in_pkts = atol(value->string);
else
flow->in_pkts = value->int_num;
break;
case INITIATOR_OCTETS:
flow->absolute_packet_octet_counters = true;
/* Don't break */
case IN_BYTES:
if (value->string != NULL)
flow->in_bytes = atol(value->string);
else
flow->in_bytes = value->int_num;
break;
case RESPONDER_PKTS:
flow->absolute_packet_octet_counters = true;
/* Don't break */
case OUT_PKTS:
if (value->string != NULL)
flow->out_pkts = atol(value->string);
else
flow->out_pkts = value->int_num;
break;
case RESPONDER_OCTETS:
flow->absolute_packet_octet_counters = true;
/* Don't break */
case OUT_BYTES:
if (value->string != NULL)
flow->out_bytes = atol(value->string);
else
flow->out_bytes = value->int_num;
break;
case FIRST_SWITCHED:
if (value->string != NULL)
flow->first_switched = atoi(value->string);
else
flow->first_switched = value->int_num;
break;
case LAST_SWITCHED:
if (value->string != NULL)
flow->last_switched = atoi(value->string);
else
flow->last_switched = value->int_num;
break;
case SAMPLING_INTERVAL:
#if 0
/* Ignore it as nProbe as already implemented upscale */
flow->pkt_sampling_rate = value->int_num;
#endif
break;
case DIRECTION:
if (value->string != NULL)
flow->direction = atoi(value->string);
else
flow->direction = value->int_num;
break;
case EXPORTER_IPV4_ADDRESS:
if (value->string != NULL) {
/* Format: a.b.c.d, possibly overrides NPROBE_IPV4_ADDRESS */
u_int32_t ip = ntohl(inet_addr(value->string));
if (ip) {
flow->device_ip = ip;
if(ntop->getPrefs()->is_edr_mode()) {
char buf[32], ipb[24];
std::unordered_map<u_int32_t, bool>::iterator it = cloud_flow_exporters.find(ip);
if(it == cloud_flow_exporters.end()) {
cloud_flow_exporters[ip] = true;
snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(ip, ipb, sizeof(ipb)));
ntop->addLocalCloudAddress(buf);
/* Re-evaluate IPVx_SRC_ADDR/IPVx_DST_ADDR */
flow->src_ip.checkIP();
flow->dst_ip.checkIP();
}
}
}
}
break;
case EXPORTER_IPV6_ADDRESS:
if (value->string != NULL && strlen(value->string) > 0)
inet_pton(AF_INET6, value->string, &flow->device_ipv6);
break;
case FLOW_END_REASON:
if (value->string)
flow->setEndReason(value->string);
break;
case TOTAL_FLOWS_EXP:
/*
if(value->string != NULL)
total_flows_exp = atol(value->string);
else
total_flows_exp = value->int_num;
ntop->getTrace()->traceEvent(TRACE_INFO,
"Total Exported Flows %u", total_flows_exp);
*/
break;
case INPUT_SNMP:
flow->inIndex = value->int_num;
break;
case OUTPUT_SNMP:
flow->outIndex = value->int_num;
break;
case OBSERVATION_POINT_ID:
flow->observationPointId = value->int_num;
break;
case POST_NAT_SRC_IPV4_ADDR:
/* Alwais set src_ip_addr_post_nat, however switch the src_ip only if preference is set*/
if (value->string) {
IpAddress tmp;
tmp.set(value->string);
if (!tmp.isEmpty()) {
flow->setPostNATSrcIp(tmp.get_ipv4());
}
if (ntop->getPrefs()->do_override_src_with_post_nat_src()) {
if (!tmp.isEmpty()) {
flow->src_ip.set((char *)value->string);
}
}
} else if (value->int_num) {
if (ntop->getPrefs()->do_override_src_with_post_nat_src()) {
flow->src_ip.set(ntohl(value->int_num));
}
flow->setPostNATSrcIp(ntohl(value->int_num));
}
break;
case POST_NAT_DST_IPV4_ADDR:
/* Alwais set dst_ip_addr_post_nat, however switch the dst_ip only if preference is set*/
if (value->string) {
IpAddress tmp;
tmp.set(value->string);
if (!tmp.isEmpty()) {
flow->setPostNATDstIp(tmp.get_ipv4());
}
if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
if (!tmp.isEmpty()) {
flow->dst_ip.set((char *)value->string);
}
}
} else if (value->int_num) {
if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
flow->dst_ip.set(ntohl(value->int_num));
}
flow->setPostNATDstIp(ntohl(value->int_num));
}
break;
case POST_NAT_SRC_TRANSPORT_PORT:
if (ntop->getPrefs()->do_override_src_with_post_nat_src() &&
(value->int_num != 0))
flow->src_port = htons((u_int16_t)value->int_num);
if (value->int_num != 0)
flow->setPostNATSrcPort(htons((u_int16_t)value->int_num));
break;
case POST_NAT_DST_TRANSPORT_PORT:
if (ntop->getPrefs()->do_override_dst_with_post_nat_dst() &&
(value->int_num != 0))
flow->dst_port = htons((u_int16_t)value->int_num);
if (value->int_num != 0)
flow->setPostNATDstPort(htons((u_int16_t)value->int_num));
break;
case INGRESS_VRFID:
flow->vrfId = value->int_num;
break;
case IPV4_SRC_MASK:
case IPV4_DST_MASK:
if (value->int_num != 0) return false;
break;
case IPV4_NEXT_HOP:
if (value->string && strcmp(value->string, "0.0.0.0")) return false;
break;
case SRC_AS:
flow->src_as = value->int_num;
break;
case DST_AS:
flow->dst_as = value->int_num;
break;
case BGP_NEXT_ADJACENT_ASN:
flow->next_adjacent_as = value->int_num;
break;
case BGP_PREV_ADJACENT_ASN:
flow->prev_adjacent_as = value->int_num;
break;
default:
ntop->getTrace()->traceEvent(TRACE_INFO,
"Skipping no-PEN flow fieldId %u", field);
return false;
}
return true;
}
/* **************************************************** */
bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
u_int32_t field,
ParsedValue *value) {
/* Check for backward compatibility to handle cases like field = 123
* (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[field %d][%s]", field, value->string ? value->string : ""); */
switch (field) {
case L7_PROTO:
if (value->string) {
if (!strchr(value->string, '.')) {
/* Old behaviour, only the app protocol */
flow->l7_proto.app_protocol = atoi(value->string);
} else {
char *proto_dot;
flow->l7_proto.master_protocol = (u_int16_t)strtoll(value->string, &proto_dot, 10);
flow->l7_proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
}
} else {
flow->l7_proto.app_protocol = value->int_num;
}
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[value: %s][master: %u][app: %u]",
value->string ? value->string : "(int)",
flow->l7_proto.master_protocol,
flow->l7_proto.app_protocol);
#endif
break;
case NPROBE_INSTANCE_NAME:
if(ntop->getPrefs()->is_edr_mode()
&& ntop->getPrefs()->addVLANCloudToExporters()) {
u_int16_t vlan_id = findVLANMapping((char*)value->string);
flow->vlan_id = vlan_id;
}
break;
case L7_PROTO_NAME:
break;
case L7_INFO:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setL7Info(value->string);
break;
case L7_CONFIDENCE:
flow->setConfidence((ndpi_confidence_t)((value->int_num < NDPI_CONFIDENCE_MAX)
? value->int_num
: NDPI_CONFIDENCE_UNKNOWN));
break;
case L7_ERROR_CODE:
flow->setL7ErrorCode(value->int_num);
break;
case OOORDER_IN_PKTS:
flow->tcp.ooo_in_pkts = value->int_num;
break;
case OOORDER_OUT_PKTS:
flow->tcp.ooo_out_pkts = value->int_num;
break;
case RETRANSMITTED_IN_PKTS:
flow->tcp.retr_in_pkts = value->int_num;
break;
case RETRANSMITTED_OUT_PKTS:
flow->tcp.retr_out_pkts = value->int_num;
break;
/* TODO add lost in/out to nProbe and here */
case CLIENT_NW_LATENCY_MS: {
float client_nw_latency;
client_nw_latency = value->double_num;
flow->tcp.clientNwLatency.tv_sec = client_nw_latency / 1e3;
flow->tcp.clientNwLatency.tv_usec =
1e3 * (client_nw_latency - flow->tcp.clientNwLatency.tv_sec * 1e3);
} break;
case SERVER_NW_LATENCY_MS: {
float server_nw_latency;
server_nw_latency = value->double_num;
flow->tcp.serverNwLatency.tv_sec = server_nw_latency / 1e3;
flow->tcp.serverNwLatency.tv_usec =
1e3 * (server_nw_latency - flow->tcp.serverNwLatency.tv_sec * 1e3);
} break;
case CLIENT_TCP_FLAGS:
flow->tcp.client_tcp_flags = value->int_num;
flow->tcp.tcp_flags |= flow->tcp.client_tcp_flags;
break;
case SERVER_TCP_FLAGS:
flow->tcp.server_tcp_flags = value->int_num;
flow->tcp.tcp_flags |= flow->tcp.server_tcp_flags;
break;
case APPL_LATENCY_MS:
flow->tcp.applLatencyMsec = value->double_num;
break;
case TCP_WIN_MAX_IN:
flow->tcp.in_window = value->int_num;
break;
case TCP_WIN_MAX_OUT:
flow->tcp.out_window = value->int_num;
break;
case DNS_QUERY:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setDNSQuery(value->string);
break;
case DNS_QUERY_TYPE:
flow->setDNSQueryType(value->string ? atoi(value->string) : value->int_num);
break;
case DNS_RET_CODE:
flow->setDNSRetCode(value->string ? atoi(value->string) : value->int_num);
break;
case HTTP_URL:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setHTTPurl(value->string);
break;
case HTTP_USER_AGENT:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setHTTPuserAgent(value->string);
break;
case HTTP_SITE:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setHTTPsite(value->string);
break;
case HTTP_RET_CODE:
flow->setHTTPRetCode(value->string ? atoi(value->string) : value->int_num);
break;
case HTTP_METHOD:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setHTTPMethod(ndpi_http_str2method(value->string, strlen(value->string)));
break;
case TLS_SERVER_NAME:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setTLSserverName(value->string);
break;
case JA3C_HASH:
if (value->string && value->string[0])
flow->setJA3cHash(value->string);
break;
case JA3S_HASH:
if (value->string && value->string[0])
flow->setJA3sHash(value->string);
break;
case JA4C_HASH:
if (value->string && value->string[0])
flow->setJA4cHash(value->string);
break;
case TLS_CIPHER:
flow->setTLSCipher(value->int_num);
break;
case SSL_UNSAFE_CIPHER:
flow->setTLSUnsafeCipher(value->int_num);
break;
case L7_PROTO_RISK:
flow->setRisk((ndpi_risk)value->int_num);
break;
case L7_PROTO_RISK_NAME:
flow->setRiskName(value->string);
break;
case FLOW_VERDICT:
flow->setFlowVerdict(value->int_num);
break;
case L7_RISK_INFO:
if (value->string && value->string[0])
flow->setRiskInfo(value->string);
break;
case FLOW_SOURCE:
{
FlowSource s;
if((value->int_num < packet_to_flow) || (value->int_num > collected_sflow))
s = packet_to_flow; /* Default value */
else
s = static_cast<FlowSource>(value->int_num);
flow->setFlowSource(s);
}
break;
case BITTORRENT_HASH:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setBittorrentHash(value->string);
break;
case NPROBE_IPV4_ADDRESS:
if (value->string) {
flow->probe_ip = ntohl(inet_addr(value->string));
if(flow->device_ip == 0 && (flow->device_ip = ntohl(inet_addr(value->string))))
return false;
}
break;
case UNIQUE_SOURCE_ID:
flow->unique_source_id = value->int_num;
break;
case SRC_FRAGMENTS:
flow->in_fragments = value->int_num;
break;
case DST_FRAGMENTS:
flow->out_fragments = value->int_num;
break;
case SRC_PROC_PID:
flow->src_process_info.pid = value->int_num;
break;
case SRC_FATHER_PROC_PID:
flow->src_process_info.father_pid = value->int_num;
break;
case SRC_PROC_NAME:
if (value->string && value->string[0]) {
flow->setParsedProcessInfo();
flow->src_process_info.process_name = strdup(value->string);
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SRC] %s (%u)",
flow->src_process_info.process_name,
ntohs(flow->src_port));
#endif
}
break;
case SRC_FATHER_PROC_NAME:
if (value->string && value->string[0]) {
flow->setParsedProcessInfo();
flow->src_process_info.father_process_name = strdup(value->string);
}
break;
case SRC_PROC_PKG_NAME:
if (value->string && value->string[0])
flow->src_process_info.pkg_name = strdup(value->string);
break;
case SRC_FATHER_PROC_PKG_NAME:
if (value->string && value->string[0])
flow->src_process_info.father_pkg_name = strdup(value->string);
break;
case SRC_PROC_CMDLINE:
if (value->string && value->string[0])
flow->src_process_info.cmd_line = strdup(value->string);
break;
case SRC_PROC_UID:
flow->src_process_info.uid = value->int_num;
break;
case SRC_FATHER_PROC_UID:
flow->src_process_info.father_uid = value->int_num;
break;
case SRC_PROC_USER_NAME:
if (value->string && value->string[0])
flow->src_process_info.uid_name = strdup(value->string);
break;
case SRC_FATHER_PROC_USER_NAME:
if (value->string && value->string[0])
flow->src_process_info.father_uid_name = strdup(value->string);
break;
case SRC_PROC_CONTAINER_ID:
if (value->string && value->string[0]) {
flow->setParsedContainerInfo();
flow->src_container_info.id = strdup(value->string);
}
break;
case DST_PROC_PID:
flow->dst_process_info.pid = value->int_num;
break;
case DST_FATHER_PROC_PID:
flow->dst_process_info.father_pid = value->int_num;
break;
case DST_PROC_NAME:
if (value->string && value->string[0]) {
flow->setParsedProcessInfo();
flow->dst_process_info.process_name = strdup(value->string);
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DST] %s (%u)",
flow->dst_process_info.process_name,
ntohs(flow->dst_port));
#endif
}
break;
case DST_FATHER_PROC_NAME:
if (value->string && value->string[0]) {
flow->setParsedProcessInfo();
flow->dst_process_info.father_process_name = strdup(value->string);
}
break;
case DST_PROC_PKG_NAME:
if (value->string && value->string[0])
flow->dst_process_info.pkg_name = strdup(value->string);
break;
case DST_FATHER_PROC_PKG_NAME:
if (value->string && value->string[0])
flow->dst_process_info.father_pkg_name = strdup(value->string);
break;
case DST_PROC_CMDLINE:
if (value->string && value->string[0])
flow->dst_process_info.cmd_line = strdup(value->string);
break;
case DST_PROC_UID:
flow->dst_process_info.uid = value->int_num;
break;
case DST_FATHER_PROC_UID:
flow->dst_process_info.father_uid = value->int_num;
break;
case DST_PROC_USER_NAME:
if (value->string && value->string[0])
flow->dst_process_info.uid_name = strdup(value->string);
break;
case DST_FATHER_PROC_USER_NAME:
if (value->string && value->string[0])
flow->dst_process_info.father_uid_name = strdup(value->string);
break;
case DST_PROC_CONTAINER_ID:
if (value->string && value->string[0]) {
flow->setParsedContainerInfo();
flow->dst_container_info.id = strdup(value->string);
}
break;
case SMTP_RCPT_TO:
if(value->string && value->string[0])
flow->setSMTPRcptTo(value->string);
break;
case SMTP_MAIL_FROM:
if(value->string && value->string[0])
flow->setSMTPMailFrom(value->string);
break;
case DHCP_CLIENT_NAME:
if(value->string && value->string[0])
flow->setDHCPClientName(value->string);
break;
case SIP_CALL_ID:
if(value->string && value->string[0])
flow->setSIPCallId(value->string);
break;
default:
return false;
}
return true;
}
/* **************************************************** */
bool ZMQParserInterface::matchPENZeroField(ParsedFlow *const flow,
u_int32_t field,
ParsedValue *value) {
IpAddress ip_aux; /* used to check empty IPs */
switch (field) {
case IN_SRC_MAC:
case OUT_SRC_MAC: {
u_int8_t mac[6];
Utils::parseMac(mac, value->string);
return (memcmp(flow->src_mac, mac, sizeof(mac)) == 0);
}
case IN_DST_MAC:
case OUT_DST_MAC: {
u_int8_t mac[6];
Utils::parseMac(mac, value->string);
return (memcmp(flow->dst_mac, mac, sizeof(mac)) == 0);
}
case SRC_TOS:
if (value->string)
return (flow->src_tos == atoi(value->string));
else
return (flow->src_tos == value->int_num);
case DST_TOS:
if (value->string)
return (flow->dst_tos == atoi(value->string));
else
return (flow->dst_tos == value->int_num);
case IPV4_SRC_ADDR:
case IPV6_SRC_ADDR: {
IpAddress ip;
if (value->string)
ip.set((char *)value->string);
else
ip.set(ntohl(value->int_num));
return (flow->src_ip.compare(&ip) == 0);
}
case IP_PROTOCOL_VERSION:
if (value->string)
return (flow->version == atoi(value->string));
else
return (flow->version == value->int_num);
case IPV4_DST_ADDR:
case IPV6_DST_ADDR: {
IpAddress ip;
if (value->string)
ip.set((char *)value->string);
else
ip.set(ntohl(value->int_num));
return (flow->dst_ip.compare(&ip) == 0);
}
case L4_SRC_PORT:
if (value->string)
return (flow->src_port == htons((u_int32_t)atoi(value->string)));
else
return (flow->src_port == htons((u_int32_t)value->int_num));
case L4_DST_PORT:
if (value->string)
return (flow->dst_port == htons((u_int32_t)atoi(value->string)));
else
return (flow->dst_port == htons((u_int32_t)value->int_num));
case SRC_VLAN:
case DST_VLAN:
case DOT1Q_SRC_VLAN:
case DOT1Q_DST_VLAN:
if (value->string)
return (flow->vlan_id == atoi(value->string));
else
return (flow->vlan_id == value->int_num);
case PROTOCOL:
if (value->string)
return (flow->l4_proto == atoi(value->string));
else
return (flow->l4_proto == value->int_num);
case DIRECTION:
if (value->string)
return (flow->direction == atoi(value->string));
else
return (flow->direction == value->int_num);
case FLOW_END_REASON:
if (value->string) {
if (flow->getEndReason())
return (!strcmp(value->string, flow->getEndReason()));
}
case EXPORTER_IPV4_ADDRESS:
return (flow->device_ip == ntohl(inet_addr(value->string)));
case EXPORTER_IPV6_ADDRESS:
if (value->string != NULL && strlen(value->string) > 0) {
struct ndpi_in6_addr ipv6;
if (inet_pton(AF_INET6, value->string, &ipv6) <= 0) return false;
return (memcmp(&flow->device_ipv6, &ipv6, sizeof(flow->device_ipv6)) == 0);
}
case INPUT_SNMP:
if (value->string)
return (flow->inIndex == (u_int32_t)atoi(value->string));
else
return (flow->inIndex == value->int_num);
case OUTPUT_SNMP:
if (value->string)
return (flow->outIndex == (u_int32_t)atoi(value->string));
else
return (flow->outIndex == value->int_num);
case OBSERVATION_POINT_ID:
if (value->string)
return (flow->observationPointId == atoi(value->string));
else
return (flow->observationPointId == value->int_num);
case INGRESS_VRFID:
if (value->string)
return (flow->vrfId == (u_int)atoi(value->string));
else
return (flow->vrfId == value->int_num);
case SRC_AS:
if (value->string)
return (flow->src_as == (u_int32_t)atoi(value->string));
else
return (flow->src_as == value->int_num);
case DST_AS:
if (value->string)
return (flow->dst_as == (u_int32_t)atoi(value->string));
else
return (flow->dst_as == value->int_num);
case BGP_NEXT_ADJACENT_ASN:
if (value->string)
return (flow->next_adjacent_as == (u_int32_t)atoi(value->string));
else
return (flow->next_adjacent_as == value->int_num);
case BGP_PREV_ADJACENT_ASN:
if (value->string)
return (flow->prev_adjacent_as == (u_int32_t)atoi(value->string));
else
return (flow->prev_adjacent_as == value->int_num);
default:
ntop->getTrace()->traceEvent(TRACE_INFO,
"Skipping no-PEN flow fieldId %u", field);
break;
}
return false;
}
/* **************************************************** */
bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow,
u_int32_t field,
ParsedValue *value) {
/* Check for backward compatibility to handle cases like field = 123
* (CLIENT_NW_LATENCY_MS) instead of field = 57595 (NTOP_BASE_ID + 123) */
if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
switch (field) {
case L7_PROTO: {
ndpi_proto l7_proto = {0};
if (value->string) {
if (!strchr(value->string, '.')) {
/* Old behaviour, only the app protocol */
l7_proto.app_protocol = atoi(value->string);
} else {
char *proto_dot;
l7_proto.master_protocol =
(u_int16_t)strtoll(value->string, &proto_dot, 10);
l7_proto.app_protocol = (u_int16_t)strtoll(proto_dot + 1, NULL, 10);
}
} else {
l7_proto.app_protocol = value->int_num;
}
return (flow->l7_proto.app_protocol == l7_proto.app_protocol);
}
case L7_PROTO_NAME:
if (value->string) {
/* This lookup should be optimized */
u_int16_t app_protocol =
ndpi_get_proto_by_name(get_ndpi_struct(), value->string);
return (flow->l7_proto.app_protocol == app_protocol);
} else
return false;
case L7_INFO:
if (value->string && flow->getL7Info())
return (strcmp(flow->getL7Info(), value->string) == 0);
else
return false;
case L7_ERROR_CODE:
return (flow->getL7ErrorCode() == value->int_num);
case DNS_QUERY:
if (value->string && flow->getDNSQuery())
return (strcmp(flow->getDNSQuery(), value->string) == 0);
else
return false;
case DNS_QUERY_TYPE:
if (value->string)
return (flow->getDNSQueryType() == atoi(value->string));
else
return (flow->getDNSQueryType() == value->int_num);
case HTTP_URL:
if (value->string && flow->getHTTPurl())
return (strcmp(flow->getHTTPurl(), value->string) == 0);
else
return false;
case HTTP_USER_AGENT:
if (value->string && flow->getHTTPuserAgent())
return (strcmp(flow->getHTTPuserAgent(), value->string) == 0);
else
return false;
case HTTP_SITE:
if (value->string && flow->getHTTPsite())
return (strcmp(flow->getHTTPsite(), value->string) == 0);
else
return false;
case TLS_SERVER_NAME:
if (value->string && flow->getTLSserverName())
return (strcmp(flow->getTLSserverName(), value->string) == 0);
else
return false;
case NPROBE_IPV4_ADDRESS:
return (flow->probe_ip == ntohl(inet_addr(value->string)));
case UNIQUE_SOURCE_ID:
return (flow->unique_source_id == value->int_num);
case SMTP_MAIL_FROM:
if (value->string && flow->getSMTPMailFrom())
return (strcmp(flow->getSMTPMailFrom(), value->string) == 0);
else
return false;
case SMTP_RCPT_TO:
if (value->string && flow->getSMTPRcptTo())
return (strcmp(flow->getSMTPRcptTo(), value->string) == 0);
else
return false;
default:
break;
}
ntop->getTrace()->traceEvent(
TRACE_WARNING, "Field %u not supported by flow filtering", field);
return false;
}
/* **************************************************** */
bool ZMQParserInterface::matchField(ParsedFlow *const flow, const char *key,
ParsedValue *value) {
u_int32_t pen, key_id;
bool res;
if (!getKeyId((char *)key, strlen(key), &pen, &key_id)) {
ntop->getTrace()->traceEvent(
TRACE_WARNING, "Field %s not supported by flow filtering", key);
return false;
}
switch (pen) {
case 0: /* No PEN */
res = matchPENZeroField(flow, key_id, value);
break;
case NTOP_PEN:
res = matchPENNtopField(flow, key_id, value);
break;
case UNKNOWN_PEN:
default:
ntop->getTrace()->traceEvent(
TRACE_WARNING, "Field %s not supported by flow filtering", key);
res = false;
break;
}
return res;
}
/* **************************************************** */
bool ZMQParserInterface::parseNProbeAgentField(
ParsedFlow *const flow, const char *key, ParsedValue *value,
json_object *const jvalue) {
bool ret = false;
json_object *obj;
if (!strncmp(key, "timestamp", 9)) {
u_int32_t seconds, nanoseconds /* nanoseconds not currently used */;
if (sscanf(value->string, "%u.%u", &seconds, &nanoseconds) == 2) {
flow->first_switched = flow->last_switched = seconds;
ret = true;
}
} else if (!strncmp(key, "IPV4_LOCAL_ADDR", 15) ||
!strncmp(key, "IPV6_LOCAL_ADDR", 15)) {
flow->src_ip.set(value->string); /* FIX: do not always assume Local == Client */
ret = true;
} else if (!strncmp(key, "IPV4_REMOTE_ADDR", 16) ||
!strncmp(key, "IPV6_REMOTE_ADDR", 16)) {
flow->dst_ip.set(value->string); /* FIX: do not always assume Remote == Server */
ret = true;
} else if (!strncmp(key, "L4_LOCAL_PORT", 13)) {
flow->src_port = htons((u_int32_t)value->int_num);
ret = true;
} else if (!strncmp(key, "L4_REMOTE_PORT", 14)) {
flow->dst_port = htons((u_int32_t)value->int_num);
ret = true;
} else if (!strncmp(key, "INTERFACE_NAME", 7) && strlen(key) == 14) {
flow->ifname = (char *)json_object_get_string(jvalue);
ret = true;
} else if (strlen(key) >= 14 &&
!strncmp(&key[strlen(key) - 14], "FATHER_PROCESS", 14)) {
if (json_object_object_get_ex(jvalue, "PID", &obj))
flow->src_process_info.father_pid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "UID", &obj))
flow->src_process_info.father_uid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
flow->src_process_info.father_uid_name =
strdup((char *)json_object_get_string(obj));
if (json_object_object_get_ex(jvalue, "GID", &obj))
flow->src_process_info.father_gid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
flow->src_process_info.actual_memory =
(u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
flow->src_process_info.peak_memory =
(u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
flow->src_process_info.father_process_name =
strdup((char *)json_object_get_string(obj));
if (!flow->process_info_set) flow->process_info_set = true;
ret = true;
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Father Process [pid: %u][uid:
// %u][gid: %u][path: %s]",
// flow->src_process_info.father_pid,
// flow->src_process_info.father_uid,
// flow->src_process_info.father_gid,
// flow->src_process_info.father_process_name);
} else if (strlen(key) >= 7 &&
!strncmp(&key[strlen(key) - 7], "PROCESS", 7)) {
if (json_object_object_get_ex(jvalue, "PID", &obj))
flow->src_process_info.pid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "UID", &obj))
flow->src_process_info.uid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "UID_NAME", &obj))
flow->src_process_info.uid_name =
strdup((char *)json_object_get_string(obj));
if (json_object_object_get_ex(jvalue, "GID", &obj))
flow->src_process_info.gid = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "VM_SIZE", &obj))
flow->src_process_info.actual_memory =
(u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "VM_PEAK", &obj))
flow->src_process_info.peak_memory =
(u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "PROCESS_PATH", &obj))
flow->src_process_info.process_name =
strdup((char *)json_object_get_string(obj));
if (!flow->process_info_set) flow->process_info_set = true;
ret = true;
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Process [pid: %u][uid:
// %u][gid: %u][size/peak vm: %u/%u][path: %s]",
// flow->src_process_info.pid,
// flow->src_process_info.uid, flow->src_process_info.gid,
// flow->src_process_info.actual_memory, flow->src_process_info.peak_memory,
// flow->src_process_info.process_name);
} else if (strlen(key) >= 9 &&
!strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
if ((ret = parseContainerInfo(jvalue, &flow->src_container_info)))
flow->container_info_set = true;
} else if (!strncmp(key, "TCP", 3) && strlen(key) == 3) {
if (json_object_object_get_ex(jvalue, "CONN_STATE", &obj))
flow->src_tcp_info.conn_state =
Utils::tcpStateStr2State(json_object_get_string(obj));
if (json_object_object_get_ex(jvalue, "SEGS_IN", &obj))
flow->src_tcp_info.in_segs = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "SEGS_OUT", &obj))
flow->src_tcp_info.out_segs = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "UNACK_SEGMENTS", &obj))
flow->src_tcp_info.unacked_segs = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "RETRAN_PKTS", &obj))
flow->src_tcp_info.retx_pkts = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "LOST_PKTS", &obj))
flow->src_tcp_info.lost_pkts = (u_int32_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "RTT", &obj))
flow->src_tcp_info.rtt = json_object_get_double(obj);
if (json_object_object_get_ex(jvalue, "RTT_VARIANCE", &obj))
flow->src_tcp_info.rtt_var = json_object_get_double(obj);
if (json_object_object_get_ex(jvalue, "BYTES_RCVD", &obj))
flow->out_bytes = flow->src_tcp_info.rcvd_bytes =
(u_int64_t)json_object_get_int64(obj);
if (json_object_object_get_ex(jvalue, "BYTES_ACKED", &obj))
flow->in_bytes = flow->src_tcp_info.sent_bytes =
(u_int64_t)json_object_get_int64(obj);
if (!flow->tcp_info_set) flow->tcp_info_set = true;
flow->absolute_packet_octet_counters = true;
ret = true;
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "TCP INFO [conn state:
// %s][rcvd_bytes: %u][retx_pkts: %u][lost_pkts: %u]"
// "[in_segs: %u][out_segs: %u][unacked_segs: %u]"
// "[rtt: %f][rtt_var: %f]",
// Utils::tcpState2StateStr(flow->src_tcp_info.conn_state),
// flow->src_tcp_info.rcvd_bytes,
// flow->src_tcp_info.retx_pkts,
// flow->src_tcp_info.lost_pkts,
// flow->src_tcp_info.in_segs,
// flow->src_tcp_info.out_segs,
// flow->src_tcp_info.unacked_segs,
// flow->src_tcp_info.rtt,
// flow->src_tcp_info.rtt_var);
} else if ((!strncmp(key, "TCP_EVENT_TYPE", 14) && strlen(key) == 14) ||
(!strncmp(key, "UDP_EVENT_TYPE", 14) && strlen(key) == 14)) {
flow->event_type = Utils::eBPFEventStr2Event(value->string);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Event Type [type: %s]",
// Utils::eBPFEvent2EventStr(flow->event_type));
}
return ret;
}
/* **************************************************** */
bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
bool invalid_flow = false;
bool rc = false;
if (flow->vlan_id && ntop->getPrefs()->do_ignore_vlans()) flow->vlan_id = 0;
/*
Some flow exporters write in host ports the ICMP type/code.
hence we need to make sure such fields are zeroed in order
to avoid odd values in the web GUI
*/
if ((flow->l4_proto == IPPROTO_ICMP) || (flow->l4_proto == IPPROTO_ICMPV6))
flow->src_port = flow->dst_port = 0;
/* Handle zero IPv4/IPv6 discrepancies */
if (!flow->hasParsedeBPF()) {
if (flow->src_ip.getVersion() != flow->dst_ip.getVersion()) {
if (flow->dst_ip.isIPv4() && flow->src_ip.isIPv6() &&
flow->src_ip.isEmpty())
flow->src_ip.setVersion(4);
else if (flow->src_ip.isIPv4() && flow->dst_ip.isIPv6() &&
flow->dst_ip.isEmpty())
flow->dst_ip.setVersion(4);
else if (flow->dst_ip.isIPv6() && flow->src_ip.isIPv4() &&
flow->src_ip.isEmpty())
flow->src_ip.setVersion(6);
else if (flow->src_ip.isIPv6() && flow->dst_ip.isIPv4() &&
flow->dst_ip.isEmpty())
flow->dst_ip.setVersion(6);
else {
invalid_flow = true;
ntop->getTrace()->traceEvent(TRACE_WARNING,
"IP version mismatch: client:%d server:%d - flow will be ignored",
flow->src_ip.getVersion(), flow->dst_ip.getVersion());
}
}
}
if (!invalid_flow) {
if (flow->hasParsedeBPF()) {
/* Direction already reliable when the event is an accept or a connect.
Heuristic is only used in the other cases. */
#if 0
/* Disabled as Flow::setParsedeBPFInfo() now supports directions */
if(flow->event_type != ebpf_event_type_tcp_accept
&& flow->event_type != ebpf_event_type_tcp_connect
&& ntohs(flow->src_port) < ntohs(flow->dst_port))
flow->swap();
#endif
} else {
/* NOTE: keep in sync with Flow::check_swap() */
if(flow->src_ip.isBroadMulticastAddress() || flow->dst_ip.isBroadMulticastAddress())
; /* Ignore non-unicast IP addresses */
else {
u_int16_t cli_port = ntohs(flow->src_port), srv_port = ntohs(flow->dst_port);
bool do_swap = (cli_port < srv_port) ? true : false;
if(do_swap) {
if(flow->l4_proto == IPPROTO_TCP) {
/*
Don't swap if the client has sent a SYN. Unfortunately as TCP flags
are in OR we cannot see if this is a initiator or a responder so
better to be conservative rather than swapping wrongly
See also https://github.com/ntop/ntopng/issues/1978
*/
if((flow->tcp.client_tcp_flags & TH_SYN) == TH_SYN)
do_swap = false;
} else if(flow->l4_proto == IPPROTO_UDP) {
#if 1
/* We disable UDP swap that might be wrong in particular for probing attempts */
do_swap = false;
#else
if((cli_port > 32768) && (srv_port > 32768))
do_swap = false; /* Don't do anything: this might be RTP or similar */
#endif
}
if(do_swap)
flow->swap();
}
}
}
if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1;
/* Process Flow */
INTERFACE_PROFILING_SECTION_ENTER("processFlow", 30);
rc = processFlow(flow);
INTERFACE_PROFILING_SECTION_EXIT(30);
}
if (!rc) recvStats.num_dropped_flows++;
return rc;
}
/* **************************************************** */
int ZMQParserInterface::parseSingleJSONFlow(json_object *o,
u_int32_t source_id) {
ParsedFlow flow;
struct json_object_iterator it = json_object_iter_begin(o);
struct json_object_iterator itEnd = json_object_iter_end(o);
int ret = 0;
/* Reset data */
flow.source_id = source_id;
flow.direction = UNKNOWN_FLOW_DIRECTION;
while (!json_object_iter_equal(&it, &itEnd)) {
const char *key = json_object_iter_peek_name(&it);
json_object *jvalue = json_object_iter_peek_value(&it);
json_object *additional_o = NULL;
enum json_type type = json_object_get_type(jvalue);
ParsedValue value = {0};
bool add_to_additional_fields = false;
switch (type) {
case json_type_int:
value.int_num = json_object_get_int64(jvalue);
value.double_num = value.int_num;
break;
case json_type_double:
value.double_num = json_object_get_double(jvalue);
break;
case json_type_boolean:
value.boolean = json_object_get_boolean(jvalue);
break;
case json_type_string:
value.string = json_object_get_string(jvalue);
if (strcmp(key, "json") == 0)
additional_o = json_tokener_parse(value.string);
break;
case json_type_object:
/* This is handled by parseNProbeAgentField or addAdditionalField */
break;
case json_type_array:
/* This is handled by parseNProbeAgentField or addAdditionalField */
break;
default:
ntop->getTrace()->traceEvent(
TRACE_WARNING, "JSON type %u not supported [key: %s]\n", type, key);
break;
}
if ((key != NULL) && (jvalue != NULL)) {
u_int32_t pen, key_id;
bool res;
getKeyId((char *)key, strlen(key), &pen, &key_id);
switch (pen) {
case 0: /* No PEN */
res = parsePENZeroField(&flow, key_id, &value);
if (res) break;
/* Dont'break when res == false for backward compatibility: attempt to
* parse Zero-PEN as Ntop-PEN */
case NTOP_PEN:
res = parsePENNtopField(&flow, key_id, &value);
break;
case UNKNOWN_PEN:
default:
res = false;
break;
}
if (!res) {
switch (key_id) {
case 0: // json additional object added by Flow::serialize()
if (additional_o != NULL) {
struct json_object_iterator additional_it =
json_object_iter_begin(additional_o);
struct json_object_iterator additional_itEnd =
json_object_iter_end(additional_o);
while (
!json_object_iter_equal(&additional_it, &additional_itEnd)) {
const char *additional_key =
json_object_iter_peek_name(&additional_it);
json_object *additional_v =
json_object_iter_peek_value(&additional_it);
const char *additional_value =
json_object_get_string(additional_v);
if ((additional_key != NULL) && (additional_value != NULL)) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
// field: %s", additional_key);
flow.addAdditionalField(
additional_key, json_object_new_string(additional_value));
}
json_object_iter_next(&additional_it);
}
}
break;
case UNKNOWN_FLOW_ELEMENT:
/* Attempt to parse it as an nProbe mini field */
if (parseNProbeAgentField(&flow, key, &value, jvalue)) {
if (!flow.hasParsedeBPF()) {
flow.setParsedeBPF();
flow.absolute_packet_octet_counters = true;
}
break;
}
default:
#ifdef NTOPNG_PRO
if (custom_app_maps ||
(custom_app_maps = new (std::nothrow) CustomAppMaps()))
custom_app_maps->checkCustomApp(key, &value, &flow);
#endif
ntop->getTrace()->traceEvent(
TRACE_DEBUG, "Not handled ZMQ field %u/%s", key_id, key);
add_to_additional_fields = true;
break;
} /* switch */
}
if (add_to_additional_fields) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s",
// key);
flow.addAdditionalField(key, json_object_get(jvalue));
}
if (additional_o) json_object_put(additional_o);
} /* if */
/* Move to the next element */
json_object_iter_next(&it);
} // while json_object_iter_equal
if (preprocessFlow(&flow)) ret = 1;
return ret;
}
/* **************************************************** */
int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer,
u_int32_t source_id) {
ndpi_serialization_type kt, et;
ParsedFlow flow;
int ret = 0, rc;
bool recordFound = false;
/* Reset data */
flow.source_id = source_id;
flow.direction = UNKNOWN_FLOW_DIRECTION;
INTERFACE_PROFILING_SECTION_ENTER("Decode TLV", 9);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Processing TLV record");
while ((et = ndpi_deserialize_get_item_type(deserializer, &kt)) !=
ndpi_serialization_unknown) {
ParsedValue value = {0};
u_int32_t pen = 0, key_id = 0;
u_int32_t v32 = 0;
int32_t i32 = 0;
float f = 0;
u_int64_t v64 = 0;
int64_t i64 = 0;
ndpi_string key, vs;
char key_str[64];
u_int8_t vbkp = 0;
bool add_to_additional_fields = false;
bool key_is_string = false, value_is_string = false;
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "TLV key type = %u value type = %u", kt, et);
if (et == ndpi_serialization_end_of_record) {
ndpi_deserialize_next(deserializer);
goto end_of_record;
}
recordFound = true;
switch (kt) {
case ndpi_serialization_uint32:
ndpi_deserialize_key_uint32(deserializer, &key_id);
break;
case ndpi_serialization_string:
ndpi_deserialize_key_string(deserializer, &key);
key_is_string = true;
break;
default:
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Unsupported TLV key type %u: please update both ntopng and "
"nprobe to the same version", kt);
ret = -1;
goto error;
}
switch (et) {
case ndpi_serialization_uint32:
ndpi_deserialize_value_uint32(deserializer, &v32);
value.double_num = value.int_num = v32;
break;
case ndpi_serialization_uint64:
ndpi_deserialize_value_uint64(deserializer, &v64);
value.double_num = value.int_num = v64;
break;
case ndpi_serialization_int32:
ndpi_deserialize_value_int32(deserializer, &i32);
value.double_num = value.int_num = i32;
break;
case ndpi_serialization_int64:
ndpi_deserialize_value_int64(deserializer, &i64);
value.double_num = value.int_num = i64;
break;
case ndpi_serialization_float:
ndpi_deserialize_value_float(deserializer, &f);
value.double_num = f;
break;
case ndpi_serialization_string:
ndpi_deserialize_value_string(deserializer, &vs);
value.string = vs.str;
value_is_string = true;
break;
default:
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n", et);
ret = -1;
goto error;
}
if (key_is_string) {
u_int8_t kbkp = key.str[key.str_len];
key.str[key.str_len] = '\0';
snprintf(key_str, sizeof(key_str), "%s", key.str);
getKeyId(key.str, key.str_len, &pen, &key_id);
key.str[key.str_len] = kbkp;
}
if (value_is_string) {
/* Adding '\0' to the end of the string, backing up the character */
vbkp = vs.str[vs.str_len];
vs.str[vs.str_len] = '\0';
}
switch (pen) {
case 0: /* No PEN */
rc = parsePENZeroField(&flow, key_id, &value);
if (rc) break;
/* Dont'break when rc == false for backward compatibility: attempt to
* parse Zero-PEN as Ntop-PEN */
case NTOP_PEN:
rc = parsePENNtopField(&flow, key_id, &value);
break;
case UNKNOWN_PEN:
default:
rc = false;
break;
}
if (!key_is_string) {
if (pen)
snprintf(key_str, sizeof(key_str), "%u.%u", pen, key_id);
else
snprintf(key_str, sizeof(key_str), "%u", key_id);
}
#if 0
if(ntop->getTrace()->get_trace_level() >= TRACE_LEVEL_DEBUG) {
switch(et) {
case ndpi_serialization_uint32:
case ndpi_serialization_uint64:
case ndpi_serialization_int32:
case ndpi_serialization_int64:
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %lld", key_str, key_id, pen, value.int_num);
break;
case ndpi_serialization_float:
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %.3f", key_str, key_id, pen, value.double_num);
break;
case ndpi_serialization_string:
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: %s", key_str, key_id, pen, value.string);
break;
default:
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Key: %s Key-ID: %u PEN: %u Value: -", key_str, key_id, pen);
break;
}
}
#endif
if (!rc) { /* Not handled */
switch (key_id) {
case 0: // json additional object added by Flow::serialize()
if (strcmp(key_str, "json") == 0 && value_is_string) {
json_object *additional_o = json_tokener_parse(vs.str);
if (additional_o) {
struct json_object_iterator additional_it =
json_object_iter_begin(additional_o);
struct json_object_iterator additional_itEnd =
json_object_iter_end(additional_o);
while (
!json_object_iter_equal(&additional_it, &additional_itEnd)) {
const char *additional_key = json_object_iter_peek_name(&additional_it);
json_object *additional_v = json_object_iter_peek_value(&additional_it);
const char *additional_value = json_object_get_string(additional_v);
if ((additional_key != NULL) && (additional_value != NULL)) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional
// field: %s", additional_key);
flow.addAdditionalField(
additional_key, json_object_new_string(additional_value));
}
json_object_iter_next(&additional_it);
}
json_object_put(additional_o);
}
}
break;
case UNKNOWN_FLOW_ELEMENT:
#if 0 // TODO
/* Attempt to parse it as an nProbe mini field */
if(parseNProbeAgentField(&flow, key_str, &value)) {
if(!flow.hasParsedeBPF()) {
flow->setParsedeBPF();
flow.absolute_packet_octet_counters = true;
}
break;
}
#endif
default:
#ifdef NTOPNG_PRO
if (custom_app_maps ||
(custom_app_maps = new (std::nothrow) CustomAppMaps()))
custom_app_maps->checkCustomApp(key_str, &value, &flow);
#endif
ntop->getTrace()->traceEvent(
TRACE_DEBUG, "Not handled ZMQ field %u.%u", pen, key_id);
add_to_additional_fields = true;
break;
} /* switch */
}
if (add_to_additional_fields) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Additional field: %s
// (Key-ID: %u PEN: %u)", key_str, key_id, pen);
#if 1
flow.addAdditionalField(deserializer);
#else
flow.addAdditionalField(
key_str, value_is_string ? json_object_new_string(value.string)
: json_object_new_int64(value.int_num));
#endif
}
/* Restoring backed up character at the end of the string in place of '\0'
*/
if (value_is_string) vs.str[vs.str_len] = vbkp;
/* Move to the next element */
ndpi_deserialize_next(deserializer);
} /* while */
end_of_record:
if (recordFound) {
INTERFACE_PROFILING_SECTION_EXIT(9); /* Closes Decode TLV */
INTERFACE_PROFILING_SECTION_ENTER("processFlow", 10);
if (preprocessFlow(&flow)) ret = 1;
INTERFACE_PROFILING_SECTION_EXIT(10);
}
error:
return ret;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseJSONFlow(const char *payload,
int payload_size, u_int32_t source_id,
u_int32_t msg_id) {
json_object *f = NULL;
enum json_tokener_error jerr = json_tokener_success;
#ifndef NTOPNG_PRO
/*
nProbe exports flows in TLV so this code will be removed in the future
Leaving here for old nProbes that will be discontinued soon
*/
return(0);
#endif
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "JSON: '%s' [len=%lu]", payload, strlen(payload));
printf("\n\n%s\n\n", payload);
#endif
if (payload) f = json_tokener_parse_verbose(payload, &jerr);
if (f != NULL) {
int n = 0, rc;
if (json_object_get_type(f) == json_type_array) {
/* Flow array */
int id, num_elements = json_object_array_length(f);
for (id = 0; id < num_elements; id++) {
rc = parseSingleJSONFlow(json_object_array_get_idx(f, id), source_id);
if (rc > 0) n++;
}
} else {
rc = parseSingleJSONFlow(f, source_id);
if (rc > 0) n++;
}
json_object_put(f);
return n;
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data "
"encrypted or invalid JSON?");
ntop->getTrace()->traceEvent(TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;
return 0;
}
return 0;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseTLVFlow(const char *payload, int payload_size,
u_int32_t source_id, u_int32_t msg_id,
void *data) {
ndpi_deserializer deserializer;
ndpi_serialization_type kt;
int n = 0, rc;
rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
payload_size);
if (rc == -1) return 0;
if (ndpi_deserialize_get_format(&deserializer) != ndpi_serialization_format_tlv) {
if (!once) {
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Invalid TLV message: the TLV generated by your probe does not match "
"the version supported "
"by ntopng, please update both the probe and ntopng to the latest "
"version available");
once = true;
}
return 0;
}
while (ndpi_deserialize_get_item_type(&deserializer, &kt) !=
ndpi_serialization_unknown) {
rc = parseSingleTLVFlow(&deserializer, source_id);
if (rc < 0)
break;
else if (rc > 0)
n++;
}
return n;
}
/* **************************************************** */
bool ZMQParserInterface::parseContainerInfo(
json_object *jo, ContainerInfo *const container_info) {
json_object *obj, *obj2;
/* Keep in sync with ZMQParserInterface::freeContainerInfo and
* ParsedeBPF::~ParsedeBPF */
if (json_object_object_get_ex(jo, "ID", &obj))
container_info->id = strdup((char *)json_object_get_string(obj));
if (json_object_object_get_ex(jo, "K8S", &obj)) {
if (json_object_object_get_ex(obj, "POD", &obj2))
container_info->data.k8s.pod =
strdup((char *)json_object_get_string(obj2));
if (json_object_object_get_ex(obj, "NS", &obj2))
container_info->data.k8s.ns =
strdup((char *)json_object_get_string(obj2));
container_info->data_type = container_info_data_type_k8s;
} else if (json_object_object_get_ex(jo, "DOCKER", &obj)) {
container_info->data_type = container_info_data_type_k8s;
} else
container_info->data_type = container_info_data_type_unknown;
if (obj) {
if (json_object_object_get_ex(obj, "NAME", &obj2))
container_info->name = strdup((char *)json_object_get_string(obj2));
}
return true;
}
/* **************************************************** */
void ZMQParserInterface::freeContainerInfo(
ContainerInfo *const container_info) {
if (container_info->id) free(container_info->id);
if (container_info->name) free(container_info->name);
if (container_info->data.k8s.pod) free(container_info->data.k8s.pod);
if (container_info->data.k8s.ns) free(container_info->data.k8s.ns);
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseJSONCounter(const char *payload,
int payload_size) {
json_object *o;
enum json_tokener_error jerr = json_tokener_success;
sFlowInterfaceStats stats;
//ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
memset(&stats, 0, sizeof(stats));
o = json_tokener_parse_verbose(payload, &jerr);
if (o != NULL) {
struct json_object_iterator it = json_object_iter_begin(o);
struct json_object_iterator itEnd = json_object_iter_end(o);
/* Reset data */
memset(&stats, 0, sizeof(stats));
while (!json_object_iter_equal(&it, &itEnd)) {
const char *key = json_object_iter_peek_name(&it);
json_object *v = json_object_iter_peek_value(&it);
const char *value = json_object_get_string(v);
if ((key != NULL) && (value != NULL)) {
if (!strcmp(key, "deviceIP"))
stats.deviceIP =
(u_int32_t)json_object_get_int64(v); // ntohl(inet_addr(value));
else if (!strcmp(key, "samplesGenerated"))
stats.samplesGenerated = (u_int32_t)json_object_get_int64(v);
else if (!strcmp(key, "ifIndex"))
stats.ifIndex = (u_int32_t)json_object_get_int64(v);
else if (!strcmp(key, "ifName"))
stats.ifName = (char *)json_object_get_string(v);
else if (!strcmp(key, "ifType"))
stats.ifType = (u_int32_t)json_object_get_int64(v);
else if (!strcmp(key, "ifSpeed"))
stats.ifSpeed = (u_int32_t)json_object_get_int64(v);
else if (!strcmp(key, "ifDirection"))
stats.ifFullDuplex = (!strcmp(value, "Full")) ? true : false;
else if (!strcmp(key, "ifAdminStatus"))
stats.ifAdminStatus = (!strcmp(value, "Up")) ? true : false;
else if (!strcmp(key, "ifOperStatus"))
stats.ifOperStatus = (!strcmp(value, "Up")) ? true : false;
else if (!strcmp(key, "ifInOctets"))
stats.ifInOctets = json_object_get_int64(v);
else if (!strcmp(key, "ifInPackets"))
stats.ifInPackets = json_object_get_int64(v);
else if (!strcmp(key, "ifInErrors"))
stats.ifInErrors = json_object_get_int64(v);
else if (!strcmp(key, "ifOutOctets"))
stats.ifOutOctets = json_object_get_int64(v);
else if (!strcmp(key, "ifOutPackets"))
stats.ifOutPackets = json_object_get_int64(v);
else if (!strcmp(key, "ifOutErrors"))
stats.ifOutErrors = json_object_get_int64(v);
else if (!strcmp(key, "ifPromiscuousMode"))
stats.ifPromiscuousMode =
(((u_int32_t)json_object_get_int64(v)) == 1) ? true : false;
else if (strlen(key) >= 9 &&
!strncmp(&key[strlen(key) - 9], "CONTAINER", 9)) {
if (parseContainerInfo(v, &stats.container_info))
stats.container_info_set = true;
}
} /* if */
/* Move to the next element */
json_object_iter_next(&it);
} // while json_object_iter_equal
/* Process Flow */
processInterfaceStats(&stats);
freeContainerInfo(&stats.container_info);
json_object_put(o);
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data "
"encrypted or invalid JSON?");
ntop->getTrace()->traceEvent(
TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;
return -1;
}
return 0;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseTLVCounter(const char *payload,
int payload_size) {
sFlowInterfaceStats stats;
ndpi_deserializer deserializer;
ndpi_serialization_type kt, et;
int rc, ret = -1;
memset(&stats, 0, sizeof(stats));
rc = ndpi_init_deserializer_buf(&deserializer, (u_int8_t *)payload,
payload_size);
if (rc == -1) {
goto error;
}
if (ndpi_deserialize_get_format(&deserializer) !=
ndpi_serialization_format_tlv) {
if (!once) {
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Invalid TLV message: the TLV generated by your probe does not match "
"the version supported "
"by ntopng, please update both the probe and ntopng to the latest "
"version available");
once = true;
}
goto error;
}
while ((et = ndpi_deserialize_get_item_type(&deserializer, &kt)) !=
ndpi_serialization_unknown) {
/* Key */
bool key_is_string = false;
ndpi_string key;
u_int32_t key_id = 0;
switch (kt) {
case ndpi_serialization_uint32:
ndpi_deserialize_key_uint32(&deserializer, &key_id);
break;
case ndpi_serialization_string:
ndpi_deserialize_key_string(&deserializer, &key);
key_is_string = true;
break;
default:
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Unsupported TLV key type %u: "
"please update both ntopng and the probe to the same version",
kt);
goto error;
}
if (key_is_string) {
u_int8_t kbkp = key.str[key.str_len];
key.str[key.str_len] = '\0';
bool found = getCounterId(key.str, key.str_len, &key_id);
if (!found) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %s\n",
key.str);
key.str[key.str_len] = kbkp;
goto error;
}
key.str[key.str_len] = kbkp;
}
/* Value */
ParsedValue value = {0};
bool value_is_string = false;
ndpi_string vs;
u_int32_t v32 = 0;
u_int64_t v64 = 0;
switch (et) {
case ndpi_serialization_uint32:
ndpi_deserialize_value_uint32(&deserializer, &v32);
value.int_num = v32;
break;
case ndpi_serialization_uint64:
ndpi_deserialize_value_uint64(&deserializer, &v64);
value.int_num = v64;
break;
case ndpi_serialization_string:
ndpi_deserialize_value_string(&deserializer, &vs);
value.string = vs.str;
value_is_string = true;
break;
default:
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported TLV type %u\n",
et);
goto error;
}
u_int8_t vbkp;
if (value_is_string) {
/* Adding '\0' to the end of the string, backing up the character */
vbkp = vs.str[vs.str_len];
vs.str[vs.str_len] = '\0';
}
switch (key_id) {
case SFLOW_DEVICE_IP:
if (value_is_string)
stats.deviceIP = ntohl(inet_addr((char *)value.string));
else
stats.deviceIP = (u_int32_t)value.int_num;
break;
case SFLOW_SAMPLES_GENERATED:
stats.samplesGenerated = (u_int32_t)value.int_num;
break;
case SFLOW_IF_INDEX:
stats.ifIndex = (u_int32_t)value.int_num;
break;
case SFLOW_IF_NAME:
stats.ifName = strdup((char *)value.string);
break;
case SFLOW_IF_TYPE:
stats.ifType = (u_int32_t)value.int_num;
break;
case SFLOW_IF_SPEED:
stats.ifSpeed = (u_int32_t)value.int_num;
break;
case SFLOW_IF_DIRECTION:
stats.ifFullDuplex = (!strcmp(value.string, "Full")) ? true : false;
break;
case SFLOW_IF_ADMIN_STATUS:
stats.ifAdminStatus = (!strcmp(value.string, "Up")) ? true : false;
break;
case SFLOW_IF_OPER_STATUS:
stats.ifOperStatus = (!strcmp(value.string, "Up")) ? true : false;
break;
case SFLOW_IF_IN_OCTETS:
stats.ifInOctets = value.int_num;
break;
case SFLOW_IF_IN_PACKETS:
stats.ifInPackets = value.int_num;
break;
case SFLOW_IF_IN_ERRORS:
stats.ifInErrors = value.int_num;
break;
case SFLOW_IF_OUT_OCTETS:
stats.ifOutOctets = value.int_num;
break;
case SFLOW_IF_OUT_PACKETS:
stats.ifOutPackets = value.int_num;
break;
case SFLOW_IF_OUT_ERRORS:
stats.ifOutErrors = value.int_num;
break;
case SFLOW_IF_PROMISCUOUS_MODE:
stats.ifPromiscuousMode = (value.int_num == 1) ? true : false;
break;
default:
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported Counter %u\n",
key_id);
break;
}
/* Restoring backed up character at the end of the string in place of '\0'
*/
if (value_is_string) vs.str[vs.str_len] = vbkp;
ndpi_deserialize_next(&deserializer);
}
/* Process Flow */
processInterfaceStats(&stats);
ret = 0;
if (stats.ifName) free(stats.ifName);
error:
return ret;
}
/* **************************************************** */
/*
* Minimum set of fields expected by ntopng
* NOTE:
* - the following fields may or may not appear depending on the traffic:
* "IPV4_SRC_ADDR", "IPV4_DST_ADDR", "IPV6_SRC_ADDR", "IPV6_DST_ADDR"
* - some fields may not appear when nprobe runs with --collector-passthrough
* "L7_PROTO"
*/
static std::string mandatory_template_fields[] = {
"FIRST_SWITCHED", "LAST_SWITCHED", "L4_SRC_PORT", "L4_DST_PORT",
"IP_PROTOCOL_VERSION", "PROTOCOL", "IN_BYTES", "IN_PKTS",
"OUT_BYTES", "OUT_PKTS"};
u_int8_t ZMQParserInterface::parseTemplate(const char *payload,
int payload_size, u_int32_t source_id,
u_int32_t msg_id, void *data) {
/* The format that is currently defined for templates is a JSON as follows:
[{"12/Apr/2023 23:53:04
[util.cPEN":0,"field":1,"len":4,"format":"formatted_uint","name":"IN_BYTES","descr":"Incoming
flow bytes
(src->dst)"},{"PEN":0,"field":2,"len":4,"format":"formatted_uint","name":"IN_PKTS","descr":"Incoming
flow packets (src->dst)"},]
*/
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
ZMQ_Template zmq_template;
json_object *obj, *w, *z;
enum json_tokener_error jerr = json_tokener_success;
memset(&zmq_template, 0, sizeof(zmq_template));
obj = json_tokener_parse_verbose(payload, &jerr);
if (obj) {
if (json_object_get_type(obj) == json_type_array) {
int i, num_elements = json_object_array_length(obj);
std::set<std::string> mandatory_fields(
mandatory_template_fields,
mandatory_template_fields + sizeof(mandatory_template_fields) /
sizeof(mandatory_template_fields[0]));
for (i = 0; i < num_elements; i++) {
memset(&zmq_template, 0, sizeof(zmq_template));
w = json_object_array_get_idx(obj, i);
if (json_object_object_get_ex(w, "PEN", &z))
zmq_template.pen = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "field", &z))
zmq_template.field = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "format", &z))
zmq_template.format = json_object_get_string(z);
if (json_object_object_get_ex(w, "name", &z))
zmq_template.name = json_object_get_string(z);
if (json_object_object_get_ex(w, "descr", &z))
zmq_template.descr = json_object_get_string(z);
if (zmq_template.name) {
addMapping(zmq_template.name, zmq_template.field, zmq_template.pen,
zmq_template.descr);
mandatory_fields.erase(zmq_template.name);
}
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Template [PEN: %u][field:
// %u][format: %s][name: %s][descr: %s]",
// zmq_template.pen, zmq_template.field,
// zmq_template.format, zmq_template.name, zmq_template.descr)
;
}
if (mandatory_fields.size() > 0) {
static bool template_warning_sent = 0;
if (!template_warning_sent) {
std::set<std::string>::iterator it;
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Some mandatory fields are missing in the ZMQ template:");
template_warning_sent = true;
for (it = mandatory_fields.begin(); it != mandatory_fields.end();
++it) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "\t%s", (*it).c_str());
}
}
}
}
json_object_put(obj);
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data "
"encrypted or invalid JSON?");
ntop->getTrace()->traceEvent(
TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;
return -1;
}
return 0;
}
/* **************************************************** */
void ZMQParserInterface::setFieldMap(
const ZMQ_FieldMap *const field_map) const {
char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
snprintf(hname, sizeof(hname), CONST_FIELD_MAP_CACHE_KEY, get_id(),
field_map->pen);
snprintf(key, sizeof(key), "%u", field_map->field);
ntop->getRedis()->hashSet(hname, key, field_map->map);
}
/* **************************************************** */
void ZMQParserInterface::setFieldValueMap(
const ZMQ_FieldValueMap *const field_value_map) const {
char hname[CONST_MAX_LEN_REDIS_KEY], key[32];
snprintf(hname, sizeof(hname), CONST_FIELD_VALUE_MAP_CACHE_KEY, get_id(),
field_value_map->pen, field_value_map->field);
snprintf(key, sizeof(key), "%u", field_value_map->value);
ntop->getRedis()->hashSet(hname, key, field_value_map->map);
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseOptionFieldMap(json_object *const jo) {
int arraylen = json_object_array_length(jo);
json_object *w, *z;
ZMQ_FieldMap field_map;
memset(&field_map, 0, sizeof(field_map));
for (int i = 0; i < arraylen; i++) {
w = json_object_array_get_idx(jo, i);
if (json_object_object_get_ex(w, "PEN", &z))
field_map.pen = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "field", &z)) {
field_map.field = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "map", &z)) {
field_map.map = json_object_to_json_string(z);
setFieldMap(&field_map);
#ifdef CUSTOM_APP_DEBUG
ntop->getTrace()->traceEvent(
TRACE_NORMAL, "Option FieldMap [PEN: %u][field: %u][map: %s]",
field_map.pen, field_map.field, field_map.map);
#endif
}
}
}
return 0;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseOptionFieldValueMap(
json_object *const w) {
json_object *z;
ZMQ_FieldValueMap field_value_map;
memset(&field_value_map, 0, sizeof(field_value_map));
if (json_object_object_get_ex(w, "PEN", &z))
field_value_map.pen = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "field", &z)) {
field_value_map.field = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "value", &z)) {
field_value_map.value = (u_int32_t)json_object_get_int(z);
if (json_object_object_get_ex(w, "map", &z)) {
field_value_map.map = json_object_to_json_string(z);
setFieldValueMap(&field_value_map);
#ifdef CUSTOM_APP_DEBUG
ntop->getTrace()->traceEvent(
TRACE_NORMAL,
"Option FieldValueMap [PEN: %u][field: %u][value: %u][map: %s]",
field_value_map.pen, field_value_map.field, field_value_map.value,
field_value_map.map);
#endif
}
}
}
return 0;
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload,
int payload_size,
u_int32_t source_id,
u_int32_t msg_id, void *data) {
enum json_tokener_error jerr = json_tokener_success;
json_object *o = json_tokener_parse_verbose(payload, &jerr);
if (o != NULL) {
json_object *z;
ListeningPorts pinfo;
u_int16_t vlan_id = 0;
if(ntop->getPrefs()->is_edr_mode()
&& ntop->getPrefs()->addVLANCloudToExporters()) {
if(json_object_object_get_ex(o, "instance-name", &z))
vlan_id = findVLANMapping(json_object_get_string(z));
}
/* Parse port information */
if (json_object_object_get_ex(o, "listening-ports", &z)) {
enum json_type o_type = json_object_get_type(z);
if (o_type == json_type_object) {
pinfo.parsePorts(z);
/* Parse list of IP addresses */
if (json_object_object_get_ex(o, "ip-addresses", &z)) {
enum json_type o_type = json_object_get_type(z);
if (o_type == json_type_array) {
for (u_int i = 0; i < (u_int)json_object_array_length(z); i++) {
Host *h = NULL;
json_object *host = json_object_array_get_idx(z, i);
const char *ip_addr = json_object_get_string(host);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Received listening
// ports for %s", ip_addr);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
/* Assign list of ports to the host */
h = getHost((char *)ip_addr,
vlan_id,
0 /* observationPointId */,
true /* inline */);
if (h) {
h->setListeningPorts(pinfo);
} else {
// ntop->getTrace()->traceEvent(TRACE_INFO, "Unable to find host %s", (char *)ip_addr);
}
}
}
}
}
}
json_object_put(o); /* Free memory */
}
return (0);
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseSNMPIntefaces(const char *payload,
int payload_size,
u_int32_t source_id,
u_int32_t msg_id, void *data) {
enum json_tokener_error jerr = json_tokener_success;
json_object *f = json_tokener_parse_verbose(payload, &jerr);
if (f != NULL) {
struct json_object_iterator it = json_object_iter_begin(f);
struct json_object_iterator itEnd = json_object_iter_end(f);
while (!json_object_iter_equal(&it, &itEnd)) {
const char *key = json_object_iter_peek_name(&it);
json_object *value = json_object_iter_peek_value(&it);
const char *rsp = json_object_to_json_string(value);
char redis_key[64];
/*
Saving 'short' interface names
Use lua/modules/snmp_mappings.lua to access them from Lua //
*/
snprintf(redis_key, sizeof(redis_key), "cachedexporters.%s.ifnames", key);
ntop->getRedis()->set(redis_key, rsp);
ntop->getTrace()->traceEvent(TRACE_INFO, "[JSON] %s = %s", redis_key,
rsp);
/* Move to the next element */
json_object_iter_next(&it);
} // while json_object_iter_equal
json_object_put(f); /* Free memory */
return (0);
} else
return (-1);
}
/* **************************************************** */
u_int8_t ZMQParserInterface::parseOption(const char *payload, int payload_size,
u_int32_t source_id, u_int32_t msg_id,
void *data) {
/* The format that is currently defined for options is a JSON as follows:
char opt[] = "
"{\"PEN\":8741, \"field\": 22, \"value\":1, \"map\":{\"name\":\"Skype\"}},"
"{\"PEN\":8741, \"field\": 22, \"value\":3, \"map\":{\"name\":\"Winni\"}}";
parseOption(opt, strlen(opt), source_id, this);
*/
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
json_object *o;
enum json_tokener_error jerr = json_tokener_success;
o = json_tokener_parse_verbose(payload, &jerr);
if (o != NULL) {
parseOptionFieldValueMap(o);
json_object_put(o);
} else {
// if o != NULL
if (!once) {
ntop->getTrace()->traceEvent(
TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, "
"data encrypted or invalid JSON?");
ntop->getTrace()->traceEvent(
TRACE_WARNING, "JSON Parse error [%s] payload size: %u payload: %s",
json_tokener_error_desc(jerr), payload_size, payload);
}
once = true;
return -1;
}
return 0;
}
/* **************************************** */
u_int32_t ZMQParserInterface::periodicStatsUpdateFrequency() const {
nProbeStats *zrs = zmq_remote_stats;
u_int32_t update_freq;
u_int32_t update_freq_min = ntop->getPrefs()->get_housekeeping_frequency();
if (zrs) update_freq =
min_val(max_val(zrs->remote_lifetime_timeout, zrs->remote_idle_timeout),
zrs->remote_collected_lifetime_timeout);
else
update_freq = update_freq_min;
return max_val(update_freq, update_freq_min);
}
/* **************************************** */
void ZMQParserInterface::setRemoteStats(nProbeStats *zrs) {
nProbeStats *last_zrs, *cumulative_zrs;
map<u_int32_t, nProbeStats *>::iterator it;
u_int32_t last_time = getTimeLastPktRcvdRemote();
struct timeval now;
gettimeofday(&now, NULL);
/* Store stats for the current exporter */
lock.wrlock(__FILE__, __LINE__);
if (source_id_last_zmq_remote_stats.find(zrs->source_id) ==
source_id_last_zmq_remote_stats.end()) {
last_zrs = new (std::nothrow) nProbeStats();
if (!last_zrs) {
lock.unlock(__FILE__, __LINE__);
return;
}
source_id_last_zmq_remote_stats[zrs->source_id] = last_zrs;
} else
last_zrs = source_id_last_zmq_remote_stats[zrs->source_id];
*last_zrs = *zrs;
lock.unlock(__FILE__, __LINE__);
if (Utils::msTimevalDiff(&now, &last_zmq_remote_stats_update) < 1000) {
/* Do not update cumulative stats more frequently than once per second.
* Note: this also avoids concurrent access (use after free) of shadow */
return;
}
/* Sum stats from all exporters */
cumulative_zrs = new (std::nothrow) nProbeStats();
if (!cumulative_zrs) return;
lock.wrlock(__FILE__, __LINE__); /* Need write lock due to (*) */
for (it = source_id_last_zmq_remote_stats.begin();
it != source_id_last_zmq_remote_stats.end();) {
nProbeStats *zrs_i = it->second;
if (last_time > MAX_HASH_ENTRY_IDLE &&
zrs_i->remote_time < last_time - MAX_HASH_ENTRY_IDLE /* sec */) {
/* Do not account inactive exporters, release them */
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "Erased %s [local_time:
// %u][last_time: %u]", zrs_i->remote_ifname, zrs_i->local_time,
// last_time);
delete (zrs_i);
source_id_last_zmq_remote_stats.erase(it++); /* (*) */
} else {
cumulative_zrs->num_exporters += zrs_i->num_exporters;
cumulative_zrs->remote_bytes += zrs_i->remote_bytes;
cumulative_zrs->remote_pkts += zrs_i->remote_pkts;
cumulative_zrs->num_flow_exports += zrs_i->num_flow_exports;
cumulative_zrs->remote_ifspeed =
max_val(cumulative_zrs->remote_ifspeed, zrs_i->remote_ifspeed);
cumulative_zrs->remote_time =
max_val(cumulative_zrs->remote_time, zrs_i->remote_time);
cumulative_zrs->local_time =
max_val(cumulative_zrs->local_time, zrs_i->local_time);
cumulative_zrs->avg_bps += zrs_i->avg_bps;
cumulative_zrs->avg_pps += zrs_i->avg_pps;
cumulative_zrs->remote_lifetime_timeout =
max_val(cumulative_zrs->remote_lifetime_timeout,
zrs_i->remote_lifetime_timeout);
cumulative_zrs->remote_collected_lifetime_timeout =
max_val(cumulative_zrs->remote_collected_lifetime_timeout,
zrs_i->remote_collected_lifetime_timeout);
cumulative_zrs->remote_idle_timeout = max_val(
cumulative_zrs->remote_idle_timeout, zrs_i->remote_idle_timeout);
cumulative_zrs->export_queue_full += zrs_i->export_queue_full;
cumulative_zrs->too_many_flows += zrs_i->too_many_flows;
cumulative_zrs->elk_flow_drops += zrs_i->elk_flow_drops;
cumulative_zrs->sflow_pkt_sample_drops += zrs_i->sflow_pkt_sample_drops;
cumulative_zrs->flow_collection_drops += zrs_i->flow_collection_drops;
cumulative_zrs->flow_collection_udp_socket_drops +=
zrs_i->flow_collection_udp_socket_drops;
cumulative_zrs->flow_collection.nf_ipfix_flows +=
zrs_i->flow_collection.nf_ipfix_flows;
cumulative_zrs->flow_collection.sflow_samples +=
zrs_i->flow_collection.sflow_samples;
++it;
}
}
lock.unlock(__FILE__, __LINE__);
ifSpeed = cumulative_zrs->remote_ifspeed;
last_pkt_rcvd = 0;
last_pkt_rcvd_remote = cumulative_zrs->remote_time;
last_remote_pps = cumulative_zrs->avg_pps;
last_remote_bps = cumulative_zrs->avg_bps;
if (cumulative_zrs->flow_collection.sflow_samples > 0)
is_sampled_traffic = true;
/* Recalculate the flow max idle according to the timeouts received */
flow_max_idle = min_val(cumulative_zrs->remote_lifetime_timeout,
cumulative_zrs->remote_collected_lifetime_timeout) +
10 /* Safe margin */;
updateFlowMaxIdle();
if ((zmq_initial_pkts == 0) /* ntopng has been restarted */
|| (cumulative_zrs->remote_bytes <
zmq_initial_bytes) /* nProbe has been restarted */
) {
/* Start over */
zmq_initial_bytes = cumulative_zrs->remote_bytes,
zmq_initial_pkts = cumulative_zrs->remote_pkts;
}
if (zmq_remote_initial_exported_flows == 0 /* ntopng has been restarted */
|| cumulative_zrs->num_flow_exports <
zmq_remote_initial_exported_flows) /* nProbe has been restarted */
zmq_remote_initial_exported_flows = cumulative_zrs->num_flow_exports;
/* Swap values */
if (zmq_remote_stats_shadow) free(zmq_remote_stats_shadow);
zmq_remote_stats_shadow = zmq_remote_stats;
zmq_remote_stats = cumulative_zrs;
memcpy(&last_zmq_remote_stats_update, &now, sizeof(now));
/*
* Don't override ethStats here, these stats are properly updated
* inside NetworkInterface::processFlow for ZMQ interfaces.
* Overriding values here may cause glitches and non-strictly-increasing
counters
* yielding negative rates.
ethStats.setNumBytes(cumulative_zrs->remote_bytes),
ethStats.setNumPackets(cumulative_zrs->remote_pkts);
*
*/
}
/* **************************************************** */
#ifdef NTOPNG_PRO
bool ZMQParserInterface::getCustomAppDetails(u_int32_t remapped_app_id,
u_int32_t *const pen,
u_int32_t *const app_field,
u_int32_t *const app_id) {
return custom_app_maps
&& custom_app_maps->getCustomAppDetails(remapped_app_id, pen, app_field, app_id);
}
#endif
/* **************************************************** */
void ZMQParserInterface::probeLuaStats(lua_State *vm) {
std::map<u_int32_t, nProbeStats *>::iterator it;
lua_newtable(vm);
lock.rdlock(__FILE__, __LINE__);
for (it = source_id_last_zmq_remote_stats.begin();
it != source_id_last_zmq_remote_stats.end(); ++it) {
nProbeStats *zrs = it->second;
lua_newtable(vm);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s (%u)", zrs->remote_ifname,
// it->first);
lua_push_str_table_entry(vm, "remote.name", zrs->remote_ifname);
lua_push_str_table_entry(vm, "remote.if_addr", zrs->remote_ifaddress);
lua_push_uint64_table_entry(vm, "remote.ifspeed", zrs->remote_ifspeed);
lua_push_str_table_entry(vm, "probe.ip", zrs->remote_probe_address);
lua_push_str_table_entry(vm, "probe.uuid",
zrs->uuid);
lua_push_uint64_table_entry(vm, "probe.uuid_num",
zrs->uuid_num);
lua_push_str_table_entry(vm, "probe.public_ip",
zrs->remote_probe_public_address);
lua_push_str_table_entry(vm, "probe.probe_version",
zrs->remote_probe_version);
lua_push_str_table_entry(vm, "probe.probe_os", zrs->remote_probe_os);
lua_push_str_table_entry(vm, "probe.probe_license",
zrs->remote_probe_license);
lua_push_str_table_entry(vm, "probe.probe_edition",
zrs->remote_probe_edition);
lua_push_str_table_entry(vm, "probe.probe_maintenance",
zrs->remote_probe_maintenance);
lua_push_uint64_table_entry(vm, "drops.export_queue_full", zrs->export_queue_full);
lua_push_uint64_table_entry(vm, "drops.too_many_flows", zrs->too_many_flows);
lua_push_uint64_table_entry(vm, "drops.elk_flow_drops", zrs->elk_flow_drops);
lua_push_uint64_table_entry(vm, "drops.sflow_pkt_sample_drops", zrs->sflow_pkt_sample_drops);
lua_push_uint64_table_entry(vm, "drops.flow_collection_drops", zrs->flow_collection_drops);
lua_push_uint64_table_entry(vm, "drops.flow_collection_udp_socket_drops", zrs->flow_collection_udp_socket_drops);
lua_push_uint64_table_entry(vm, "flow_collection.nf_ipfix_flows", zrs->flow_collection.nf_ipfix_flows);
lua_push_uint64_table_entry(vm, "flow_collection.sflow_samples", zrs->flow_collection.sflow_samples);
lua_push_uint64_table_entry(vm, "zmq.num_flow_exports", zrs->num_flow_exports);
lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
exporterLuaStats(vm, zrs);
/* ************************************* */
if (zrs) {
lua_push_uint64_table_entry(vm, "probe.remote_time",
zrs->remote_time); /* remote time when last event has been sent */
lua_push_uint64_table_entry(vm, "probe.local_time",
zrs->local_time); /* local time when last event has been received */
lua_push_uint64_table_entry(vm, "zmq.num_flow_exports",
zrs->num_flow_exports - zmq_remote_initial_exported_flows);
lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
if (zrs->export_queue_full > 0)
lua_push_uint64_table_entry(vm, "zmq.drops.export_queue_full",
zrs->export_queue_full);
if (zrs->flow_collection_drops)
lua_push_uint64_table_entry(vm, "zmq.drops.flow_collection_drops",
zrs->flow_collection_drops);
if (zrs->flow_collection_udp_socket_drops)
lua_push_uint64_table_entry(vm,
"zmq.drops.flow_collection_udp_socket_drops",
zrs->flow_collection_udp_socket_drops);
lua_push_uint64_table_entry(vm, "timeout.lifetime",
zrs->remote_lifetime_timeout);
lua_push_uint64_table_entry(vm, "timeout.collected_lifetime",
zrs->remote_collected_lifetime_timeout);
lua_push_uint64_table_entry(vm, "timeout.idle", zrs->remote_idle_timeout);
}
lua_pushstring(vm, std::to_string(it->first).c_str() /* The source_id as string (can't use integers or Lua will think it's an array ) */);
lua_insert(vm, -2);
lua_settable(vm, -3);
}
lua_rawseti(vm, -2, get_id());
/* Here the Interface ID is added because in case of View Interfaces
* this field could be the same for different interfaces
*/
lock.unlock(__FILE__, __LINE__);
}
/* **************************************************** */
void ZMQParserInterface::lua(lua_State *vm, bool fullStats) {
NetworkInterface::lua(vm, fullStats);
lua_newtable(vm);
probeLuaStats(vm);
lua_pushstring(vm, "probes");
lua_insert(vm, -2);
lua_settable(vm, -3);
}
/* **************************************************** */
void ZMQParserInterface::exporterLuaStats(lua_State *vm, nProbeStats *zrs) {
std::map<u_int32_t, ExporterStats>::iterator it;
lua_newtable(vm);
for (it = zrs->exportersStats.begin();
it != zrs->exportersStats.end(); ++it) {
lua_newtable(vm);
char buf[32], ipb[24];
snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(it->first, ipb, sizeof(ipb)));
lua_push_uint64_table_entry(vm, "time_last_used", it->second.time_last_used);
lua_push_uint64_table_entry(vm, "num_netflow_flows", it->second.num_netflow_flows);
lua_push_uint64_table_entry(vm, "num_sflow_flows", it->second.num_sflow_flows);
lua_push_uint64_table_entry(vm, "num_drops", it->second.num_drops);
lua_push_uint64_table_entry(vm, "unique_source_id", it->second.unique_source_id);
lua_pushstring(vm, buf);
lua_insert(vm, -2);
lua_settable(vm, -3);
}
lua_pushstring(vm, "exporters");
lua_insert(vm, -2);
lua_settable(vm, -3);
}
/* **************************************************** */
void ZMQParserInterface::loadVLANMappings() {
char **keys, **values, buf[64];
int rc;
Redis *redis = ntop->getRedis();
top_vlan_id = 0;
snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
rc = redis->hashGetAll(buf, &keys, &values);
if(rc > 0) {
for (int i = 0; i < rc; i++) {
if(values[i] && keys[i]) {
u_int16_t v = atoi(values[i]);
if(v > top_vlan_id) top_vlan_id = v;
name_to_vlan[keys[i]] = v;
}
if (values[i]) free(values[i]);
if (keys[i]) free(keys[i]);
}
free(keys);
free(values);
}
}
/* **************************************************** */
u_int16_t ZMQParserInterface::findVLANMapping(std::string name) {
std::unordered_map<std::string, u_int16_t>::iterator it = name_to_vlan.find(name);
if(it != name_to_vlan.end())
return(it->second);
else if(top_vlan_id < 4095) {
char value[16], buf[64];
u_int16_t id = ++top_vlan_id;
Redis *redis = ntop->getRedis();
if(id >= 4096) return(0 /* too many vlans */);
snprintf(value, sizeof(value), "%u", id);
snprintf(buf, sizeof(buf), VLAN_HASH_KEY, get_id());
redis->hashSet(buf, name.c_str(), value);
/* Add VLAN mapping */
redis->hashSet(NTOPNG_VLAN_ALIASES, value, name.c_str());
name_to_vlan[name] = id;
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Added %s = %d", name.c_str(), id);
return(id);
} else
return(0);
}
/* **************************************************** */
#endif