Added support for SIP CallID in flow key

This commit is contained in:
Luca Deri 2024-05-04 11:27:38 +02:00
parent 4429334066
commit 3d117a9e16
12 changed files with 170 additions and 116 deletions

View file

@ -33,7 +33,7 @@ ZMQParserInterface::ZMQParserInterface(const char *endpoint,
const char *custom_interface_type)
: ParserInterface(endpoint, custom_interface_type) {
if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__);
zmq_initial_bytes = 0, zmq_initial_pkts = 0;
zmq_remote_stats = zmq_remote_stats_shadow = NULL;
memset(&last_zmq_remote_stats_update, 0,
@ -53,7 +53,7 @@ ZMQParserInterface::ZMQParserInterface(const char *endpoint,
/*
Populate defaults for @NTOPNG@ nProbe templates. No need to populate
all the fields as nProbe will sent them periodically.
This minimum set is required for backward compatibility.
*/
addMapping("IN_SRC_MAC", IN_SRC_MAC);
@ -143,7 +143,7 @@ ZMQParserInterface::ZMQParserInterface(const char *endpoint,
addMapping("FLOW_SOURCE", FLOW_SOURCE, NTOP_PEN);
addMapping("SMTP_MAIL_FROM", SMTP_MAIL_FROM, NTOP_PEN);
addMapping("SMTP_RCPT_TO", SMTP_RCPT_TO, NTOP_PEN);
/* eBPF / Process */
addMapping("SRC_PROC_PID", SRC_PROC_PID, NTOP_PEN);
addMapping("SRC_PROC_NAME", SRC_PROC_NAME, NTOP_PEN);
@ -603,7 +603,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
flow->dst_ip.set(ntohl(value->int_num));
} else {
ip_aux.set((char *)value->string);
if (!ip_aux.isEmpty() &&
!ntop->getPrefs()->do_override_dst_with_post_nat_dst())
ntop->getTrace()->traceEvent(TRACE_WARNING,
@ -717,11 +717,11 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
if (ip) {
flow->device_ip = ip;
if(ntop->getPrefs()->is_edr_mode()) {
char buf[32], ipb[24];
std::unordered_map<u_int32_t, bool>::iterator it = cloud_flow_exporters.find(ip);
if(it == cloud_flow_exporters.end()) {
cloud_flow_exporters[ip] = true;
snprintf(buf, sizeof(buf), "%s", Utils::intoaV4(ip, ipb, sizeof(ipb)));
@ -740,7 +740,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
inet_pton(AF_INET6, value->string, &flow->device_ipv6);
break;
case FLOW_END_REASON:
if (value->string)
if (value->string)
flow->setEndReason(value->string);
break;
case TOTAL_FLOWS_EXP:
@ -777,9 +777,9 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow *const flow,
if (ntop->getPrefs()->do_override_dst_with_post_nat_dst()) {
if (value->string) {
IpAddress tmp;
tmp.set(value->string);
if (!tmp.isEmpty()) {
flow->dst_ip.set((char *)value->string);
}
@ -839,7 +839,7 @@ bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
if (field < NTOP_BASE_ID) field += NTOP_BASE_ID;
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[field %d][%s]", field, value->string ? value->string : ""); */
switch (field) {
case L7_PROTO:
if (value->string) {
@ -1015,8 +1015,8 @@ bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
case L7_PROTO_RISK:
flow->setRisk((ndpi_risk)value->int_num);
break;
case L7_PROTO_RISK_NAME:
case L7_PROTO_RISK_NAME:
flow->setRiskName(value->string);
break;
@ -1033,15 +1033,15 @@ bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
{
FlowSource s;
if((value->int_num < packet_to_flow) || (value->int_num > collected_sflow))
if((value->int_num < packet_to_flow) || (value->int_num > collected_sflow))
s = packet_to_flow; /* Default value */
else
s = static_cast<FlowSource>(value->int_num);
flow->setFlowSource(s);
}
break;
case BITTORRENT_HASH:
if (value->string && value->string[0] && value->string[0] != '\n')
flow->setBittorrentHash(value->string);
@ -1194,24 +1194,29 @@ bool ZMQParserInterface::parsePENNtopField(ParsedFlow *const flow,
case DST_PROC_CONTAINER_ID:
if (value->string && value->string[0])
flow->setParsedContainerInfo();
flow->dst_container_info.id = strdup(value->string);
flow->dst_container_info.id = strdup(value->string);
break;
case SMTP_RCPT_TO:
if(value->string && value->string[0])
flow->setSMTPRcptTo(value->string);
flow->setSMTPRcptTo(value->string);
break;
case SMTP_MAIL_FROM:
if(value->string && value->string[0])
flow->setSMTPMailFrom(value->string);
flow->setSMTPMailFrom(value->string);
break;
case DHCP_CLIENT_NAME:
if(value->string && value->string[0])
flow->setDHCPClientName(value->string);
flow->setDHCPClientName(value->string);
break;
case SIP_CALL_ID:
if(value->string && value->string[0])
flow->setSIPCallId(value->string);
break;
default:
return false;
}
@ -1311,7 +1316,7 @@ bool ZMQParserInterface::matchPENZeroField(ParsedFlow *const flow,
return (flow->direction == atoi(value->string));
else
return (flow->direction == value->int_num);
case FLOW_END_REASON:
if (value->string) {
if (flow->getEndReason())
@ -1476,7 +1481,7 @@ bool ZMQParserInterface::matchPENNtopField(ParsedFlow *const flow,
return (strcmp(flow->getSMTPMailFrom(), value->string) == 0);
else
return false;
case SMTP_RCPT_TO:
if (value->string && flow->getSMTPRcptTo())
return (strcmp(flow->getSMTPRcptTo(), value->string) == 0);
@ -1707,7 +1712,7 @@ bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
flow->dst_ip.setVersion(6);
else {
invalid_flow = true;
ntop->getTrace()->traceEvent(TRACE_WARNING,
"IP version mismatch: client:%d server:%d - flow will be ignored",
flow->src_ip.getVersion(), flow->dst_ip.getVersion());
@ -1741,7 +1746,7 @@ bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
Don't swap if the client has sent a SYN. Unfortunately as TCP flags
are in OR we cannot see if this is a initiator or a responder so
better to be conservative rather than swapping wrongly
See also https://github.com/ntop/ntopng/issues/1978
*/
@ -1756,13 +1761,13 @@ bool ZMQParserInterface::preprocessFlow(ParsedFlow *flow) {
do_swap = false; /* Don't do anything: this might be RTP or similar */
#endif
}
if(do_swap)
flow->swap();
}
}
}
if (flow->pkt_sampling_rate == 0) flow->pkt_sampling_rate = 1;
/* Process Flow */
@ -2803,13 +2808,13 @@ u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload,
json_object *z;
ListeningPorts pinfo;
u_int16_t vlan_id = 0;
if(ntop->getPrefs()->is_edr_mode()
&& ntop->getPrefs()->addVLANCloudToExporters()) {
if(json_object_object_get_ex(o, "instance-name", &z))
vlan_id = findVLANMapping(json_object_get_string(z));
}
/* Parse port information */
if (json_object_object_get_ex(o, "listening-ports", &z)) {
enum json_type o_type = json_object_get_type(z);
@ -2819,7 +2824,7 @@ u_int8_t ZMQParserInterface::parseListeningPorts(const char *payload,
/* Parse list of IP addresses */
if (json_object_object_get_ex(o, "ip-addresses", &z)) {
enum json_type o_type = json_object_get_type(z);
if (o_type == json_type_array) {
for (u_int i = 0; i < (u_int)json_object_array_length(z); i++) {
Host *h = NULL;
@ -3104,8 +3109,8 @@ bool ZMQParserInterface::getCustomAppDetails(u_int32_t remapped_app_id,
u_int32_t *const pen,
u_int32_t *const app_field,
u_int32_t *const app_id) {
return custom_app_maps && custom_app_maps->getCustomAppDetails(
remapped_app_id, pen, app_field, app_id);
return custom_app_maps
&& custom_app_maps->getCustomAppDetails(remapped_app_id, pen, app_field, app_id);
}
#endif
@ -3162,18 +3167,15 @@ void ZMQParserInterface::lua(lua_State *vm, bool fullStats) {
/* ************************************* */
if (zrs) {
lua_push_uint64_table_entry(
vm, "probe.remote_time",
zrs->remote_time); /* remote time when last event has been sent */
lua_push_uint64_table_entry(
vm, "probe.local_time",
zrs->local_time); /* local time when last event has been received */
lua_push_uint64_table_entry(
vm, "zmq.num_flow_exports",
zrs->num_flow_exports - zmq_remote_initial_exported_flows);
lua_push_uint64_table_entry(vm, "probe.remote_time",
zrs->remote_time); /* remote time when last event has been sent */
lua_push_uint64_table_entry(vm, "probe.local_time",
zrs->local_time); /* local time when last event has been received */
lua_push_uint64_table_entry(vm, "zmq.num_flow_exports",
zrs->num_flow_exports - zmq_remote_initial_exported_flows);
lua_push_uint64_table_entry(vm, "zmq.num_exporters", zrs->num_exporters);
if (zrs->export_queue_full > 0)
lua_push_uint64_table_entry(vm, "zmq.drops.export_queue_full",
zrs->export_queue_full);