From 68246efd01ff2df67ea7e4387186abcbd4b3b613 Mon Sep 17 00:00:00 2001 From: Simone Mainardi Date: Sun, 27 Oct 2019 17:40:07 +0100 Subject: [PATCH] Decouples periodic hash table updates using a thread pool --- include/Flow.h | 2 +- include/GenericHash.h | 2 +- include/Host.h | 2 +- include/NetworkInterface.h | 1 + include/ntop_defines.h | 26 +-- include/ntop_typedefs.h | 8 +- .../callbacks/interface/flow/blacklisted.lua | 8 + .../callbacks/interface/ht_state_update.lua | 19 ++ src/Flow.cpp | 28 +-- src/GenericHash.cpp | 7 +- src/Host.cpp | 12 +- src/LuaEngine.cpp | 21 +- src/NetworkInterface.cpp | 190 +++++++++--------- src/PeriodicActivities.cpp | 19 +- src/ThreadedActivity.cpp | 20 +- src/ViewInterface.cpp | 4 +- 16 files changed, 212 insertions(+), 157 deletions(-) create mode 100644 scripts/callbacks/interface/ht_state_update.lua diff --git a/include/Flow.h b/include/Flow.h index 5860933aa5..65581e946d 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -449,7 +449,7 @@ class Flow : public GenericHashEntry { u_int get_hash_entry_id() const; char* print(char *buf, u_int buf_len) const; - void update_hosts_stats(update_stats_user_data_t *update_flows_stats_user_data); + void update_hosts_stats(periodic_stats_update_user_data_t *periodic_stats_update_user_data); void periodic_dump_check(bool dump_alert, const struct timeval *tv); u_int32_t key(); diff --git a/include/GenericHash.h b/include/GenericHash.h index 370b446804..1dc4ed4538 100644 --- a/include/GenericHash.h +++ b/include/GenericHash.h @@ -117,7 +117,7 @@ class GenericHash { * @param walker A pointer to the comparison function. * @param user_data Value to be compared with the values of hash. */ - void walkIdle(bool (*walker)(GenericHashEntry *h, void *user_data, bool *entryMatched), void *user_data); + void walkIdle(void (*walker)(GenericHashEntry *h, void *user_data), void *user_data); /** * @brief Purge idle hash entries. diff --git a/include/Host.h b/include/Host.h index ebb607ec9a..2ce747ed77 100644 --- a/include/Host.h +++ b/include/Host.h @@ -131,7 +131,7 @@ class Host : public GenericHashEntry, public AlertableEntity { }; virtual HostStats* allocateStats() { return(new HostStats(this)); }; - void updateStats(update_stats_user_data_t *update_hosts_stats_user_data); + void updateStats(periodic_stats_update_user_data_t *periodic_stats_update_user_data); void incLowGoodputFlows(time_t t, bool asClient); void decLowGoodputFlows(time_t t, bool asClient); inline void incNumAnomalousFlows(bool asClient) { stats->incNumAnomalousFlows(asClient); }; diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index 236558dcf0..bbe9e26314 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -455,6 +455,7 @@ class NetworkInterface : public AlertableEntity { void getActiveFlowsStats(nDPIStats *stats, FlowStats *status_stats, AddressTree *allowed_hosts, Host *h, Paginator *p); virtual u_int32_t periodicStatsUpdateFrequency(); void periodicStatsUpdate(); + void periodicHTStateUpdate(time_t deadline); virtual u_int32_t getFlowMaxIdle(); virtual void lua(lua_State* vm); void lua_hash_tables_stats(lua_State* vm); diff --git a/include/ntop_defines.h b/include/ntop_defines.h index 9b9653204c..41c812025b 100644 --- a/include/ntop_defines.h +++ b/include/ntop_defines.h @@ -843,18 +843,19 @@ #define CONST_MAX_NUM_THREADED_ACTIVITIES 64 #define STARTUP_SCRIPT_PATH "startup.lua" #define BOOT_SCRIPT_PATH "boot.lua" /* Executed as root before networking is setup */ -#define SHUTDOWN_SCRIPT_PATH "shutdown.lua" -#define HOUSEKEEPING_SCRIPT_PATH "housekeeping.lua" -#define DISCOVER_SCRIPT_PATH "discover.lua" -#define TIMESERIES_SCRIPT_PATH "timeseries.lua" -#define UPGRADE_SCRIPT_PATH "upgrade.lua" -#define PINGER_SCRIPT_PATH "pinger.lua" -#define SECOND_SCRIPT_PATH "second.lua" -#define MINUTE_SCRIPT_PATH "minute.lua" -#define THIRTY_SECONDS_SCRIPT_PATH "30sec.lua" -#define FIVE_MINUTES_SCRIPT_PATH "5min.lua" -#define HOURLY_SCRIPT_PATH "hourly.lua" -#define DAILY_SCRIPT_PATH "daily.lua" +#define SHUTDOWN_SCRIPT_PATH "shutdown.lua" +#define HOUSEKEEPING_SCRIPT_PATH "housekeeping.lua" +#define DISCOVER_SCRIPT_PATH "discover.lua" +#define TIMESERIES_SCRIPT_PATH "timeseries.lua" +#define UPGRADE_SCRIPT_PATH "upgrade.lua" +#define PINGER_SCRIPT_PATH "pinger.lua" +#define SECOND_SCRIPT_PATH "second.lua" +#define MINUTE_SCRIPT_PATH "minute.lua" +#define HT_STATE_UPDATE_SCRIPT_PATH "ht_state_update.lua" +#define THIRTY_SECONDS_SCRIPT_PATH "30sec.lua" +#define FIVE_MINUTES_SCRIPT_PATH "5min.lua" +#define HOURLY_SCRIPT_PATH "hourly.lua" +#define DAILY_SCRIPT_PATH "daily.lua" #define SYSLOG_SCRIPT_PATH "callbacks/syslog.lua" #define SYSLOG_SCRIPTS_PATH "callbacks/syslog" @@ -1017,7 +1018,6 @@ extern struct ntopngLuaContext* getUserdata(struct lua_State *vm); #define MAX_THREAD_POOL_SIZE 5 #endif -#define MIN_TIME_SPAWN_THREAD_POOL 10 /* sec */ #define DONT_NOT_EXPIRE_BEFORE_SEC 15 /* sec */ #define MAX_NDPI_IDLE_TIME_BEFORE_GUESS 5 /* sec */ #define MAX_NUM_PCAP_CAPTURES 4 diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index 668ae2c4bd..95e685fc4f 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -58,8 +58,12 @@ class AlertCheckLuaEngine; typedef struct { AlertCheckLuaEngine *acle; struct timeval *tv; - bool hash_entry_state_update_only; -} update_stats_user_data_t; + time_t deadline; +} periodic_ht_state_update_user_data_t; + +typedef struct { + struct timeval *tv; +} periodic_stats_update_user_data_t; /* Keep in sync with alert_consts.alerts_granularities and Utils */ typedef enum { diff --git a/scripts/callbacks/interface/flow/blacklisted.lua b/scripts/callbacks/interface/flow/blacklisted.lua index ceb89d8e0a..2f7ff3230f 100644 --- a/scripts/callbacks/interface/flow/blacklisted.lua +++ b/scripts/callbacks/interface/flow/blacklisted.lua @@ -23,9 +23,17 @@ local script = { -- ################################################################# function script.hooks.protocolDetected(params) +-- tprint("begin...") + + local a = 0 + -- for i=1,800000000 do + -- a = a + 1 + -- end + if flow.isBlacklisted() then flow.triggerStatus(flow_consts.status_types.status_blacklisted.status_id, flow.getBlacklistedInfo()) end +-- tprint("...done") end -- ################################################################# diff --git a/scripts/callbacks/interface/ht_state_update.lua b/scripts/callbacks/interface/ht_state_update.lua new file mode 100644 index 0000000000..c0977feec3 --- /dev/null +++ b/scripts/callbacks/interface/ht_state_update.lua @@ -0,0 +1,19 @@ +-- +-- (C) 2013 - ntop.org +-- + +local dirs = ntop.getDirs() +package.path = dirs.installdir .. "/scripts/lua/modules/?.lua;" .. package.path + +require "lua_utils" +-- Keep it in sync with HT_STATE_UPDATE_SCRIPT_PATH periodicity in PeriodicActivities.cpp +-- that is, with the frequency of execution of this script. +local HT_STATE_UPDATE_FREQ = 5 + +-- ######################################################## + +local deadline = os.time() + HT_STATE_UPDATE_FREQ +interface.periodicHTStateUpdate(deadline) + +-- ######################################################## + diff --git a/src/Flow.cpp b/src/Flow.cpp index e018788693..455572bf6e 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -1039,8 +1039,8 @@ void Flow::incFlowDroppedCounters() { /* *************************************** */ -void Flow::update_hosts_stats(update_stats_user_data_t *update_flows_stats_user_data) { - struct timeval *tv = update_flows_stats_user_data->tv; +void Flow::update_hosts_stats(periodic_stats_update_user_data_t *periodic_stats_update_user_data) { + struct timeval *tv = periodic_stats_update_user_data->tv; u_int64_t sent_packets, sent_bytes, sent_goodput_bytes, rcvd_packets, rcvd_bytes, rcvd_goodput_bytes; u_int64_t diff_sent_packets, diff_sent_bytes, diff_sent_goodput_bytes, diff_rcvd_packets, diff_rcvd_bytes, diff_rcvd_goodput_bytes; @@ -1105,12 +1105,6 @@ void Flow::update_hosts_stats(update_stats_user_data_t *update_flows_stats_user_ update_flow_port_stats = false; } - /* For pcap-dump interface, the lua method idle is executed when there are no - more packets left in the pcap file. There's no risk to call this twice - as flows never for idle for pcap dump interfaces. */ - if(iface->read_from_pcap_dump() && iface->read_from_pcap_dump_done()) - performLuaCall(flow_lua_call_idle, tv, &update_flows_stats_user_data->acle); - stats_protocol = getStatsProtocol(); sent_packets = stats.cli2srv_packets, sent_bytes = stats.cli2srv_bytes, sent_goodput_bytes = stats.cli2srv_goodput_bytes; @@ -1955,8 +1949,16 @@ bool Flow::is_hash_entry_state_idle_transition_ready() const { /* *************************************** */ void Flow::periodic_hash_entry_state_update(void *user_data, bool quick) { - update_stats_user_data_t *update_flows_stats_user_data = (update_stats_user_data_t*)user_data; - struct timeval *tv = update_flows_stats_user_data->tv; + periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; + struct timeval *tv = periodic_ht_state_update_user_data->tv; + +#if 0 + char buf[256]; + + if(quick) { + ntop->getTrace()->traceEvent(TRACE_NORMAL, "Deadline approaching, quick update [%s][%s]", getInterface()->get_name(), print(buf, sizeof(buf))); + } +#endif switch(get_state()) { case hash_entry_state_allocated: @@ -1965,12 +1967,12 @@ void Flow::periodic_hash_entry_state_update(void *user_data, bool quick) { break; case hash_entry_state_flow_protocoldetected: - if(!quick) performLuaCall(flow_lua_call_protocol_detected, tv, &update_flows_stats_user_data->acle); + if(!quick) performLuaCall(flow_lua_call_protocol_detected, tv, &periodic_ht_state_update_user_data->acle); set_hash_entry_state_active(); break; case hash_entry_state_active: - if(!quick) performLuaCall(flow_lua_call_periodic_update, tv, &update_flows_stats_user_data->acle); + if(!quick) performLuaCall(flow_lua_call_periodic_update, tv, &periodic_ht_state_update_user_data->acle); /* Don't change state: purgeIdle() will do */ break; @@ -1985,7 +1987,7 @@ void Flow::periodic_hash_entry_state_update(void *user_data, bool quick) { } postFlowSetIdle(tv->tv_sec); - if(!quick) performLuaCall(flow_lua_call_idle, tv, &update_flows_stats_user_data->acle); + if(!quick) performLuaCall(flow_lua_call_idle, tv, &periodic_ht_state_update_user_data->acle); break; } diff --git a/src/GenericHash.cpp b/src/GenericHash.cpp index 7bbfbaf867..6266dd9aa7 100644 --- a/src/GenericHash.cpp +++ b/src/GenericHash.cpp @@ -116,8 +116,7 @@ bool GenericHash::add(GenericHashEntry *h, bool do_lock) { /* ************************************ */ -void GenericHash::walkIdle(bool (*walker)(GenericHashEntry *h, void *user_data, bool *entryMatched), void *user_data) { - bool matched = false; +void GenericHash::walkIdle(void (*walker)(GenericHashEntry *h, void *user_data), void *user_data) { vector *cur_idle = NULL; if(idle_entries) { @@ -128,7 +127,7 @@ void GenericHash::walkIdle(bool (*walker)(GenericHashEntry *h, void *user_data, if(cur_idle) { if(!cur_idle->empty()) { for(vector::const_iterator it = cur_idle->begin(); it != cur_idle->end(); ++it) { - walker(*it, user_data, &matched); + walker(*it, user_data); delete *it; entry_state_transition_counters.num_purged++; } @@ -151,7 +150,7 @@ void GenericHash::walkIdle(bool (*walker)(GenericHashEntry *h, void *user_data, ntop->getTrace()->traceEvent(TRACE_ERROR, "Unexpected idle state found [%u]", head->get_state()); if(!head->idle()) - walker(head, user_data, &matched); + walker(head, user_data); head = next; } /* while */ diff --git a/src/Host.cpp b/src/Host.cpp index 0aff2bdd42..ce973f7871 100644 --- a/src/Host.cpp +++ b/src/Host.cpp @@ -816,7 +816,7 @@ bool Host::is_hash_entry_state_idle_transition_ready() const { void Host::periodic_hash_entry_state_update(void *user_data, bool quick) { char buf[64]; - update_stats_user_data_t *update_hosts_stats_user_data = (update_stats_user_data_t*)user_data; + periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; if(get_state() == hash_entry_state_idle) { if(getUses() > 0 && !ntop->getGlobals()->isShutdownRequested()) @@ -824,10 +824,10 @@ void Host::periodic_hash_entry_state_update(void *user_data, bool quick) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Internal error: num_uses=%u [%s]", getUses(), get_ip()->print(buf, sizeof(buf))); if(getNumTriggeredAlerts() - && (update_hosts_stats_user_data->acle - || (update_hosts_stats_user_data->acle = new (std::nothrow) AlertCheckLuaEngine(alert_entity_host, minute_script /* doesn't matter */, iface))) + && (periodic_ht_state_update_user_data->acle + || (periodic_ht_state_update_user_data->acle = new (std::nothrow) AlertCheckLuaEngine(alert_entity_host, minute_script /* doesn't matter */, iface))) ) { - AlertCheckLuaEngine *acle = update_hosts_stats_user_data->acle; + AlertCheckLuaEngine *acle = periodic_ht_state_update_user_data->acle; lua_State *L = acle->getState(); acle->setHost(this); @@ -1321,8 +1321,8 @@ bool Host::statsResetRequested() { /* *************************************** */ -void Host::updateStats(update_stats_user_data_t *update_hosts_stats_user_data) { - struct timeval *tv = update_hosts_stats_user_data->tv; +void Host::updateStats(periodic_stats_update_user_data_t *periodic_stats_update_user_data) { + struct timeval *tv = periodic_stats_update_user_data->tv; Mac *cur_mac = getMac(); checkDataReset(); diff --git a/src/LuaEngine.cpp b/src/LuaEngine.cpp index 36c86f7bbc..e8a4aca09c 100644 --- a/src/LuaEngine.cpp +++ b/src/LuaEngine.cpp @@ -5850,6 +5850,22 @@ static int ntop_get_interface_hash_tables_stats(lua_State* vm) { /* ****************************************** */ +static int ntop_periodic_ht_state_update(lua_State* vm) { + NetworkInterface *ntop_interface = getCurrentInterface(vm); + time_t deadline; + + if(ntop_lua_check(vm, __FUNCTION__, 1, LUA_TNUMBER) != CONST_LUA_OK || !ntop_interface) + return(CONST_LUA_ERROR); + + deadline = (time_t)lua_tonumber(vm, 1); + ntop_interface->periodicHTStateUpdate(deadline); + lua_pushnil(vm); + + return(CONST_LUA_OK); +} + +/* ****************************************** */ + static int ntop_get_interface_stats(lua_State* vm) { NetworkInterface *ntop_interface = getCurrentInterface(vm); bool get_direction_stats = false; @@ -10322,7 +10338,6 @@ static const luaL_Reg ntop_interface_reg[] = { { "hasEBPF", ntop_interface_has_ebpf }, { "hasHighResTs", ntop_interface_has_high_res_ts }, { "getStats", ntop_get_interface_stats }, - { "getHashTablesStats", ntop_get_interface_hash_tables_stats }, { "getStatsUpdateFreq", ntop_get_interface_stats_update_freq }, { "getInterfaceTimeseries", ntop_get_interface_timeseries }, { "resetCounters", ntop_interface_reset_counters }, @@ -10331,6 +10346,10 @@ static const luaL_Reg ntop_interface_reg[] = { { "resetMacStats", ntop_interface_reset_mac_stats }, { "deleteMacData", ntop_interface_delete_mac_data }, + /* Functions related to the management of the internal hash tables */ + { "getHashTablesStats", ntop_get_interface_hash_tables_stats }, + { "periodicHTStateUpdate", ntop_periodic_ht_state_update }, + #ifndef HAVE_NEDGE { "processFlow", ntop_process_flow }, #endif diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 7b5a989179..c29f91a27c 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -2659,16 +2659,12 @@ static bool perform_quick_update(const struct timeval *tv, GenericHashEntry *ghe static bool flow_update_hosts_stats(GenericHashEntry *node, void *user_data, bool *matched) { Flow *flow = (Flow*)node; - update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data; - struct timeval *tv = update_stats_user_data->tv; + periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data; + struct timeval *tv = periodic_stats_update_user_data->tv; bool quick_update = perform_quick_update(tv, node); flow->periodic_dump_check(!quick_update, tv); - - flow->periodic_hash_entry_state_update(user_data, quick_update); - - if(!update_stats_user_data->hash_entry_state_update_only) - flow->update_hosts_stats(update_stats_user_data); + flow->update_hosts_stats(periodic_stats_update_user_data); *matched = true; @@ -2680,15 +2676,10 @@ static bool flow_update_hosts_stats(GenericHashEntry *node, /* NOTE: host is not a GenericTrafficElement */ static bool update_hosts_stats(GenericHashEntry *node, void *user_data, bool *matched) { Host *host = (Host*)node; - update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data; - struct timeval *tv = update_stats_user_data->tv; - bool quick_update = perform_quick_update(tv, node); + periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data; - host->periodic_hash_entry_state_update(user_data, quick_update); host->checkReloadPrefs(); - - if(!update_stats_user_data->hash_entry_state_update_only) - host->updateStats(update_stats_user_data); + host->updateStats(periodic_stats_update_user_data); *matched = true; @@ -2700,13 +2691,10 @@ static bool update_hosts_stats(GenericHashEntry *node, void *user_data, bool *ma /* NOTE: mac is not a GenericTrafficElement */ static bool update_macs_stats(GenericHashEntry *node, void *user_data, bool *matched) { Mac *mac = (Mac*)node; - update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data; - struct timeval *tv = update_stats_user_data->tv; + periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data; + struct timeval *tv = periodic_stats_update_user_data->tv; - mac->periodic_hash_entry_state_update(user_data, false); - - if(!update_stats_user_data->hash_entry_state_update_only) - mac->updateStats(tv); + mac->updateStats(tv); *matched = true; @@ -2717,19 +2705,15 @@ static bool update_macs_stats(GenericHashEntry *node, void *user_data, bool *mat static bool update_generic_element_stats(GenericHashEntry *node, void *user_data, bool *matched) { GenericTrafficElement *elem; - update_stats_user_data_t *update_stats_user_data = (update_stats_user_data_t*)user_data; - struct timeval *tv = update_stats_user_data->tv; + periodic_stats_update_user_data_t *periodic_stats_update_user_data = (periodic_stats_update_user_data_t*)user_data; + struct timeval *tv = periodic_stats_update_user_data->tv; - node->periodic_hash_entry_state_update(user_data, false); + if((elem = dynamic_cast(node))) { - if(!update_stats_user_data->hash_entry_state_update_only) { - if((elem = dynamic_cast(node))) { - - elem->updateStats(tv); - *matched = true; - } else - ntop->getTrace()->traceEvent(TRACE_ERROR, "update_generic_element_stats on non GenericTrafficElement"); - } + elem->updateStats(tv); + *matched = true; + } else + ntop->getTrace()->traceEvent(TRACE_ERROR, "update_generic_element_stats on non GenericTrafficElement"); return(false); /* false = keep on walking */ } @@ -2762,86 +2746,48 @@ u_int32_t NetworkInterface::getFlowMaxIdle() { /* **************************************************** */ -// #define PERIODIC_STATS_UPDATE_DEBUG_TIMING - void NetworkInterface::periodicStatsUpdate() { + u_int32_t begin_slot = 0; + periodic_stats_update_user_data_t periodic_stats_update_user_data; struct timeval tv; -#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING - struct timeval tdebug, tdebug_init; -#endif - update_stats_user_data_t update_stats_user_data; - - bool periodic_stats_update; - if(!read_from_pcap_dump() || reproducePcapOriginalSpeed()) gettimeofday(&tv, NULL); else tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0; -#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING - gettimeofday(&tdebug, NULL); - memcpy(&tdebug_init, &tdebug, sizeof(tdebug_init)); -#endif - - /* Check if it is time to also do a periodic stats update or if - only a quick walk on the hash tables to increase purge speedup - is necessary */ - periodic_stats_update = checkPeriodicStatsUpdateTime(&tv); - - update_stats_user_data.acle = NULL, - update_stats_user_data.hash_entry_state_update_only = !periodic_stats_update; - update_stats_user_data.tv = &tv; + periodic_stats_update_user_data.tv = &tv; + + if(!checkPeriodicStatsUpdateTime(&tv)) + return; /* Not yet the time to perform an update */ /* View Interfaces don't have flows, they just walk flows of their 'viewed' peers */ - if((!isView()) && flows_hash) { - flows_hash->walkIdle(flow_update_hosts_stats, &update_stats_user_data); + if((!isView()) && flows_hash) + walker(&begin_slot, true, walker_flows, flow_update_hosts_stats, &periodic_stats_update_user_data); - if(update_stats_user_data.acle) { - /* Lazy instantiation */ - delete update_stats_user_data.acle; - update_stats_user_data.acle = NULL; - } + if(hosts_hash) { + begin_slot = 0; + walker(&begin_slot, true, walker_hosts, update_hosts_stats, &periodic_stats_update_user_data); } -#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING - ntop->getTrace()->traceEvent(TRACE_NORMAL, "flows_hash->walk took %d seconds", time(NULL) - tdebug.tv_sec); - gettimeofday(&tdebug, NULL); -#endif - - if(hosts_hash) { - hosts_hash->walkIdle(update_hosts_stats, &update_stats_user_data); - - if(update_stats_user_data.acle) { - /* Lazy instantiation */ - delete update_stats_user_data.acle; - update_stats_user_data.acle = NULL; - } + if(ases_hash) { + begin_slot = 0; + walker(&begin_slot, true, walker_ases, update_generic_element_stats, &periodic_stats_update_user_data); } -#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING - ntop->getTrace()->traceEvent(TRACE_NORMAL, "hosts_hash->walk took %d seconds", time(NULL) - tdebug.tv_sec); - gettimeofday(&tdebug, NULL); -#endif + if(countries_hash) { + begin_slot = 0; + walker(&begin_slot, true, walker_countries, update_generic_element_stats, &periodic_stats_update_user_data); + } - if(ases_hash) - ases_hash->walkIdle(update_generic_element_stats, &update_stats_user_data); + if(vlans_hash && hasSeenVlanTaggedPackets()) { + begin_slot = 0; + walker(&begin_slot, true, walker_vlans, update_generic_element_stats, &periodic_stats_update_user_data); + } - if(countries_hash) - countries_hash->walkIdle(update_generic_element_stats, &update_stats_user_data); - - if(vlans_hash && hasSeenVlanTaggedPackets()) - vlans_hash->walkIdle(update_generic_element_stats, &update_stats_user_data); - - if(macs_hash) - macs_hash->walkIdle(update_macs_stats, &update_stats_user_data); - -#ifdef PERIODIC_STATS_UPDATE_DEBUG_TIMING - ntop->getTrace()->traceEvent(TRACE_NORMAL, "asn/macs/vlan->walk took %d seconds", time(NULL) - tdebug.tv_sec); - gettimeofday(&tdebug, NULL); -#endif - - if(!periodic_stats_update) - return; /* Not yet the time to perform an update */ + if(macs_hash) { + begin_slot = 0; + walker(&begin_slot, true, walker_macs, update_macs_stats, &periodic_stats_update_user_data); + } #ifdef NTOPNG_PRO if(getHostPools()) getHostPools()->checkPoolsStatsReset(); @@ -2914,6 +2860,60 @@ void NetworkInterface::periodicStatsUpdate() { /* **************************************************** */ +static bool quick_periodic_ht_state_update(time_t deadline, GenericHashEntry *ghe) { + /* TODO: optimize time(NULL) */ + return time(NULL) + 2 >= deadline; +} + +/* **************************************************** */ + +static void generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data) { + periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; + bool quick_update = quick_periodic_ht_state_update(periodic_ht_state_update_user_data->deadline, node); + + node->periodic_hash_entry_state_update(user_data, quick_update); +} + +/* **************************************************** */ + +void NetworkInterface::periodicHTStateUpdate(time_t deadline) { +#if 0 + ntop->getTrace()->traceEvent(TRACE_NORMAL, "updating hash tables [%s]", get_name()); +#endif + struct timeval tv; + periodic_ht_state_update_user_data_t periodic_ht_state_update_user_data; + GenericHash *ghs[] = { + !isView() ? flows_hash : NULL, /* View Interfaces don't have flows, they just walk flows of their 'viewed' peers */ + hosts_hash, + ases_hash, + countries_hash, + hasSeenVlanTaggedPackets() ? vlans_hash : NULL, + macs_hash + }; + + if(!read_from_pcap_dump() || reproducePcapOriginalSpeed()) + gettimeofday(&tv, NULL); + else + tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0; + + periodic_ht_state_update_user_data.acle = NULL, + periodic_ht_state_update_user_data.deadline = deadline, + periodic_ht_state_update_user_data.tv = &tv; + + for(u_int i = 0; i < sizeof(ghs) / sizeof(ghs[0]); i++) { + if(ghs[i]) { + ghs[i]->walkIdle(generic_periodic_hash_entry_state_update, &periodic_ht_state_update_user_data); + + if(periodic_ht_state_update_user_data.acle) { + delete periodic_ht_state_update_user_data.acle; + periodic_ht_state_update_user_data.acle = NULL; + } + } + } +} + +/* **************************************************** */ + struct update_host_pool_l7policy { bool update_pool_id; bool update_l7policy; diff --git a/src/PeriodicActivities.cpp b/src/PeriodicActivities.cpp index 4efaf114dc..14ac4405a4 100644 --- a/src/PeriodicActivities.cpp +++ b/src/PeriodicActivities.cpp @@ -97,16 +97,17 @@ void PeriodicActivities::startPeriodicActivitiesLoop() { num_threads = MAX_THREAD_POOL_SIZE; static activity_descr ad[] = { - { SECOND_SCRIPT_PATH, 1, false, 1 }, - { MINUTE_SCRIPT_PATH, 60, false, num_threads }, - { FIVE_MINUTES_SCRIPT_PATH, 300, false, num_threads }, - { HOURLY_SCRIPT_PATH, 3600, false, num_threads }, - { DAILY_SCRIPT_PATH, 86400, true, 1 }, - { HOUSEKEEPING_SCRIPT_PATH, 3, false, 1 }, - { DISCOVER_SCRIPT_PATH, 5, false, 1 }, - { TIMESERIES_SCRIPT_PATH, 5, false, 1 }, + { SECOND_SCRIPT_PATH, 1, false, 1 }, + { HT_STATE_UPDATE_SCRIPT_PATH, 5, false, num_threads }, + { MINUTE_SCRIPT_PATH, 60, false, num_threads }, + { FIVE_MINUTES_SCRIPT_PATH, 300, false, num_threads }, + { HOURLY_SCRIPT_PATH, 3600, false, num_threads }, + { DAILY_SCRIPT_PATH, 86400, true, num_threads }, + { HOUSEKEEPING_SCRIPT_PATH, 3, false, 1 }, + { DISCOVER_SCRIPT_PATH, 5, false, 1 }, + { TIMESERIES_SCRIPT_PATH, 5, false, 1 }, #ifdef HAVE_NEDGE - { PINGER_SCRIPT_PATH, 5, false, 1 }, + { PINGER_SCRIPT_PATH, 5, false, 1 }, #endif { NULL, 0, false} }; diff --git a/src/ThreadedActivity.cpp b/src/ThreadedActivity.cpp index 755f203c45..a99855ff48 100644 --- a/src/ThreadedActivity.cpp +++ b/src/ThreadedActivity.cpp @@ -43,8 +43,8 @@ ThreadedActivity::ThreadedActivity(const char* _path, path = strdup(_path); /* ntop->get_callbacks_dir() */; interfaceTasksRunning = (bool *) calloc(MAX_NUM_DEFINED_INTERFACES, sizeof(bool)); - if(periodicity > MIN_TIME_SPAWN_THREAD_POOL) { - pool = new ThreadPool(thread_pool_size); + if(thread_pool_size > 1) { + pool = new (std::nothrow) ThreadPool(thread_pool_size); if(pool == NULL) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Out of resources"); @@ -113,7 +113,7 @@ bool ThreadedActivity::isInterfaceTaskRunning(NetworkInterface *iface) { void ThreadedActivity::activityBody() { if(periodicity == 0) /* The script is not periodic */ aperiodicActivityBody(); - else if(periodicity <= MIN_TIME_SPAWN_THREAD_POOL) /* Accurate time computation with micro-second-accurate sleep */ + else if(!pool) /* Accurate time computation with micro-second-accurate sleep */ uSecDiffPeriodicActivityBody(); else periodicActivityBody(); @@ -162,7 +162,7 @@ void ThreadedActivity::runScript() { /* Run a script - both periodic and one-shot scripts are called here */ void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface) { LuaEngine *l; - u_long max_duration_ms = periodicity * 1e3; + u_long max_duration_ms =periodicity * 1e3; u_long msec_diff; struct timeval begin, end; @@ -194,9 +194,11 @@ void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface) { msec_diff = (end.tv_sec - begin.tv_sec) * 1000 + (end.tv_usec - begin.tv_usec) / 1000; - ntop->getTrace()->traceEvent(TRACE_INFO, - "[PeriodicActivity] %s: completed in %u/%u ms [%s]", path, msec_diff, max_duration_ms, - (((max_duration_ms > 0) && (msec_diff > max_duration_ms)) ? "SLOW" : "OK")); +#if 0 + ntop->getTrace()->traceEvent(TRACE_NORMAL, + "[PeriodicActivity][%s][%s]: completed in %u/%u ms [%s]", iface->get_name(), path, msec_diff, max_duration_ms, + (((max_duration_ms > 0) && (msec_diff > max_duration_ms)) ? "SLOW" : "OK")); +#endif if((max_duration_ms > 0) && (msec_diff > 2*max_duration_ms) && @@ -326,9 +328,11 @@ void ThreadedActivity::schedulePeriodicActivity(ThreadPool *pool) { && !isInterfaceTaskRunning(iface)) { pool->queueJob(this, script_path, iface); setInterfaceTaskRunning(iface, true); + #ifdef THREAD_DEBUG - ntop->getTrace()->traceEvent(TRACE_NORMAL, "Queued interface job %s", script_path); + ntop->getTrace()->traceEvent(TRACE_NORMAL, "Queued interface job %s [%s]", script_path, iface->get_name()); #endif + } } } diff --git a/src/ViewInterface.cpp b/src/ViewInterface.cpp index 0ef053d250..256fd1dbd7 100644 --- a/src/ViewInterface.cpp +++ b/src/ViewInterface.cpp @@ -241,7 +241,7 @@ typedef struct { /* **************************************************** */ -static bool viewed_flows_walker(GenericHashEntry *flow, void *user_data, bool *matched) { +static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { viewed_flows_walker_user_data_t *viewed_flows_walker_user_data = (viewed_flows_walker_user_data_t*)user_data; ViewInterface *iface = viewed_flows_walker_user_data->iface; const struct timeval *tv = &viewed_flows_walker_user_data->tv; @@ -320,8 +320,6 @@ static bool viewed_flows_walker(GenericHashEntry *flow, void *user_data, bool *m partials.tcp_stats_d2s.pktLost, partials.tcp_stats_d2s.pktKeepAlive); } } - - return false; /* Move on to the next flow, keep walking */ } /* **************************************************** */