From bc14efd102aa5201a416df0764ea6d00c81fc6db Mon Sep 17 00:00:00 2001 From: emanuele-f Date: Wed, 26 Oct 2016 14:32:02 +0200 Subject: [PATCH] Interpret ZMQ retransmissions/OOO and update counters --- include/Flow.h | 10 ++++---- include/ntop_typedefs.h | 5 ++++ src/Flow.cpp | 52 +++++++++++++++++++++++++++++++--------- src/NetworkInterface.cpp | 4 ++++ src/ParserInterface.cpp | 39 ++++++++++++++++++++---------- 5 files changed, 82 insertions(+), 28 deletions(-) diff --git a/include/Flow.h b/include/Flow.h index 4ec92c80bd..6b5b30f40c 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -97,11 +97,11 @@ class Flow : public GenericHashEntry { char *last_content_type; u_int16_t last_return_code; } http; - + struct { char *last_query; } dns; - + struct { char *certificate; FlowSSLStage cli_stage, srv_stage; @@ -186,7 +186,7 @@ class Flow : public GenericHashEntry { void updatePacketStats(InterarrivalStats *stats, const struct timeval *when); void dumpPacketStats(lua_State* vm, bool cli2srv_direction); inline u_int32_t getCurrentInterArrivalTime(time_t now, bool cli2srv_direction) { - return(1000 /* msec */ + return(1000 /* msec */ * (now - (cli2srv_direction ? cli2srvStats.pktTime.lastTime.tv_sec : srv2cliStats.pktTime.lastTime.tv_sec))); } FlowStatus getFlowStatus(); @@ -231,6 +231,8 @@ class Flow : public GenericHashEntry { inline void setServerName(char *v) { if(host_server_name) free(host_server_name); host_server_name = strdup(v); } void updateTcpFlags(const struct bpf_timeval *when, u_int8_t flags, bool src2dst_direction); + void incTcpBadStats(bool src2dst_direction, + u_int32_t ooo_pkts, u_int32_t retr_pkts, u_int32_t lost_pkts); void updateTcpSeqNum(const struct bpf_timeval *when, u_int32_t seq_num, u_int32_t ack_seq_num, @@ -365,7 +367,7 @@ class Flow : public GenericHashEntry { bool isIdleFlow(); inline FlowState getFlowState() { return(state); } inline bool isEstablished() { return state == flow_state_established; } - + void setActivityFilter(ActivityFilterID fid, const activity_filter_config * config); inline bool getActivityFilterId(ActivityFilterID *out) { if(activityDetection && activityDetection->filterSet) { diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index a3126942cb..05149af8dd 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -130,6 +130,11 @@ typedef struct zmq_flow { u_int16_t vlan_id, pkt_sampling_rate; u_int8_t l4_proto, tcp_flags; u_int32_t in_pkts, in_bytes, out_pkts, out_bytes; + struct { + u_int32_t ooo_in_pkts, ooo_out_pkts; + u_int32_t retr_in_pkts, retr_out_pkts; + u_int32_t lost_in_pkts, lost_out_pkts; + } tcp; u_int32_t first_switched, last_switched; json_object *additional_fields; u_int8_t src_mac[6], dst_mac[6], direction, source_id; diff --git a/src/Flow.cpp b/src/Flow.cpp index df70dc078c..51f72039d2 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -56,7 +56,7 @@ Flow::Flow(NetworkInterface *_iface, doNotExpireBefore = iface->getTimeLastPktRcvd() + 30 /* sec */; memset(&cli2srvStats, 0, sizeof(cli2srvStats)), memset(&srv2cliStats, 0, sizeof(srv2cliStats)); - + if(ntop->getPrefs()->is_flow_activity_enabled()){ if((activityDetection = (FlowActivityDetection*)calloc(1, sizeof(FlowActivityDetection))) == NULL) ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to allocate memory for flow activity detection"); @@ -240,7 +240,7 @@ Flow::~Flow() { iface->getAlertsManager()->storeFlowAlert(this, alert_suspicious_activity, alert_level_warning, alert_msg); - + // ntop->getTrace()->traceEvent(TRACE_WARNING, "%s", print(alert_msg, sizeof(alert_msg))); break; @@ -538,7 +538,7 @@ void Flow::setDetectedProtocol(ndpi_protocol proto_id, bool forceDetection) { if (detection_completed) iface->luaEvalFlow(this, callback_flow_proto_callback); - + #ifdef NTOPNG_PRO // Update the profile even if the detection is not yet completed. // Indeed, even if the L7 detection is not yet completed @@ -1884,7 +1884,7 @@ void Flow::dumpPacketStats(lua_State* vm, bool cli2srv_direction) { bool Flow::isSSLProto() { u_int16_t lower = ndpi_get_lower_proto(ndpiDetectedProtocol); - + return ( (lower == NDPI_PROTOCOL_SSL) || (lower == NDPI_PROTOCOL_MAIL_IMAPS) || @@ -2021,7 +2021,7 @@ void Flow::updateTcpFlags(const struct bpf_timeval *when, if((synAckTime.tv_sec == 0) && (synTime.tv_sec > 0)) { memcpy(&synAckTime, when, sizeof(struct timeval)); timeval_diff(&synTime, (struct timeval*)when, &serverNwLatency, 1); - + /* Sanity check */ if(serverNwLatency.tv_sec > 5) memset(&serverNwLatency, 0, sizeof(serverNwLatency)); } @@ -2029,12 +2029,12 @@ void Flow::updateTcpFlags(const struct bpf_timeval *when, if((ackTime.tv_sec == 0) && (synAckTime.tv_sec > 0)) { memcpy(&ackTime, when, sizeof(struct timeval)); timeval_diff(&synAckTime, (struct timeval*)when, &clientNwLatency, 1); - + /* Sanity check */ if(clientNwLatency.tv_sec > 5) memset(&clientNwLatency, 0, sizeof(clientNwLatency)); - + rttSec = ((float)(serverNwLatency.tv_sec+clientNwLatency.tv_sec)) - +((float)(serverNwLatency.tv_usec+clientNwLatency.tv_usec))/(float)1000000; + +((float)(serverNwLatency.tv_usec+clientNwLatency.tv_usec))/(float)1000000; } twh_over = true, iface->getTcpFlowStats()->incEstablished(); @@ -2079,6 +2079,36 @@ u_int32_t Flow::getNextTcpSeq ( u_int8_t tcpFlags, /* *************************************** */ +void Flow::incTcpBadStats(bool src2dst_direction, + u_int32_t ooo_pkts, + u_int32_t retr_pkts, + u_int32_t lost_pkts) { + TCPPacketStats * stats; + Host * host; + + if (src2dst_direction) { + stats = &tcp_stats_s2d; + host = cli_host; + } else { + stats = &tcp_stats_d2s; + host = srv_host; + } + + stats->pktRetr += retr_pkts; + stats->pktOOO += ooo_pkts; + stats->pktLost += lost_pkts; + + host->incRetransmittedPkts(retr_pkts); + host->incOOOPkts(ooo_pkts); + host->incLostPkts(lost_pkts); + + iface->incRetransmittedPkts(retr_pkts); + iface->incOOOPkts(ooo_pkts); + iface->incLostPkts(lost_pkts); +} + +/* *************************************** */ + void Flow::updateTcpSeqNum(const struct bpf_timeval *when, u_int32_t seq_num, u_int32_t ack_seq_num, u_int16_t window, u_int8_t flags, @@ -2453,7 +2483,7 @@ bool Flow::isLowGoodput() { void Flow::dissectSSL(u_int8_t *payload, u_int16_t payload_len, const struct bpf_timeval *when, bool cli2srv) { uint16_t skiphello; bool hs_now_end = false; - + if(good_ssl_hs && twh_over && payload_len >= SSL_MIN_PACKET_SIZE) { if( (cli2srv && (getSSLEncryptionStatus() & SSL_ENCRYPTION_CLIENT)) || (!cli2srv && (getSSLEncryptionStatus() & SSL_ENCRYPTION_SERVER)) ) { @@ -2472,7 +2502,7 @@ void Flow::dissectSSL(u_int8_t *payload, u_int16_t payload_len, const struct bpf } } else { protos.ssl.is_data = false; - + if(payload[0] == SSL_HANDSHAKE_PACKET) { if(payload[5] == SSL_CLIENT_HELLO) { if(protos.ssl.cli_stage == SSL_STAGE_UNKNOWN) { @@ -2593,7 +2623,7 @@ bool Flow::invokeActivityFilter(const struct timeval *when, bool cli2srv, u_int1 if(activityDetection == NULL) return false /* detection disabled */; if(activityDetection->filterSet) - return (activity_filter_funcs[activityDetection->filterId])(&activityDetection->config, + return (activity_filter_funcs[activityDetection->filterId])(&activityDetection->config, &activityDetection->status, this, when, cli2srv, payload_len); return false; diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 6e1efc8565..621c57eafe 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -746,6 +746,10 @@ void NetworkInterface::processFlow(ZMQ_Flow *zflow) { when.tv_sec = (long)now, when.tv_usec = 0; flow->updateTcpFlags((const struct bpf_timeval*)&when, zflow->tcp_flags, src2dst_direction); + flow->incTcpBadStats(true, + zflow->tcp.ooo_in_pkts, zflow->tcp.retr_in_pkts, zflow->tcp.lost_in_pkts); + flow->incTcpBadStats(false, + zflow->tcp.ooo_out_pkts, zflow->tcp.retr_out_pkts, zflow->tcp.lost_out_pkts); } flow->addFlowStats(src2dst_direction, diff --git a/src/ParserInterface.cpp b/src/ParserInterface.cpp index c44295d33f..7ef468f10a 100755 --- a/src/ParserInterface.cpp +++ b/src/ParserInterface.cpp @@ -24,7 +24,7 @@ /* **************************************************** */ /* IMPORTANT: keep it in sync with flow_fields_description part of flow_utils.lua */ -ParserInterface::ParserInterface(const char *endpoint) : NetworkInterface(endpoint) { +ParserInterface::ParserInterface(const char *endpoint) : NetworkInterface(endpoint) { map = NULL, once = false; addMapping("IN_BYTES", 1); @@ -413,9 +413,9 @@ int ParserInterface::getKeyId(char *sym) { struct FlowFieldMap *s; if(isdigit(sym[0])) return(atoi(sym)); - + HASH_FIND_STR(map, sym, s); /* s: output pointer */ - + return(s ? s->value : -1); } @@ -425,12 +425,12 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s json_object *o; enum json_tokener_error jerr = json_tokener_success; NetworkInterface * iface = (NetworkInterface*)data; - + // payload[payload_size] = '\0'; - + //ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload); o = json_tokener_parse_verbose(payload, &jerr); - + if(o != NULL) { struct json_object_iterator it = json_object_iter_begin(o); struct json_object_iterator itEnd = json_object_iter_end(o); @@ -445,10 +445,10 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s const char *value = json_object_get_string(v); if((key != NULL) && (value != NULL)) { - /* + /* Example { "if.name": "en0", "if.speed": 1000, "if.ip": "fe80::c62c:3ff:fe06:49fe%en0", "probe.ip": "192.168.1.5", "time" : 1456595814, "bytes": 18505, "packets": 85 } - */ + */ if(!strcmp(key, "if.name")) snprintf(remote_ifname, sizeof(remote_ifname), "%s", value); else if(!strcmp(key, "if.ip")) snprintf(remote_ifaddress, sizeof(remote_ifaddress), "%s", value); else if(!strcmp(key, "if.speed")) remote_ifspeed = atol(value); @@ -459,19 +459,19 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s else if(!strcmp(key, "time")) remote_time = atol(value); /* Format 1461424017.299 . */ else if(!strcmp(key, "avg.bps")) avg_bps = atol(value); else if(!strcmp(key, "avg.pps")) avg_pps = atol(value); - + /* Move to the next element */ json_object_iter_next(&it); } } // while json_object_iter_equal - + /* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps); */ /* Process Flow */ - iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed, + iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed, remote_probe_address, remote_probe_public_address, remote_bytes, remote_pkts, remote_time, avg_pps, avg_bps); - + /* Dispose memory */ json_object_put(o); } else { @@ -625,6 +625,19 @@ u_int8_t ParserInterface::parseFlow(char *payload, int payload_size, u_int8_t so case OUT_BYTES: flow.out_bytes = atol(value); break; + case OOORDER_IN_PKTS: + flow.tcp.ooo_in_pkts = atol(value); + break; + case OOORDER_OUT_PKTS: + flow.tcp.ooo_out_pkts = atol(value); + break; + case RETRANSMITTED_IN_PKTS: + flow.tcp.retr_in_pkts = atol(value); + break; + case RETRANSMITTED_OUT_PKTS: + flow.tcp.retr_out_pkts = atol(value); + break; + /* TODO add lost in/out to nProbe and here */ case FIRST_SWITCHED: flow.first_switched = atol(value); break; @@ -812,7 +825,7 @@ u_int8_t ParserInterface::parseCounter(char *payload, int payload_size, u_int8_t else if(!strcmp(key, "ifOutErrors")) stats.ifOutErrors = atoll(value); else if(!strcmp(key, "ifPromiscuousMode")) stats.ifPromiscuousMode = (!strcmp(value, "1")) ? true : false; } /* if */ - + /* Move to the next element */ json_object_iter_next(&it); } // while json_object_iter_equal