ntopng/src/Flow.cpp

3769 lines
131 KiB
C++

/*
*
* (C) 2013-19 - ntop.org
*
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*
*/
#include "ntop_includes.h"
/* static so default is zero-initialization, let's just define it */
const ndpi_protocol Flow::ndpiUnknownProtocol = { NDPI_PROTOCOL_UNKNOWN,
NDPI_PROTOCOL_UNKNOWN,
NDPI_PROTOCOL_CATEGORY_UNSPECIFIED };
//#define DEBUG_DISCOVERY
//#define DEBUG_UA
/* *************************************** */
Flow::Flow(NetworkInterface *_iface,
u_int16_t _vlanId, u_int8_t _protocol,
Mac *_cli_mac, IpAddress *_cli_ip, u_int16_t _cli_port,
Mac *_srv_mac, IpAddress *_srv_ip, u_int16_t _srv_port,
const ICMPinfo * const _icmp_info,
time_t _first_seen, time_t _last_seen) : GenericHashEntry(_iface) {
vlanId = _vlanId, protocol = _protocol, cli_port = _cli_port, srv_port = _srv_port;
cli2srv_packets = 0, cli2srv_bytes = 0, cli2srv_goodput_bytes = 0,
srv2cli_packets = 0, srv2cli_bytes = 0, srv2cli_goodput_bytes = 0,
cli2srv_last_packets = 0, cli2srv_last_bytes = 0, srv2cli_last_packets = 0, srv2cli_last_bytes = 0,
cli_host = srv_host = NULL, good_low_flow_detected = false,
srv2cli_last_goodput_bytes = cli2srv_last_goodput_bytes = 0, good_ssl_hs = true,
flow_alerted = flow_dropped_counts_increased = false, vrfId = 0;
l7_protocol_guessed = detection_completed = false;
ndpiDetectedProtocol = ndpiUnknownProtocol;
doNotExpireBefore = iface->getTimeLastPktRcvd() + DONT_NOT_EXPIRE_BEFORE_SEC;
#ifdef HAVE_NEDGE
last_conntrack_update = 0;
marker = MARKER_NO_ACTION;
#endif
icmp_info = _icmp_info ? new (std::nothrow) ICMPinfo(*_icmp_info) : NULL;
memset(&cli2srvStats, 0, sizeof(cli2srvStats)), memset(&srv2cliStats, 0, sizeof(srv2cliStats));
ndpiFlow = NULL, cli_id = srv_id = NULL, client_proc = server_proc = NULL;
json_info = strdup("{}"), cli2srv_direction = true, twh_over = twh_ok = false,
dissect_next_http_packet = false,
check_tor = false, host_server_name = NULL, diff_num_http_requests = 0,
bt_hash = NULL, community_id_flow_hash = NULL;
src2dst_tcp_flags = 0, dst2src_tcp_flags = 0, last_update_time.tv_sec = 0, last_update_time.tv_usec = 0,
bytes_thpt = 0, goodput_bytes_thpt = 0, top_bytes_thpt = 0, top_pkts_thpt = 0;
bytes_thpt_cli2srv = 0, goodput_bytes_thpt_cli2srv = 0;
bytes_thpt_srv2cli = 0, goodput_bytes_thpt_srv2cli = 0;
pkts_thpt = 0, pkts_thpt_cli2srv = 0, pkts_thpt_srv2cli = 0;
cli2srv_last_bytes = 0, prev_cli2srv_last_bytes = 0, srv2cli_last_bytes = 0, prev_srv2cli_last_bytes = 0;
cli2srv_last_packets = 0, prev_cli2srv_last_packets = 0, srv2cli_last_packets = 0, prev_srv2cli_last_packets = 0;
top_bytes_thpt = 0, top_goodput_bytes_thpt = 0, applLatencyMsec = 0;
last_db_dump.cli2srv_packets = 0, last_db_dump.srv2cli_packets = 0,
last_db_dump.cli2srv_bytes = 0, last_db_dump.srv2cli_bytes = 0,
last_db_dump.cli2srv_goodput_bytes = 0, last_db_dump.srv2cli_goodput_bytes = 0,
last_db_dump.last_dump = 0;
memset(&protos, 0, sizeof(protos));
memset(&flow_device, 0, sizeof(flow_device));
PROFILING_SUB_SECTION_ENTER(iface, "Flow::Flow: iface->findFlowHosts", 7);
iface->findFlowHosts(_vlanId, _cli_mac, _cli_ip, &cli_host, _srv_mac, _srv_ip, &srv_host);
PROFILING_SUB_SECTION_EXIT(iface, 7);
if(cli_host) { cli_host->incUses(); cli_host->incNumFlows(last_seen, true, srv_host); }
if(srv_host) { srv_host->incUses(); srv_host->incNumFlows(last_seen, false, cli_host); }
if(icmp_info){
if(icmp_info->isPortUnreachable()){ //port unreachable icmpv6/icmpv4
/*
This is an ICMP flow which contains unreachable information. As is the cli_host
that complains, it means that the srv_host has made a connection that triggered
the issue and thus it must be accounted in reverse
*/
if(cli_host) cli_host->incNumUnreachableFlows(true /* as server */);
if(srv_host) srv_host->incNumUnreachableFlows(false /* as client */);
}
else if(icmp_info->isNetUnreachable(protocol)){
if(cli_host) cli_host->incNumNetUnreachableFlows(true /* as server */);
if(srv_host) srv_host->incNumNetUnreachableFlows(false /* as client */);
}
else if(icmp_info->isHostUnreachable(protocol)){
if(cli_host) cli_host->incNumHostUnreachableFlows(true /* as server */);
if(srv_host) srv_host->incNumHostUnreachableFlows(false /* as client */);
}
}
memset(&custom_app, 0, sizeof(custom_app));
#ifdef NTOPNG_PRO
HostPools *hp = iface->getHostPools();
routing_table_id = DEFAULT_ROUTING_TABLE_ID;
if(hp) {
if(cli_host) routing_table_id = hp->getRoutingPolicy(cli_host->get_host_pool());
if(srv_host) routing_table_id = max_val(routing_table_id, hp->getRoutingPolicy(srv_host->get_host_pool()));
}
counted_in_aggregated_flow = status_counted_in_aggregated_flow = false;
#endif
passVerdict = true, quota_exceeded = false;
if(_first_seen > _last_seen) _first_seen = _last_seen;
first_seen = _first_seen, last_seen = _last_seen;
bytes_thpt_trend = trend_unknown, pkts_thpt_trend = trend_unknown;
//bytes_rate = new TimeSeries<float>(4096);
protocol_processed = false;
synTime.tv_sec = synTime.tv_usec = 0,
ackTime.tv_sec = ackTime.tv_usec = 0,
synAckTime.tv_sec = synAckTime.tv_usec = 0,
rttSec = 0, cli2srv_window = srv2cli_window = 0,
c2sFirstGoodputTime.tv_sec = c2sFirstGoodputTime.tv_usec = 0;
memset(&tcp_stats_s2d, 0, sizeof(tcp_stats_s2d)), memset(&tcp_stats_d2s, 0, sizeof(tcp_stats_d2s));
memset(&clientNwLatency, 0, sizeof(clientNwLatency)), memset(&serverNwLatency, 0, sizeof(serverNwLatency));
if(!iface->isPacketInterface())
last_update_time.tv_sec = (long)first_seen;
#ifdef NTOPNG_PRO
#ifndef HAVE_NEDGE
trafficProfile = NULL;
#else
cli2srv_in = cli2srv_out = srv2cli_in = srv2cli_out = DEFAULT_SHAPER_ID;
memset(&flowShaperIds, 0, sizeof(flowShaperIds));
cli_quota_source = srv_quota_source = policy_source_default;
#endif
#endif
switch(protocol) {
case IPPROTO_TCP:
case IPPROTO_UDP:
if(iface->is_ndpi_enabled() && (!iface->isSampledTraffic()))
allocDPIMemory();
break;
case IPPROTO_ICMP:
ndpiDetectedProtocol.app_protocol = NDPI_PROTOCOL_IP_ICMP,
ndpiDetectedProtocol.master_protocol = NDPI_PROTOCOL_UNKNOWN;
setDetectedProtocol(ndpiDetectedProtocol, true);
break;
case IPPROTO_ICMPV6:
ndpiDetectedProtocol.app_protocol = NDPI_PROTOCOL_IP_ICMPV6,
ndpiDetectedProtocol.master_protocol = NDPI_PROTOCOL_UNKNOWN;
setDetectedProtocol(ndpiDetectedProtocol, true);
break;
default:
ndpiDetectedProtocol = ndpi_guess_undetected_protocol(iface->get_ndpi_struct(),
NULL, protocol, 0, 0, 0, 0);
setDetectedProtocol(ndpiDetectedProtocol, true);
break;
}
protos.ssl.dissect_certificate = true,
protos.ssl.subject_alt_name_match = false;
}
/* *************************************** */
void Flow::allocDPIMemory() {
if((ndpiFlow = (ndpi_flow_struct*)calloc(1, iface->get_flow_size())) == NULL)
throw "Not enough memory";
if((cli_id = calloc(1, iface->get_size_id())) == NULL)
throw "Not enough memory";
if((srv_id = calloc(1, iface->get_size_id())) == NULL)
throw "Not enough memory";
}
/* *************************************** */
void Flow::freeDPIMemory() {
if(ndpiFlow) { ndpi_free_flow(ndpiFlow); ndpiFlow = NULL; }
if(cli_id) { free(cli_id); cli_id = NULL; }
if(srv_id) { free(srv_id); srv_id = NULL; }
}
/* *************************************** */
Flow::~Flow() {
if(cli_host) cli_host->decNumFlows(last_seen, true, srv_host), cli_host->decUses();
if(srv_host) srv_host->decNumFlows(last_seen, false, cli_host), srv_host->decUses();
if(json_info) free(json_info);
if(host_server_name) free(host_server_name);
if(client_proc) {
if(client_proc->process_name) free(client_proc->process_name);
if(client_proc->father_process_name) free(client_proc->father_process_name);
free(client_proc);
}
if(server_proc) free(server_proc);
if(isHTTP()) {
if(protos.http.last_method) free(protos.http.last_method);
if(protos.http.last_url) free(protos.http.last_url);
if(protos.http.last_content_type) free(protos.http.last_content_type);
} else if(isDNS()) {
if(protos.dns.last_query) free(protos.dns.last_query);
} else if(isSSH()) {
if(protos.ssh.client_signature) free(protos.ssh.client_signature);
if(protos.ssh.server_signature) free(protos.ssh.server_signature);
} else if(isSSL()) {
if(protos.ssl.certificate) free(protos.ssl.certificate);
if(protos.ssl.server_certificate) free(protos.ssl.server_certificate);
}
if(bt_hash) free(bt_hash);
if(community_id_flow_hash) free(community_id_flow_hash);
freeDPIMemory();
if(icmp_info) delete(icmp_info);
}
/* *************************************** */
bool Flow::triggerAlerts() const {
/* If a flow involves at least a local endpoint,
then that endpoint may have disabled alerts.
When there's a local endpoint with alerts disabled,
we do not generate flow alerts having it as an endpoint as one
wants to explicitly silence them */
bool cli_trigger_alerts, srv_trigger_alerts;
/* client is either remote, or has alerts enabled... */
cli_trigger_alerts = cli_host && (!cli_host->isLocalHost() || cli_host->triggerAlerts());
/* server is either remote, or has alerts enabled.. */
srv_trigger_alerts = srv_host && (!srv_host->isLocalHost() || srv_host->triggerAlerts());
return cli_trigger_alerts && srv_trigger_alerts;
}
/* *************************************** */
void Flow::dumpFlowAlert() {
time_t when = time(0);
if(!triggerAlerts())
return;
FlowStatus status = getFlowStatus();
if(!isFlowAlerted()) {
bool do_dump = true;
switch(status) {
case status_normal:
do_dump = false;
break;
case status_not_purged:
do_dump = true;
break;
case status_web_mining_detected:
do_dump = ntop->getPrefs()->are_mining_alerts_enabled();
break;
case status_blacklisted:
do_dump = ntop->getPrefs()->are_malware_alerts_enabled();
break;
case status_slow_tcp_connection: /* 1 */
case status_slow_application_header: /* 2 */
case status_slow_data_exchange: /* 3 */
case status_low_goodput: /* 4 */
case status_tcp_connection_issues: /* 6 - i.e. too many retransmission ooo... or similar */
/* Don't log them for the time being otherwise we'll have too many flows */
do_dump = false;
break;
case status_suspicious_tcp_syn_probing: /* 5 */
case status_suspicious_tcp_probing: /* 7 */
case status_tcp_connection_refused: /* 9 */
do_dump = ntop->getPrefs()->are_probing_alerts_enabled();
break;
case status_flow_when_interface_alerted /* 8 */:
#if 0
do_dump = ntop->getPrefs()->do_dump_flow_alerts_when_iface_alerted();
#endif
break;
case status_ssl_certificate_mismatch: /* 10 */
do_dump = ntop->getPrefs()->are_ssl_alerts_enabled();
break;
case status_dns_invalid_query:
do_dump = ntop->getPrefs()->are_dns_alerts_enabled();
break;
case status_remote_to_remote:
do_dump = ntop->getPrefs()->are_remote_to_remote_alerts_enabled();
break;
case status_blocked:
#ifdef HAVE_NEDGE
do_dump = ntop->getPrefs()->are_dropped_flows_alerts_enabled();
#else
do_dump = false;
#endif
break;
case status_device_protocol_not_allowed:
do_dump = ntop->getPrefs()->are_device_protocols_alerts_enabled();
break;
case status_elephant_local_to_remote:
case status_elephant_remote_to_local:
do_dump = ntop->getPrefs()->are_elephant_flows_alerts_enabled();
break;
case status_longlived:
do_dump = ntop->getPrefs()->are_longlived_flows_alerts_enabled();
break;
}
#ifdef HAVE_NEDGE
/* NOTE: this must be explicitly re-checked as a more specific alert
e.g. status_device_protocol_not_allowed may have set do_dump=false but
we still want to generate the alert */
if(!do_dump && !isPassVerdict())
/* A side effect of this is that the generated alert will still have
the original status rather then the status_blocked */
do_dump = ntop->getPrefs()->are_dropped_flows_alerts_enabled();
#endif
/* Check per-host thresholds */
if((cli_host && cli_host->incFlowAlertHits(when))
||(srv_host && srv_host->incFlowAlertHits(when)))
do_dump = false;
if(do_dump) {
iface->getAlertsManager()->storeFlowAlert(this);
}
}
}
/* *************************************** */
void Flow::processDetectedProtocol() {
u_int16_t l7proto;
if(protocol_processed || (ndpiFlow == NULL))
return;
l7proto = ndpi_get_lower_proto(ndpiDetectedProtocol);
if((l7proto != NDPI_PROTOCOL_DNS)
&& (l7proto != NDPI_PROTOCOL_DHCP) /* host_server_name in DHCP is for the client name, not the server */
&& (ndpiFlow->host_server_name[0] != '\0')
&& (host_server_name == NULL)) {
Utils::sanitizeHostName((char*)ndpiFlow->host_server_name);
if(ndpi_is_proto(ndpiDetectedProtocol, NDPI_PROTOCOL_HTTP)) {
char *double_column = strrchr((char*)ndpiFlow->host_server_name, ':');
if(double_column) double_column[0] = '\0';
}
/*
Host server name equals the Host: HTTP header field.
*/
host_server_name = strdup((char*)ndpiFlow->host_server_name);
}
switch(l7proto) {
case NDPI_PROTOCOL_BITTORRENT:
if(bt_hash == NULL) {
setBittorrentHash((char*)ndpiFlow->protos.bittorrent.hash);
protocol_processed = true;
}
break;
case NDPI_PROTOCOL_MDNS:
/*
The statement below can create issues sometimes as devices publish
themselves with varisous names depending on the context (**)
*/
if((ndpiFlow->protos.mdns.answer[0] != '\0') && cli_host) {
ntop->getTrace()->traceEvent(TRACE_INFO, "[MDNS] %s", ndpiFlow->protos.mdns.answer);
cli_host->inlineSetMDNSInfo(ndpiFlow->protos.mdns.answer);
}
break;
case NDPI_PROTOCOL_DNS:
if(ndpiFlow->host_server_name[0] != '\0') {
if(protos.dns.last_query) {
free(protos.dns.last_query);
protos.dns.invalid_query = false;
}
protos.dns.last_query = strdup((const char*)ndpiFlow->host_server_name);
for(int i = 0; protos.dns.last_query[i] != '\0'; i++) {
if(!isprint(protos.dns.last_query[i])) {
protos.dns.last_query[i] = '?';
protos.dns.invalid_query = true;
}
}
if(!protos.dns.invalid_query)
protos.dns.invalid_query = (strlen(protos.dns.last_query) > MAX_VALID_DNS_QUERY_LEN) ? true : false;
}
if(ntop->getPrefs()->decode_dns_responses()) {
if(ndpiFlow->host_server_name[0] != '\0') {
char delimiter = '@', *name = NULL;
char *at = (char*)strchr((const char*)ndpiFlow->host_server_name, delimiter);
/* Consider only positive DNS replies */
if(at != NULL)
name = &at[1], at[0] = '\0';
else if((!strstr((const char*)ndpiFlow->host_server_name, ".in-addr.arpa"))
&& (!strstr((const char*)ndpiFlow->host_server_name, ".ip6.arpa")))
name = (char*)ndpiFlow->host_server_name;
if(name) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DNS] %s", (char*)ndpiFlow->host_server_name);
if(ndpiFlow->protos.dns.reply_code == 0) {
if(ndpiFlow->protos.dns.num_answers > 0) {
protocol_processed = true;
if(at != NULL) {
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "[DNS] %s <-> %s", name, (char*)ndpiFlow->host_server_name);
ntop->getRedis()->setResolvedAddress(name, (char*)ndpiFlow->host_server_name);
}
}
}
}
}
}
break;
case NDPI_PROTOCOL_SSH:
if(protos.ssh.client_signature) free(protos.ssh.client_signature);
if(protos.ssh.server_signature) free(protos.ssh.server_signature);
/*
NOTE
Signatures are swapped as the server observes the client signature
and vice-versa
*/
protos.ssh.server_signature = strdup(ndpiFlow->protos.ssh.client_signature);
protos.ssh.client_signature = strdup(ndpiFlow->protos.ssh.server_signature);
break;
case NDPI_PROTOCOL_TOR:
case NDPI_PROTOCOL_SSL:
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "-> [client: %s][server: %s]",
ndpiFlow->protos.stun_ssl.ssl.client_certificate,
ndpiFlow->protos.stun_ssl.ssl.server_certificate);
#endif
if((protos.ssl.certificate == NULL)
&& (ndpiFlow->protos.stun_ssl.ssl.client_certificate[0] != '\0')) {
protos.ssl.certificate = strdup(ndpiFlow->protos.stun_ssl.ssl.client_certificate);
if(protos.ssl.certificate && (strncmp(protos.ssl.certificate, "www.", 4) == 0)) {
if(ndpi_is_proto(ndpiDetectedProtocol, NDPI_PROTOCOL_TOR))
check_tor = true;
}
}
if((protos.ssl.server_certificate == NULL)
&& (ndpiFlow->protos.stun_ssl.ssl.server_certificate[0] != '\0')) {
protos.ssl.server_certificate = strdup(ndpiFlow->protos.stun_ssl.ssl.server_certificate);
}
if(check_tor) {
char rsp[256];
if(ntop->getRedis()->getAddress(protos.ssl.certificate, rsp, sizeof(rsp), false) == 0) {
if(rsp[0] == '\0') /* Cached failed resolution */
ndpiDetectedProtocol.app_protocol = NDPI_PROTOCOL_TOR;
check_tor = false; /* This is a valid host */
} else {
ntop->getRedis()->pushHostToResolve(protos.ssl.certificate, false, true /* Fake to resolve it ASAP */);
}
}
if(protos.ssl.certificate
&& cli_host
&& cli_host->isLocalHost())
cli_host->incrVisitedWebSite(protos.ssl.certificate);
protocol_processed = true;
break;
/* No break here !*/
case NDPI_PROTOCOL_HTTP:
case NDPI_PROTOCOL_HTTP_PROXY:
if(ndpiFlow->host_server_name[0] != '\0') {
char *doublecol, delimiter = ':';
protocol_processed = true;
/* If <host>:<port> we need to remove ':' */
if((doublecol = (char*)strchr((const char*)ndpiFlow->host_server_name, delimiter)) != NULL)
doublecol[0] = '\0';
if(srv_host && (ndpiFlow->protos.http.detected_os[0] != '\0') && cli_host)
cli_host->inlineSetOS((char*)ndpiFlow->protos.http.detected_os);
if(cli_host && cli_host->isLocalHost())
cli_host->incrVisitedWebSite(host_server_name);
}
break;
} /* switch */
#ifdef NTOPNG_PRO
if((ndpiDetectedProtocol.app_protocol == NDPI_PROTOCOL_UNKNOWN) && (!l7_protocol_guessed))
ntop->getFlowChecker()->flowCheck(this);
#endif
if(protocol_processed
/* For DNS we delay the memory free so that we can let nDPI analyze all the packets of the flow */
&& (l7proto != NDPI_PROTOCOL_DNS))
freeDPIMemory();
}
/* *************************************** */
void Flow::guessProtocol() {
if(detection_completed)
return; /* Nothing to do */
else
detection_completed = true; /* We give up */
/* This code should no longer be necessary as the nDPI API changed */
if((protocol == IPPROTO_TCP) || (protocol == IPPROTO_UDP)) {
if(cli_host && srv_host) {
/* We can guess the protocol */
IpAddress *cli_ip = cli_host->get_ip(), *srv_ip = srv_host->get_ip();
ndpi_protocol guessed_proto = ndpi_guess_undetected_protocol(iface->get_ndpi_struct(), NULL, protocol,
ntohl(cli_ip ? cli_ip->get_ipv4() : 0),
ntohs(cli_port),
ntohl(srv_ip ? srv_ip->get_ipv4() : 0),
ntohs(srv_port));
ndpiDetectedProtocol.master_protocol = guessed_proto.master_protocol;
ndpiDetectedProtocol.app_protocol = guessed_proto.app_protocol;
/* NOTE: only overwrite the category if it was not set.
* This prevents overwriting already determined category (e.g. by IP or Host)
*/
if(ndpiDetectedProtocol.category == NDPI_PROTOCOL_CATEGORY_UNSPECIFIED)
ndpiDetectedProtocol.category = guessed_proto.category;
}
l7_protocol_guessed = true;
}
}
/* *************************************** */
void Flow::setDetectedProtocol(ndpi_protocol proto_id, bool forceDetection) {
ndpiDetectedProtocol.category = proto_id.category;
if(proto_id.app_protocol != NDPI_PROTOCOL_UNKNOWN) {
ndpiDetectedProtocol = proto_id;
/* Let the client SSL certificate win over the server SSL certificate
this addresses detection for youtube, e.g., when the client
requests s.youtube.com but the server responds with google.com */
if((proto_id.master_protocol == NDPI_PROTOCOL_SSL)
&& (get_packets() < NDPI_MIN_NUM_PACKETS)
&& (ndpiFlow)
&& (ndpiFlow->protos.stun_ssl.ssl.client_certificate[0] == '\0')
&& (ndpiFlow->protos.stun_ssl.ssl.server_certificate[0] == '\0')) {
get_ndpi_flow()->detected_protocol_stack[0] = NDPI_PROTOCOL_UNKNOWN;
return;
}
processDetectedProtocol();
detection_completed = true;
} else if(forceDetection
|| (get_packets() >= NDPI_MIN_NUM_PACKETS)
|| (!iface->is_ndpi_enabled())
|| iface->isSampledTraffic()
)
guessProtocol();
if(detection_completed) {
#ifdef HAVE_NEDGE
updateFlowShapers(true);
#endif
}
#ifdef NTOPNG_PRO
// Update the profile even if the detection is not yet completed.
// Indeed, even if the L7 detection is not yet completed
// the flow already carries information on all the other fields,
// e.g., IP src and DST, vlan, L4 proto, etc
#ifndef HAVE_NEDGE
updateProfile();
#endif
#endif
}
/* *************************************** */
void Flow::setJSONInfo(const char *json) {
if(json == NULL) return;
if(json_info != NULL) free(json_info);
json_info = strdup(json);
}
/* *************************************** */
/*
* A faster replacement for inet_ntoa().
*/
char* Flow::intoaV4(unsigned int addr, char* buf, u_short bufLen) {
char *cp, *retStr;
int n;
cp = &buf[bufLen];
*--cp = '\0';
n = 4;
do {
u_int byte = addr & 0xff;
*--cp = byte % 10 + '0';
byte /= 10;
if(byte > 0) {
*--cp = byte % 10 + '0';
byte /= 10;
if(byte > 0)
*--cp = byte + '0';
}
*--cp = '.';
addr >>= 8;
} while (--n > 0);
/* Convert the string to srccase */
retStr = (char*)(cp+1);
return(retStr);
}
/* *************************************** */
u_int64_t Flow::get_current_bytes_cli2srv() {
int64_t diff = cli2srv_bytes - cli2srv_last_bytes;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* *************************************** */
u_int64_t Flow::get_current_bytes_srv2cli() {
int64_t diff = srv2cli_bytes - srv2cli_last_bytes;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* *************************************** */
u_int64_t Flow::get_current_goodput_bytes_cli2srv() {
int64_t diff = cli2srv_goodput_bytes - cli2srv_last_goodput_bytes;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* *************************************** */
u_int64_t Flow::get_current_goodput_bytes_srv2cli() {
int64_t diff = srv2cli_goodput_bytes - srv2cli_last_goodput_bytes;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* *************************************** */
u_int64_t Flow::get_current_packets_cli2srv() {
int64_t diff = cli2srv_packets - cli2srv_last_packets;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* *************************************** */
u_int64_t Flow::get_current_packets_srv2cli() {
int64_t diff = srv2cli_packets - srv2cli_last_packets;
/*
We need to do this as due to concurrency issues,
we might have a negative value
*/
return((diff > 0) ? diff : 0);
};
/* ****************************************************** */
char* Flow::printTCPflags(u_int8_t flags, char * const buf, u_int buf_len) const {
snprintf(buf, buf_len, "%s%s%s%s%s%s%s%s%s",
(flags & TH_SYN) ? " SYN" : "",
(flags & TH_ACK) ? " ACK" : "",
(flags & TH_FIN) ? " FIN" : "",
(flags & TH_RST) ? " RST" : "",
(flags & TH_PUSH) ? " PUSH" : "",
isTCPEstablished() ? " est" : "",
isTCPConnecting() ? " conn" : "",
isTCPClosed() ? " closed" : "",
isTCPReset() ? " reset" : "");
if(buf[0] == ' ')
return(&buf[1]);
else
return(buf);
}
/* *************************************** */
char* Flow::print(char *buf, u_int buf_len) {
char buf1[32], buf2[32], buf3[32], buf4[32], pbuf[32], tcp_buf[64];
buf[0] = '\0';
if((cli_host == NULL) || (srv_host == NULL)) return(buf);
#if defined(NTOPNG_PRO) && defined(SHAPER_DEBUG)
char shapers[64];
TrafficShaper *cli2srv_in = flowShaperIds.cli2srv.ingress;
TrafficShaper *cli2srv_out = flowShaperIds.cli2srv.egress;
TrafficShaper *srv2cli_in = flowShaperIds.srv2cli.ingress;
TrafficShaper *srv2cli_out = flowShaperIds.srv2cli.egress;
if(iface->is_bridge_interface()) {
snprintf(shapers, sizeof(shapers),
"[pass_verdict: %s] "
"[shapers: cli2srv=%u/%u, srv2cli=%u/%u] "
"[cli2srv_ingress shaping_enabled: %i max_rate: %lu] "
"[cli2srv_egress shaping_enabled: %i max_rate: %lu] "
"[srv2cli_ingress shaping_enabled: %i max_rate: %lu] "
"[srv2cli_egress shaping_enabled: %i max_rate: %lu] ",
passVerdict ? "PASS" : "DROP",
flowShaperIds.cli2srv.ingress ? flowShaperIds.cli2srv.ingress->get_shaper_id() : DEFAULT_SHAPER_ID,
flowShaperIds.cli2srv.egress ? flowShaperIds.cli2srv.egress->get_shaper_id() : DEFAULT_SHAPER_ID,
flowShaperIds.srv2cli.ingress ? flowShaperIds.srv2cli.ingress->get_shaper_id() : DEFAULT_SHAPER_ID,
flowShaperIds.srv2cli.egress ? flowShaperIds.srv2cli.egress->get_shaper_id() : DEFAULT_SHAPER_ID,
cli2srv_in->shaping_enabled(), cli2srv_in->get_max_rate_kbit_sec(),
cli2srv_out->shaping_enabled(), cli2srv_out->get_max_rate_kbit_sec(),
srv2cli_in->shaping_enabled(), srv2cli_in->get_max_rate_kbit_sec(),
srv2cli_out->shaping_enabled(), srv2cli_out->get_max_rate_kbit_sec()
);
} else
shapers[0] = '\0';
#endif
tcp_buf[0] = '\0';
if(protocol == IPPROTO_TCP) {
int len = 0;
if((tcp_stats_s2d.pktOOO+tcp_stats_d2s.pktOOO) > 0)
len += snprintf(&tcp_buf[len], sizeof(tcp_buf)-len, "[OOO=%u/%u]",
tcp_stats_s2d.pktOOO, tcp_stats_d2s.pktOOO);
if((tcp_stats_s2d.pktLost+tcp_stats_d2s.pktLost) > 0)
len += snprintf(&tcp_buf[len], sizeof(tcp_buf)-len, "[Lost=%u/%u]",
tcp_stats_s2d.pktLost, tcp_stats_d2s.pktLost);
if((tcp_stats_s2d.pktRetr+tcp_stats_d2s.pktRetr) > 0)
len += snprintf(&tcp_buf[len], sizeof(tcp_buf)-len, "[Retr=%u/%u]",
tcp_stats_s2d.pktRetr, tcp_stats_d2s.pktRetr);
if((tcp_stats_s2d.pktKeepAlive+tcp_stats_d2s.pktKeepAlive) > 0)
len += snprintf(&tcp_buf[len], sizeof(tcp_buf)-len, "[KeepAlive=%u/%u]",
tcp_stats_s2d.pktKeepAlive, tcp_stats_d2s.pktKeepAlive);
}
snprintf(buf, buf_len,
"%s %s:%u &gt; %s:%u [first: %u][last: %u][proto: %u.%u/%s][cat: %u/%s][device: %u in: %u out:%u][%u/%u pkts][%llu/%llu bytes][src2dst: %s][dst2stc: %s]"
"%s%s%s"
#if defined(NTOPNG_PRO) && defined(SHAPER_DEBUG)
"%s"
#endif
,
get_protocol_name(),
cli_host->get_ip()->print(buf1, sizeof(buf1)), ntohs(cli_port),
srv_host->get_ip()->print(buf2, sizeof(buf2)), ntohs(srv_port),
(u_int32_t)first_seen, (u_int32_t)last_seen,
ndpiDetectedProtocol.master_protocol, ndpiDetectedProtocol.app_protocol,
get_detected_protocol_name(pbuf, sizeof(pbuf)),
get_protocol_category(),
get_protocol_category_name(),
flow_device.device_ip, flow_device.in_index, flow_device.out_index,
cli2srv_packets, srv2cli_packets,
(long long unsigned) cli2srv_bytes, (long long unsigned) srv2cli_bytes,
printTCPflags(src2dst_tcp_flags, buf3, sizeof(buf3)),
printTCPflags(dst2src_tcp_flags, buf4, sizeof(buf4)),
(isSSL() && protos.ssl.certificate) ? "[" : "",
(isSSL() && protos.ssl.certificate) ? protos.ssl.certificate : "",
(isSSL() && protos.ssl.certificate) ? "]" : ""
#if defined(NTOPNG_PRO) && defined(SHAPER_DEBUG)
, shapers
#endif
);
if(getFlowStatus() == status_dns_invalid_query && protos.dns.last_query)
snprintf(&buf[strlen(buf)], buf_len - strlen(buf), "[query: %s]", protos.dns.last_query);
return(buf);
}
/* *************************************** */
bool Flow::dumpFlow(bool dump_alert) {
bool rc = false;
if(dump_alert) {
/* NOTE: this can be very time consuming */
dumpFlowAlert();
}
if(((cli2srv_packets - last_db_dump.cli2srv_packets) == 0)
&& ((srv2cli_packets - last_db_dump.srv2cli_packets) == 0))
return(rc);
if(ntop->getPrefs()->do_dump_flows()
#ifndef HAVE_NEDGE
|| ntop->get_export_interface()
#endif
) {
#ifdef NTOPNG_PRO
#ifndef HAVE_NEDGE
if(!detection_completed || cli2srv_packets + srv2cli_packets <= NDPI_MIN_NUM_PACKETS)
/* force profile detection even if the L7 Protocol has not been detected */
updateProfile();
#endif
#endif
if(!ntop->getPrefs()->is_tiny_flows_export_enabled() && isTiny()) {
#ifdef TINY_FLOWS_DEBUG
ntop->getTrace()->traceEvent(TRACE_NORMAL,
"Skipping tiny flow dump "
"[flow key: %u]"
"[packets current/max: %i/%i] "
"[bytes current/max: %i/%i].",
key(),
get_packets(),
ntop->getPrefs()->get_max_num_packets_per_tiny_flow(),
get_bytes(),
ntop->getPrefs()->get_max_num_bytes_per_tiny_flow());
#endif
return(rc);
}
if(!idle()) {
time_t now = time(NULL);
if(iface->getIfType() == interface_type_PCAP_DUMP
|| (now - get_first_seen()) < CONST_DB_DUMP_FREQUENCY
|| (now - last_db_dump.last_dump) < CONST_DB_DUMP_FREQUENCY) {
return(rc);
}
} else {
/* flows idle, i.e., ready to be purged, are always dumped */
}
#ifdef NTOPNG_PRO
if(ntop->getPro()->has_valid_license() && ntop->getPrefs()->is_enterprise_edition())
getInterface()->aggregatePartialFlow(this);
#endif
getInterface()->dumpFlow(last_seen, this);
#ifndef HAVE_NEDGE
if(ntop->get_export_interface()) {
char *json = serialize(false);
if(json) {
ntop->get_export_interface()->export_data(json);
free(json);
}
}
#endif
rc = true;
}
return(rc);
}
/* *************************************** */
void Flow::setDropVerdict() {
#if defined(HAVE_NEDGE)
if((iface->getIfType() == interface_type_NETFILTER) && (passVerdict == true))
((NetfilterInterface *) iface)->setPolicyChanged();
#endif
passVerdict = false;
}
/* *************************************** */
void Flow::incFlowDroppedCounters() {
if(!flow_dropped_counts_increased) {
if(cli_host) {
cli_host->incNumDroppedFlows();
if(cli_host->getMac()) cli_host->getMac()->incNumDroppedFlows();
}
#ifdef NTOPNG_PRO
HostPools *h = iface ? iface->getHostPools() : NULL;
u_int16_t cli_pool = NO_HOST_POOL_ID;
if(h) {
cli_pool = cli_host ? cli_host->get_host_pool() : NO_HOST_POOL_ID;
if(cli_pool != NO_HOST_POOL_ID)
h->incPoolNumDroppedFlows(cli_pool);
}
#endif
/* Increasing stats on the server is pointless.
If a flow is dropped, the server doesn't even see it,
it is just the client that gets a drop. */
flow_dropped_counts_increased = true;
}
}
/* *************************************** */
void Flow::update_hosts_stats(struct timeval *tv, bool dump_alert) {
u_int64_t sent_packets, sent_bytes, sent_goodput_bytes, rcvd_packets, rcvd_bytes, rcvd_goodput_bytes;
u_int64_t diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes,
diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes;
bool updated = false;
bool cli_and_srv_in_same_subnet = false;
bool cli_and_srv_in_same_country = false;
int16_t cli_network_id, srv_network_id;
int16_t stats_protocol; /* The protocol (among ndpi master_ and app_) that is chosen to increase stats */
Vlan *vl;
NetworkStats *cli_network_stats;
if(isReadyToPurge()) {
/* Marked as ready to be purged, will be purged by NetworkInterface::purgeIdleFlows */
set_to_purge(tv->tv_sec);
}
if(check_tor && (ndpiDetectedProtocol.app_protocol == NDPI_PROTOCOL_SSL)) {
char rsp[256];
if(ntop->getRedis()->getAddress(protos.ssl.certificate, rsp, sizeof(rsp), false) == 0) {
if(rsp[0] == '\0') /* Cached failed resolution */
ndpiDetectedProtocol.app_protocol = NDPI_PROTOCOL_TOR;
check_tor = false; /* This is a valid host */
} else {
if((tv->tv_sec - last_seen) > 30) {
/* We give up */
check_tor = false; /* This is a valid host */
}
}
}
if(ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_UNKNOWN
&& !ndpi_is_subprotocol_informative(NULL, ndpiDetectedProtocol.master_protocol))
stats_protocol = ndpiDetectedProtocol.app_protocol;
else
stats_protocol = ndpiDetectedProtocol.master_protocol;
sent_packets = cli2srv_packets, sent_bytes = cli2srv_bytes, sent_goodput_bytes = cli2srv_goodput_bytes;
diff_sent_packets = sent_packets - cli2srv_last_packets,
diff_sent_bytes = sent_bytes - cli2srv_last_bytes, diff_sent_goodput_bytes = sent_goodput_bytes - cli2srv_last_goodput_bytes;
prev_cli2srv_last_bytes = cli2srv_last_bytes, prev_cli2srv_last_goodput_bytes = cli2srv_last_goodput_bytes,
prev_cli2srv_last_packets = cli2srv_last_packets;
rcvd_packets = srv2cli_packets, rcvd_bytes = srv2cli_bytes, rcvd_goodput_bytes = srv2cli_goodput_bytes;
diff_rcvd_packets = rcvd_packets - srv2cli_last_packets,
diff_rcvd_bytes = rcvd_bytes - srv2cli_last_bytes, diff_rcvd_goodput_bytes = rcvd_goodput_bytes - srv2cli_last_goodput_bytes;
prev_srv2cli_last_bytes = srv2cli_last_bytes, prev_srv2cli_last_goodput_bytes = srv2cli_last_goodput_bytes,
prev_srv2cli_last_packets = srv2cli_last_packets;
cli2srv_last_packets = sent_packets, cli2srv_last_bytes = sent_bytes,
cli2srv_last_goodput_bytes = sent_goodput_bytes;
srv2cli_last_packets = rcvd_packets, srv2cli_last_bytes = rcvd_bytes,
srv2cli_last_goodput_bytes = rcvd_goodput_bytes;
if(cli_host && srv_host) {
cli_network_id = cli_host->get_local_network_id();
srv_network_id = srv_host->get_local_network_id();
if(cli_network_id >= 0 && (cli_network_id == srv_network_id))
cli_and_srv_in_same_subnet = true;
if(diff_sent_packets || diff_rcvd_packets) {
/* Update L2 Device stats */
if(srv_host->get_mac()) {
#ifdef HAVE_NEDGE
srv_host->getMac()->incSentStats(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes);
srv_host->getMac()->incRcvdStats(tv->tv_sec, diff_sent_packets, diff_sent_bytes);
#endif
if(ntop->getPrefs()->areMacNdpiStatsEnabled()) {
srv_host->getMac()->incnDPIStats(tv->tv_sec, stats_protocol,
diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes,
diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes);
}
}
if(cli_host->getMac()) {
#ifdef HAVE_NEDGE
cli_host->getMac()->incSentStats(tv->tv_sec, diff_sent_packets, diff_sent_bytes);
cli_host->getMac()->incRcvdStats(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes);
#endif
if(ntop->getPrefs()->areMacNdpiStatsEnabled()) {
cli_host->getMac()->incnDPIStats(tv->tv_sec, stats_protocol,
diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes,
diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes);
}
}
#ifdef NTOPNG_PRO
if(ntop->getPro()->has_valid_license()) {
#ifndef HAVE_NEDGE
if(trafficProfile)
trafficProfile->incBytes(diff_sent_bytes+diff_rcvd_bytes);
#endif
update_pools_stats(tv, diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
}
#endif
if(iface && iface->hasSeenVlanTaggedPackets() && (vl = iface->getVlan(vlanId, false))) {
/* Note: source and destination hosts have, by definition, the same VLAN so the increase is done only one time. */
/* Note: vl will never be null as we're in a flow with that vlan. Hence, it is guaranteed that at least
two hosts exists for that vlan and that any purge attempt will be prevented. */
#ifdef VLAN_DEBUG
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Increasing Vlan %u stats", vlanId);
#endif
vl->incStats(tv->tv_sec, stats_protocol,
diff_sent_packets, diff_sent_bytes,
diff_rcvd_packets, diff_rcvd_bytes);
}
// Update network stats
cli_network_stats = cli_host->getNetworkStats(cli_network_id);
cli_host->incStats(tv->tv_sec, protocol, stats_protocol, custom_app,
diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes,
diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes);
// update per-subnet byte counters
if(cli_network_stats) { // only if the network is known and local
if(!cli_and_srv_in_same_subnet) {
cli_network_stats->incEgress(tv->tv_sec, diff_sent_packets, diff_sent_bytes,
srv_host->get_ip()->isBroadcastAddress());
cli_network_stats->incIngress(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes,
cli_host->get_ip()->isBroadcastAddress());
} else // client and server ARE in the same subnet
// need to update the inner counter (just one time, will intentionally skip this for srv_host)
cli_network_stats->incInner(tv->tv_sec, diff_sent_packets + diff_rcvd_packets,
diff_sent_bytes + diff_rcvd_bytes,
srv_host->get_ip()->isBroadcastAddress()
|| cli_host->get_ip()->isBroadcastAddress());
}
NetworkStats *srv_network_stats;
srv_network_stats = srv_host->getNetworkStats(srv_network_id);
srv_host->incStats(tv->tv_sec, protocol, stats_protocol, custom_app,
diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes,
diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes);
if(srv_network_stats) {
// local and known server network
if(!cli_and_srv_in_same_subnet) {
srv_network_stats->incIngress(tv->tv_sec, diff_sent_packets, diff_sent_bytes,
srv_host->get_ip()->isBroadcastAddress());
srv_network_stats->incEgress(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes,
cli_host->get_ip()->isBroadcastAddress());
}
}
if(cli_host->get_asn() != srv_host->get_asn()) {
AutonomousSystem *cli_as = cli_host->get_as(), *srv_as = srv_host->get_as();
if(cli_as)
cli_as->incStats(tv->tv_sec, stats_protocol, diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
if(srv_as)
srv_as->incStats(tv->tv_sec, stats_protocol, diff_rcvd_packets, diff_rcvd_bytes, diff_sent_packets, diff_sent_bytes);
}
// Update Country stats
Country *cli_country_stats = cli_host->getCountryStats();
Country *srv_country_stats = srv_host->getCountryStats();
if(cli_country_stats && srv_country_stats && cli_country_stats->equal(srv_country_stats))
cli_and_srv_in_same_country = true;
if(cli_country_stats) {
if(!cli_and_srv_in_same_country) {
cli_country_stats->incEgress(tv->tv_sec, diff_sent_packets, diff_sent_bytes,
srv_host->get_ip()->isBroadcastAddress());
cli_country_stats->incIngress(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes,
cli_host->get_ip()->isBroadcastAddress());
} else // client and server ARE in the same country
// need to update the inner counter (just one time, will intentionally skip this for srv_host)
cli_country_stats->incInner(tv->tv_sec, diff_sent_packets + diff_rcvd_packets,
diff_sent_bytes + diff_rcvd_bytes,
srv_host->get_ip()->isBroadcastAddress()
|| cli_host->get_ip()->isBroadcastAddress());
}
if(srv_country_stats) {
if(!cli_and_srv_in_same_country) {
srv_country_stats->incIngress(tv->tv_sec, diff_sent_packets, diff_sent_bytes,
srv_host->get_ip()->isBroadcastAddress());
srv_country_stats->incEgress(tv->tv_sec, diff_rcvd_packets, diff_rcvd_bytes,
cli_host->get_ip()->isBroadcastAddress());
}
}
if(host_server_name
&& isThreeWayHandshakeOK()
&& (ndpi_is_proto(ndpiDetectedProtocol, NDPI_PROTOCOL_HTTP)
|| ndpi_is_proto(ndpiDetectedProtocol, NDPI_PROTOCOL_HTTP_PROXY))) {
if(srv_host->getHTTPstats())
srv_host->getHTTPstats()->updateHTTPHostRequest(tv->tv_sec, host_server_name,
diff_num_http_requests,
diff_sent_bytes, diff_rcvd_bytes);
diff_num_http_requests = 0; /*
As this is a difference it is reset
whenever we update the counters
*/
}
}
/* Check and possibly enqueue host remote-to-remote alerts */
if(!cli_host->isLocalHost() && !srv_host->isLocalHost()
&& cli_host->get_ip()->isNonEmptyUnicastAddress()
&& srv_host->get_ip()->isNonEmptyUnicastAddress()
&& ntop->getPrefs()->are_remote_to_remote_alerts_enabled()
&& !cli_host->setRemoteToRemoteAlerts()) {
json_object *jo = cli_host->getJSONObject(details_normal);
if(jo) {
ntop->getRedis()->rpush(CONST_ALERT_HOST_REMOTE_TO_REMOTE, json_object_to_json_string(jo), CONST_REMOTE_TO_REMOTE_MAX_QUEUE);
json_object_put(jo);
}
}
}
if(last_update_time.tv_sec > 0) {
float tdiff_msec = ((float)(tv->tv_sec-last_update_time.tv_sec)*1000)+((tv->tv_usec-last_update_time.tv_usec)/(float)1000);
//float t_sec = (float)(tv->tv_sec)+(float)(tv->tv_usec)/1000;
#if 0
/* Actually, the refresh interval is controlled with ntop->getPrefs()->get_housekeeping_frequency()
so there is no need to set an a-priori minimum check interval */
if((iface->getIfType() == interface_type_ZMQ)
&& (tdiff_msec < 5000)) {
/* With ZMQ (if collecting sFlow) we might compute inaccurate
throughput when haveing one flow with a single sample so
we spread the traffic across at least 5 secs
*/
;
} else
#endif
if(tdiff_msec >= 1000 /* Do not update when less than 1 second (1000 msec) */) {
// bps
u_int64_t diff_bytes_cli2srv = cli2srv_last_bytes - prev_cli2srv_last_bytes;
u_int64_t diff_bytes_srv2cli = srv2cli_last_bytes - prev_srv2cli_last_bytes;
u_int64_t diff_bytes = diff_bytes_cli2srv + diff_bytes_srv2cli;
u_int64_t diff_goodput_bytes_cli2srv = cli2srv_last_goodput_bytes - prev_cli2srv_last_goodput_bytes;
u_int64_t diff_goodput_bytes_srv2cli = srv2cli_last_goodput_bytes - prev_srv2cli_last_goodput_bytes;
float bytes_msec_cli2srv = ((float)(diff_bytes_cli2srv*1000))/tdiff_msec;
float bytes_msec_srv2cli = ((float)(diff_bytes_srv2cli*1000))/tdiff_msec;
float bytes_msec = bytes_msec_cli2srv + bytes_msec_srv2cli;
float goodput_bytes_msec_cli2srv = ((float)(diff_goodput_bytes_cli2srv*1000))/tdiff_msec;
float goodput_bytes_msec_srv2cli = ((float)(diff_goodput_bytes_srv2cli*1000))/tdiff_msec;
float goodput_bytes_msec = goodput_bytes_msec_cli2srv + goodput_bytes_msec_srv2cli;
if(isDetectionCompleted() && cli_host && srv_host) {
iface->topProtocolsAdd(cli_host->get_host_pool(), stats_protocol, diff_bytes);
if(cli_host->get_host_pool() != srv_host->get_host_pool())
iface->topProtocolsAdd(srv_host->get_host_pool(), stats_protocol, diff_bytes);
if(cli_host->get_mac() && srv_host->getMac()) {
iface->topMacsAdd(cli_host->getMac(), stats_protocol, diff_bytes);
iface->topMacsAdd(srv_host->getMac(), stats_protocol, diff_bytes);
}
}
/* Just to be safe */
if(bytes_msec < 0) bytes_msec = 0;
if(bytes_msec_cli2srv < 0) bytes_msec_cli2srv = 0;
if(bytes_msec_srv2cli < 0) bytes_msec_srv2cli = 0;
if(goodput_bytes_msec < 0) goodput_bytes_msec = 0;
if(goodput_bytes_msec_cli2srv < 0) goodput_bytes_msec_cli2srv = 0;
if(goodput_bytes_msec_srv2cli < 0) goodput_bytes_msec_srv2cli = 0;
if((bytes_msec > 0) || iface->isPacketInterface()) {
// refresh trend stats for the overall throughput
if(bytes_thpt < bytes_msec) bytes_thpt_trend = trend_up;
else if(bytes_thpt > bytes_msec) bytes_thpt_trend = trend_down;
else bytes_thpt_trend = trend_stable;
// refresh goodput stats for the overall throughput
if(goodput_bytes_thpt < goodput_bytes_msec) goodput_bytes_thpt_trend = trend_up;
else if(goodput_bytes_thpt > goodput_bytes_msec) goodput_bytes_thpt_trend = trend_down;
else goodput_bytes_thpt_trend = trend_stable;
if(false)
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msec: %.1f][bytes: %lu][bits_thpt: %.4f Mbps]",
bytes_msec, diff_bytes, (bytes_thpt*8)/((float)(1024*1024)));
// update the old values with the newly calculated ones
bytes_thpt_cli2srv = bytes_msec_cli2srv;
bytes_thpt_srv2cli = bytes_msec_srv2cli;
goodput_bytes_thpt_cli2srv = goodput_bytes_msec_cli2srv;
goodput_bytes_thpt_srv2cli = goodput_bytes_msec_srv2cli;
bytes_thpt = bytes_msec, goodput_bytes_thpt = goodput_bytes_msec;
if(top_bytes_thpt < bytes_thpt) top_bytes_thpt = bytes_thpt;
if(top_goodput_bytes_thpt < goodput_bytes_thpt) top_goodput_bytes_thpt = goodput_bytes_thpt;
if(!idle() /* set_to_purge() deals with low goodput flows when they become idle */
&& iface->getIfType() != interface_type_ZMQ
&& protocol == IPPROTO_TCP
&& get_goodput_bytes() > 0
&& ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_SSH) {
if(isLowGoodput()) {
if(!good_low_flow_detected) {
if(cli_host) cli_host->incLowGoodputFlows(tv->tv_sec, true);
if(srv_host) srv_host->incLowGoodputFlows(tv->tv_sec, false);
good_low_flow_detected = true;
}
} else {
if(good_low_flow_detected) {
/* back to normal */
if(cli_host) cli_host->decLowGoodputFlows(tv->tv_sec, true);
if(srv_host) srv_host->decLowGoodputFlows(tv->tv_sec, false);
good_low_flow_detected = false;
}
}
}
#ifdef NTOPNG_PRO
throughputTrend.update(bytes_thpt), goodputTrend.update(goodput_bytes_thpt);
thptRatioTrend.update(((double)(goodput_bytes_msec*100))/(double)bytes_msec);
#ifdef DEBUG_TREND
if((cli2srv_goodput_bytes+srv2cli_goodput_bytes) > 0) {
char buf[256];
ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s [Goodput long/mid/short %.3f/%.3f/%.3f][ratio: %s][goodput/thpt: %.3f]",
print(buf, sizeof(buf)),
goodputTrend.getLongTerm(), goodputTrend.getMidTerm(), goodputTrend.getShortTerm(),
goodputTrend.getTrendMsg(),
((float)(100*(cli2srv_goodput_bytes+srv2cli_goodput_bytes)))/(float)(cli2srv_bytes+srv2cli_bytes));
}
#endif
#endif
// pps
u_int64_t diff_pkts_cli2srv = cli2srv_last_packets - prev_cli2srv_last_packets;
u_int64_t diff_pkts_srv2cli = srv2cli_last_packets - prev_srv2cli_last_packets;
u_int64_t diff_pkts = diff_pkts_cli2srv + diff_pkts_srv2cli;
float pkts_msec_cli2srv = ((float)(diff_pkts_cli2srv*1000))/tdiff_msec;
float pkts_msec_srv2cli = ((float)(diff_pkts_srv2cli*1000))/tdiff_msec;
float pkts_msec = pkts_msec_cli2srv + pkts_msec_srv2cli;
/* Just to be safe */
if(pkts_msec < 0) pkts_msec = 0;
if(pkts_msec_cli2srv < 0) pkts_msec_cli2srv = 0;
if(pkts_msec_srv2cli < 0) pkts_msec_srv2cli = 0;
if(pkts_thpt < pkts_msec) pkts_thpt_trend = trend_up;
else if(pkts_thpt > pkts_msec) pkts_thpt_trend = trend_down;
else pkts_thpt_trend = trend_stable;
pkts_thpt_cli2srv = pkts_msec_cli2srv;
pkts_thpt_srv2cli = pkts_msec_srv2cli;
pkts_thpt = pkts_msec;
if(top_pkts_thpt < pkts_thpt) top_pkts_thpt = pkts_thpt;
if(false)
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[msec: %.1f][tdiff: %f][pkts: %lu][pkts_thpt: %.2f pps]",
pkts_msec, tdiff_msec, diff_pkts, pkts_thpt);
updated = true;
}
}
} else
updated = true;
if(updated)
memcpy(&last_update_time, tv, sizeof(struct timeval));
if(dumpFlow(dump_alert)) {
last_db_dump.cli2srv_packets = cli2srv_packets,
last_db_dump.srv2cli_packets = srv2cli_packets,
last_db_dump.cli2srv_bytes = cli2srv_bytes,
last_db_dump.cli2srv_goodput_bytes = cli2srv_goodput_bytes,
last_db_dump.srv2cli_bytes = srv2cli_bytes,
last_db_dump.srv2cli_goodput_bytes = srv2cli_goodput_bytes,
last_db_dump.last_dump = last_seen;
}
}
/* *************************************** */
#ifdef NTOPNG_PRO
void Flow::update_pools_stats(const struct timeval *tv,
u_int64_t diff_sent_packets, u_int64_t diff_sent_bytes,
u_int64_t diff_rcvd_packets, u_int64_t diff_rcvd_bytes) {
if(!diff_sent_packets && !diff_rcvd_packets)
return; /* Nothing to update */
HostPools *hp;
u_int16_t cli_host_pool_id, srv_host_pool_id;
ndpi_protocol_category_t category_id = get_protocol_category();
hp = iface->getHostPools();
if(hp) {
/* Client host */
if(cli_host
#ifdef HAVE_NEDGE
&& cli_host->getMac() && (cli_host->getMac()->locate() == located_on_lan_interface)
#endif
) {
cli_host_pool_id = cli_host->get_host_pool();
/* Overall host pool stats */
if(ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_UNKNOWN
&& !ndpi_is_subprotocol_informative(NULL, ndpiDetectedProtocol.master_protocol))
hp->incPoolStats(tv->tv_sec, cli_host_pool_id, ndpiDetectedProtocol.app_protocol, category_id,
diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
else
hp->incPoolStats(tv->tv_sec, cli_host_pool_id, ndpiDetectedProtocol.master_protocol, category_id,
diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
/* Per host quota-enforcement stats */
if(hp->enforceQuotasPerPoolMember(cli_host_pool_id)) {
cli_host->incQuotaEnforcementStats(tv->tv_sec, ndpiDetectedProtocol.master_protocol,
diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
cli_host->incQuotaEnforcementStats(tv->tv_sec, ndpiDetectedProtocol.app_protocol,
diff_sent_packets, diff_sent_bytes, diff_rcvd_packets, diff_rcvd_bytes);
cli_host->incQuotaEnforcementCategoryStats(tv->tv_sec, category_id, diff_sent_bytes, diff_rcvd_bytes);
}
}
/* Server host */
if(srv_host
#ifdef HAVE_NEDGE
&& srv_host->getMac() && (srv_host->getMac()->locate() == located_on_lan_interface)
#endif
) {
srv_host_pool_id = srv_host->get_host_pool();
/* Update server pool stats only if the pool is not equal to the client pool */
if(!cli_host || srv_host_pool_id != cli_host_pool_id) {
if(ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_UNKNOWN
&& !ndpi_is_subprotocol_informative(NULL, ndpiDetectedProtocol.master_protocol))
hp->incPoolStats(tv->tv_sec, srv_host_pool_id, ndpiDetectedProtocol.app_protocol, category_id,
diff_rcvd_packets, diff_rcvd_bytes, diff_sent_packets, diff_sent_bytes);
else
hp->incPoolStats(tv->tv_sec, srv_host_pool_id, ndpiDetectedProtocol.master_protocol, category_id,
diff_rcvd_packets, diff_rcvd_bytes, diff_sent_packets, diff_sent_bytes);
}
/* When quotas have to be enforced per pool member, stats must be increased even if cli and srv are on the same pool */
if(hp->enforceQuotasPerPoolMember(srv_host_pool_id)) {
srv_host->incQuotaEnforcementStats(tv->tv_sec, ndpiDetectedProtocol.master_protocol,
diff_rcvd_packets, diff_rcvd_bytes, diff_sent_packets, diff_sent_bytes);
srv_host->incQuotaEnforcementStats(tv->tv_sec, ndpiDetectedProtocol.app_protocol,
diff_rcvd_packets, diff_rcvd_bytes, diff_sent_packets, diff_sent_bytes);
srv_host->incQuotaEnforcementCategoryStats(tv->tv_sec, category_id, diff_rcvd_bytes, diff_sent_bytes);
}
}
}
}
#endif
/* *************************************** */
bool Flow::equal(IpAddress *_cli_ip, IpAddress *_srv_ip, u_int16_t _cli_port,
u_int16_t _srv_port, u_int16_t _vlanId, u_int8_t _protocol,
const ICMPinfo * const _icmp_info,
bool *src2srv_direction) {
if((_vlanId != vlanId) && (vlanId != 0))
return(false);
if(_protocol != protocol) return(false);
if(icmp_info && !icmp_info->equal(_icmp_info)) return(false);
if(cli_host && cli_host->equal(_cli_ip)
&& srv_host && srv_host->equal(_srv_ip)
&& (_cli_port == cli_port) && (_srv_port == srv_port)) {
*src2srv_direction = true;
return(true);
} else if(srv_host && srv_host->equal(_cli_ip)
&& cli_host && cli_host->equal(_srv_ip)
&& (_srv_port == cli_port) && (_cli_port == srv_port)) {
*src2srv_direction = false;
return(true);
} else
return(false);
}
/* *************************************** */
bool Flow::clientLessThanServer() const {
if(cli_host && srv_host)
return cli_host->get_ip()->compare(srv_host->get_ip()) < 0;
else if(cli_host && !cli_host->get_ip()->isEmpty())
return false; /* Assumes server is zero and cli_host > 0 */
else if(srv_host && !srv_host->get_ip()->isEmpty())
return true; /* Assumes server is >0 and cli_host is zero */
else
return false;
}
/* *************************************** */
void Flow::processJson(bool is_src,
json_object *my_object,
ProcessInfo *proc) {
#if 0
u_int num_id;
const char *str_id;
char jsonbuf[64];
num_id = is_src ? SRC_PROC_PID : DST_PROC_PID;
str_id = is_src ? "SRC_PROC_PID" : "DST_PROC_PID";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(proc->pid));
num_id = is_src ? SRC_FATHER_PROC_PID : DST_FATHER_PROC_PID;
str_id = is_src ? "SRC_FATHER_PROC_PID" : "DST_FATHER_PROC_PID";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(proc->father_pid));
num_id = is_src ? SRC_PROC_NAME : DST_PROC_NAME;
str_id = is_src ? "SRC_PROC_NAME" : "DST_PROC_NAME";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_string(proc->name));
num_id = is_src ? SRC_FATHER_PROC_NAME : DST_FATHER_PROC_NAME;
str_id = is_src ? "SRC_FATHER_PROC_NAME" : "DST_FATHER_PROC_NAME";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_string(proc->father_name));
num_id = is_src ? SRC_PROC_USER_NAME : DST_PROC_USER_NAME;
str_id = is_src ? "SRC_PROC_USER_NAME" : "DST_PROC_USER_NAME";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_string(proc->user_name));
num_id = is_src ? SRC_PROC_ACTUAL_MEMORY : DST_PROC_ACTUAL_MEMORY;
str_id = is_src ? "SRC_PROC_ACTUAL_MEMORY" : "DST_PROC_ACTUAL_MEMORY";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_int(proc->actual_memory));
num_id = is_src ? SRC_PROC_PEAK_MEMORY : DST_PROC_PEAK_MEMORY;
str_id = is_src ? "SRC_PROC_PEAK_MEMORY" : "DST_PROC_PEAK_MEMORY";
json_object_object_add(my_object,
Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_int(proc->peak_memory));
num_id = is_src ? SRC_PROC_AVERAGE_CPU_LOAD : DST_PROC_AVERAGE_CPU_LOAD;
str_id = is_src ? "SRC_PROC_AVERAGE_CPU_LOAD" : "DST_PROC_AVERAGE_CPU_LOAD";
json_object_object_add(my_object, Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_double(proc->average_cpu_load));
num_id = is_src ? SRC_PROC_NUM_PAGE_FAULTS : DST_PROC_NUM_PAGE_FAULTS;
str_id = is_src ? "SRC_PROC_NUM_PAGE_FAULTS" : "DST_PROC_NUM_PAGE_FAULTS";
json_object_object_add(my_object,
Utils::jsonLabel(num_id, str_id, jsonbuf, sizeof(jsonbuf)),
json_object_new_int(proc->num_vm_page_faults));
#endif
}
/* *************************************** */
void Flow::processLua(lua_State* vm, ProcessInfo *proc, bool client) {
#ifndef WIN32
Host *src = get_cli_host(), *dst = get_srv_host();
struct passwd *pwd;
if((src == NULL) || (dst == NULL) || (proc->pid == 0)) return;
lua_newtable(vm);
lua_push_uint64_table_entry(vm, "pid", proc->pid);
lua_push_uint64_table_entry(vm, "father_pid", proc->father_pid);
lua_push_str_table_entry(vm, "name", proc->process_name);
lua_push_str_table_entry(vm, "father_name", proc->father_process_name);
lua_push_uint64_table_entry(vm, "uid", proc->uid);
lua_push_uint64_table_entry(vm, "gid", proc->gid);
lua_push_uint64_table_entry(vm, "father_uid", proc->father_uid);
lua_push_uint64_table_entry(vm, "father_gid", proc->father_gid);
/* TODO: improve code efficiency */
pwd = getpwuid(proc->uid);
lua_push_str_table_entry(vm, "user_name", pwd ? pwd->pw_name : "");
pwd = getpwuid(proc->father_uid);
lua_push_str_table_entry(vm, "father_user_name", pwd ? pwd->pw_name : "");
#if 0
lua_push_uint64_table_entry(vm, "actual_memory", proc->actual_memory);
lua_push_uint64_table_entry(vm, "peak_memory", proc->peak_memory);
lua_push_float_table_entry(vm, "average_cpu_load", proc->average_cpu_load);
lua_push_float_table_entry(vm, "percentage_iowait_time", proc->percentage_iowait_time);
lua_push_uint64_table_entry(vm, "num_vm_page_faults", proc->num_vm_page_faults);
#endif
lua_pushstring(vm, client ? "client_process" : "server_process");
lua_insert(vm, -2);
lua_settable(vm, -3);
#endif
}
/* *************************************** */
void Flow::lua(lua_State* vm, AddressTree * ptree,
DetailsLevel details_level, bool skipNewTable) {
char buf[64];
Host *src = get_cli_host(), *dst = get_srv_host();
bool src_match = true, dst_match = true;
bool mask_cli_host = true, mask_dst_host = true, mask_flow;
if((src == NULL) || (dst == NULL)) return;
if(ptree) {
src_match = src->match(ptree), dst_match = dst->match(ptree);
if((!src_match) && (!dst_match)) return;
}
if(!skipNewTable)
lua_newtable(vm);
if(src) {
mask_cli_host = Utils::maskHost(src->isLocalHost());
lua_push_str_table_entry(vm, "cli.ip",
src->get_ip()->printMask(buf, sizeof(buf),
src->isLocalHost()));
if(src->get_vlan_id())
lua_push_uint64_table_entry(vm, "cli.vlan", src->get_vlan_id());
lua_push_uint64_table_entry(vm, "cli.key", mask_cli_host ? 0 : src->key());
} else {
lua_push_nil_table_entry(vm, "cli.ip");
lua_push_nil_table_entry(vm, "cli.key");
}
lua_push_uint64_table_entry(vm, "cli.port", get_cli_port());
if(dst) {
mask_dst_host = Utils::maskHost(dst->isLocalHost());
lua_push_str_table_entry(vm, "srv.ip",
dst->get_ip()->printMask(buf, sizeof(buf),
dst->isLocalHost()));
if(dst->get_vlan_id())
lua_push_uint64_table_entry(vm, "srv.vlan", dst->get_vlan_id());
lua_push_uint64_table_entry(vm, "srv.key", mask_dst_host ? 0 : dst->key());
} else {
lua_push_nil_table_entry(vm, "srv.ip");
lua_push_nil_table_entry(vm, "srv.key");
}
lua_push_uint64_table_entry(vm, "srv.port", get_srv_port());
mask_flow = isMaskedFlow(); // mask_cli_host || mask_dst_host;
lua_push_uint64_table_entry(vm, "bytes", cli2srv_bytes+srv2cli_bytes);
lua_push_uint64_table_entry(vm, "goodput_bytes", cli2srv_goodput_bytes+srv2cli_goodput_bytes);
if(details_level >= details_high) {
if(src && !mask_cli_host) {
lua_push_str_table_entry(vm, "cli.host", src->get_visual_name(buf, sizeof(buf)));
lua_push_uint64_table_entry(vm, "cli.source_id", 0 /* was never set by src->getSourceId()*/ );
lua_push_str_table_entry(vm, "cli.mac", Utils::formatMac(src->get_mac(), buf, sizeof(buf)));
lua_push_bool_table_entry(vm, "cli.systemhost", src->isSystemHost());
lua_push_bool_table_entry(vm, "cli.blacklisted", src->isBlacklisted());
lua_push_bool_table_entry(vm, "cli.allowed_host", src_match);
lua_push_int32_table_entry(vm, "cli.network_id", src->get_local_network_id());
lua_push_uint64_table_entry(vm, "cli.pool_id", src->get_host_pool());
} else {
lua_push_nil_table_entry(vm, "cli.host");
}
if(dst && !mask_dst_host) {
lua_push_str_table_entry(vm, "srv.host", dst->get_visual_name(buf, sizeof(buf)));
lua_push_uint64_table_entry(vm, "srv.source_id", 0 /* was never set by src->getSourceId() */);
lua_push_str_table_entry(vm, "srv.mac", Utils::formatMac(dst->get_mac(), buf, sizeof(buf)));
lua_push_bool_table_entry(vm, "srv.systemhost", dst->isSystemHost());
lua_push_bool_table_entry(vm, "srv.blacklisted", dst->isBlacklisted());
lua_push_bool_table_entry(vm, "srv.allowed_host", dst_match);
lua_push_int32_table_entry(vm, "srv.network_id", dst->get_local_network_id());
lua_push_uint64_table_entry(vm, "srv.pool_id", dst->get_host_pool());
} else {
lua_push_nil_table_entry(vm, "srv.host");
}
if(vrfId) lua_push_uint64_table_entry(vm, "vrfId", vrfId);
lua_push_uint64_table_entry(vm, "vlan", get_vlan_id());
lua_push_str_table_entry(vm, "proto.l4", get_protocol_name());
if(((cli2srv_packets+srv2cli_packets) > NDPI_MIN_NUM_PACKETS)
|| (ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)
|| iface->is_ndpi_enabled()
|| iface->isSampledTraffic()
|| (iface->getIfType() == interface_type_ZMQ)
|| (iface->getIfType() == interface_type_ZC_FLOW)) {
lua_push_str_table_entry(vm, "proto.ndpi", get_detected_protocol_name(buf, sizeof(buf)));
} else
lua_push_str_table_entry(vm, "proto.ndpi", (char*)CONST_TOO_EARLY);
lua_push_uint64_table_entry(vm, "proto.ndpi_id", ndpiDetectedProtocol.app_protocol);
lua_push_str_table_entry(vm, "proto.ndpi_breed", get_protocol_breed_name());
lua_push_uint64_table_entry(vm, "proto.ndpi_cat_id", get_protocol_category());
lua_push_str_table_entry(vm, "proto.ndpi_cat", get_protocol_category_name());
#ifdef NTOPNG_PRO
#ifndef HAVE_NEDGE
if((!mask_flow) && trafficProfile && ntop->getPro()->has_valid_license())
lua_push_str_table_entry(vm, "profile", trafficProfile->getName());
#endif
#endif
lua_push_uint64_table_entry(vm, "bytes.last",
get_current_bytes_cli2srv() + get_current_bytes_srv2cli());
lua_push_uint64_table_entry(vm, "goodput_bytes",
cli2srv_goodput_bytes+srv2cli_goodput_bytes);
lua_push_uint64_table_entry(vm, "goodput_bytes.last",
get_current_goodput_bytes_cli2srv() + get_current_goodput_bytes_srv2cli());
lua_push_uint64_table_entry(vm, "packets", cli2srv_packets+srv2cli_packets);
lua_push_uint64_table_entry(vm, "packets.last",
get_current_packets_cli2srv() + get_current_packets_srv2cli());
lua_push_uint64_table_entry(vm, "seen.first", get_first_seen());
lua_push_uint64_table_entry(vm, "seen.last", get_last_seen());
lua_push_uint64_table_entry(vm, "duration", get_duration());
lua_push_uint64_table_entry(vm, "cli2srv.bytes", cli2srv_bytes);
lua_push_uint64_table_entry(vm, "srv2cli.bytes", srv2cli_bytes);
lua_push_uint64_table_entry(vm, "cli2srv.goodput_bytes", cli2srv_goodput_bytes);
lua_push_uint64_table_entry(vm, "srv2cli.goodput_bytes", srv2cli_goodput_bytes);
lua_push_uint64_table_entry(vm, "cli2srv.packets", cli2srv_packets);
lua_push_uint64_table_entry(vm, "srv2cli.packets", srv2cli_packets);
if(isICMP()) {
lua_newtable(vm);
lua_push_uint64_table_entry(vm, "type", protos.icmp.icmp_type);
lua_push_uint64_table_entry(vm, "code", protos.icmp.icmp_code);
if(icmp_info)
icmp_info->lua(vm, ptree, iface, get_vlan_id());
lua_pushstring(vm, "icmp");
lua_insert(vm, -2);
lua_settable(vm, -3);
}
lua_push_bool_table_entry(vm, "flow_goodput.low", isLowGoodput());
#ifdef HAVE_NEDGE
if(iface->is_bridge_interface())
lua_push_bool_table_entry(vm, "verdict.pass", isPassVerdict() ? (json_bool)1 : (json_bool)0);
#endif
if(protocol == IPPROTO_TCP) {
lua_push_bool_table_entry(vm, "tcp.seq_problems",
(tcp_stats_s2d.pktRetr
| tcp_stats_s2d.pktOOO
| tcp_stats_s2d.pktLost
| tcp_stats_s2d.pktKeepAlive
| tcp_stats_d2s.pktRetr
| tcp_stats_d2s.pktOOO
| tcp_stats_d2s.pktLost
| tcp_stats_d2s.pktKeepAlive) ? true : false);
lua_push_float_table_entry(vm, "tcp.nw_latency.client", toMs(&clientNwLatency));
lua_push_float_table_entry(vm, "tcp.nw_latency.server", toMs(&serverNwLatency));
lua_push_float_table_entry(vm, "tcp.appl_latency", applLatencyMsec);
lua_push_float_table_entry(vm, "tcp.max_thpt.cli2srv", getCli2SrvMaxThpt());
lua_push_float_table_entry(vm, "tcp.max_thpt.srv2cli", getSrv2CliMaxThpt());
lua_push_uint64_table_entry(vm, "cli2srv.retransmissions", tcp_stats_s2d.pktRetr);
lua_push_uint64_table_entry(vm, "cli2srv.out_of_order", tcp_stats_s2d.pktOOO);
lua_push_uint64_table_entry(vm, "cli2srv.lost", tcp_stats_s2d.pktLost);
lua_push_uint64_table_entry(vm, "cli2srv.keep_alive", tcp_stats_s2d.pktKeepAlive);
lua_push_uint64_table_entry(vm, "srv2cli.retransmissions", tcp_stats_d2s.pktRetr);
lua_push_uint64_table_entry(vm, "srv2cli.out_of_order", tcp_stats_d2s.pktOOO);
lua_push_uint64_table_entry(vm, "srv2cli.lost", tcp_stats_d2s.pktLost);
lua_push_uint64_table_entry(vm, "srv2cli.keep_alive", tcp_stats_d2s.pktKeepAlive);
lua_push_uint64_table_entry(vm, "cli2srv.tcp_flags", src2dst_tcp_flags);
lua_push_uint64_table_entry(vm, "srv2cli.tcp_flags", dst2src_tcp_flags);
lua_push_bool_table_entry(vm, "tcp_established", isTCPEstablished());
lua_push_bool_table_entry(vm, "tcp_connecting", isTCPConnecting());
lua_push_bool_table_entry(vm, "tcp_closed", isTCPClosed());
lua_push_bool_table_entry(vm, "tcp_reset", isTCPReset());
}
if(!mask_flow) {
if(host_server_name) lua_push_str_table_entry(vm, "host_server_name", host_server_name);
if(bt_hash) lua_push_str_table_entry(vm, "bittorrent_hash", bt_hash);
lua_push_str_table_entry(vm, "info", getFlowInfo() ? getFlowInfo() : (char*)"");
}
if(isHTTP() && protos.http.last_url) {
lua_push_str_table_entry(vm, "protos.http.last_method", protos.http.last_method);
lua_push_uint64_table_entry(vm, "protos.http.last_return_code", protos.http.last_return_code);
}
#ifdef HAVE_NEDGE
lua_push_uint64_table_entry(vm, "marker", marker);
if(cli_host && srv_host) {
/* Shapers */
lua_push_uint64_table_entry(vm,
"shaper.cli2srv_ingress",
flowShaperIds.cli2srv.ingress ? flowShaperIds.cli2srv.ingress->get_shaper_id() : DEFAULT_SHAPER_ID);
lua_push_uint64_table_entry(vm,
"shaper.cli2srv_egress",
flowShaperIds.cli2srv.egress ? flowShaperIds.cli2srv.egress->get_shaper_id() : DEFAULT_SHAPER_ID);
lua_push_uint64_table_entry(vm,
"shaper.srv2cli_ingress",
flowShaperIds.srv2cli.ingress ? flowShaperIds.srv2cli.ingress->get_shaper_id() : DEFAULT_SHAPER_ID);
lua_push_uint64_table_entry(vm,
"shaper.srv2cli_egress",
flowShaperIds.srv2cli.egress ? flowShaperIds.srv2cli.egress->get_shaper_id() : DEFAULT_SHAPER_ID);
/* Quota */
lua_push_str_table_entry(vm, "cli.quota_source", Utils::policySource2Str(cli_quota_source));
lua_push_str_table_entry(vm, "srv.quota_source", Utils::policySource2Str(srv_quota_source));
}
#endif
if(!mask_flow) {
if(isHTTP() && protos.http.last_url)
lua_push_str_table_entry(vm, "protos.http.last_url", protos.http.last_url);
if(isHTTP() && host_server_name && (!mask_flow))
lua_push_str_table_entry(vm, "protos.http.server_name", host_server_name);
if(isDNS() && protos.dns.last_query)
lua_push_str_table_entry(vm, "protos.dns.last_query", protos.dns.last_query);
if(isSSH()) {
if(protos.ssh.client_signature) lua_push_str_table_entry(vm, "protos.ssh.client_signature", protos.ssh.client_signature);
if(protos.ssh.server_signature) lua_push_str_table_entry(vm, "protos.ssh.server_signature", protos.ssh.server_signature);
}
if(isSSL()) {
if(protos.ssl.certificate)
lua_push_str_table_entry(vm, "protos.ssl.certificate", protos.ssl.certificate);
if(protos.ssl.server_certificate)
lua_push_str_table_entry(vm, "protos.ssl.server_certificate", protos.ssl.server_certificate);
}
}
lua_push_str_table_entry(vm, "moreinfo.json", get_json_info());
if(client_proc) processLua(vm, client_proc, true);
if(server_proc) processLua(vm, server_proc, false);
// overall throughput stats
lua_push_float_table_entry(vm, "top_throughput_bps", top_bytes_thpt);
lua_push_float_table_entry(vm, "throughput_bps", bytes_thpt);
lua_push_uint64_table_entry(vm, "throughput_trend_bps", bytes_thpt_trend);
lua_push_float_table_entry(vm, "top_throughput_pps", top_pkts_thpt);
lua_push_float_table_entry(vm, "throughput_pps", pkts_thpt);
lua_push_uint64_table_entry(vm, "throughput_trend_pps", pkts_thpt_trend);
// throughput stats cli2srv and srv2cli breakdown
lua_push_float_table_entry(vm, "throughput_cli2srv_bps", bytes_thpt_cli2srv);
lua_push_float_table_entry(vm, "throughput_srv2cli_bps", bytes_thpt_srv2cli);
lua_push_float_table_entry(vm, "throughput_cli2srv_pps", pkts_thpt_cli2srv);
lua_push_float_table_entry(vm, "throughput_srv2cli_pps", pkts_thpt_srv2cli);
lua_push_uint64_table_entry(vm, "cli2srv.packets", cli2srv_packets);
lua_push_uint64_table_entry(vm, "srv2cli.packets", srv2cli_packets);
lua_push_uint64_table_entry(vm, "cli2srv.last", get_current_bytes_cli2srv());
lua_push_uint64_table_entry(vm, "srv2cli.last", get_current_bytes_srv2cli());
/* ********************* */
dumpPacketStats(vm, true);
dumpPacketStats(vm, false);
if((!mask_flow) && (details_level >= details_higher)) {
float latitude, longitude;
char buf[32];
lua_push_uint64_table_entry(vm, "cli2srv.goodput_bytes.last", get_current_goodput_bytes_cli2srv());
lua_push_uint64_table_entry(vm, "srv2cli.goodput_bytes.last", get_current_goodput_bytes_srv2cli());
get_cli_host()->get_geocoordinates(&latitude, &longitude);
lua_push_float_table_entry(vm, "cli.latitude", latitude);
lua_push_float_table_entry(vm, "cli.longitude", longitude);
get_srv_host()->get_geocoordinates(&latitude, &longitude);
lua_push_float_table_entry(vm, "srv.latitude", latitude);
lua_push_float_table_entry(vm, "srv.longitude", longitude);
if(details_level >= details_max) {
lua_push_bool_table_entry(vm, "cli.private", get_cli_host()->get_ip()->isPrivateAddress()); // cli. */
lua_push_str_table_entry(vm, "cli.country", get_cli_host()->get_country(buf, sizeof(buf)));
lua_push_str_table_entry(vm, "cli.city", get_cli_host()->get_city(buf, sizeof(buf)));
lua_push_bool_table_entry(vm, "srv.private", get_srv_host()->get_ip()->isPrivateAddress());
lua_push_str_table_entry(vm, "srv.country", get_srv_host()->get_country(buf, sizeof(buf)));
lua_push_str_table_entry(vm, "srv.city", get_srv_host()->get_city(buf, sizeof(buf)));
}
}
json_object *status_info_json = flow2statusinfojson();
if(status_info_json) {
lua_push_str_table_entry(vm, "status_info", (char*)json_object_to_json_string(status_info_json));
json_object_put(status_info_json);
}
}
lua_push_bool_table_entry(vm, "flow.idle", idle());
lua_push_uint64_table_entry(vm, "flow.status", getFlowStatus());
// this is used to dynamicall update entries in the GUI
lua_push_uint64_table_entry(vm, "ntopng.key", key()); // Key
}
/* *************************************** */
u_int32_t Flow::key() {
u_int32_t k = cli_port + srv_port /* +vlanId */ + protocol;
if(cli_host) k += cli_host->key();
if(srv_host) k += srv_host->key();
if(icmp_info) k += icmp_info->key();
return(k);
}
/* *************************************** */
u_int32_t Flow::key(Host *_cli, u_int16_t _cli_port,
Host *_srv, u_int16_t _srv_port,
u_int16_t _vlan_id,
u_int16_t _protocol) {
u_int32_t k = _cli_port + _srv_port + _vlan_id + _protocol;
if(_cli) k += _cli -> key();
if(_srv) k += _srv -> key();
return(k);
}
/* *************************************** */
bool Flow::isReadyToPurge() {
if (ntop->getPrefs()->flushFlowsOnShutdown()
&& (ntop->getGlobals()->isShutdownRequested() || ntop->getGlobals()->isShutdown()))
return(true);
if(idle()) return(true);
#ifdef HAVE_NEDGE
if(iface->getIfType() == interface_type_NETFILTER)
return(isNetfilterIdleFlow());
#endif
if(!iface->is_purge_idle_interface())
return(false);
if(protocol == IPPROTO_TCP) {
u_int8_t tcp_flags = src2dst_tcp_flags | dst2src_tcp_flags;
/* The flow is considered idle after a MAX_TCP_FLOW_IDLE
when RST/FIN are set or when the TWH is not completed.
This prevents finalized/reset flows, or flows with an imcomplete
TWH from staying in memory for too long. */
if((tcp_flags & TH_FIN
|| tcp_flags & TH_RST
|| !isThreeWayHandshakeOK())
/* Flows won't expire if less than DONT_NOT_EXPIRE_BEFORE_SEC old */
&& iface->getTimeLastPktRcvd() > doNotExpireBefore
&& isIdle(MAX_TCP_FLOW_IDLE)) {
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "[TCP] Early flow expire"); */
return(true);
}
}
return(isIdle(ntop->getPrefs()->get_flow_max_idle()));
}
/* *************************************** */
bool Flow::isFlowPeer(char *numIP, u_int16_t vlanId) {
char s_buf[32], *ret;
if((!cli_host) || (!srv_host)) return(false);
ret = cli_host->get_ip()->print(s_buf, sizeof(s_buf));
if((strcmp(ret, numIP) == 0) &&
(cli_host->get_vlan_id() == vlanId))return(true);
ret = srv_host->get_ip()->print(s_buf, sizeof(s_buf));
if((strcmp(ret, numIP) == 0) &&
(cli_host->get_vlan_id() == vlanId))return(true);
return(false);
}
/* *************************************** */
void Flow::sumStats(nDPIStats *stats) {
stats->incStats(0, ndpiDetectedProtocol.app_protocol,
cli2srv_packets, cli2srv_bytes,
srv2cli_packets, srv2cli_bytes);
}
/* *************************************** */
char* Flow::serialize(bool es_json) {
json_object *my_object;
char *rsp;
if((cli_host == NULL) || (srv_host == NULL))
return(NULL);
if(es_json) {
ntop->getPrefs()->set_json_symbolic_labels_format(true);
if((my_object = flow2json()) != NULL) {
/* JSON string */
rsp = strdup(json_object_to_json_string(my_object));
/* Free memory */
json_object_put(my_object);
} else
rsp = NULL;
} else {
/* JSON string */
ntop->getPrefs()->set_json_symbolic_labels_format(false);
my_object = flow2json();
rsp = strdup(json_object_to_json_string(my_object));
ntop->getTrace()->traceEvent(TRACE_DEBUG, "Emitting Flow: %s", rsp);
/* Free memory */
json_object_put(my_object);
}
return(rsp);
}
/* *************************************** */
/* Returns a stripped-down JSON specifically used for providing more alert information */
json_object* Flow::flow2statusinfojson() {
char buf[128];
json_object *obj = json_object_new_object();
if(!obj) return NULL;
DeviceProtoStatus proto_status = device_proto_allowed;
json_object_object_add(obj, "cli.devtype", json_object_new_int((cli_host && cli_host->getMac()) ? cli_host->getMac()->getDeviceType() : device_unknown));
json_object_object_add(obj, "srv.devtype", json_object_new_int((srv_host && srv_host->getMac()) ? srv_host->getMac()->getDeviceType() : device_unknown));
json_object_object_add(obj, "ntopng.key", json_object_new_int64(key()));
if(cli_host && ((proto_status = cli_host->getDeviceAllowedProtocolStatus(ndpiDetectedProtocol, true /* client */)) != device_proto_allowed)) {
json_object_object_add(obj, "devproto_forbidden_peer", json_object_new_string("cli"));
json_object_object_add(obj, "devproto_forbidden_id", json_object_new_int(
(proto_status == device_proto_forbidden_app) ? ndpiDetectedProtocol.app_protocol : ndpiDetectedProtocol.master_protocol));
} else if(srv_host && ((proto_status = srv_host->getDeviceAllowedProtocolStatus(ndpiDetectedProtocol, false /* server */)) != device_proto_allowed)) {
json_object_object_add(obj, "devproto_forbidden_peer", json_object_new_string("srv"));
json_object_object_add(obj, "devproto_forbidden_id", json_object_new_int(
(proto_status == device_proto_forbidden_app) ? ndpiDetectedProtocol.app_protocol : ndpiDetectedProtocol.master_protocol));
}
FlowStatus fs = getFlowStatus();
if(fs == status_elephant_local_to_remote)
json_object_object_add(obj, "elephant.l2r_threshold",
json_object_new_int64(ntop->getPrefs()->get_elephant_flow_local_to_remote_bytes()));
else if(fs == status_elephant_remote_to_local)
json_object_object_add(obj, "elephant.r2l_threshold",
json_object_new_int64(ntop->getPrefs()->get_elephant_flow_remote_to_local_bytes()));
if(isICMP()) {
json_object_object_add(obj, "icmp.icmp_type", json_object_new_int(protos.icmp.icmp_type)),
json_object_object_add(obj, "icmp.icmp_code", json_object_new_int(protos.icmp.icmp_code));
if(icmp_info) {
unreachable_t *unreach = icmp_info->getUnreach();
if(unreach)
json_object_object_add(obj, "icmp.unreach.src_ip", json_object_new_string(unreach->src_ip.print(buf, sizeof(buf)))),
json_object_object_add(obj, "icmp.unreach.dst_ip", json_object_new_string(unreach->dst_ip.print(buf, sizeof(buf)))),
json_object_object_add(obj, "icmp.unreach.src_port", json_object_new_int(ntohs(unreach->src_port))),
json_object_object_add(obj, "icmp.unreach.dst_port", json_object_new_int(ntohs(unreach->dst_port))),
json_object_object_add(obj, "icmp.unreach.protocol", json_object_new_int(unreach->protocol));
}
}
return obj;
}
/* *************************************** */
json_object* Flow::flow2json() {
json_object *my_object;
char buf[64], jsonbuf[64], *c;
time_t t;
if(((cli2srv_packets - last_db_dump.cli2srv_packets) == 0)
&& ((srv2cli_packets - last_db_dump.srv2cli_packets) == 0))
return(NULL);
if((my_object = json_object_new_object()) == NULL) return(NULL);
if(ntop->getPrefs()->do_dump_flows_on_es()
|| ntop->getPrefs()->do_dump_flows_on_ls()
) {
struct tm* tm_info;
t = last_seen;
tm_info = gmtime(&t);
/*
strftime in the VS2013 library and earlier are not C99-conformant,
as they do not accept that format-specifier: MSDN VS2013 strftime page
https://msdn.microsoft.com/en-us/library/fe06s4ak.aspx
*/
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S.0Z", tm_info);
if(ntop->getPrefs()->do_dump_flows_on_ls()){
/* Add current timestamp differently for Logstash, in case of delay
* Note: Logstash generates it's own @timestamp field on input
*/
json_object_object_add(my_object,"ntop_timestamp",json_object_new_string(buf));
}
if(ntop->getPrefs()->do_dump_flows_on_es()){
json_object_object_add(my_object, "@timestamp", json_object_new_string(buf));
json_object_object_add(my_object, "type", json_object_new_string(ntop->getPrefs()->get_es_type()));
}
/* json_object_object_add(my_object, "@version", json_object_new_int(1)); */
// MAC addresses are set only when dumping to ES to optimize space consumption
json_object_object_add(my_object, Utils::jsonLabel(IN_SRC_MAC, "IN_SRC_MAC", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(Utils::formatMac(cli_host->get_mac(), buf, sizeof(buf))));
json_object_object_add(my_object, Utils::jsonLabel(OUT_DST_MAC, "OUT_DST_MAC", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(Utils::formatMac(srv_host->get_mac(), buf, sizeof(buf))));
}
if(cli_host->get_ip()) {
if(cli_host->get_ip()->isIPv4()) {
json_object_object_add(my_object, Utils::jsonLabel(IPV4_SRC_ADDR, "IPV4_SRC_ADDR", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(cli_host->get_string_key(buf, sizeof(buf))));
} else if(cli_host->get_ip()->isIPv6()) {
json_object_object_add(my_object, Utils::jsonLabel(IPV6_SRC_ADDR, "IPV6_SRC_ADDR", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(cli_host->get_string_key(buf, sizeof(buf))));
}
}
if(srv_host->get_ip()) {
if(srv_host->get_ip()->isIPv4()) {
json_object_object_add(my_object, Utils::jsonLabel(IPV4_DST_ADDR, "IPV4_DST_ADDR", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(srv_host->get_string_key(buf, sizeof(buf))));
} else if(srv_host->get_ip()->isIPv6()) {
json_object_object_add(my_object, Utils::jsonLabel(IPV6_DST_ADDR, "IPV6_DST_ADDR", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(srv_host->get_string_key(buf, sizeof(buf))));
}
}
json_object_object_add(my_object, Utils::jsonLabel(L4_SRC_PORT, "L4_SRC_PORT", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(get_cli_port()));
json_object_object_add(my_object, Utils::jsonLabel(L4_DST_PORT, "L4_DST_PORT", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(get_srv_port()));
json_object_object_add(my_object, Utils::jsonLabel(PROTOCOL, "PROTOCOL", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(protocol));
if(((cli2srv_packets+srv2cli_packets) > NDPI_MIN_NUM_PACKETS)
|| (ndpiDetectedProtocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)) {
json_object_object_add(my_object, Utils::jsonLabel(L7_PROTO, "L7_PROTO", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(ndpiDetectedProtocol.app_protocol));
json_object_object_add(my_object, Utils::jsonLabel(L7_PROTO_NAME, "L7_PROTO_NAME", jsonbuf, sizeof(jsonbuf)),
json_object_new_string(get_detected_protocol_name(buf, sizeof(buf))));
}
if(protocol == IPPROTO_TCP)
json_object_object_add(my_object, Utils::jsonLabel(TCP_FLAGS, "TCP_FLAGS", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(src2dst_tcp_flags | dst2src_tcp_flags));
json_object_object_add(my_object, Utils::jsonLabel(IN_PKTS, "IN_PKTS", jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(get_partial_packets_cli2srv()));
json_object_object_add(my_object, Utils::jsonLabel(IN_BYTES, "IN_BYTES", jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(get_partial_bytes_cli2srv()));
json_object_object_add(my_object, Utils::jsonLabel(OUT_PKTS, "OUT_PKTS", jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(get_partial_packets_srv2cli()));
json_object_object_add(my_object, Utils::jsonLabel(OUT_BYTES, "OUT_BYTES", jsonbuf, sizeof(jsonbuf)),
json_object_new_int64(get_partial_bytes_srv2cli()));
json_object_object_add(my_object, Utils::jsonLabel(FIRST_SWITCHED, "FIRST_SWITCHED", jsonbuf, sizeof(jsonbuf)),
json_object_new_int((u_int32_t)get_partial_first_seen()));
json_object_object_add(my_object, Utils::jsonLabel(LAST_SWITCHED, "LAST_SWITCHED", jsonbuf, sizeof(jsonbuf)),
json_object_new_int((u_int32_t)get_partial_last_seen()));
if(json_info && strcmp(json_info, "{}")) {
json_object *o;
enum json_tokener_error jerr = json_tokener_success;
if((o = json_tokener_parse_verbose(json_info, &jerr)) != NULL) {
json_object_object_add(my_object, "json", o);
} else {
ntop->getTrace()->traceEvent(TRACE_INFO,
"JSON Parse error [%s]: %s "
" adding as a plain string",
json_tokener_error_desc(jerr),
json_info);
/* Experimental to attempt to fix https://github.com/ntop/ntopng/issues/522 */
json_object_object_add(my_object, "json", json_object_new_string(json_info));
}
}
if(vlanId > 0) json_object_object_add(my_object,
Utils::jsonLabel(SRC_VLAN, "SRC_VLAN", jsonbuf, sizeof(jsonbuf)),
json_object_new_int(vlanId));
if(protocol == IPPROTO_TCP) {
json_object_object_add(my_object, Utils::jsonLabel(CLIENT_NW_LATENCY_MS, "CLIENT_NW_LATENCY_MS", jsonbuf, sizeof(jsonbuf)),
json_object_new_double(toMs(&clientNwLatency)));
json_object_object_add(my_object, Utils::jsonLabel(SERVER_NW_LATENCY_MS, "SERVER_NW_LATENCY_MS", jsonbuf, sizeof(jsonbuf)),
json_object_new_double(toMs(&serverNwLatency)));
}
if(client_proc != NULL) processJson(true, my_object, client_proc);
if(server_proc != NULL) processJson(false, my_object, server_proc);
c = cli_host->get_country(buf, sizeof(buf));
if(c) {
json_object *location = json_object_new_array();
json_object_object_add(my_object, "SRC_IP_COUNTRY", json_object_new_string(c));
if(location) {
float latitude, longitude;
cli_host->get_geocoordinates(&latitude, &longitude);
json_object_array_add(location, json_object_new_double(longitude));
json_object_array_add(location, json_object_new_double(latitude));
json_object_object_add(my_object, "SRC_IP_LOCATION", location);
}
}
c = srv_host->get_country(buf, sizeof(buf));
if(c) {
json_object *location = json_object_new_array();
json_object_object_add(my_object, "DST_IP_COUNTRY", json_object_new_string(c));
if(location) {
float latitude, longitude;
srv_host->get_geocoordinates(&latitude, &longitude);
json_object_array_add(location, json_object_new_double(longitude));
json_object_array_add(location, json_object_new_double(latitude));
json_object_object_add(my_object, "DST_IP_LOCATION", location);
}
}
#ifdef NTOPNG_PRO
#ifndef HAVE_NEDGE
// Traffic profile information, if any
if(trafficProfile && trafficProfile->getName())
json_object_object_add(my_object, "PROFILE", json_object_new_string(trafficProfile->getName()));
#endif
#endif
if(ntop->getPrefs() && ntop->getPrefs()->get_instance_name())
json_object_object_add(my_object, "NTOPNG_INSTANCE_NAME",
json_object_new_string(ntop->getPrefs()->get_instance_name()));
if(iface && iface->get_name())
json_object_object_add(my_object, "INTERFACE", json_object_new_string(iface->get_name()));
if(isDNS() && protos.dns.last_query)
json_object_object_add(my_object, "DNS_QUERY", json_object_new_string(protos.dns.last_query));
if(isHTTP() && protos.http.last_url && protos.http.last_method) {
if(host_server_name && (host_server_name[0] != '\0'))
json_object_object_add(my_object, "HTTP_HOST", json_object_new_string(host_server_name));
json_object_object_add(my_object, "HTTP_URL", json_object_new_string(protos.http.last_url));
json_object_object_add(my_object, "HTTP_METHOD", json_object_new_string(protos.http.last_method));
json_object_object_add(my_object, "HTTP_RET_CODE", json_object_new_int((u_int32_t)protos.http.last_return_code));
}
if(bt_hash)
json_object_object_add(my_object, "BITTORRENT_HASH", json_object_new_string(bt_hash));
if(isSSL() && protos.ssl.certificate)
json_object_object_add(my_object, "SSL_SERVER_NAME", json_object_new_string(protos.ssl.certificate));
#ifdef HAVE_NEDGE
if(iface && iface->is_bridge_interface())
json_object_object_add(my_object, "verdict.pass",
json_object_new_boolean(isPassVerdict() ? (json_bool)1 : (json_bool)0));
#endif
return(my_object);
}
/* *************************************** */
#ifdef HAVE_NEDGE
bool Flow::isNetfilterIdleFlow() {
if(idle()) return(true);
/*
Note that on netfilter interfaces we never observe the
FIN/RST flags as they have been offloaded to kernel
Hence on netfilter interfaces flows are purged only for
inactivity based on lastSeen updates
*/
if(last_conntrack_update > 0) {
/*
- At latest every MIN_CONNTRACK_UPDATE the scan is performed
- the conntrack scan time that we assume is less than MIN_CONNTRACK_UPDATE
- in the worst case this method is called when iface->getTimeLastPktRcvd()
is almost MIN_CONNTRACK_UPDATE past the last scan
Thuis in total we assume that every 3*MIN_CONNTRACK_UPDATE
seconds an active flow should have been updated
by conntrack
*/
if(iface->getTimeLastPktRcvd() > (last_conntrack_update + (3 * MIN_CONNTRACK_UPDATE)))
return(true);
return(false);
} else {
/* if an conntrack update hasn't been seen for this flow
we use the standard idleness check */
return(isIdle(ntop->getPrefs()->get_flow_max_idle()));
}
}
#endif
/* *************************************** */
void Flow::housekeep(time_t t) {
#ifdef HAVE_NEDGE
if(iface->getIfType() == interface_type_NETFILTER) {
if(isNetfilterIdleFlow()) {
set_to_purge(iface->getTimeLastPktRcvd());
}
}
#endif
if(!isDetectionCompleted() && t - get_last_seen() > 5 /* sec */
&& iface->get_ndpi_struct()
&& get_ndpi_flow())
setDetectedProtocol(ndpi_detection_giveup(iface->get_ndpi_struct(), get_ndpi_flow(), 1), true);
}
/* *************************************** */
void Flow::updatePacketStats(InterarrivalStats *stats, const struct timeval *when) {
if(stats->lastTime.tv_sec != 0) {
float deltaMS = (float)(Utils::timeval2usec((struct timeval*)when) - Utils::timeval2usec(&stats->lastTime))/(float)1000;
if(deltaMS > 0) {
if(stats->max_ms == 0)
stats->min_ms = stats->max_ms = deltaMS;
else {
if(deltaMS > stats->max_ms) stats->max_ms = deltaMS;
if(deltaMS < stats->min_ms) stats->min_ms = deltaMS;
}
stats->total_delta_ms += deltaMS;
}
}
memcpy(&stats->lastTime, when, sizeof(struct timeval));
}
/* *************************************** */
void Flow::dumpPacketStats(lua_State* vm, bool cli2srv_direction) {
lua_newtable(vm);
lua_push_float_table_entry(vm, "min", cli2srv_direction ? getCli2SrvMinInterArrivalTime() : getSrv2CliMinInterArrivalTime());
lua_push_float_table_entry(vm, "max", cli2srv_direction ? getCli2SrvMaxInterArrivalTime() : getSrv2CliMaxInterArrivalTime());
lua_push_float_table_entry(vm, "avg", cli2srv_direction ? getCli2SrvAvgInterArrivalTime() : getSrv2CliAvgInterArrivalTime());
lua_pushstring(vm, cli2srv_direction ? "interarrival.cli2srv" : "interarrival.srv2cli");
lua_insert(vm, -2);
lua_settable(vm, -3);
}
/* *************************************** */
bool Flow::isSSLProto() {
u_int16_t lower = ndpi_get_lower_proto(ndpiDetectedProtocol);
return(
(lower == NDPI_PROTOCOL_SSL) ||
(lower == NDPI_PROTOCOL_MAIL_IMAPS) ||
(lower == NDPI_PROTOCOL_MAIL_SMTPS) ||
(lower == NDPI_PROTOCOL_MAIL_POPS)
);
}
/* *************************************** */
void Flow::incStats(bool cli2srv_direction, u_int pkt_len,
u_int8_t *payload, u_int payload_len, u_int8_t l4_proto,
const struct timeval *when) {
payload_len *= iface->getScalingFactor();
updateSeen();
updatePacketStats(cli2srv_direction ? &cli2srvStats.pktTime : &srv2cliStats.pktTime, when);
if((cli_host == NULL) || (srv_host == NULL)) return;
if(cli2srv_direction) {
cli2srv_packets++, cli2srv_bytes += pkt_len, cli2srv_goodput_bytes += payload_len;
cli_host->incSentStats(pkt_len), srv_host->incRecvStats(pkt_len);
} else {
srv2cli_packets++, srv2cli_bytes += pkt_len, srv2cli_goodput_bytes += payload_len;
cli_host->incRecvStats(pkt_len), srv_host->incSentStats(pkt_len);
}
if((applLatencyMsec == 0) && (payload_len > 0)) {
if(cli2srv_direction) {
memcpy(&c2sFirstGoodputTime, when, sizeof(struct timeval));
} else {
if(c2sFirstGoodputTime.tv_sec != 0)
applLatencyMsec = ((float)(Utils::timeval2usec((struct timeval*)when)
- Utils::timeval2usec(&c2sFirstGoodputTime)))/1000;
}
}
}
/* *************************************** */
void Flow::updateInterfaceLocalStats(bool src2dst_direction, u_int num_pkts, u_int pkt_len) {
Host *from = src2dst_direction ? cli_host : srv_host;
Host *to = src2dst_direction ? srv_host : cli_host;
iface->incLocalStats(num_pkts, pkt_len,
from ? from->isLocalHost() : false,
to ? to->isLocalHost() : false);
}
/* *************************************** */
void Flow::addFlowStats(bool cli2srv_direction,
u_int in_pkts, u_int in_bytes, u_int in_goodput_bytes,
u_int out_pkts, u_int out_bytes, u_int out_goodput_bytes,
time_t last_seen) {
/* Don't update seen if no traffic has been observed */
if((in_pkts == 0) && (out_pkts == 0)) return;
updateSeen(last_seen);
if(cli2srv_direction)
cli2srv_packets += in_pkts, cli2srv_bytes += in_bytes, cli2srv_goodput_bytes += in_goodput_bytes,
srv2cli_packets += out_pkts, srv2cli_bytes += out_bytes, srv2cli_goodput_bytes += out_goodput_bytes;
else
cli2srv_packets += out_pkts, cli2srv_bytes += out_bytes, cli2srv_goodput_bytes += out_goodput_bytes,
srv2cli_packets += in_pkts, srv2cli_bytes += in_bytes, srv2cli_goodput_bytes += in_goodput_bytes;
if(bytes_thpt == 0 && last_seen >= first_seen + 1) {
/* Do a fist estimation while waiting for the periodic activities */
bytes_thpt = (cli2srv_bytes + srv2cli_bytes) / (float)(last_seen - first_seen),
pkts_thpt = (cli2srv_packets + srv2cli_packets) / (float)(last_seen - first_seen);
}
}
/* *************************************** */
void Flow::setTcpFlags(u_int8_t flags, bool src2dst_direction) {
if(iface->isPacketInterface())
return; /* Use updateTcpFlags for packet interfaces */
iface->incFlagsStats(flags);
if(cli_host) cli_host->incFlagStats(src2dst_direction, flags);
if(srv_host) srv_host->incFlagStats(!src2dst_direction, flags);
if((flags & TH_SYN) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_SYN) != TH_SYN))
iface->getTcpFlowStats()->incSyn();
if((flags & TH_RST) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_RST) != TH_RST))
iface->getTcpFlowStats()->incReset();
if((flags & TH_FIN) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_FIN) != TH_FIN))
iface->getTcpFlowStats()->incFin();
/* The update below must be after the above check */
if(src2dst_direction)
src2dst_tcp_flags |= flags;
else
dst2src_tcp_flags |= flags;
if(!twh_over) {
if((src2dst_tcp_flags & (TH_SYN|TH_ACK)) == (TH_SYN|TH_ACK)
&& ((dst2src_tcp_flags & (TH_SYN|TH_ACK)) == (TH_SYN|TH_ACK)))
twh_ok = twh_over = true,
iface->getTcpFlowStats()->incEstablished();
}
/* Can't set these guys for non-packet interfaces */
memset(&synTime, 0, sizeof(synTime)),
memset(&synAckTime, 0, sizeof(synAckTime)),
memset(&ackTime, 0, sizeof(ackTime));
}
/* *************************************** */
void Flow::updateTcpFlags(const struct bpf_timeval *when,
u_int8_t flags, bool src2dst_direction) {
if(!iface->isPacketInterface())
return; /* Use setTcpFlags for non-packet interfaces */
iface->incFlagsStats(flags);
if(cli_host) cli_host->incFlagStats(src2dst_direction, flags);
if(srv_host) srv_host->incFlagStats(!src2dst_direction, flags);
if(flags == TH_SYN) {
if(cli_host) cli_host->updateSynAlertsCounter(when->tv_sec, flags, this, true);
if(srv_host) srv_host->updateSynAlertsCounter(when->tv_sec, flags, this, false);
}
if((flags & TH_SYN) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_SYN) != TH_SYN))
iface->getTcpFlowStats()->incSyn();
else if((flags & TH_RST) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_RST) != TH_RST))
iface->getTcpFlowStats()->incReset();
else if((flags & TH_FIN) && (((src2dst_tcp_flags | dst2src_tcp_flags) & TH_FIN) != TH_FIN))
iface->getTcpFlowStats()->incFin();
/* The update below must be after the above check */
if(src2dst_direction)
src2dst_tcp_flags |= flags;
else
dst2src_tcp_flags |= flags;
if(!twh_over) {
if(flags == TH_SYN) {
cli2srv_direction = src2dst_direction;
if(synTime.tv_sec == 0) memcpy(&synTime, when, sizeof(struct timeval));
} else if(flags == (TH_SYN|TH_ACK)) {
cli2srv_direction = !src2dst_direction;
if((synAckTime.tv_sec == 0) && (synTime.tv_sec > 0)) {
memcpy(&synAckTime, when, sizeof(struct timeval));
timeval_diff(&synTime, (struct timeval*)when, &serverNwLatency, 1);
/* Sanity check */
if(serverNwLatency.tv_sec > 5)
memset(&serverNwLatency, 0, sizeof(serverNwLatency));
else if(srv_host)
srv_host->updateRoundTripTime(Utils::timeval2ms(&serverNwLatency));
}
} else if(flags == TH_ACK) {
if((ackTime.tv_sec == 0) && (synAckTime.tv_sec > 0)) {
memcpy(&ackTime, when, sizeof(struct timeval));
timeval_diff(&synAckTime, (struct timeval*)when, &clientNwLatency, 1);
/* Sanity check */
if(clientNwLatency.tv_sec > 5)
memset(&clientNwLatency, 0, sizeof(clientNwLatency));
else if(cli_host)
cli_host->updateRoundTripTime(Utils::timeval2ms(&clientNwLatency));
setRtt();
twh_ok = true;
}
twh_over = true, iface->getTcpFlowStats()->incEstablished();
} else
twh_over = true, iface->getTcpFlowStats()->incEstablished();
}
}
/* *************************************** */
void Flow::timeval_diff(struct timeval *begin, const struct timeval *end,
struct timeval *result, u_short divide_by_two) {
if(end->tv_sec >= begin->tv_sec) {
result->tv_sec = end->tv_sec-begin->tv_sec;
if((end->tv_usec - begin->tv_usec) < 0) {
result->tv_usec = 1000000 + end->tv_usec - begin->tv_usec;
if(result->tv_usec > 1000000) begin->tv_usec = 1000000;
result->tv_sec--;
} else
result->tv_usec = end->tv_usec-begin->tv_usec;
if(divide_by_two) {
result->tv_usec /= 2;
if(result->tv_sec % 2)
result->tv_usec += 500000;
result->tv_sec /= 2;
}
} else
result->tv_sec = 0, result->tv_usec = 0;
}
/* *************************************** */
char* Flow::getFlowInfo() {
if(!isMaskedFlow()) {
if(isDNS() && protos.dns.last_query)
return protos.dns.last_query;
else if(isHTTP() && protos.http.last_url)
return protos.http.last_url;
else if(isSSL() && protos.ssl.certificate)
return protos.ssl.certificate;
else if(bt_hash)
return bt_hash;
else if(host_server_name)
return host_server_name;
else if(isSSH()) {
if(protos.ssh.server_signature)
return protos.ssh.server_signature;
else if(protos.ssh.client_signature)
return protos.ssh.client_signature;
}
}
return (char*)"";
}
/* *************************************** */
double Flow::toMs(const struct timeval *t) {
return(((double)t->tv_sec)*1000+((double)t->tv_usec)/1000);
}
/* *************************************** */
u_int32_t Flow::getNextTcpSeq ( u_int8_t tcpFlags,
u_int32_t tcpSeqNum,
u_int32_t payloadLen) {
return(tcpSeqNum + ((tcpFlags & TH_SYN) ? 1 : 0) + payloadLen);
}
/* *************************************** */
void Flow::incTcpBadStats(bool src2dst_direction,
u_int32_t ooo_pkts,
u_int32_t retr_pkts,
u_int32_t lost_pkts) {
#ifdef HAVE_NEDGE
return;
#endif
TCPPacketStats * stats;
/*Host * host;*/
if(src2dst_direction) {
stats = &tcp_stats_s2d;
/*host = cli_host;*/
} else {
stats = &tcp_stats_d2s;
/*host = srv_host;*/
}
stats->pktRetr += retr_pkts;
stats->pktOOO += ooo_pkts;
stats->pktLost += lost_pkts;
/*host->incRetransmittedPkts(retr_pkts);
host->incOOOPkts(ooo_pkts);
host->incLostPkts(lost_pkts);
iface->incRetransmittedPkts(retr_pkts);
iface->incOOOPkts(ooo_pkts);
iface->incLostPkts(lost_pkts);*/
}
/* *************************************** */
void Flow::updateTcpSeqNum(const struct bpf_timeval *when,
u_int32_t seq_num, u_int32_t ack_seq_num,
u_int16_t window, u_int8_t flags,
u_int16_t payload_Len, bool src2dst_direction) {
u_int32_t next_seq_num;
bool update_last_seqnum = true;
bool debug = false;
#ifdef HAVE_NEDGE
return;
#endif
next_seq_num = getNextTcpSeq(flags, seq_num, payload_Len);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[act: %u][next: %u][next - act (in flight): %d][ack: %u]",
seq_num, next_seq_num,
next_seq_num - seq_num,
ack_seq_num);
if(src2dst_direction) {
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[src2dst][last: %u][next: %u]", tcp_stats_s2d.last, tcp_stats_s2d.next);
if(window > 0) srv2cli_window = window; /* Note the window is reverted */
if(tcp_stats_s2d.next > 0) {
if((tcp_stats_s2d.next != seq_num) /* If equal, seq_num is the expected seq_num as determined with prev. segment */
&& (tcp_stats_s2d.next != (seq_num - 1))) {
if((seq_num == tcp_stats_s2d.next - 1)
&& (payload_Len == 0 || payload_Len == 1)
&& ((flags & (TH_SYN|TH_FIN|TH_RST)) == 0)) {
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[src2dst] Packet KeepAlive");
tcp_stats_s2d.pktKeepAlive++, cli_host->incKeepAlivePkts(1);
} else if(tcp_stats_s2d.last == seq_num) {
tcp_stats_s2d.pktRetr++, cli_host->incRetransmittedPkts(1), iface->incRetransmittedPkts(1);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[src2dst] Packet retransmission");
} else if((tcp_stats_s2d.last > seq_num)
&& (seq_num < tcp_stats_s2d.next)) {
tcp_stats_s2d.pktLost++, cli_host->incLostPkts(1), iface->incLostPkts(1);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[src2dst] Packet lost [last: %u][act: %u]", tcp_stats_s2d.last, seq_num);
} else {
tcp_stats_s2d.pktOOO++, cli_host->incOOOPkts(1), iface->incOOOPkts(1);
update_last_seqnum = ((seq_num - 1) > tcp_stats_s2d.last) ? true : false;
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[src2dst] Packet OOO [last: %u][act: %u]", tcp_stats_s2d.last, seq_num);
}
}
}
tcp_stats_s2d.next = next_seq_num;
if(update_last_seqnum) tcp_stats_s2d.last = seq_num;
} else {
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src][last: %u][next: %u]", tcp_stats_d2s.last, tcp_stats_d2s.next);
if(window > 0) cli2srv_window = window; /* Note the window is reverted */
if(tcp_stats_d2s.next > 0) {
if((tcp_stats_d2s.next != seq_num)
&& (tcp_stats_d2s.next != (seq_num-1))) {
if((seq_num == tcp_stats_d2s.next - 1)
&& (payload_Len == 0 || payload_Len == 1)
&& ((flags & (TH_SYN|TH_FIN|TH_RST)) == 0)) {
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src] Packet KeepAlive");
tcp_stats_d2s.pktKeepAlive++, srv_host->incKeepAlivePkts(1);
} else if(tcp_stats_d2s.last == seq_num) {
tcp_stats_d2s.pktRetr++, srv_host->incRetransmittedPkts(1), iface->incRetransmittedPkts(1);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src] Packet retransmission");
// bytes
} else if((tcp_stats_d2s.last > seq_num)
&& (seq_num < tcp_stats_d2s.next)) {
tcp_stats_d2s.pktLost++, srv_host->incLostPkts(1), iface->incLostPkts(1);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src] Packet lost [last: %u][act: %u]", tcp_stats_d2s.last, seq_num);
} else {
tcp_stats_d2s.pktOOO++, srv_host->incOOOPkts(1), iface->incOOOPkts(1);
update_last_seqnum = ((seq_num - 1) > tcp_stats_d2s.last) ? true : false;
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src] [last: %u][next: %u]", tcp_stats_d2s.last, tcp_stats_d2s.next);
if(debug) ntop->getTrace()->traceEvent(TRACE_WARNING, "[dst2src] Packet OOO [last: %u][act: %u]", tcp_stats_d2s.last, seq_num);
}
}
}
tcp_stats_d2s.next = next_seq_num;
if(update_last_seqnum) tcp_stats_d2s.last = seq_num;
}
}
/* *************************************** */
u_int32_t Flow::getPid(bool client) {
ProcessInfo *proc = client ? client_proc : server_proc;
return((proc == NULL) ? 0 : proc->pid);
};
/* *************************************** */
u_int32_t Flow::getFatherPid(bool client) {
ProcessInfo *proc = client ? client_proc : server_proc;
return((proc == NULL) ? 0 : proc->father_pid);
};
/* *************************************** */
u_int32_t Flow::get_uid(bool client) const {
#ifdef WIN32
return false;
#else
ProcessInfo *proc = client ? client_proc : server_proc;
if(proc)
return proc->uid;
return NO_UID;
#endif
}
/* *************************************** */
u_int32_t Flow::get_pid(bool client) const {
#ifdef WIN32
return false;
#else
ProcessInfo *proc = client ? client_proc : server_proc;
if(proc)
return proc->pid;
return NO_PID;
#endif
}
/* *************************************** */
char* Flow::get_proc_name(bool client) {
ProcessInfo *proc = client ? client_proc : server_proc;
return((proc == NULL) ? NULL : proc->process_name);
};
/* *************************************** */
bool Flow::match(AddressTree *ptree) {
if((cli_host && cli_host->match(ptree))
|| (srv_host && srv_host->match(ptree)))
return(true);
else
return(false);
};
/* *************************************** */
void Flow::setBittorrentHash(char *hash) {
int i, j, n = 0;
char bittorrent_hash[41];
for(i=0, j = 0; i<20; i++) {
u_char c = hash[i] & 0xFF;
sprintf(&bittorrent_hash[j], "%02x", c);
j += 2, n += c;
}
if(n > 0) bt_hash = strdup(bittorrent_hash);
}
/* *************************************** */
void Flow::dissectBittorrent(char *payload, u_int16_t payload_len) {
/* This dissector is called only for uTP/UDP protocol */
if(payload_len > 47) {
char *bt_proto = ndpi_strnstr((const char *)&payload[20],
"BitTorrent protocol", payload_len-20);
if(bt_proto)
setBittorrentHash(&bt_proto[27]);
}
}
/* *************************************** */
void Flow::dissectHTTP(bool src2dst_direction, char *payload, u_int16_t payload_len) {
HTTPstats *h;
ssize_t host_server_name_len = host_server_name && host_server_name[0] != '\0' ? strlen(host_server_name) : 0;
if(!isThreeWayHandshakeOK())
; /* Useless to compute http stats as client and server could be swapped */
else if(src2dst_direction) {
char *space;
// payload[10]=0; ntop->getTrace()->traceEvent(TRACE_WARNING, "[len: %u][%s]", payload_len, payload);
h = cli_host->getHTTPstats(); if(h) h->incRequestAsSender(payload); /* Sent */
h = srv_host->getHTTPstats(); if(h) h->incRequestAsReceiver(payload); /* Rcvd */
dissect_next_http_packet = true;
/* use memchr to prevent possibly non-NULL terminated HTTP requests */
if(payload && ((space = (char*)memchr(payload, ' ', payload_len-1)) != NULL)) {
u_int l = space - payload;
if((!strncmp(payload, "GET", 3))
|| (!strncmp(payload, "POST", 4))
|| (!strncmp(payload, "HEAD", 4))
|| (!strncmp(payload, "PUT", 3))
) {
char *ua;
diff_num_http_requests++; /* One new request found */
if(protos.http.last_method) free(protos.http.last_method);
if((protos.http.last_method = (char*)malloc(l + 1)) != NULL) {
strncpy(protos.http.last_method, payload, l);
protos.http.last_method[l] = '\0';
}
payload_len -= (l + 1);
payload = &space[1];
if((space = (char*)memchr(payload, ' ', payload_len)) != NULL) {
l = min_val(space - payload, 512); /* Avoid jumbo URLs */
/* Stop at the first non-printable char of the HTTP URL */
for(u_int i = 0; i < l; i++) {
if(!isprint(payload[i])) {
l = i;
break;
}
}
if(protos.http.last_url) free(protos.http.last_url);
if((protos.http.last_url = (char*)malloc(host_server_name_len + l + 1)) != NULL) {
protos.http.last_url[0] = '\0';
if(host_server_name_len > 0) {
strncat(protos.http.last_url, host_server_name, host_server_name_len);
}
strncat(protos.http.last_url, payload, l);
}
}
if((ua = strstr(payload, "User-Agent:")) != NULL) {
char buf[128];
u_int i;
ua = &ua[11];
while(ua[0] == ' ') ua++;
for(i=0; (i < payload_len) && (i < (sizeof(buf)-1) && (ua[i] != '\r')); i++)
buf[i] = ua[i];
buf[i] = '\0';
#ifdef DEBUG_UA
ntop->getTrace()->traceEvent(TRACE_WARNING, "[UA] %s", buf);
#endif
if(cli_host
&& cli_host->getMac()
// && (cli_host->getMac()->getOperatingSystem() == os_unknown)
) {
/*
https://en.wikipedia.org/wiki/User_agent
Most Web browsers use a User-Agent string value as follows:
Mozilla/[version] ([system and browser information]) [platform] ([platform details]) [extensions]
*/
if((ua = strchr(buf, '(')) != NULL) {
char *end = strchr(buf, ')');
if(end) {
OperatingSystem os = os_unknown;
end[0] = '\0';
ua++;
if(strstr(ua, "iPad") || strstr(ua, "iPod") || strstr(ua, "iPhone"))
os = os_ios;
else if(strstr(ua, "Android"))
os = os_android;
else if(strstr(ua, "Airport"))
os = os_apple_airport;
else if(strstr(ua, "Macintosh") || strstr(ua, "OS X"))
os = os_macos;
else if(strstr(ua, "Windows"))
os = os_windows;
else if(strcasestr(ua, "Linux") || strstr(ua, "Debian") || strstr(ua, "Ubuntu"))
os = os_linux;
if(os != os_unknown) {
#ifdef DEBUG_UA
char mbuf[32];
ntop->getTrace()->traceEvent(TRACE_WARNING, "[UA] [%s][OS=%u][%s]", cli_host->getMac()->get_string_key(mbuf, sizeof(mbuf)), os, ua);
#endif
if(!(cli_host->get_ip()->isBroadcastAddress()
|| cli_host->get_ip()->isMulticastAddress()))
cli_host->getMac()->setOperatingSystem(os);
}
}
}
}
}
}
}
} else {
if(dissect_next_http_packet) {
char *space;
// payload[10]=0; ntop->getTrace()->traceEvent(TRACE_WARNING, "[len: %u][%s]", payload_len, payload);
h = cli_host->getHTTPstats(); if(h) h->incResponseAsReceiver(payload); /* Rcvd */
h = srv_host->getHTTPstats(); if(h) h->incResponseAsSender(payload); /* Sent */
dissect_next_http_packet = false;
if((space = (char*)memchr(payload, ' ', payload_len)) != NULL) {
u_int l = space - payload;
payload_len -= (l + 1);
payload = &space[1];
if((space = (char*)memchr(payload, ' ', payload_len)) != NULL) {
char tmp[32];
l = min_val(space - payload, (int)(sizeof(tmp) - 1));
strncpy(tmp, payload, l);
tmp[l] = 0;
protos.http.last_return_code = atoi(tmp);
}
}
// Detect content type in response header
char buf[sizeof(HTTP_CONTENT_TYPE_HEADER) + HTTP_MAX_CONTENT_TYPE_LENGTH];
const char * s = payload;
size_t len = payload_len;
for (int i=0; i<HTTP_MAX_HEADER_LINES && len > 2; i++) {
const char * newline = (const char *) memchr(s, '\n', len);
if((!newline) || (newline - s < 2) || (*(newline - 1) != '\r')) break;
size_t linesize = newline - s + 1;
const char * terminator = (const char *) memchr(s, ';', linesize);
size_t effsize = terminator ? (terminator - s) : (linesize - 2);
if(effsize < sizeof(buf)) {
strncpy(buf, s, effsize);
buf[effsize] = '\0';
if(strstr(buf, HTTP_CONTENT_TYPE_HEADER) == buf) {
const char * ct = buf + sizeof(HTTP_CONTENT_TYPE_HEADER) - 1;
if(protos.http.last_content_type) free(protos.http.last_content_type);
protos.http.last_content_type = strdup(ct);
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "LAST CONTENT TYPE: '%s'", protos.http.last_content_type);
break;
}
}
len -= linesize;
s = newline + 1;
}
}
}
}
/* *************************************** */
void Flow::dissectMDNS(u_int8_t *payload, u_int16_t payload_len) {
u_int16_t answers, i = 0;
PACK_ON
struct mdns_rsp_entry {
u_int16_t rsp_type, rsp_class;
u_int32_t ttl;
u_int16_t data_len;
} PACK_OFF;
if(((payload[2] & 0x80) != 0x80) || (payload_len < 12))
return; /* This is a not MDNS response */
answers = ntohs(*((u_int16_t*)&payload[6]))
+ ntohs(*((u_int16_t*)&payload[8]))
+ ntohs(*((u_int16_t*)&payload[10]));
payload = &payload[12], payload_len -= 12;
while((answers > 0) && (i < payload_len)) {
char _name[256], *name;
struct mdns_rsp_entry rsp;
u_int j;
u_int16_t rsp_type, data_len;
DeviceType dtype = device_unknown;
memset(_name, 0, sizeof(_name));
for(j=0; (i < payload_len) && (j < (sizeof(_name)-1)); i++) {
if(payload[i] == 0x0) {
i++;
break;
} else if(payload[i] < 32) {
if(j > 0) _name[j++] = '.';
} else if(payload[i] == 0x22) {
_name[j++] = 'a';
_name[j++] = 'r';
_name[j++] = 'p';
_name[j++] = 'a';
i++;
break;
} else if(payload[i] == 0xC0) {
u_int8_t offset;
u_int16_t i_save = i;
u_int8_t num_loops = 0;
const u_int8_t max_nested_loops = 8;
nested_dns_definition:
offset = payload[i+1] - 12;
i = offset;
if((offset > i)|| (i > payload_len) || (num_loops > max_nested_loops)) {
#ifdef DEBUG_DISCOVERY
ntop->getTrace()->traceEvent(TRACE_WARNING, "Invalid MDNS packet");
#endif
return; /* Invalid packet */
} else {
/* Pointer back */
while((i < payload_len)
&& (payload[i] != 0)
&& (j < (sizeof(_name)-1))) {
if(payload[i] == 0)
break;
else if(payload[i] == 0xC0) {
num_loops++;
goto nested_dns_definition;
} else if(payload[i] < 32) {
if(j > 0) _name[j++] = '.';
i++;
} else
_name[j++] = payload[i++];
}
if(i_save > 0) {
i = i_save;
i_save = 0;
}
i += 2;
/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "===>>> [%d] %s", i, &payload[i-12]); */
break;
}
} else
_name[j++] = payload[i];
}
memcpy(&rsp, &payload[i], sizeof(rsp));
data_len = ntohs(rsp.data_len), rsp_type = ntohs(rsp.rsp_type);
/* Skip lenght for strings >= 32 with head length */
name = &_name[((data_len <= 32) || (_name[0] >= '0'))? 0 : 1];
#ifdef DEBUG_DISCOVERY
ntop->getTrace()->traceEvent(TRACE_NORMAL, "===>>> [%u][%s][len=%u]", ntohs(rsp.rsp_type) & 0xFFFF, name, data_len);
#endif
if(strstr(name, "._device-info._"))
;
else if(strstr(name, "._airplay._") || strstr(name, "._spotify-connect._") )
dtype = device_multimedia;
else if(strstr(name, "_ssh._"))
dtype = device_workstation;
else if(strstr(name, "._daap._")
|| strstr(name, "_afpovertcp._")
|| strstr(name, "_adisk._")
|| strstr(name, "_smb._")
)
dtype = device_nas;
else if(strstr(name, "_hap._"))
dtype = device_iot;
else if(strstr(name, "_pdl-datastream._"))
dtype = device_printer;
if((dtype != device_unknown) && cli_host && cli_host->getMac()) {
Mac *m = cli_host->getMac();
if(m->getDeviceType() == device_unknown)
m->setDeviceType(dtype);
}
switch(rsp_type) {
case 0x1C: /* AAAA */
case 0x01: /* AA */
case 0x10: /* TXT */
{
int len = strlen(name);
char *c;
if((len > 6) && (strcmp(&name[len-6], ".local") == 0))
name[len-6] = 0;
c = strstr(name, "._");
if(c && (c != name) /* Does not begin with... */)
c[0] = '\0';
}
if(cli_host)
cli_host->inlineSetMDNSName(name); /* See (**) */
if((rsp_type == 0x10 /* TXT */) && (data_len > 0)) {
char *txt = (char*)&payload[i+sizeof(rsp)], txt_buf[256];
u_int16_t off = 0;
while(off < data_len) {
u_int8_t txt_len = (u_int8_t)txt[off];
if(txt_len < data_len) {
txt_len = min_val(data_len-off, txt_len);
off++;
if(txt_len > 0) {
char *model = NULL;
strncpy(txt_buf, &txt[off], txt_len);
txt_buf[txt_len] = '\0';
off += txt_len;
#ifdef DEBUG_DISCOVERY
ntop->getTrace()->traceEvent(TRACE_NORMAL, "===>>> [TXT][%s]", txt_buf);
#endif
if(strncmp(txt_buf, "am=", 3 /* Apple Model */) == 0) model = &txt_buf[3];
else if(strncmp(txt_buf, "model=", 6) == 0) model = &txt_buf[6];
else if(strncmp(txt_buf, "md=", 3) == 0) model = &txt_buf[3];
if(model && cli_host) {
Mac *mac = cli_host->getMac();
if(mac) {
mac->inlineSetModel(model);
}
}
if(strncmp(txt_buf, "nm=", 3) == 0) {
if(cli_host)
cli_host->inlineSetMDNSTXTName(&txt_buf[3]);
}
if(strncmp(txt_buf, "ssid=", 3) == 0) {
if(cli_host && cli_host->getMac())
cli_host->getMac()->inlineSetSSID(&txt_buf[5]);
}
}
} else
break;
}
}
#ifdef DEBUG_DISCOVERY
ntop->getTrace()->traceEvent(TRACE_NORMAL, "%u) %u [%s]", answers, rsp_type, name);
#endif
//return; /* It's enough to decode the first name */
}
i += sizeof(rsp) + data_len, answers--;
}
}
/* *************************************** */
void Flow::dissectSSDP(bool src2dst_direction, char *payload, u_int16_t payload_len) {
char url[512];
u_int i = 0;
if(payload_len < 6 /* NOTIFY */) return;
if(strncmp(payload, "NOTIFY", 6) == 0) {
payload += 6, payload_len -= 6;
for(; 0 < payload_len - 9 /* strlen("Location:") */; payload++, payload_len--) {
if(strncasecmp(payload, "Location:", 9)) {
continue;
} else {
payload += 9, payload_len -= 9;
for(; (payload_len > 0)
&& (payload[0] != '\n')
&& (payload[0] != '\r'); payload++, payload_len--) {
if(*payload == ' ') continue;
if(i == sizeof(url) - 1) break;
url[i++] = *payload;
}
url[i] = '\0';
// ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SSDP URL:] %s", url);
if(cli_host) cli_host->inlineSetSSDPLocation(url);
break;
}
}
}
}
/* *************************************** */
#ifdef HAVE_NEDGE
bool Flow::isPassVerdict() {
if(!passVerdict)
return(false);
if(cli_host && srv_host)
return((!quota_exceeded)
&& (!(cli_host->dropAllTraffic() || srv_host->dropAllTraffic()))
&& (!isBlacklistedFlow()));
else
return(true);
}
/* *************************************** */
bool Flow::checkPassVerdict(const struct tm *now) {
if(!passVerdict)
return(false);
if(!isDetectionCompleted())
return(true); /* Always pass until detection is completed */
recheckQuota(now);
return isPassVerdict();
}
#endif
/* *************************************** */
#ifdef HAVE_NEDGE
bool Flow::updateDirectionShapers(bool src2dst_direction, TrafficShaper **ingress_shaper, TrafficShaper **egress_shaper) {
bool verdict = true;
if(cli_host && srv_host) {
if(src2dst_direction) {
*ingress_shaper = srv_host->get_ingress_shaper(ndpiDetectedProtocol),
*egress_shaper = cli_host->get_egress_shaper(ndpiDetectedProtocol);
if(*ingress_shaper) srv2cli_in = (*ingress_shaper)->get_shaper_id();
if(*egress_shaper) cli2srv_out = (*egress_shaper)->get_shaper_id();
} else {
*ingress_shaper = cli_host->get_ingress_shaper(ndpiDetectedProtocol),
*egress_shaper = srv_host->get_egress_shaper(ndpiDetectedProtocol);
if(*ingress_shaper) cli2srv_in = (*ingress_shaper)->get_shaper_id();
if(*egress_shaper) srv2cli_out = (*egress_shaper)->get_shaper_id();
}
if((*ingress_shaper && (*ingress_shaper)->shaping_enabled() && (*ingress_shaper)->get_max_rate_kbit_sec() == 0)
|| (*egress_shaper && (*egress_shaper)->shaping_enabled() && (*egress_shaper)->get_max_rate_kbit_sec() == 0))
verdict = false;
} else
*ingress_shaper = *egress_shaper = NULL;
return verdict;
}
/* *************************************** */
void Flow::updateFlowShapers(bool first_update) {
bool cli2srv_verdict, srv2cli_verdict;
bool old_verdict = passVerdict;
bool new_verdict;
u_int16_t old_cli2srv_in = cli2srv_in,
old_cli2srv_out = cli2srv_out,
old_srv2cli_in = srv2cli_in,
old_srv2cli_out = srv2cli_out;
/* Re-compute the verdict */
cli2srv_verdict = updateDirectionShapers(true, &flowShaperIds.cli2srv.ingress, &flowShaperIds.cli2srv.egress);
srv2cli_verdict = updateDirectionShapers(false, &flowShaperIds.srv2cli.ingress, &flowShaperIds.srv2cli.egress);
new_verdict = (cli2srv_verdict && srv2cli_verdict);
if(ntop->getPrefs()->are_device_protocol_policies_enabled() && cli_host && srv_host && new_verdict) {
/* NOTE: this must be handled differently to only consider actual peers direction */
if((cli_host->getDeviceAllowedProtocolStatus(ndpiDetectedProtocol, true /* client */) != device_proto_allowed) ||
(srv_host->getDeviceAllowedProtocolStatus(ndpiDetectedProtocol, false /* server */) != device_proto_allowed))
new_verdict = false;
}
/* Set the new verdict */
passVerdict = new_verdict;
if((!first_update) && (iface->getIfType() == interface_type_NETFILTER) &&
(((old_verdict != passVerdict)) ||
(old_cli2srv_in != cli2srv_in) ||
(old_cli2srv_out != cli2srv_out) ||
(old_srv2cli_in != srv2cli_in) ||
(old_srv2cli_out != srv2cli_out)))
((NetfilterInterface *) iface)->setPolicyChanged();
#ifdef SHAPER_DEBUG
{
char buf[1024];
ntop->getTrace()->traceEvent(TRACE_NORMAL, "[SHAPERS] %s", print(buf, sizeof(buf)));
}
#endif
}
/* *************************************** */
void Flow::recheckQuota(const struct tm *now) {
bool above_quota = false;
if(cli_host && srv_host) {
L7PolicySource_t cli_src, srv_src;
if((above_quota = cli_host->checkQuota(ndpiDetectedProtocol, &cli_src, now)))
srv_src = policy_source_default;
else if((above_quota = srv_host->checkQuota(ndpiDetectedProtocol, &srv_src, now)))
;
/* Use temporary values to guard against partial changes */
cli_quota_source = cli_src, srv_quota_source = srv_src;
}
quota_exceeded = above_quota;
}
#endif
/* *************************************** */
bool Flow::isLowGoodput() {
if(iface->getIfType() == interface_type_ZMQ
|| iface->getIfType() == interface_type_NETFILTER
|| protocol == IPPROTO_UDP)
return(false);
else
return((((get_goodput_bytes()*100)/(get_bytes()+1 /* avoid zero divisions */)) < FLOW_GOODPUT_THRESHOLD) ? true : false);
}
/* ***************************************************** */
FlowStatus Flow::getFlowStatus() {
#ifndef HAVE_NEDGE
u_int32_t threshold;
#endif
u_int16_t l7proto = ndpi_get_lower_proto(ndpiDetectedProtocol);
/* NOTE: evaluation order is important here! */
if(iface->isPacketInterface() && iface->is_purge_idle_interface() &&
!is_ready_to_be_purged() && isIdle(5 * ntop->getPrefs()->get_flow_max_idle())) {
/* Should've already been marked as idle and purged */
return status_not_purged;
}
if(isBlacklistedFlow())
return status_blacklisted;
if(!isDeviceAllowedProtocol())
return status_device_protocol_not_allowed;
//if(get_protocol_category() == CUSTOM_CATEGORY_MINING)
if(ndpiDetectedProtocol.category == CUSTOM_CATEGORY_MINING)
return status_web_mining_detected;
#ifndef HAVE_NEDGE
/* All flows */
threshold = cli2srv_packets / CONST_TCP_CHECK_ISSUES_RATIO;
if((tcp_stats_s2d.pktRetr + tcp_stats_s2d.pktOOO + tcp_stats_s2d.pktLost) > threshold)
return status_tcp_connection_issues;
threshold = srv2cli_packets / CONST_TCP_CHECK_ISSUES_RATIO;
if((tcp_stats_d2s.pktRetr + tcp_stats_d2s.pktOOO + tcp_stats_d2s.pktLost) > threshold)
return status_tcp_connection_issues;
#endif
if(iface->getIfType() == interface_type_ZMQ) {
/* ZMQ flows */
} else {
/* Packet flows */
bool isIdle = idle();
#ifndef HAVE_NEDGE
bool lowGoodput = isLowGoodput();
#endif
if(protocol == IPPROTO_TCP) {
if((srv2cli_packets == 0) && ((time(NULL)-last_seen) > CONST_ALERT_PROBING_TIME))
return status_suspicious_tcp_probing;
if(!twh_over) {
if(isIdle)
return status_suspicious_tcp_syn_probing;
else
return status_normal;
} else {
/* 3WH is over */
switch(l7proto) {
case NDPI_PROTOCOL_SSL:
if(protos.ssl.certificate
&& protos.ssl.server_certificate
&& !protos.ssl.subject_alt_name_match) {
if(protos.ssl.server_certificate[0] == '*') {
if(!strstr(protos.ssl.certificate, &protos.ssl.server_certificate[1]))
return status_ssl_certificate_mismatch;
} else if(strcmp(protos.ssl.certificate, protos.ssl.server_certificate))
return status_ssl_certificate_mismatch;
}
break;
}
#ifndef HAVE_NEDGE
if(isIdle && lowGoodput) return status_slow_data_exchange;
if(!isIdle && lowGoodput) {
if(isTCPReset())
return status_tcp_connection_refused;
else
return status_low_goodput;
}
#endif
}
}
/* If here is either UDP or TCP */
switch(l7proto) {
case NDPI_PROTOCOL_DNS:
if(protos.dns.invalid_query)
return status_dns_invalid_query;
}
}
if(cli_host && srv_host
&& cli_host->get_ip()->isNonEmptyUnicastAddress()
&& srv_host->get_ip()->isNonEmptyUnicastAddress()) {
if(! cli_host->isLocalHost() &&
! srv_host->isLocalHost())
return status_remote_to_remote;
if(get_duration() > ntop->getPrefs()->get_longlived_flow_duration())
return status_longlived;
}
if(cli_host && srv_host
/* Assumes elephant flows are normal when the category is data transfer */
&& get_protocol_category() != NDPI_PROTOCOL_CATEGORY_DATA_TRANSFER) {
u_int64_t local_to_remote_bytes = 0, remote_to_local_bytes = 0;
if(cli_host->isLocalHost() && ! srv_host->isLocalHost()) {
local_to_remote_bytes = get_bytes_cli2srv();
remote_to_local_bytes = get_bytes_srv2cli();
} else if(srv_host->isLocalHost() && ! cli_host->isLocalHost()) {
local_to_remote_bytes = get_bytes_srv2cli();
remote_to_local_bytes = get_bytes_cli2srv();
}
if(local_to_remote_bytes > ntop->getPrefs()->get_elephant_flow_local_to_remote_bytes())
return status_elephant_local_to_remote;
if(remote_to_local_bytes > ntop->getPrefs()->get_elephant_flow_remote_to_local_bytes())
return status_elephant_remote_to_local;
}
#ifdef HAVE_NEDGE
/* Leave this at the end. A more specific status should be returned above if avaialble. */
if(!isPassVerdict())
return status_blocked;
#endif
#if 0
if(iface->getAlertLevel() > 0)
return(status_flow_when_interface_alerted);
#endif
return status_normal;
}
/* ***************************************************** */
bool Flow::isTiny() {
//if((cli2srv_packets < 3) && (srv2cli_packets == 0))
if((get_packets() <= ntop->getPrefs()->get_max_num_packets_per_tiny_flow())
|| (get_bytes() <= ntop->getPrefs()->get_max_num_bytes_per_tiny_flow()))
return(true);
else
return(false);
}
/* ***************************************************** */
#ifdef HAVE_NEDGE
void Flow::setPacketsBytes(time_t now, u_int32_t s2d_pkts, u_int32_t d2s_pkts,
u_int64_t s2d_bytes, u_int64_t d2s_bytes) {
u_int16_t eth_proto = ETHERTYPE_IP;
u_int overhead = 0;
bool nf_existing_flow;
/* netfilter (depending on configured timeouts) could expire a flow before than
ntopng. This heuristics attempt to detect such events.
Basically, if netfilter is sending counters for a new flow and ntopng
already have an existing flow matching the same 5-tuple, we sum counters
rather than overwriting them.
A complete solution would require the registration of a netfilter callback
and the detection of event NFCT_T_DESTROY.
*/
nf_existing_flow = !(cli2srv_packets > s2d_pkts || cli2srv_bytes > s2d_bytes
|| srv2cli_packets > d2s_pkts || srv2cli_bytes > d2s_bytes);
updateSeen();
/*
We need to set last_conntrack_update even with 0 packtes/bytes
as this function has been called only within netfilter through
the conntrack handler, and thus the flow is still alive.
*/
last_conntrack_update = now;
iface->_incStats(isIngress2EgressDirection(), now, eth_proto, ndpiDetectedProtocol.app_protocol,
nf_existing_flow ? s2d_bytes - cli2srv_bytes : s2d_bytes,
nf_existing_flow ? s2d_pkts - cli2srv_packets : s2d_pkts,
overhead);
iface->_incStats(!isIngress2EgressDirection(), now, eth_proto, ndpiDetectedProtocol.app_protocol,
nf_existing_flow ? d2s_bytes - srv2cli_bytes : d2s_bytes,
nf_existing_flow ? d2s_pkts - srv2cli_packets : d2s_pkts,
overhead);
if(nf_existing_flow) {
cli2srv_packets = s2d_pkts, cli2srv_bytes = s2d_bytes,
srv2cli_packets = d2s_pkts, srv2cli_bytes = d2s_bytes;
} else {
cli2srv_packets += s2d_pkts, cli2srv_bytes += s2d_bytes,
srv2cli_packets += d2s_pkts, srv2cli_bytes += d2s_bytes;
}
}
#endif
/* ***************************************************** */
#ifdef HAVE_EBPF
void Flow::setProcessInfo(eBPFevent *event, bool client_process) {
ProcessInfo **process_info = client_process ? &client_proc : &server_proc;
if(!*process_info)
*process_info = (ProcessInfo*)calloc(1, sizeof(ProcessInfo));
// else
// memset(*process_info, 0, sizeof(ProcessInfo));
if(*process_info) {
struct taskInfo *proc, *father;
ProcessInfo *c = *process_info /* , *s */;
proc = (event->ip_version == 4) ? &event->proc : &event->proc;
father = (event->ip_version == 4) ? &event->father : &event->father;
c->pid = proc->tid ? proc->tid : proc->pid,
c->father_pid = father->tid ? father->tid : father->pid,
c->uid = proc->uid, c->gid = proc->gid,
c->father_uid = father->uid, c->father_gid = father->gid;
if(c->process_name) free(c->process_name);
c->process_name = strdup(proc->full_task_path ? proc->full_task_path : proc->task);
if(c->father_process_name) free(c->father_process_name);
c->father_process_name = strdup(father->full_task_path ? father->full_task_path : father->task);
/* TODO: handle latency_usec */
}
}
#endif
/* ***************************************************** */
/* Called when a flow is set_to_purge */
void Flow::postFlowSetPurge(time_t t) {
/* not called from the datapath for flows, so it is only
safe to touch low goodput uses */
if(good_low_flow_detected) {
if(cli_host) cli_host->decLowGoodputFlows(t, true);
if(srv_host) srv_host->decLowGoodputFlows(t, false);
}
FlowStatus status = getFlowStatus();
if(status != status_normal) {
#if 0
char buf[256];
printf("%s status=%d\n", print(buf, sizeof(buf)), status);
#endif
if(cli_host) cli_host->incNumAnomalousFlows(true);
if(srv_host) srv_host->incNumAnomalousFlows(false);
}
}
/* ***************************************************** */
void Flow::fillZmqFlowCategory() {
struct ndpi_detection_module_struct *ndpi_struct = iface->get_ndpi_struct();
char *srv_name = getFlowServerInfo();
if(!cli_host || !srv_host)
return;
if(cli_host->get_ip()->isIPv4()) {
if(ndpi_fill_ip_protocol_category(ndpi_struct, get_cli_ipv4(), get_srv_ipv4(), &ndpiDetectedProtocol))
return;
}
if(srv_name && srv_name[0]) {
unsigned long id;
if(ndpi_match_custom_category(ndpi_struct, srv_name, &id) == 0) {
ndpiDetectedProtocol.category = (ndpi_protocol_category_t)id;
return;
}
}
}
/* ***************************************************** */
void Flow::dissectSSL(char *payload, u_int16_t payload_len) {
if(protos.ssl.dissect_certificate) {
u_int16_t _payload_len = payload_len + protos.ssl.certificate_leftover;
u_char *_payload = (u_char*)malloc(_payload_len);
bool find_initial_pattern = true;
if(!_payload)
return;
else {
int i = 0;
if(protos.ssl.certificate_leftover > 0) {
memcpy(_payload, protos.ssl.certificate_buf_leftover, (i = protos.ssl.certificate_leftover));
free(protos.ssl.certificate_buf_leftover);
protos.ssl.certificate_buf_leftover = NULL, protos.ssl.certificate_leftover = 0;
find_initial_pattern = false;
}
memcpy(&_payload[i], payload, payload_len);
}
if(_payload_len > 4) {
for(int i = (find_initial_pattern ? 9 : 0); i < _payload_len - 4 && protos.ssl.dissect_certificate; i++) {
/* Look for the Subject Alternative Name Extension with OID 55 1D 11 */
if((find_initial_pattern && (_payload[i] == 0x55) && (_payload[i+1] == 0x1d) && (_payload[i+2] == 0x11))
|| (!find_initial_pattern)) {
if(find_initial_pattern) {
i += 3 /* skip the initial patten 55 1D 11 */;
i++; /* skip the first type, 0x04 == BIT STRING, and jump to it's length */
i += _payload[i] & 0x80 ? _payload[i] & 0x7F : 0; /* skip BIT STRING length */
i += 2; /* skip the second type, 0x30 == SEQUENCE, and jump to it's length */
i += _payload[i] & 0x80 ? _payload[i] & 0x7F : 0; /* skip SEQUENCE length */
i++;
}
while(i < _payload_len) {
if(_payload[i] == 0x82) {
u_int8_t len;
if(i < _payload_len - 1 && (len = _payload[i + 1]) && i + len + 2 < _payload_len) {
i += 2;
if(!isalpha(_payload[i]) && _payload[i] != '*') {
protos.ssl.dissect_certificate = false;
break;
} else {
char buf[len + 1];
strncpy(buf, (const char*)&_payload[i], len);
buf[len] = '\0';
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s [Len %u][sizeof(buf): %u][ssl cert: %s]", buf, len, sizeof(buf), getSSLCertificate());
#endif
if(protos.ssl.certificate
&& ((buf[0] != '*' && !strncmp(protos.ssl.certificate, buf, sizeof(buf)))
|| (buf[0] == '*' && strstr(protos.ssl.certificate, &buf[1])))) {
protos.ssl.subject_alt_name_match = true;
protos.ssl.dissect_certificate = false;
break;
}
}
i += len;
} else {
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Leftover %u bytes [%u len]", _payload_len - i, len);
#endif
protos.ssl.certificate_leftover = _payload_len - i;
if((protos.ssl.certificate_buf_leftover = (char*)malloc(protos.ssl.certificate_leftover)) != NULL)
memcpy(protos.ssl.certificate_buf_leftover, &_payload[i], protos.ssl.certificate_leftover);
else
protos.ssl.certificate_leftover = 0;
break;
}
} else {
protos.ssl.dissect_certificate = false;
break;
}
}
}
} /* for */
}
free(_payload);
}
}