Decouples periodic hash table updates using a thread pool

This commit is contained in:
Simone Mainardi 2019-10-27 17:40:07 +01:00
parent 6b60324544
commit 68246efd01
16 changed files with 212 additions and 157 deletions

View file

@ -2659,16 +2659,12 @@ static bool perform_quick_update(const struct timeval *tv, GenericHashEntry *ghe
static bool flow_update_hosts_stats(GenericHashEntry *node,
void *user_data, bool *matched) {
Flow *flow = (Flow*)node;
update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data;
struct timeval *tv = update_stats_user_data->tv;
periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data;
struct timeval *tv = periodic_stats_update_user_data->tv;
bool quick_update = perform_quick_update(tv, node);
flow->periodic_dump_check(!quick_update, tv);
flow->periodic_hash_entry_state_update(user_data, quick_update);
if(!update_stats_user_data->hash_entry_state_update_only)
flow->update_hosts_stats(update_stats_user_data);
flow->update_hosts_stats(periodic_stats_update_user_data);
*matched = true;
@ -2680,15 +2676,10 @@ static bool flow_update_hosts_stats(GenericHashEntry *node,
/* NOTE: host is not a GenericTrafficElement */
static bool update_hosts_stats(GenericHashEntry *node, void *user_data, bool *matched) {
Host *host = (Host*)node;
update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data;
struct timeval *tv = update_stats_user_data->tv;
bool quick_update = perform_quick_update(tv, node);
periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data;
host->periodic_hash_entry_state_update(user_data, quick_update);
host->checkReloadPrefs();
if(!update_stats_user_data->hash_entry_state_update_only)
host->updateStats(update_stats_user_data);
host->updateStats(periodic_stats_update_user_data);
*matched = true;
@ -2700,13 +2691,10 @@ static bool update_hosts_stats(GenericHashEntry *node, void *user_data, bool *ma
/* NOTE: mac is not a GenericTrafficElement */
static bool update_macs_stats(GenericHashEntry *node, void *user_data, bool *matched) {
Mac *mac = (Mac*)node;
update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data;
struct timeval *tv = update_stats_user_data->tv;
periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data;
struct timeval *tv = periodic_stats_update_user_data->tv;
mac->periodic_hash_entry_state_update(user_data, false);
if(!update_stats_user_data->hash_entry_state_update_only)
mac->updateStats(tv);
mac->updateStats(tv);
*matched = true;
@ -2717,19 +2705,15 @@ static bool update_macs_stats(GenericHashEntry *node, void *user_data, bool *mat
static bool update_generic_element_stats(GenericHashEntry *node, void *user_data, bool *matched) {
GenericTrafficElement *elem;
update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data;
struct timeval *tv = update_stats_user_data->tv;
periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data;
struct timeval *tv = periodic_stats_update_user_data->tv;
node->periodic_hash_entry_state_update(user_data, false);
if((elem = dynamic_cast<GenericTrafficElement*>(node))) {
if(!update_stats_user_data->hash_entry_state_update_only) {
if((elem = dynamic_cast<GenericTrafficElement*>(node))) {
elem->updateStats(tv);
*matched = true;
} else
ntop->getTrace()->traceEvent(TRACE_ERROR, "update_generic_element_stats on non GenericTrafficElement");
}
elem->updateStats(tv);
*matched = true;
} else
ntop->getTrace()->traceEvent(TRACE_ERROR, "update_generic_element_stats on non GenericTrafficElement");
return(false); /* false = keep on walking */
}
@ -2762,86 +2746,48 @@ u_int32_t NetworkInterface::getFlowMaxIdle() {
/* **************************************************** */
// #define PERIODIC_STATS_UPDATE_DEBUG_TIMING
void NetworkInterface::periodicStatsUpdate() {
u_int32_t begin_slot = 0;
periodic_stats_update_user_data_t periodic_stats_update_user_data;
struct timeval tv;
#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING
struct timeval tdebug, tdebug_init;
#endif
update_stats_user_data_t update_stats_user_data;
bool periodic_stats_update;
if(!read_from_pcap_dump() || reproducePcapOriginalSpeed())
gettimeofday(&tv, NULL);
else
tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0;
#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING
gettimeofday(&tdebug, NULL);
memcpy(&tdebug_init, &tdebug, sizeof(tdebug_init));
#endif
/* Check if it is time to also do a periodic stats update or if
only a quick walk on the hash tables to increase purge speedup
is necessary */
periodic_stats_update = checkPeriodicStatsUpdateTime(&tv);
update_stats_user_data.acle = NULL,
update_stats_user_data.hash_entry_state_update_only = !periodic_stats_update;
update_stats_user_data.tv = &tv;
periodic_stats_update_user_data.tv = &tv;
if(!checkPeriodicStatsUpdateTime(&tv))
return; /* Not yet the time to perform an update */
/* View Interfaces don't have flows, they just walk flows of their 'viewed' peers */
if((!isView()) && flows_hash) {
flows_hash->walkIdle(flow_update_hosts_stats, &update_stats_user_data);
if((!isView()) && flows_hash)
walker(&begin_slot, true, walker_flows, flow_update_hosts_stats, &periodic_stats_update_user_data);
if(update_stats_user_data.acle) {
/* Lazy instantiation */
delete update_stats_user_data.acle;
update_stats_user_data.acle = NULL;
}
if(hosts_hash) {
begin_slot = 0;
walker(&begin_slot, true, walker_hosts, update_hosts_stats, &periodic_stats_update_user_data);
}
#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING
ntop->getTrace()->traceEvent(TRACE_NORMAL, "flows_hash->walk took %d seconds", time(NULL) - tdebug.tv_sec);
gettimeofday(&tdebug, NULL);
#endif
if(hosts_hash) {
hosts_hash->walkIdle(update_hosts_stats, &update_stats_user_data);
if(update_stats_user_data.acle) {
/* Lazy instantiation */
delete update_stats_user_data.acle;
update_stats_user_data.acle = NULL;
}
if(ases_hash) {
begin_slot = 0;
walker(&begin_slot, true, walker_ases, update_generic_element_stats, &periodic_stats_update_user_data);
}
#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING
ntop->getTrace()->traceEvent(TRACE_NORMAL, "hosts_hash->walk took %d seconds", time(NULL) - tdebug.tv_sec);
gettimeofday(&tdebug, NULL);
#endif
if(countries_hash) {
begin_slot = 0;
walker(&begin_slot, true, walker_countries, update_generic_element_stats, &periodic_stats_update_user_data);
}
if(ases_hash)
ases_hash->walkIdle(update_generic_element_stats, &update_stats_user_data);
if(vlans_hash && hasSeenVlanTaggedPackets()) {
begin_slot = 0;
walker(&begin_slot, true, walker_vlans, update_generic_element_stats, &periodic_stats_update_user_data);
}
if(countries_hash)
countries_hash->walkIdle(update_generic_element_stats, &update_stats_user_data);
if(vlans_hash && hasSeenVlanTaggedPackets())
vlans_hash->walkIdle(update_generic_element_stats, &update_stats_user_data);
if(macs_hash)
macs_hash->walkIdle(update_macs_stats, &update_stats_user_data);
#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING
ntop->getTrace()->traceEvent(TRACE_NORMAL, "asn/macs/vlan->walk took %d seconds", time(NULL) - tdebug.tv_sec);
gettimeofday(&tdebug, NULL);
#endif
if(!periodic_stats_update)
return; /* Not yet the time to perform an update */
if(macs_hash) {
begin_slot = 0;
walker(&begin_slot, true, walker_macs, update_macs_stats, &periodic_stats_update_user_data);
}
#ifdef NTOPNG_PRO
if(getHostPools()) getHostPools()->checkPoolsStatsReset();
@ -2914,6 +2860,60 @@ void NetworkInterface::periodicStatsUpdate() {
/* **************************************************** */
static bool quick_periodic_ht_state_update(time_t deadline, GenericHashEntry *ghe) {
/* TODO: optimize time(NULL) */
return time(NULL) + 2 >= deadline;
}
/* **************************************************** */
static void generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data) {
periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data;
bool quick_update = quick_periodic_ht_state_update(periodic_ht_state_update_user_data->deadline, node);
node->periodic_hash_entry_state_update(user_data, quick_update);
}
/* **************************************************** */
void NetworkInterface::periodicHTStateUpdate(time_t deadline) {
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL, "updating hash tables [%s]", get_name());
#endif
struct timeval tv;
periodic_ht_state_update_user_data_t periodic_ht_state_update_user_data;
GenericHash *ghs[] = {
!isView() ? flows_hash : NULL, /* View Interfaces don't have flows, they just walk flows of their 'viewed' peers */
hosts_hash,
ases_hash,
countries_hash,
hasSeenVlanTaggedPackets() ? vlans_hash : NULL,
macs_hash
};
if(!read_from_pcap_dump() || reproducePcapOriginalSpeed())
gettimeofday(&tv, NULL);
else
tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0;
periodic_ht_state_update_user_data.acle = NULL,
periodic_ht_state_update_user_data.deadline = deadline,
periodic_ht_state_update_user_data.tv = &tv;
for(u_int i = 0; i < sizeof(ghs) / sizeof(ghs[0]); i++) {
if(ghs[i]) {
ghs[i]->walkIdle(generic_periodic_hash_entry_state_update, &periodic_ht_state_update_user_data);
if(periodic_ht_state_update_user_data.acle) {
delete periodic_ht_state_update_user_data.acle;
periodic_ht_state_update_user_data.acle = NULL;
}
}
}
}
/* **************************************************** */
struct update_host_pool_l7policy {
bool update_pool_id;
bool update_l7policy;