Adds and enforces periodic activities max duration

Implements #3477
This commit is contained in:
Simone Mainardi 2020-03-02 19:09:54 +01:00
parent 1074059c2d
commit 1e82eb073b
10 changed files with 91 additions and 61 deletions

View file

@ -34,6 +34,7 @@ static void* startActivity(void* ptr) {
ThreadedActivity::ThreadedActivity(const char* _path,
u_int32_t _periodicity_seconds,
u_int32_t _max_duration_seconds,
bool _align_to_localtime,
bool _exclude_viewed_interfaces,
bool _exclude_pcap_dump_interfaces,
@ -41,6 +42,7 @@ ThreadedActivity::ThreadedActivity(const char* _path,
ThreadPool *_pool) {
terminating = false;
periodicity = _periodicity_seconds;
max_duration_secs = _max_duration_seconds;
align_to_localtime = _align_to_localtime;
exclude_viewed_interfaces = _exclude_viewed_interfaces;
exclude_pcap_dump_interfaces = _exclude_pcap_dump_interfaces;
@ -357,7 +359,7 @@ void ThreadedActivity::runSystemScript() {
if(stat(script_path, &buf) == 0) {
set_state_running(ntop->getSystemInterface());
runScript(script_path, ntop->getSystemInterface(), 0 /* No deadline */);
runScript(script_path, ntop->getSystemInterface(), time(NULL) + max_duration_secs /* this is the deadline */);
set_state_sleeping(ntop->getSystemInterface());
} else
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to find script %s", path);
@ -368,7 +370,6 @@ void ThreadedActivity::runSystemScript() {
/* Run a script - both periodic and one-shot scripts are called here */
void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface, time_t deadline) {
LuaEngine *l = NULL;
u_long max_duration_ms = periodicity * 1e3;
u_long msec_diff;
struct timeval begin, end;
ThreadedActivityStats *thstats = getThreadedActivityStats(iface, true);
@ -398,6 +399,7 @@ void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface, tim
l->setThreadedActivityData(this, thstats, deadline);
if(thstats) {
thstats->setDeadline(deadline);
thstats->setCurrentProgress(0);
/* Reset the internal state for the current execution */
@ -414,25 +416,8 @@ void ThreadedActivity::runScript(char *script_path, NetworkInterface *iface, tim
msec_diff = (end.tv_sec - begin.tv_sec) * 1000 + (end.tv_usec - begin.tv_usec) / 1000;
updateThreadedActivityStatsEnd(iface, msec_diff);
#if 0
ntop->getTrace()->traceEvent(TRACE_NORMAL,
"[PeriodicActivity][%s][%s]: completed in %u/%u ms [%s]", iface->get_name(), path, msec_diff, max_duration_ms,
(((max_duration_ms > 0) && (msec_diff > max_duration_ms)) ? "SLOW" : "OK"));
#endif
if(getPeriodicity() == 1) {
/* Second is aperiodic, we need to trigger this here instead of ThreadPool::queueJob */
if(thstats) {
thstats->setDeadline(time(NULL) + getPeriodicity());
if((max_duration_ms > 0) && (msec_diff > 2*max_duration_ms)) {
thstats->setSlowPeriodicActivity(true);
}
}
} else if(deadline) {
if(isDeadlineApproaching(deadline))
thstats->setSlowPeriodicActivity(true);
}
if(thstats && isDeadlineApproaching(deadline))
thstats->setSlowPeriodicActivity(true);
if(l && !reuse_vm)
delete l;
@ -523,24 +508,25 @@ void ThreadedActivity::uSecDiffPeriodicActivityBody() {
/* ******************************************* */
void ThreadedActivity::periodicActivityBody() {
u_int32_t next_run = (u_int32_t)time(NULL);
u_int32_t scheduled_time;
u_int now;
u_int32_t next_deadline, next_schedule = (u_int32_t)time(NULL);
next_run = Utils::roundTime(next_run, periodicity, align_to_localtime ? ntop->get_time_offset() : 0);
next_schedule = Utils::roundTime(next_schedule, periodicity, align_to_localtime ? ntop->get_time_offset() : 0);
if(align_to_localtime)
next_run -= periodicity;
next_schedule -= periodicity;
while(!isTerminating()) {
u_int now = (u_int)time(NULL);
now = (u_int)time(NULL);
if(now >= next_run) {
scheduled_time = next_run;
next_run = Utils::roundTime(now, periodicity,
align_to_localtime ? ntop->get_time_offset() : 0);
if(now >= next_schedule) {
next_deadline = next_schedule + max_duration_secs;
next_schedule = Utils::roundTime(now,
periodicity,
align_to_localtime ? ntop->get_time_offset() : 0);
if(!skipExecution(path))
schedulePeriodicActivity(pool, scheduled_time, next_run /* next_run is now also the deadline of the current script */);
schedulePeriodicActivity(pool, now, next_deadline);
}
sleep(1);
@ -565,6 +551,16 @@ void ThreadedActivity::schedulePeriodicActivity(ThreadPool *pool, time_t schedul
struct stat buf;
#endif
#ifdef THREAD_DEBUG
char deadline_buf[32], scheduled_time_buf[32];
struct tm deadline_tm, scheduled_time_tm;
strftime(deadline_buf, sizeof(deadline_buf), "%H:%M:%S", localtime_r(&deadline, &deadline_tm));
strftime(scheduled_time_buf, sizeof(scheduled_time_buf), "%H:%M:%S", localtime_r(&scheduled_time, &scheduled_time_tm));
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Scheduling [%s][schedule: %s][deadline: %s]",
path, scheduled_time_buf, deadline_buf);
#endif
/* Schedule system script */
snprintf(script_path, sizeof(script_path), "%s/system/%s",
ntop->get_callbacks_dir(), path);
@ -609,6 +605,7 @@ void ThreadedActivity::lua(NetworkInterface *iface, lua_State *vm) {
lua_push_str_table_entry(vm, "state", get_state_label(get_state(iface)));
lua_push_uint64_table_entry(vm, "periodicity", getPeriodicity());
lua_push_uint64_table_entry(vm, "max_duration_secs", max_duration_secs);
lua_push_uint64_table_entry(vm, "deadline_secs", deadline_approaching_secs);
lua_pushstring(vm, path ? path : "");