diff --git a/include/PartializableFlowTrafficStats.h b/include/PartializableFlowTrafficStats.h index 1d119f04b8..d496eaba08 100644 --- a/include/PartializableFlowTrafficStats.h +++ b/include/PartializableFlowTrafficStats.h @@ -33,6 +33,7 @@ class PartializableFlowTrafficStats { u_int64_t cli2srv_goodput_bytes, srv2cli_goodput_bytes; FlowTCPPacketStats cli2srv_tcp_stats, srv2cli_tcp_stats; u_int16_t cli_host_score[MAX_NUM_SCORE_CATEGORIES], srv_host_score[MAX_NUM_SCORE_CATEGORIES]; + bool is_flow_alerted; /* NOTE: only used by view interfaces. Potentially removed in the future after views rework */ union { FlowHTTPStats http; FlowDNSStats dns; @@ -49,6 +50,7 @@ class PartializableFlowTrafficStats { void incTcpStats(bool cli2srv_direction, u_int retr, u_int ooo, u_int lost, u_int keepalive); void incScore(u_int16_t score, ScoreCategory score_category, bool as_client); + void setFlowAlerted(); inline void incHTTPReqPOST() { protos.http.num_post++; }; inline void incHTTPReqPUT() { protos.http.num_put++; }; @@ -96,6 +98,7 @@ class PartializableFlowTrafficStats { inline const u_int16_t get_cli_score(ScoreCategory score_category) const { return cli_host_score[score_category]; }; inline const u_int16_t get_srv_score(ScoreCategory score_category) const { return srv_host_score[score_category]; }; + inline const bool get_is_flow_alerted() const { return is_flow_alerted; }; }; #endif /* _PARTIALIZABLE_FLOW_TRAFFIC_STATS_H_ */ diff --git a/src/Flow.cpp b/src/Flow.cpp index 47ec3db5ac..ebcb6821b0 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -264,7 +264,7 @@ Flow::~Flow() { /* Get client and server hosts. Use unsafe* methods to get the client and server also for 'viewed' interfaces. - For 'viewed' interfaces, host pointers are shared across multiple 'viewed' interfaces and thus they are termed as unsafe. + For 'Viewed' interfaces, host pointers are shared across multiple 'viewed' interfaces and thus they are termed as unsafe. IMPORTANT: only call here methods that are safe (e.g., locked or atomic-ed). @@ -272,6 +272,9 @@ Flow::~Flow() { */ Host *cli_u = getViewSharedClient(), *srv_u = getViewSharedServer(); + if(getInterface()->isViewed()) /* Score decrements done here for 'viewed' interfaces to avoid races. */ + decAllFlowScores(); + if(cli_u) { cli_u->decUses(); /* Decrease the number of uses */ cli_u->decNumFlows(get_last_seen(), true); @@ -1752,11 +1755,6 @@ void Flow::periodic_stats_update(const struct timeval *tv) { } #endif - /* - Check (and possibly enqueue) the flow for processing by a view interface - */ - getInterface()->viewEnqueue(tv->tv_sec, this); - memcpy(&last_update_time, tv, sizeof(struct timeval)); GenericHashEntry::periodic_stats_update(tv); } @@ -2890,8 +2888,16 @@ void Flow::decAllFlowScores() { if(isFlowAlerted()) { iface->decNumAlertedFlows(this, Utils::mapScoreToSeverity(getPredominantAlertScore())); - if(cli_u) cli_u->decNumAlertedFlows(true /* As client */); - if(srv_u) srv_u->decNumAlertedFlows(false /* As server */); + + if(!getInterface()->isViewed() /* Always for non-viewed interfaces (increments are always performed and in the same thread) */ + /* + For viewed interfaces, do the decrement only if previously incremented. + A previous increment can fail when the view flows queue is full and enqueues fail. + */ + || (getViewInterfaceFlowStats() && getViewInterfaceFlowStats()->getPartializableStats()->get_is_flow_alerted())) { + if(cli_u) cli_u->decNumAlertedFlows(true /* As client */); + if(srv_u) srv_u->decNumAlertedFlows(false /* As server */); + } #ifdef ALERTED_FLOWS_DEBUG iface_alert_dec = true; @@ -2945,14 +2951,23 @@ void Flow::housekeep(time_t t) { /* Score decrements MUST be performed here as this is the same thread of callbacks execution where - scores are increased + scores are increased. + NOTE: for view interfaces, decrement are performed in ~Flow to avoid races. */ - decAllFlowScores(); + if(!getInterface()->isViewed()) decAllFlowScores(); break; default: break; } + + /* + Check (and possibly enqueue) the flow for processing by a view interface. + Make sure to enqueue the flow to view interfaces AFTER all housekeeping tasks have been performed. + This guarantees any change set by these operations (e.g., changes in the flow status, flow alerts, etc.) + are done before the flow is propagated to the view. + */ + getInterface()->viewEnqueue(t, this); } /* *************************************** */ @@ -5346,6 +5361,9 @@ void Flow::setNormalToAlertedCounters() { srv_h->incNumAlertedFlows(false /* As server */), srv_h->incTotalAlerts(); + /* Set this into the partializable flow traffic stats as well (necessary for view interfaces) */ + stats.setFlowAlerted(); + #ifdef ALERTED_FLOWS_DEBUG iface_alert_inc = true; #endif diff --git a/src/PartializableFlowTrafficStats.cpp b/src/PartializableFlowTrafficStats.cpp index 81614cd9aa..61b6acdaaf 100644 --- a/src/PartializableFlowTrafficStats.cpp +++ b/src/PartializableFlowTrafficStats.cpp @@ -35,6 +35,8 @@ PartializableFlowTrafficStats::PartializableFlowTrafficStats() { memset(&cli_host_score, 0, sizeof(cli_host_score)); memset(&srv_host_score, 0, sizeof(srv_host_score)); + is_flow_alerted = false; + memset(&protos, 0, sizeof(protos)); } @@ -55,6 +57,8 @@ PartializableFlowTrafficStats::PartializableFlowTrafficStats(const Partializable memcpy(&cli_host_score, &fts.cli_host_score, sizeof(cli_host_score)); memcpy(&srv_host_score, &fts.srv_host_score, sizeof(srv_host_score)); + is_flow_alerted = fts.is_flow_alerted; + memcpy(&protos, &fts.protos, sizeof(protos)); } /* *************************************** */ @@ -93,6 +97,12 @@ PartializableFlowTrafficStats PartializableFlowTrafficStats::operator-(const Par cur.cli_host_score[i] -= fts.cli_host_score[i], cur.srv_host_score[i] -= fts.srv_host_score[i]; + /* + Even though is_flow_alerted is a boolean, we can still use operator -= to keep it consistent with other fields. + Compilers know how to handle the boolean as 0, 1. + */ + cur.is_flow_alerted -= fts.is_flow_alerted; + switch(ndpi_get_lower_proto(ndpiDetectedProtocol)) { case NDPI_PROTOCOL_HTTP: cur.protos.http.num_get -= fts.protos.http.num_get; @@ -153,7 +163,6 @@ void PartializableFlowTrafficStats::incTcpStats(bool cli2srv_direction, u_int re cur_stats->pktLost += lost; } - /* *************************************** */ void PartializableFlowTrafficStats::incScore(u_int16_t score, ScoreCategory score_category, bool as_client) { @@ -164,6 +173,12 @@ void PartializableFlowTrafficStats::incScore(u_int16_t score, ScoreCategory scor /* *************************************** */ +void PartializableFlowTrafficStats::setFlowAlerted() { + is_flow_alerted = true; +} + +/* *************************************** */ + void PartializableFlowTrafficStats::incStats(bool cli2srv_direction, u_int num_pkts, u_int pkt_len, u_int payload_len) { if(cli2srv_direction) cli2srv_packets += num_pkts, cli2srv_bytes += pkt_len, cli2srv_goodput_bytes += payload_len; diff --git a/src/ViewInterface.cpp b/src/ViewInterface.cpp index 66e7571e82..a3be1bd3de 100644 --- a/src/ViewInterface.cpp +++ b/src/ViewInterface.cpp @@ -510,6 +510,11 @@ void ViewInterface::viewed_flows_walker(Flow *f, const struct timeval *tv) { srv_host->incScoreValue(srv_score_val, score_category, false /* as server */); } + if(partials.get_is_flow_alerted()) { + if(cli_host) cli_host->incNumAlertedFlows(true /* As client */), cli_host->incTotalAlerts(); + if(srv_host) srv_host->incNumAlertedFlows(false /* As server */), srv_host->incTotalAlerts(); + } + incStats(true /* ingressPacket */, tv->tv_sec, cli_ip && cli_ip->isIPv4() ? ETHERTYPE_IP : ETHERTYPE_IPV6, f->getStatsProtocol(), f->get_protocol_category(),