Interpret ZMQ retransmissions/OOO and update counters

This commit is contained in:
emanuele-f 2016-10-26 14:32:02 +02:00
parent a4bd8f1d09
commit bc14efd102
5 changed files with 82 additions and 28 deletions

View file

@ -97,11 +97,11 @@ class Flow : public GenericHashEntry {
char *last_content_type;
u_int16_t last_return_code;
} http;
struct {
char *last_query;
} dns;
struct {
char *certificate;
FlowSSLStage cli_stage, srv_stage;
@ -186,7 +186,7 @@ class Flow : public GenericHashEntry {
void updatePacketStats(InterarrivalStats *stats, const struct timeval *when);
void dumpPacketStats(lua_State* vm, bool cli2srv_direction);
inline u_int32_t getCurrentInterArrivalTime(time_t now, bool cli2srv_direction) {
return(1000 /* msec */
return(1000 /* msec */
* (now - (cli2srv_direction ? cli2srvStats.pktTime.lastTime.tv_sec : srv2cliStats.pktTime.lastTime.tv_sec)));
}
FlowStatus getFlowStatus();
@ -231,6 +231,8 @@ class Flow : public GenericHashEntry {
inline void setServerName(char *v) { if(host_server_name) free(host_server_name); host_server_name = strdup(v); }
void updateTcpFlags(const struct bpf_timeval *when,
u_int8_t flags, bool src2dst_direction);
void incTcpBadStats(bool src2dst_direction,
u_int32_t ooo_pkts, u_int32_t retr_pkts, u_int32_t lost_pkts);
void updateTcpSeqNum(const struct bpf_timeval *when,
u_int32_t seq_num, u_int32_t ack_seq_num,
@ -365,7 +367,7 @@ class Flow : public GenericHashEntry {
bool isIdleFlow();
inline FlowState getFlowState() { return(state); }
inline bool isEstablished() { return state == flow_state_established; }
void setActivityFilter(ActivityFilterID fid, const activity_filter_config * config);
inline bool getActivityFilterId(ActivityFilterID *out) {
if(activityDetection && activityDetection->filterSet) {

View file

@ -130,6 +130,11 @@ typedef struct zmq_flow {
u_int16_t vlan_id, pkt_sampling_rate;
u_int8_t l4_proto, tcp_flags;
u_int32_t in_pkts, in_bytes, out_pkts, out_bytes;
struct {
u_int32_t ooo_in_pkts, ooo_out_pkts;
u_int32_t retr_in_pkts, retr_out_pkts;
u_int32_t lost_in_pkts, lost_out_pkts;
} tcp;
u_int32_t first_switched, last_switched;
json_object *additional_fields;
u_int8_t src_mac[6], dst_mac[6], direction, source_id;

View file

@ -56,7 +56,7 @@ Flow::Flow(NetworkInterface *_iface,
doNotExpireBefore = iface->getTimeLastPktRcvd() + 30 /* sec */;
memset(&cli2srvStats, 0, sizeof(cli2srvStats)), memset(&srv2cliStats, 0, sizeof(srv2cliStats));
if(ntop->getPrefs()->is_flow_activity_enabled()){
if((activityDetection = (FlowActivityDetection*)calloc(1, sizeof(FlowActivityDetection))) == NULL)
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to allocate memory for flow activity detection");
@ -240,7 +240,7 @@ Flow::~Flow() {
iface->getAlertsManager()->storeFlowAlert(this, alert_suspicious_activity,
alert_level_warning, alert_msg);
// ntop->getTrace()->traceEvent(TRACE_WARNING, "%s", print(alert_msg, sizeof(alert_msg)));
break;
@ -538,7 +538,7 @@ void Flow::setDetectedProtocol(ndpi_protocol proto_id, bool forceDetection) {
if (detection_completed)
iface->luaEvalFlow(this, callback_flow_proto_callback);
#ifdef NTOPNG_PRO
// Update the profile even if the detection is not yet completed.
// Indeed, even if the L7 detection is not yet completed
@ -1884,7 +1884,7 @@ void Flow::dumpPacketStats(lua_State* vm, bool cli2srv_direction) {
bool Flow::isSSLProto() {
u_int16_t lower = ndpi_get_lower_proto(ndpiDetectedProtocol);
return (
(lower == NDPI_PROTOCOL_SSL) ||
(lower == NDPI_PROTOCOL_MAIL_IMAPS) ||
@ -2021,7 +2021,7 @@ void Flow::updateTcpFlags(const struct bpf_timeval *when,
if((synAckTime.tv_sec == 0) && (synTime.tv_sec > 0)) {
memcpy(&synAckTime, when, sizeof(struct timeval));
timeval_diff(&synTime, (struct timeval*)when, &serverNwLatency, 1);
/* Sanity check */
if(serverNwLatency.tv_sec > 5) memset(&serverNwLatency, 0, sizeof(serverNwLatency));
}
@ -2029,12 +2029,12 @@ void Flow::updateTcpFlags(const struct bpf_timeval *when,
if((ackTime.tv_sec == 0) && (synAckTime.tv_sec > 0)) {
memcpy(&ackTime, when, sizeof(struct timeval));
timeval_diff(&synAckTime, (struct timeval*)when, &clientNwLatency, 1);
/* Sanity check */
if(clientNwLatency.tv_sec > 5) memset(&clientNwLatency, 0, sizeof(clientNwLatency));
rttSec = ((float)(serverNwLatency.tv_sec+clientNwLatency.tv_sec))
+((float)(serverNwLatency.tv_usec+clientNwLatency.tv_usec))/(float)1000000;
+((float)(serverNwLatency.tv_usec+clientNwLatency.tv_usec))/(float)1000000;
}
twh_over = true, iface->getTcpFlowStats()->incEstablished();
@ -2079,6 +2079,36 @@ u_int32_t Flow::getNextTcpSeq ( u_int8_t tcpFlags,
/* *************************************** */
void Flow::incTcpBadStats(bool src2dst_direction,
u_int32_t ooo_pkts,
u_int32_t retr_pkts,
u_int32_t lost_pkts) {
TCPPacketStats * stats;
Host * host;
if (src2dst_direction) {
stats = &tcp_stats_s2d;
host = cli_host;
} else {
stats = &tcp_stats_d2s;
host = srv_host;
}
stats->pktRetr += retr_pkts;
stats->pktOOO += ooo_pkts;
stats->pktLost += lost_pkts;
host->incRetransmittedPkts(retr_pkts);
host->incOOOPkts(ooo_pkts);
host->incLostPkts(lost_pkts);
iface->incRetransmittedPkts(retr_pkts);
iface->incOOOPkts(ooo_pkts);
iface->incLostPkts(lost_pkts);
}
/* *************************************** */
void Flow::updateTcpSeqNum(const struct bpf_timeval *when,
u_int32_t seq_num, u_int32_t ack_seq_num,
u_int16_t window, u_int8_t flags,
@ -2453,7 +2483,7 @@ bool Flow::isLowGoodput() {
void Flow::dissectSSL(u_int8_t *payload, u_int16_t payload_len, const struct bpf_timeval *when, bool cli2srv) {
uint16_t skiphello;
bool hs_now_end = false;
if(good_ssl_hs && twh_over && payload_len >= SSL_MIN_PACKET_SIZE) {
if( (cli2srv && (getSSLEncryptionStatus() & SSL_ENCRYPTION_CLIENT)) ||
(!cli2srv && (getSSLEncryptionStatus() & SSL_ENCRYPTION_SERVER)) ) {
@ -2472,7 +2502,7 @@ void Flow::dissectSSL(u_int8_t *payload, u_int16_t payload_len, const struct bpf
}
} else {
protos.ssl.is_data = false;
if(payload[0] == SSL_HANDSHAKE_PACKET) {
if(payload[5] == SSL_CLIENT_HELLO) {
if(protos.ssl.cli_stage == SSL_STAGE_UNKNOWN) {
@ -2593,7 +2623,7 @@ bool Flow::invokeActivityFilter(const struct timeval *when, bool cli2srv, u_int1
if(activityDetection == NULL) return false /* detection disabled */;
if(activityDetection->filterSet)
return (activity_filter_funcs[activityDetection->filterId])(&activityDetection->config,
return (activity_filter_funcs[activityDetection->filterId])(&activityDetection->config,
&activityDetection->status, this, when, cli2srv, payload_len);
return false;

View file

@ -746,6 +746,10 @@ void NetworkInterface::processFlow(ZMQ_Flow *zflow) {
when.tv_sec = (long)now, when.tv_usec = 0;
flow->updateTcpFlags((const struct bpf_timeval*)&when,
zflow->tcp_flags, src2dst_direction);
flow->incTcpBadStats(true,
zflow->tcp.ooo_in_pkts, zflow->tcp.retr_in_pkts, zflow->tcp.lost_in_pkts);
flow->incTcpBadStats(false,
zflow->tcp.ooo_out_pkts, zflow->tcp.retr_out_pkts, zflow->tcp.lost_out_pkts);
}
flow->addFlowStats(src2dst_direction,

View file

@ -24,7 +24,7 @@
/* **************************************************** */
/* IMPORTANT: keep it in sync with flow_fields_description part of flow_utils.lua */
ParserInterface::ParserInterface(const char *endpoint) : NetworkInterface(endpoint) {
ParserInterface::ParserInterface(const char *endpoint) : NetworkInterface(endpoint) {
map = NULL, once = false;
addMapping("IN_BYTES", 1);
@ -413,9 +413,9 @@ int ParserInterface::getKeyId(char *sym) {
struct FlowFieldMap *s;
if(isdigit(sym[0])) return(atoi(sym));
HASH_FIND_STR(map, sym, s); /* s: output pointer */
return(s ? s->value : -1);
}
@ -425,12 +425,12 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s
json_object *o;
enum json_tokener_error jerr = json_tokener_success;
NetworkInterface * iface = (NetworkInterface*)data;
// payload[payload_size] = '\0';
//ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", payload);
o = json_tokener_parse_verbose(payload, &jerr);
if(o != NULL) {
struct json_object_iterator it = json_object_iter_begin(o);
struct json_object_iterator itEnd = json_object_iter_end(o);
@ -445,10 +445,10 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s
const char *value = json_object_get_string(v);
if((key != NULL) && (value != NULL)) {
/*
/*
Example
{ "if.name": "en0", "if.speed": 1000, "if.ip": "fe80::c62c:3ff:fe06:49fe%en0", "probe.ip": "192.168.1.5", "time" : 1456595814, "bytes": 18505, "packets": 85 }
*/
*/
if(!strcmp(key, "if.name")) snprintf(remote_ifname, sizeof(remote_ifname), "%s", value);
else if(!strcmp(key, "if.ip")) snprintf(remote_ifaddress, sizeof(remote_ifaddress), "%s", value);
else if(!strcmp(key, "if.speed")) remote_ifspeed = atol(value);
@ -459,19 +459,19 @@ u_int8_t ParserInterface::parseEvent(char *payload, int payload_size, u_int8_t s
else if(!strcmp(key, "time")) remote_time = atol(value); /* Format 1461424017.299 <sec>.<msec> */
else if(!strcmp(key, "avg.bps")) avg_bps = atol(value);
else if(!strcmp(key, "avg.pps")) avg_pps = atol(value);
/* Move to the next element */
json_object_iter_next(&it);
}
} // while json_object_iter_equal
/* ntop->getTrace()->traceEvent(TRACE_WARNING, "%u/%u", avg_bps, avg_pps); */
/* Process Flow */
iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed,
iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed,
remote_probe_address, remote_probe_public_address,
remote_bytes, remote_pkts, remote_time, avg_pps, avg_bps);
/* Dispose memory */
json_object_put(o);
} else {
@ -625,6 +625,19 @@ u_int8_t ParserInterface::parseFlow(char *payload, int payload_size, u_int8_t so
case OUT_BYTES:
flow.out_bytes = atol(value);
break;
case OOORDER_IN_PKTS:
flow.tcp.ooo_in_pkts = atol(value);
break;
case OOORDER_OUT_PKTS:
flow.tcp.ooo_out_pkts = atol(value);
break;
case RETRANSMITTED_IN_PKTS:
flow.tcp.retr_in_pkts = atol(value);
break;
case RETRANSMITTED_OUT_PKTS:
flow.tcp.retr_out_pkts = atol(value);
break;
/* TODO add lost in/out to nProbe and here */
case FIRST_SWITCHED:
flow.first_switched = atol(value);
break;
@ -812,7 +825,7 @@ u_int8_t ParserInterface::parseCounter(char *payload, int payload_size, u_int8_t
else if(!strcmp(key, "ifOutErrors")) stats.ifOutErrors = atoll(value);
else if(!strcmp(key, "ifPromiscuousMode")) stats.ifPromiscuousMode = (!strcmp(value, "1")) ? true : false;
} /* if */
/* Move to the next element */
json_object_iter_next(&it);
} // while json_object_iter_equal