From 92bf42fa0df428a4dc5aa336bbc1e921faadcef2 Mon Sep 17 00:00:00 2001 From: Simone Mainardi Date: Tue, 23 Jul 2019 15:09:17 +0200 Subject: [PATCH] Fixes flow partials for database dumps --- include/Flow.h | 36 ++++++++-------- src/Flow.cpp | 97 ++++++++++++++++++++++++------------------- src/ViewInterface.cpp | 2 +- 3 files changed, 73 insertions(+), 62 deletions(-) diff --git a/include/Flow.h b/include/Flow.h index 31459a91e7..a3979ecf7a 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -165,10 +165,9 @@ class Flow : public GenericHashEntry { /* Counter values at last host update */ struct { - u_int32_t cli2srv_packets, srv2cli_packets; - u_int64_t cli2srv_bytes, srv2cli_bytes; - u_int64_t cli2srv_goodput_bytes, srv2cli_goodput_bytes; - u_int32_t last_dump; + FlowTrafficStats *partial; + FlowTrafficStats delta; + time_t first_seen, last_seen; } last_db_dump; /* Lazily initialized and used by a possible view interface */ @@ -229,7 +228,8 @@ class Flow : public GenericHashEntry { void dumpFlowAlert(); void updateJA3(); const char* cipher_weakness2str(ndpi_cipher_weakness w); - + bool get_partial_traffic_stats(FlowTrafficStats **dst, FlowTrafficStats *delta, bool *first_partial) const; + public: Flow(NetworkInterface *_iface, u_int16_t _vlanId, u_int8_t _protocol, @@ -341,23 +341,21 @@ class Flow : public GenericHashEntry { inline u_int64_t get_packets() const { return(stats.cli2srv_packets+stats.srv2cli_packets); }; inline u_int64_t get_packets_cli2srv() const { return(stats.cli2srv_packets); }; inline u_int64_t get_packets_srv2cli() const { return(stats.srv2cli_packets); }; - inline u_int64_t get_partial_bytes() const { return(get_bytes() - (last_db_dump.cli2srv_bytes+last_db_dump.srv2cli_bytes)); }; - inline u_int64_t get_partial_bytes_cli2srv() const { return(stats.cli2srv_bytes - last_db_dump.cli2srv_bytes); }; - inline u_int64_t get_partial_bytes_srv2cli() const { return(stats.srv2cli_bytes - last_db_dump.srv2cli_bytes); }; - inline u_int64_t get_partial_packets_cli2srv() const { return(stats.cli2srv_packets - last_db_dump.cli2srv_packets); }; - inline u_int64_t get_partial_packets_srv2cli() const { return(stats.srv2cli_packets - last_db_dump.srv2cli_packets); }; - inline u_int64_t get_partial_goodput_bytes() const { return(get_goodput_bytes() - (last_db_dump.cli2srv_goodput_bytes+last_db_dump.srv2cli_goodput_bytes)); }; - inline u_int64_t get_partial_packets() const { return(get_packets() - (last_db_dump.cli2srv_packets+last_db_dump.srv2cli_packets)); }; - bool get_partial_traffic_stats(FlowTrafficStats *fts, bool *first_partial); + inline u_int64_t get_partial_bytes() const { return get_partial_bytes_cli2srv() + get_partial_bytes_srv2cli(); }; + inline u_int64_t get_partial_packets() const { return get_partial_packets_cli2srv() + get_partial_packets_srv2cli(); }; + inline u_int64_t get_partial_goodput_bytes() const { return last_db_dump.delta.cli2srv_goodput_bytes + last_db_dump.delta.srv2cli_goodput_bytes; }; + inline u_int64_t get_partial_bytes_cli2srv() const { return last_db_dump.delta.cli2srv_bytes; }; + inline u_int64_t get_partial_bytes_srv2cli() const { return last_db_dump.delta.srv2cli_bytes; }; + inline u_int64_t get_partial_packets_cli2srv() const { return last_db_dump.delta.cli2srv_packets; }; + inline u_int64_t get_partial_packets_srv2cli() const { return last_db_dump.delta.srv2cli_packets; }; + bool get_partial_traffic_stats_view(FlowTrafficStats *delta, bool *first_partial); + bool update_partial_traffic_stats_db_dump(); inline float get_bytes_thpt() const { return(bytes_thpt); }; inline float get_goodput_bytes_thpt() const { return(goodput_bytes_thpt); }; - - inline time_t get_partial_first_seen() const { return(last_db_dump.last_dump == 0 ? get_first_seen() : last_db_dump.last_dump); }; - inline time_t get_partial_last_seen() const { return(get_last_seen()); }; - inline u_int32_t get_duration() const { return((u_int32_t)(get_last_seen()-get_first_seen())); }; + inline time_t get_partial_first_seen() const { return(last_db_dump.first_seen); }; + inline time_t get_partial_last_seen() const { return(last_db_dump.last_seen); }; + inline u_int32_t get_duration() const { return((u_int32_t)(get_last_seen() - get_first_seen())); }; inline char* get_protocol_name() const { return(Utils::l4proto2name(protocol)); }; - - inline Host* get_cli_host() const { return(cli_host); }; inline Host* get_srv_host() const { return(srv_host); }; inline const IpAddress* get_cli_ip_addr() const { return(cli_ip_addr); }; diff --git a/src/Flow.cpp b/src/Flow.cpp index 9939431539..a21c7c64f1 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -76,13 +76,9 @@ Flow::Flow(NetworkInterface *_iface, cli2srv_last_packets = 0, prev_cli2srv_last_packets = 0, srv2cli_last_packets = 0, prev_srv2cli_last_packets = 0; top_bytes_thpt = 0, top_goodput_bytes_thpt = 0, applLatencyMsec = 0; - last_db_dump.cli2srv_packets = 0, last_db_dump.srv2cli_packets = 0, - last_db_dump.cli2srv_bytes = 0, last_db_dump.srv2cli_bytes = 0, - last_db_dump.cli2srv_goodput_bytes = 0, last_db_dump.srv2cli_goodput_bytes = 0, - last_db_dump.last_dump = 0; - suricata_alert = NULL; + memset(&last_db_dump, 0, sizeof(last_db_dump)); memset(&protos, 0, sizeof(protos)); memset(&flow_device, 0, sizeof(flow_device)); @@ -223,9 +219,10 @@ Flow::~Flow() { else if(srv_ip_addr) /* Dynamically allocated only when srv_host was NULL */ delete srv_ip_addr; - if(last_partial) free(last_partial); - if(json_info) free(json_info); - if(host_server_name) free(host_server_name); + if(last_partial) free(last_partial); + if(last_db_dump.partial) free(last_db_dump.partial); + if(json_info) free(json_info); + if(host_server_name) free(host_server_name); if(cli_ebpf) delete cli_ebpf; if(srv_ebpf) delete srv_ebpf; @@ -922,12 +919,6 @@ bool Flow::dumpFlow(const struct timeval *tv, bool dump_alert) { dumpFlowAlert(); } - /* Check for bytes, and not for packets, as with nprobeagent - there are not packet counters, just bytes. */ - if(((stats.cli2srv_bytes - last_db_dump.cli2srv_bytes) == 0) - && ((stats.srv2cli_bytes - last_db_dump.srv2cli_bytes) == 0)) - return(rc); - if(ntop->getPrefs()->do_dump_flows() #ifndef HAVE_NEDGE || ntop->get_export_interface() @@ -959,17 +950,23 @@ bool Flow::dumpFlow(const struct timeval *tv, bool dump_alert) { } if(!idle()) { - time_t now = time(NULL); - if(iface->getIfType() == interface_type_PCAP_DUMP - || (now - get_first_seen()) < CONST_DB_DUMP_FREQUENCY - || (now - last_db_dump.last_dump) < CONST_DB_DUMP_FREQUENCY) { + || tv->tv_sec - get_first_seen() < CONST_DB_DUMP_FREQUENCY + || tv->tv_sec - get_partial_last_seen() < CONST_DB_DUMP_FREQUENCY) { return(rc); } } else { /* flows idle, i.e., ready to be purged, are always dumped */ } + if(!update_partial_traffic_stats_db_dump()) + return rc; + + /* Check for bytes, and not for packets, as with nprobeagent + there are not packet counters, just bytes. */ + if(!get_partial_bytes()) + return rc; /* Nothing to dump */ + #ifdef NTOPNG_PRO if(ntop->getPro()->has_valid_license() && ntop->getPrefs()->is_enterprise_edition()) getInterface()->aggregatePartialFlow(tv, this); @@ -1475,15 +1472,8 @@ void Flow::update_hosts_stats(struct timeval *tv, bool dump_alert) { if(updated) memcpy(&last_update_time, tv, sizeof(struct timeval)); - if(dumpFlow(tv, dump_alert)) { - last_db_dump.cli2srv_packets = stats.cli2srv_packets, - last_db_dump.srv2cli_packets = stats.srv2cli_packets, - last_db_dump.cli2srv_bytes = stats.cli2srv_bytes, - last_db_dump.cli2srv_goodput_bytes = stats.cli2srv_goodput_bytes, - last_db_dump.srv2cli_bytes = stats.srv2cli_bytes, - last_db_dump.srv2cli_goodput_bytes = stats.srv2cli_goodput_bytes, - last_db_dump.last_dump = last_seen; - } + if(dumpFlow(tv, dump_alert)) + ; } /* *************************************** */ @@ -2222,10 +2212,6 @@ json_object* Flow::flow2json() { char buf[64], jsonbuf[64], *c; time_t t; - if(((stats.cli2srv_packets - last_db_dump.cli2srv_packets) == 0) - && ((stats.srv2cli_packets - last_db_dump.srv2cli_packets) == 0)) - return(NULL); - if((my_object = json_object_new_object()) == NULL) return(NULL); if(ntop->getPrefs()->do_dump_flows_on_es() @@ -2478,14 +2464,14 @@ void Flow::housekeep(time_t t) { /* *************************************** */ -bool Flow::get_partial_traffic_stats(FlowTrafficStats *fts, bool *first_partial) { +bool Flow::get_partial_traffic_stats(FlowTrafficStats **dst, FlowTrafficStats *fts, bool *first_partial) const { FlowTrafficStats tmp; - if(!fts || !first_partial) + if(!fts || !dst) return false; - if(!last_partial) { - if(!(last_partial = (FlowTrafficStats*)calloc(1, sizeof(FlowTrafficStats)))) + if(!*dst) { + if(!(*dst = (FlowTrafficStats*)calloc(1, sizeof(FlowTrafficStats)))) return false; *first_partial = true; } else @@ -2493,14 +2479,41 @@ bool Flow::get_partial_traffic_stats(FlowTrafficStats *fts, bool *first_partial) memcpy(&tmp, &stats, sizeof(stats)); - fts->cli2srv_packets = tmp.cli2srv_packets - last_partial->cli2srv_packets; - fts->srv2cli_packets = tmp.srv2cli_packets - last_partial->srv2cli_packets; - fts->cli2srv_bytes = tmp.cli2srv_bytes - last_partial->cli2srv_bytes; - fts->srv2cli_bytes = tmp.srv2cli_bytes - last_partial->srv2cli_bytes; - fts->cli2srv_goodput_bytes = tmp.cli2srv_goodput_bytes - last_partial->cli2srv_goodput_bytes; - fts->srv2cli_goodput_bytes = tmp.srv2cli_goodput_bytes - last_partial->srv2cli_goodput_bytes; + fts->cli2srv_packets = tmp.cli2srv_packets - (*dst)->cli2srv_packets; + fts->srv2cli_packets = tmp.srv2cli_packets - (*dst)->srv2cli_packets; + fts->cli2srv_bytes = tmp.cli2srv_bytes - (*dst)->cli2srv_bytes; + fts->srv2cli_bytes = tmp.srv2cli_bytes - (*dst)->srv2cli_bytes; + fts->cli2srv_goodput_bytes = tmp.cli2srv_goodput_bytes - (*dst)->cli2srv_goodput_bytes; + fts->srv2cli_goodput_bytes = tmp.srv2cli_goodput_bytes - (*dst)->srv2cli_goodput_bytes; - memcpy(last_partial, &tmp, sizeof(tmp)); + memcpy(*dst, &tmp, sizeof(tmp)); + + return true; +} + +/* *************************************** */ + +bool Flow::get_partial_traffic_stats_view(FlowTrafficStats *fts, bool *first_partial) { + return get_partial_traffic_stats(&last_partial, fts, first_partial); +} + +/* *************************************** */ + +bool Flow::update_partial_traffic_stats_db_dump() { + FlowTrafficStats delta; + bool first_partial; + + if(!get_partial_traffic_stats(&last_db_dump.partial, &delta, &first_partial)) + return false; + + memcpy(&last_db_dump.delta, &delta, sizeof(delta)); + + if(first_partial) + last_db_dump.first_seen = get_first_seen(); + else + last_db_dump.first_seen = last_db_dump.last_seen; + + last_db_dump.last_seen = get_last_seen(); return true; } diff --git a/src/ViewInterface.cpp b/src/ViewInterface.cpp index 15839786c3..0f6083f9a2 100644 --- a/src/ViewInterface.cpp +++ b/src/ViewInterface.cpp @@ -236,7 +236,7 @@ static bool viewed_flows_walker(GenericHashEntry *flow, void *user_data, bool *m bool first_partial; /* Whether this is the first time the view is visiting this flow */ const IpAddress *cli_ip = f->get_cli_ip_addr(), *srv_ip = f->get_srv_ip_addr(); - if(f->get_partial_traffic_stats(&partials, &first_partial)) { + if(f->get_partial_traffic_stats_view(&partials, &first_partial)) { if(!cli_ip || !srv_ip) ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to get flow hosts. Out of memory? Expect issues.");