Fixes for aggregated flows dump on view interfaces

This commit is contained in:
Simone Mainardi 2019-10-30 19:14:50 +01:00
parent 64b3892909
commit 11d7f2aa12
3 changed files with 49 additions and 53 deletions

View file

@ -2705,6 +2705,15 @@ u_int32_t NetworkInterface::periodicStatsUpdateFrequency() const {
/* **************************************************** */
void NetworkInterface::periodicUpdateInitTime(struct timeval *tv) const {
if(!read_from_pcap_dump() || reproducePcapOriginalSpeed())
gettimeofday(tv, NULL);
else
tv->tv_sec = last_pkt_rcvd, tv->tv_usec = 0;
}
/* **************************************************** */
u_int32_t NetworkInterface::getFlowMaxIdle() {
return ntop->getPrefs()->get_pkt_ifaces_flow_max_idle();
}
@ -2719,11 +2728,8 @@ void NetworkInterface::periodicStatsUpdate() {
u_int32_t begin_slot;
periodic_stats_update_user_data_t periodic_stats_update_user_data;
struct timeval tv;
if(!read_from_pcap_dump() || reproducePcapOriginalSpeed())
gettimeofday(&tv, NULL);
else
tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0;
periodicUpdateInitTime(&tv);
periodic_stats_update_user_data.tv = &tv;
if(!checkPeriodicStatsUpdateTime(&tv))
@ -2778,11 +2784,8 @@ void NetworkInterface::periodicStatsUpdate() {
topItemsCommit(&tv);
#ifdef NTOPNG_PRO
if(!isView() && !isViewed()) {
if(is_aggregated_flows_dump_ready()) {
dumpAggregatedFlows(&tv);
}
}
if(is_aggregated_flows_dump_ready())
dumpAggregatedFlows(&tv);
#endif
checkReloadHostsBroadcastDomain();
@ -2879,11 +2882,7 @@ void NetworkInterface::periodicHTStateUpdate(time_t deadline, lua_State* vm) {
macs_hash
};
if(!read_from_pcap_dump() || reproducePcapOriginalSpeed())
gettimeofday(&tv, NULL);
else
tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0;
periodicUpdateInitTime(&tv);
periodic_ht_state_update_user_data.acle = NULL,
periodic_ht_state_update_user_data.iface = this,
periodic_ht_state_update_user_data.deadline = deadline,
@ -6857,60 +6856,58 @@ bool NetworkInterface::initFlowDump(u_int8_t num_dump_interfaces) {
if(isFlowDumpDisabled())
return(false);
if(!isViewed()) { /* Flow dump is handled by the view interface in case of 'viewed' interfaces */
#if defined(NTOPNG_PRO) && defined(HAVE_NINDEX)
if(ntop->getPrefs()->do_dump_flows_on_nindex()) {
if(num_dump_interfaces >= NINDEX_MAX_NUM_INTERFACES) {
ntop->getTrace()->traceEvent(TRACE_ERROR,
"nIndex cannot be enabled for %s.", get_name());
ntop->getTrace()->traceEvent(TRACE_ERROR,
"The maximum number of interfaces that can be used with nIndex is %d.",
NINDEX_MAX_NUM_INTERFACES);
ntop->getTrace()->traceEvent(TRACE_ERROR,
"Interface will continue to work without nIndex support.");
} else {
db = new NIndexFlowDB(this);
goto enable_aggregation;
}
if(ntop->getPrefs()->do_dump_flows_on_nindex()) {
if(num_dump_interfaces >= NINDEX_MAX_NUM_INTERFACES) {
ntop->getTrace()->traceEvent(TRACE_ERROR,
"nIndex cannot be enabled for %s.", get_name());
ntop->getTrace()->traceEvent(TRACE_ERROR,
"The maximum number of interfaces that can be used with nIndex is %d.",
NINDEX_MAX_NUM_INTERFACES);
ntop->getTrace()->traceEvent(TRACE_ERROR,
"Interface will continue to work without nIndex support.");
} else {
db = new NIndexFlowDB(this);
goto enable_aggregation;
}
}
#endif
if(db == NULL) {
if(ntop->getPrefs()->do_dump_flows_on_mysql()
|| ntop->getPrefs()->do_read_flows_from_nprobe_mysql()) {
if(db == NULL) {
if(ntop->getPrefs()->do_dump_flows_on_mysql()
|| ntop->getPrefs()->do_read_flows_from_nprobe_mysql()) {
#ifdef NTOPNG_PRO
if(ntop->getPrefs()->is_enterprise_edition()
&& !ntop->getPrefs()->do_read_flows_from_nprobe_mysql()) {
if(ntop->getPrefs()->is_enterprise_edition()
&& !ntop->getPrefs()->do_read_flows_from_nprobe_mysql()) {
#ifdef HAVE_MYSQL
db = new BatchedMySQLDB(this);
db = new BatchedMySQLDB(this);
#endif
#if defined(NTOPNG_PRO) && defined(HAVE_NINDEX)
enable_aggregation:
enable_aggregation:
#endif
aggregated_flows_hash = new AggregatedFlowHash(this,
max_val(4096, ntop->getPrefs()->get_max_num_flows()/4) /* num buckets */,
ntop->getPrefs()->get_max_num_flows());
aggregated_flows_hash = new AggregatedFlowHash(this,
max_val(4096, ntop->getPrefs()->get_max_num_flows()/4) /* num buckets */,
ntop->getPrefs()->get_max_num_flows());
ntop->getPrefs()->enable_flow_aggregation();
} else
aggregated_flows_hash = NULL;
ntop->getPrefs()->enable_flow_aggregation();
} else
aggregated_flows_hash = NULL;
#endif
#ifdef HAVE_MYSQL
if(db == NULL)
db = new (std::nothrow) MySQLDB(this);
if(db == NULL)
db = new (std::nothrow) MySQLDB(this);
#endif
if(!db) throw "Not enough memory";
}
#ifndef HAVE_NEDGE
else if(ntop->getPrefs()->do_dump_flows_on_es())
db = new ElasticSearch(this);
else if(ntop->getPrefs()->do_dump_flows_on_ls())
db = new Logstash(this);
#endif
if(!db) throw "Not enough memory";
}
#ifndef HAVE_NEDGE
else if(ntop->getPrefs()->do_dump_flows_on_es())
db = new ElasticSearch(this);
else if(ntop->getPrefs()->do_dump_flows_on_ls())
db = new Logstash(this);
#endif
}
return(db != NULL);