diff --git a/include/Flow.h b/include/Flow.h index fd204db46e..8b4bb5c495 100644 --- a/include/Flow.h +++ b/include/Flow.h @@ -74,11 +74,6 @@ class Flow : public GenericHashEntry { u_int hash_entry_id; /* Uniquely identify this Flow inside the flows_hash hash table */ - /* When the interface isViewed(), the corresponding view needs to acknowledge the purge - before the flow can actually be deleted from memory. This guarantees the view has - seen the flow until it has become idle. */ - bool purge_acknowledged_mark; - bool detection_completed, protocol_processed, fully_processed, cli2srv_direction, twh_over, twh_ok, dissect_next_http_packet, passVerdict, l7_protocol_guessed, flow_dropped_counts_increased, @@ -444,9 +439,6 @@ class Flow : public GenericHashEntry { bool is_hash_entry_state_idle_transition_ready() const; void periodic_hash_entry_state_update(void *user_data, bool quick); void periodic_stats_update(void *user_data, bool quick); - void set_to_purge(time_t t); - bool is_acknowledged_to_purge() const; - void set_acknowledge_to_purge(); void set_hash_entry_id(u_int assigned_hash_entry_id); u_int get_hash_entry_id() const; diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index 074efb134b..61f3f0587b 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -457,7 +457,9 @@ class NetworkInterface : public AlertableEntity { void getActiveFlowsStats(nDPIStats *stats, FlowStats *status_stats, AddressTree *allowed_hosts, Host *h, Paginator *p); virtual u_int32_t periodicStatsUpdateFrequency() const; void periodicStatsUpdate(); - void periodicHTStateUpdate(time_t deadline, lua_State* vm); + virtual void periodicHTStateUpdate(time_t deadline, lua_State* vm); + static bool quick_periodic_ht_state_update(time_t deadline, GenericHashEntry *ghe); + static void generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data); virtual u_int32_t getFlowMaxIdle(); virtual void lua(lua_State* vm); void lua_hash_tables_stats(lua_State* vm); diff --git a/include/ThreadedActivity.h b/include/ThreadedActivity.h index 81bcdf2457..c35cb3aaa8 100644 --- a/include/ThreadedActivity.h +++ b/include/ThreadedActivity.h @@ -33,6 +33,7 @@ class ThreadedActivity { char *path; u_int32_t periodicity; bool align_to_localtime; + bool exclude_viewed_interfaces; bool thread_started; bool systemTaskRunning; bool *interfaceTasksRunning; @@ -52,6 +53,7 @@ class ThreadedActivity { ThreadedActivity(const char* _path, u_int32_t _periodicity_seconds = 0, bool _align_to_localtime = false, + bool _exclude_viewed_interfaces = false, u_int8_t thread_pool_size = 1); ~ThreadedActivity(); diff --git a/include/ViewInterface.h b/include/ViewInterface.h index 6ecc67c0c9..331f7f69ce 100644 --- a/include/ViewInterface.h +++ b/include/ViewInterface.h @@ -29,7 +29,6 @@ class ViewInterface : public NetworkInterface { bool is_packet_interface; u_int8_t num_viewed_interfaces; NetworkInterface *viewed_interfaces[MAX_NUM_VIEW_INTERFACES]; - void viewedFlowsWalker(); virtual void sumStats(TcpFlowStats *_tcpFlowStats, EthStats *_ethStats, LocalTrafficStats *_localStats, nDPIStats *_ndpiStats, @@ -37,7 +36,14 @@ class ViewInterface : public NetworkInterface { public: ViewInterface(const char *_endpoint); - + virtual void periodicHTStateUpdate(time_t deadline, lua_State* vm); + bool walker(u_int32_t *begin_slot, + bool walk_all, + WalkerType wtype, + bool (*walker)(GenericHashEntry *h, void *user_data, bool *matched), + void *user_data); + void viewed_flows_walker(Flow *f, void *user_data); + static void generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data); virtual InterfaceType getIfType() const { return interface_type_VIEW; }; inline const char* get_type() { return CONST_INTERFACE_TYPE_VIEW; }; virtual bool is_ndpi_enabled() const { return false; }; diff --git a/include/ntop_typedefs.h b/include/ntop_typedefs.h index 95e685fc4f..7fbafeb394 100644 --- a/include/ntop_typedefs.h +++ b/include/ntop_typedefs.h @@ -56,6 +56,7 @@ typedef enum { /* Struct used to pass parameters when walking hosts and flows periodically to update their stats */ class AlertCheckLuaEngine; typedef struct { + NetworkInterface *iface; AlertCheckLuaEngine *acle; struct timeval *tv; time_t deadline; diff --git a/src/Flow.cpp b/src/Flow.cpp index 28108b434a..4685e74c42 100644 --- a/src/Flow.cpp +++ b/src/Flow.cpp @@ -54,7 +54,7 @@ Flow::Flow(NetworkInterface *_iface, predominant_status = status_normal; alert_rowid = -1; - purge_acknowledged_mark = detection_completed = update_flow_port_stats = false; + detection_completed = update_flow_port_stats = false; fully_processed = false; ndpiDetectedProtocol = ndpiUnknownProtocol; doNotExpireBefore = iface->getTimeLastPktRcvd() + DONT_NOT_EXPIRE_BEFORE_SEC; @@ -1859,38 +1859,6 @@ u_int32_t Flow::key(Host *_cli, u_int16_t _cli_port, /* *************************************** */ -void Flow::set_to_purge(time_t t) { - /* The actual set_to_purge is done only when - the flow has been acknowledged (in the case of views). - Othewise this call is just ignored. */ - if(is_acknowledged_to_purge()) - ; -}; - -/* *************************************** */ - -bool Flow::is_acknowledged_to_purge() const { - /* This ensures that, in case of view interfaces, the flow - has been acknowledged before being purged. In case the interface - has no corresponding view, i.e, when !iface->isViewed(), this - function is just a short circuit as there is no need to wait for - an acknowledge. In case there is a view interface, we must wait - until the view sets the acknowledge. */ - return !iface->isViewed() || purge_acknowledged_mark; -}; - -/* *************************************** */ - -void Flow::set_acknowledge_to_purge() { - /* If there is a view interface on top of this interface - then such view can acknowledge a flow when it is ready - to purge. */ - if(iface->isViewed()) - purge_acknowledged_mark = true; -}; - -/* *************************************** */ - void Flow::set_hash_entry_id(u_int assigned_hash_entry_id) { hash_entry_id = assigned_hash_entry_id; }; @@ -1982,15 +1950,6 @@ void Flow::periodic_hash_entry_state_update(void *user_data, bool quick) { break; case hash_entry_state_idle: - if(iface->isViewed()) { - /* Must acknowledge so the overlying 'view' interface can actually set - the flow state as ready to be purged once it has processed the flow for the last - time */ - if(is_acknowledged_to_purge()) - return; /* Already acknowledged, nothing else to do */ - set_acknowledge_to_purge(); - } - postFlowSetIdle(tv, quick); if(!quick) performLuaCall(flow_lua_call_idle, tv, &periodic_ht_state_update_user_data->acle); break; diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index 66786f985e..c53722483c 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -2838,18 +2838,22 @@ void NetworkInterface::periodicStatsUpdate() { /* **************************************************** */ -static bool quick_periodic_ht_state_update(time_t deadline, GenericHashEntry *ghe) { +bool NetworkInterface::quick_periodic_ht_state_update(time_t deadline, GenericHashEntry *ghe) { /* TODO: optimize time(NULL) */ return time(NULL) + 2 >= deadline; } /* **************************************************** */ -static void generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data) { +void NetworkInterface::generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data) { periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; + NetworkInterface *iface = periodic_ht_state_update_user_data->iface; bool quick_update = quick_periodic_ht_state_update(periodic_ht_state_update_user_data->deadline, node); node->periodic_hash_entry_state_update(user_data, quick_update); + + if(iface->isViewed()) + iface->viewedBy()->generic_periodic_hash_entry_state_update(node, user_data); } /* **************************************************** */ @@ -2875,6 +2879,7 @@ void NetworkInterface::periodicHTStateUpdate(time_t deadline, lua_State* vm) { tv.tv_sec = last_pkt_rcvd, tv.tv_usec = 0; periodic_ht_state_update_user_data.acle = NULL, + periodic_ht_state_update_user_data.iface = this, periodic_ht_state_update_user_data.deadline = deadline, periodic_ht_state_update_user_data.tv = &tv; diff --git a/src/PeriodicActivities.cpp b/src/PeriodicActivities.cpp index 5a8a1a9cc0..50c0bcaff4 100644 --- a/src/PeriodicActivities.cpp +++ b/src/PeriodicActivities.cpp @@ -25,6 +25,7 @@ typedef struct _activity_descr { const char *path; u_int32_t periodicity; bool align_to_localtime; + bool exclude_viewed_interfaces; u_int8_t thread_pool_size; } activity_descr; @@ -105,18 +106,18 @@ void PeriodicActivities::startPeriodicActivitiesLoop() { num_threads = MAX_THREAD_POOL_SIZE; static activity_descr ad[] = { - { SECOND_SCRIPT_PATH, 1, false, 1 }, - { HT_STATE_UPDATE_SCRIPT_PATH, 5, false, num_threads }, - { STATS_UPDATE_SCRIPT_PATH, 5, false, num_threads }, - { MINUTE_SCRIPT_PATH, 60, false, num_threads }, - { FIVE_MINUTES_SCRIPT_PATH, 300, false, num_threads }, - { HOURLY_SCRIPT_PATH, 3600, false, num_threads }, - { DAILY_SCRIPT_PATH, 86400, true, num_threads }, - { HOUSEKEEPING_SCRIPT_PATH, 3, false, 1 }, - { DISCOVER_SCRIPT_PATH, 5, false, 1 }, - { TIMESERIES_SCRIPT_PATH, 5, false, 1 }, + { SECOND_SCRIPT_PATH, 1, false, false, 1 }, + { HT_STATE_UPDATE_SCRIPT_PATH, 5, false, true, num_threads }, + { STATS_UPDATE_SCRIPT_PATH, 5, false, false, num_threads }, + { MINUTE_SCRIPT_PATH, 60, false, false, num_threads }, + { FIVE_MINUTES_SCRIPT_PATH, 300, false, false, num_threads }, + { HOURLY_SCRIPT_PATH, 3600, false, false, num_threads }, + { DAILY_SCRIPT_PATH, 86400, true, false, num_threads }, + { HOUSEKEEPING_SCRIPT_PATH, 3, false, false, 1 }, + { DISCOVER_SCRIPT_PATH, 5, false, false, 1 }, + { TIMESERIES_SCRIPT_PATH, 5, false, false, 1 }, #ifdef HAVE_NEDGE - { PINGER_SCRIPT_PATH, 5, false, 1 }, + { PINGER_SCRIPT_PATH, 5, false, false, 1 }, #endif { NULL, 0, false} }; @@ -129,6 +130,7 @@ void PeriodicActivities::startPeriodicActivitiesLoop() { ThreadedActivity *ta = new ThreadedActivity(d->path, d->periodicity, d->align_to_localtime, + d->exclude_viewed_interfaces, d->thread_pool_size); if(ta) { activities[num_activities++] = ta; diff --git a/src/ThreadedActivity.cpp b/src/ThreadedActivity.cpp index c5620804d6..91560f7091 100644 --- a/src/ThreadedActivity.cpp +++ b/src/ThreadedActivity.cpp @@ -35,10 +35,12 @@ static void* startActivity(void* ptr) { ThreadedActivity::ThreadedActivity(const char* _path, u_int32_t _periodicity_seconds, bool _align_to_localtime, + bool _exclude_viewed_interfaces, u_int8_t thread_pool_size) { terminating = false; periodicity = _periodicity_seconds; align_to_localtime = _align_to_localtime; + exclude_viewed_interfaces = _exclude_viewed_interfaces; thread_started = false, systemTaskRunning = false; path = strdup(_path); /* ntop->get_callbacks_dir() */; interfaceTasksRunning = (bool *) calloc(MAX_NUM_DEFINED_INTERFACES, sizeof(bool)); @@ -206,6 +208,11 @@ void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface) { if(!iface) iface = ntop->getSystemInterface(); if(strcmp(path, SHUTDOWN_SCRIPT_PATH) && isTerminating()) return; + if(iface->isViewed() && exclude_viewed_interfaces) { + ntop->getTrace()->traceEvent(TRACE_ERROR, "Skipping viewed [%s]", iface->get_name()); + return; + } + #ifdef THREADED_DEBUG ntop->getTrace()->traceEvent(TRACE_WARNING, "[%p] Running %s", this, path); #endif diff --git a/src/ViewInterface.cpp b/src/ViewInterface.cpp index 256fd1dbd7..89c7b66db3 100644 --- a/src/ViewInterface.cpp +++ b/src/ViewInterface.cpp @@ -78,6 +78,34 @@ ViewInterface::ViewInterface(const char *_endpoint) : NetworkInterface(_endpoint /* **************************************************** */ +bool ViewInterface::walker(u_int32_t *begin_slot, + bool walk_all, + WalkerType wtype, + bool (*walker)(GenericHashEntry *h, void *user_data, bool *matched), + void *user_data) { + bool ret = false; + u_int32_t flows_begin_slot = 0; /* Always from the beginning, all flows */ + + if(id == SYSTEM_INTERFACE_ID) + return(false); + + switch(wtype) { + case walker_flows: + for(u_int8_t s = 0; s < num_viewed_interfaces; s++) { + flows_begin_slot = 0; /* Always visit all the flows starting from slot 0 */ + ret |= viewed_interfaces[s]->walker(&flows_begin_slot, true /* walk_all == true */, wtype, walker, user_data); + } + break; + default: + ret = NetworkInterface::walker(begin_slot, walk_all, wtype, walker, user_data); + break; + } + + return(ret); +} + +/* **************************************************** */ + u_int64_t ViewInterface::getNumPackets() { u_int64_t tot = 0; @@ -234,29 +262,18 @@ Flow* ViewInterface::findFlowByTuple(u_int16_t vlan_id, /* **************************************************** */ -typedef struct { - struct timeval tv; - ViewInterface *iface; -} viewed_flows_walker_user_data_t; +void ViewInterface::periodicHTStateUpdate(time_t deadline, lua_State* vm) { + for(u_int8_t s = 0; s < num_viewed_interfaces; s++) + viewed_interfaces[s]->periodicHTStateUpdate(deadline, vm); +} /* **************************************************** */ -static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { - viewed_flows_walker_user_data_t *viewed_flows_walker_user_data = (viewed_flows_walker_user_data_t*)user_data; - ViewInterface *iface = viewed_flows_walker_user_data->iface; - const struct timeval *tv = &viewed_flows_walker_user_data->tv; - Flow *f = (Flow*)flow; - bool acked_to_purge; +void ViewInterface::viewed_flows_walker(Flow *f, void *user_data) { + periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; + const struct timeval *tv = periodic_ht_state_update_user_data->tv; - acked_to_purge = f->is_acknowledged_to_purge(); - - if(acked_to_purge) { - /* We can set the 'ready to be purged' state on behalf of the underlying viewed interface. - It is safe as this view is in sync with the viewed interfaces by mean of acked_to_purge */ - /* TODO: FIXX */ // f->set_hash_entry_state_ready_to_be_purged(); - } - - f->dumpFlow(tv, iface); + f->dumpFlow(tv, this); FlowTrafficStats partials; bool first_partial; /* Whether this is the first time the view is visiting this flow */ @@ -269,9 +286,9 @@ static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { if(cli_ip && srv_ip) { Host *cli_host = NULL, *srv_host = NULL; - iface->findFlowHosts(f->get_vlan_id(), - NULL /* no src mac yet */, (IpAddress*)cli_ip, &cli_host, - NULL /* no dst mac yet */, (IpAddress*)srv_ip, &srv_host); + findFlowHosts(f->get_vlan_id(), + NULL /* no src mac yet */, (IpAddress*)cli_ip, &cli_host, + NULL /* no dst mac yet */, (IpAddress*)srv_ip, &srv_host); if(cli_host) { cli_host->incStats(tv->tv_sec, f->get_protocol(), @@ -284,7 +301,7 @@ static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { if(first_partial) cli_host->incNumFlows(f->get_last_seen(), true, srv_host), cli_host->incUses(); - if(acked_to_purge) + if(f->idle()) cli_host->decNumFlows(f->get_last_seen(), true, srv_host), cli_host->decUses(); } @@ -299,17 +316,17 @@ static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { if(first_partial) srv_host->incUses(), srv_host->incNumFlows(f->get_last_seen(), false, cli_host); - if(acked_to_purge) + if(f->idle()) srv_host->decUses(), srv_host->decNumFlows(f->get_last_seen(), false, cli_host); } - iface->incStats(true /* ingressPacket */, - tv->tv_sec, cli_ip && cli_ip->isIPv4() ? ETHERTYPE_IP : ETHERTYPE_IPV6, - f->getStatsProtocol(), f->get_protocol_category(), - f->get_protocol(), - partials.srv2cli_bytes + partials.cli2srv_bytes, - partials.srv2cli_packets + partials.cli2srv_packets, - 24 /* 8 Preamble + 4 CRC + 12 IFG */ + 14 /* Ethernet header */); + incStats(true /* ingressPacket */, + tv->tv_sec, cli_ip && cli_ip->isIPv4() ? ETHERTYPE_IP : ETHERTYPE_IPV6, + f->getStatsProtocol(), f->get_protocol_category(), + f->get_protocol(), + partials.srv2cli_bytes + partials.cli2srv_bytes, + partials.srv2cli_packets + partials.cli2srv_packets, + 24 /* 8 Preamble + 4 CRC + 12 IFG */ + 14 /* Ethernet header */); Flow::incTcpBadStats(true /* src2dst */, NULL, cli_host, srv_host, partials.tcp_stats_s2d.pktOOO, partials.tcp_stats_s2d.pktRetr, @@ -324,37 +341,24 @@ static void viewed_flows_walker(GenericHashEntry *flow, void *user_data) { /* **************************************************** */ -void ViewInterface::viewedFlowsWalker() { - FlowHash *cur_flows_hash; - viewed_flows_walker_user_data_t viewed_flows_walker_user_data; - - viewed_flows_walker_user_data.tv.tv_sec = time(NULL), - viewed_flows_walker_user_data.tv.tv_usec = 0, - viewed_flows_walker_user_data.iface = this; - - for(u_int8_t s = 0; s < num_viewed_interfaces; s++) { - cur_flows_hash = viewed_interfaces[s]->get_flows_hash(); - - if(cur_flows_hash) - cur_flows_hash->walkIdle(viewed_flows_walker, &viewed_flows_walker_user_data); +void ViewInterface::flowPollLoop() { + while(!ntop->getGlobals()->isShutdownRequested()) { + while(idle()) sleep(1); + usleep(1000000); } - -#ifdef NTOPNG_PRO - dumpAggregatedFlows(&viewed_flows_walker_user_data.tv); -#endif } /* **************************************************** */ -void ViewInterface::flowPollLoop() { - while(!ntop->getGlobals()->isShutdownRequested()) { - while(idle()) sleep(1); +void ViewInterface::generic_periodic_hash_entry_state_update(GenericHashEntry *node, void *user_data) { + periodic_ht_state_update_user_data_t *periodic_ht_state_update_user_data = (periodic_ht_state_update_user_data_t*)user_data; + ViewInterface *this_view = periodic_ht_state_update_user_data->iface->viewedBy(); - viewedFlowsWalker(); - - purgeIdle(time(NULL)); - usleep(500000); + if(Flow *flow = dynamic_cast(node)) { + this_view->viewed_flows_walker(flow, user_data); } + + this_view->purgeIdle(time(NULL)); } /* **************************************************** */