diff --git a/include/Flow.h b/include/Flow.h index 31a62c787b..190143793b 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -245,7 +245,7 @@ class Flow : public GenericHashEntry { const struct timeval *tv, u_int64_t diff_sent_packets, u_int64_t diff_sent_bytes, u_int64_t diff_rcvd_packets, u_int64_t diff_rcvd_bytes) const; - void periodic_dump_check(const struct timeval *tv, bool no_time_left); + void dumpCheck(const struct timeval *tv, bool no_time_left, bool last_dump_before_free); void updateCliJA3(); void updateSrvJA3(); void updateHASSH(bool as_client); @@ -542,7 +542,7 @@ class Flow : public GenericHashEntry { const ICMPinfo * const icmp_info, bool *src2srv_direction) const; void sumStats(nDPIStats *ndpi_stats, FlowStats *stats); - bool dumpFlow(const struct timeval *tv, NetworkInterface *dumper, bool no_time_left); + bool dumpFlow(const struct timeval *tv, NetworkInterface *dumper, bool no_time_left, bool last_dump_before_free); bool match(AddressTree *ptree); void dissectHTTP(bool src2dst_direction, char *payload, u_int16_t payload_len); void dissectDNS(bool src2dst_direction, char *payload, u_int16_t payload_len); @@ -740,6 +740,10 @@ class Flow : public GenericHashEntry { custom_flow_info = strdup(what); } } + + inline bool timeToPeriodicDump(u_int sec) { + return((sec - get_first_seen() < CONST_DB_DUMP_FREQUENCY) || (sec - get_partial_last_seen() < CONST_DB_DUMP_FREQUENCY)); + } }; #endif /* _FLOW_H_ */ diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index c5977f197b..fd127f441c 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -831,7 +831,7 @@ class NetworkInterface : public AlertableEntity { void updateFlowPeriodicity(Flow *f); #endif - + inline void incNumQueueDroppedFlows(u_int32_t num) { if(db) db->incNumQueueDroppedFlows(num); }; }; #endif /* _NETWORK_INTERFACE_H_ */ diff --git a/src/Flow.cpp b/src/Flow.cpp index 7fcc5e3a56..ba8b0ab3d0 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -1154,67 +1154,57 @@ char* Flow::print(char *buf, u_int buf_len) const { /* *************************************** */ -bool Flow::dumpFlow(const struct timeval *tv, NetworkInterface *dumper_iface, bool no_time_left) { +bool Flow::dumpFlow(const struct timeval *tv, NetworkInterface *dumper_iface, + bool no_time_left, bool last_dump_before_free) { bool rc = false; - - if(ntop->getPrefs()->is_runtime_flows_dump_enabled() /* Check if dump has been disabled at runtime from a UI preference */ - && (ntop->getPrefs()->do_dump_flows() /* This check if flows dump has been statically enabled from the CLI */ -#ifndef HAVE_NEDGE - || ntop->get_export_interface() -#endif - )) { - if(!ntop->getPrefs()->is_tiny_flows_export_enabled() && isTiny()) { + + if(!ntop->getPrefs()->is_tiny_flows_export_enabled() && isTiny()) { #ifdef TINY_FLOWS_DEBUG - ntop->getTrace()->traceEvent(TRACE_NORMAL, - "Skipping tiny flow dump " - "[flow key: %u]" - "[packets current/max: %i/%i] " - "[bytes current/max: %i/%i].", - key(), - get_packets(), - ntop->getPrefs()->get_max_num_packets_per_tiny_flow(), - get_bytes(), - ntop->getPrefs()->get_max_num_bytes_per_tiny_flow()); + ntop->getTrace()->traceEvent(TRACE_NORMAL, + "Skipping tiny flow dump " + "[flow key: %u]" + "[packets current/max: %i/%i] " + "[bytes current/max: %i/%i].", + key(), + get_packets(), + ntop->getPrefs()->get_max_num_packets_per_tiny_flow(), + get_bytes(), + ntop->getPrefs()->get_max_num_bytes_per_tiny_flow()); #endif - return(rc); - } - - if(!idle()) { - if((dumper_iface->getIfType() == interface_type_PCAP_DUMP && !dumper_iface->read_from_pcap_dump_done()) - || tv->tv_sec - get_first_seen() < CONST_DB_DUMP_FREQUENCY - || tv->tv_sec - get_partial_last_seen() < CONST_DB_DUMP_FREQUENCY) { - return(rc); - } - } else { - /* Flow idle, i.e., ready to be purged, are always dumped */ - } - - if(!update_partial_traffic_stats_db_dump()) - return(rc); /* Partial stats update has failed */ - - /* Check for bytes, and not for packets, as with nprobeagent - there are not packet counters, just bytes. */ - if(!get_partial_bytes()) - return(rc); /* Nothing to dump */ - - dumper_iface->dumpFlow(last_seen, this, no_time_left); - -#ifndef HAVE_NEDGE - if(ntop->get_export_interface()) { - char *json = serialize(false); - - if(json) { - ntop->get_export_interface()->export_data(json); - free(json); - } - } -#endif - - rc = true; + return(rc); } - return(rc); + if(!last_dump_before_free) { + if((dumper_iface->getIfType() == interface_type_PCAP_DUMP + && (!dumper_iface->read_from_pcap_dump_done())) + || timeToPeriodicDump(tv->tv_sec)) { + return(rc); /* Don't call too often periodic flow dump */ + } + } + + if(!update_partial_traffic_stats_db_dump()) + return(rc); /* Partial stats update has failed */ + + /* Check for bytes, and not for packets, as with nprobeagent + there are not packet counters, just bytes. */ + if(!get_partial_bytes()) + return(rc); /* Nothing to dump */ + + dumper_iface->dumpFlow(last_seen, this, no_time_left); + +#ifndef HAVE_NEDGE + if(ntop->get_export_interface()) { + char *json = serialize(false); + + if(json) { + ntop->get_export_interface()->export_data(json); + free(json); + } + } +#endif + + return(true); } /* *************************************** */ @@ -1704,12 +1694,33 @@ void Flow::periodic_stats_update(const struct timeval *tv) { /* *************************************** */ -void Flow::periodic_dump_check(const struct timeval *tv, bool no_time_left) { - if(no_time_left) return; - - /* Viewed interfaces don't dump flows, their flows are dumped by the overlying ViewInterface. - ViewInterface dump their flows in another thread, not this one. */ - dumpFlow(tv, iface->isViewed() ? iface->viewedBy() : iface, no_time_left); +void Flow::dumpCheck(const struct timeval *tv, bool no_time_left, bool last_dump_before_free) { + if(ntop->getPrefs()->is_runtime_flows_dump_enabled() /* Check if dump has been disabled at runtime from a UI preference */ + && (ntop->getPrefs()->do_dump_flows() /* This check if flows dump has been statically enabled from the CLI */ +#ifndef HAVE_NEDGE + || ntop->get_export_interface() +#endif + )) { + /* + Viewed interfaces don't dump flows, their flows are dumped by the overlying ViewInterface. + ViewInterface dump their flows in another thread, not this one. + */ + NetworkInterface *d_if = iface->isViewed() ? iface->viewedBy() : iface; + + if(no_time_left) { + if(last_dump_before_free) { + /* + There is no time to dump the flow, however this is not yet + lost unless it is in the idle state (active flows will be + dumped in the next iteration + */ + + d_if->incDBNumDroppedFlows(1); + } + return; + } else + dumpFlow(tv, d_if, no_time_left /* false */, last_dump_before_free); + } } /* *************************************** */ @@ -2261,13 +2272,13 @@ void Flow::periodic_hash_entry_state_update(void *user_data) { case hash_entry_state_active: if(next_lua_call_periodic_update == 0) next_lua_call_periodic_update = tv->tv_sec + FLOW_LUA_CALL_PERIODIC_UPDATE_SECS; - periodic_dump_check(tv, htstats->no_time_left); /* TODO: move it in a lua script; NOTE: this call can take a long time! */ + dumpCheck(tv, htstats->no_time_left, false); /* Don't change state: purgeIdle() will do */ break; case hash_entry_state_idle: postFlowSetIdle(tv); - periodic_dump_check(tv, htstats->no_time_left); /* TODO: move it in a lua script; NOTE: this call can take a long time! */ + dumpCheck(tv, htstats->no_time_left, true); break; } diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 8974ee28a6..72bfac3478 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -581,26 +581,6 @@ NetworkInterface::~NetworkInterface() { /* **************************************************** */ int NetworkInterface::dumpFlow(time_t when, Flow *f, bool no_time_left) { - int rc = -1; -#ifndef HAVE_NEDGE - char *json = NULL; - bool dump_json = true; - bool use_labels = ntop->getPrefs()->do_dump_flows_on_es() || - ntop->getPrefs()->do_dump_flows_on_ls(); - - if(!db) - return(-1); - -#if defined(NTOPNG_PRO) && defined(HAVE_NINDEX) - if(ntop->getPrefs()->do_dump_flows_on_nindex() && - !ntop->getPrefs()->do_dump_json_flows_on_disk()) { - /* JSON is not generated in case of nindex dump for - * performance reason (it actually contains duplicated - * information which are useless) */ - dump_json = false; - } -#endif - if(no_time_left) { /* There is no time to dump the flow, however this is not yet * lost unless it is in the idle state (active flows will be @@ -609,22 +589,41 @@ int NetworkInterface::dumpFlow(time_t when, Flow *f, bool no_time_left) { db->incNumDroppedFlows(1); return(-1); - } + } else { + int rc = -1; +#ifndef HAVE_NEDGE + char *json = NULL; + bool dump_json = true; + bool use_labels = ntop->getPrefs()->do_dump_flows_on_es() || ntop->getPrefs()->do_dump_flows_on_ls(); - if(dump_json) { - json = f->serialize(use_labels); - - if(json == NULL) + if(!db) return(-1); - } - rc = db->dumpFlow(when, f, json); - - if(json != NULL) - free(json); +#if defined(NTOPNG_PRO) && defined(HAVE_NINDEX) + if(ntop->getPrefs()->do_dump_flows_on_nindex() && + !ntop->getPrefs()->do_dump_json_flows_on_disk()) { + /* JSON is not generated in case of nindex dump for + * performance reason (it actually contains duplicated + * information which are useless) */ + dump_json = false; + } #endif - return(rc); + if(dump_json) { + json = f->serialize(use_labels); + + if(json == NULL) + return(-1); + } + + rc = db->dumpFlow(when, f, json); /* Finally dump this flow */ + + if(json != NULL) + free(json); +#endif + + return(rc); + } } /* **************************************************** */ @@ -7647,3 +7646,5 @@ void NetworkInterface::updateFlowPeriodicity(Flow *f) { if(pHash && f) pHash->updateElement(f, f->get_first_seen()); } #endif + +/* *************************************** */