Implements thread for notifications handling

This commit is contained in:
Simone Mainardi 2020-09-09 12:16:51 +02:00
parent 1263e4ac0f
commit 5e7f2cfff4
7 changed files with 114 additions and 25 deletions

View file

@ -58,6 +58,40 @@ end
-- ##############################################
-- @brief Key to store a flag for signaling recipient changes
-- @return A string key
function recipients:_get_recipients_changed_key()
local key = string.format("ntopng.cache.recipients.recipients_changed")
return key
end
-- ##############################################
-- @brief Mark a recipients change (must be called when adding/deleteding/editing a recipient)
-- @return nil
function recipients:_set_recipients_change()
ntop.setCache(self:_get_recipients_changed_key(), "1")
end
-- ##############################################
-- @brief Checks whether recipients have changed. If recipients have changed, the change is acknowledged
-- @return True if recipients have changed, false otherwise.
function recipients:_check_recipients_change()
local res = ntop.getCache(self:_get_recipients_changed_key()) ~= ""
if res then
-- A change in recipients has occurred
-- Remove the key to aknowledge the change
ntop.delCache(self:_get_recipients_changed_key())
end
return res
end
-- ##############################################
function recipients:_lock()
local max_lock_duration = 5 -- seconds
local max_lock_attempts = 5 -- give up after at most this number of attempts
@ -206,6 +240,9 @@ function recipients:_set_endpoint_recipient_params(recipient_id, endpoint_conf_n
recipient_name = endpoint_recipient_name,
recipient_params = safe_params}))
-- Notify a change in the recipients
self:_set_recipients_change()
return recipient_id
end
@ -315,6 +352,9 @@ function recipients:delete_recipient(recipient_id)
-- Remove the recipient_id from the set of all currently existing recipient ids
ntop.delMembersCache(self:_get_recipient_ids_key(), string.format("%d", recipient_id))
-- Notify a change in the recipients
self:_set_recipients_change()
-- Finally, remove the recipient from C
ntop.recipient_delete(recipient_id)
ret = true
@ -500,7 +540,7 @@ end
-- @param periodic_frequency The frequency, in seconds, of this call
-- @param force_export A boolean telling to forcefully export dispatched notifications
-- @return nil
local function process_notifications_by_priority(ready_recipients, high_priority, now, periodic_frequency, force_export)
local function process_notifications_by_priority(ready_recipients, high_priority, now, deadline, periodic_frequency, force_export)
-- Total budget availabe, which is a multiple of the periodic_frequency
-- Budget in this case is the maximum number of notifications which can
-- be processed during this call.
@ -511,7 +551,8 @@ local function process_notifications_by_priority(ready_recipients, high_priority
local budget_per_iter = 10
-- Cycle until there are ready_recipients and total_budget left
while #ready_recipients > 0 and total_budget >= 0 and not ntop.isDeadlineApproaching() do
local cur_time = os.time()
while #ready_recipients > 0 and total_budget >= 0 and cur_time <= deadline do
for i = #ready_recipients, 1, -1 do
local ready_recipient = ready_recipients[i]
local recipient = ready_recipient.recipient
@ -540,11 +581,12 @@ local function process_notifications_by_priority(ready_recipients, high_priority
-- Update the total budget
total_budget = total_budget - budget_per_iter
cur_time = os.time()
end
if do_trace then
if #ready_recipients > 0 then
tprint("Deadline approaching: "..tostring(ntop.isDeadlineApproaching()))
tprint("Deadline approaching: "..tostring(deadline < cur_time))
tprint("Budget left: "..total_budget)
tprint("The following recipients were unable to dequeue all their notifications")
for _, ready_recipient in pairs(ready_recipients) do
@ -581,14 +623,35 @@ end
-- @param periodic_frequency The frequency, in seconds, of this call
-- @param force_export A boolean telling to forcefully export dispatched notifications
-- @return nil
function recipients:process_notifications(now, periodic_frequency, force_export)
local recipients = self:get_all_recipients()
function recipients:process_notifications(now, deadline, periodic_frequency, force_export)
if not areAlertsEnabled() then
return
end
if not self.recipients then
-- Cache recipients to avoid re-reading them constantly
self.recipients = self:get_all_recipients()
else
-- Make sure the recipients cache is still valid (i.e., no recipient changes have occurred in the meanwhile)
local locked = self:_lock()
if locked then
if self:_check_recipients_change() then
-- If there has been a change, a reload is performed
self.recipients = self:get_all_recipients()
end
self:_unlock()
else
-- Unable to acquire the lock, exit (will retry later)
return
end
end
local modules_by_name = notification_configs.get_types()
local ready_recipients = {}
-- Check, among all available recipients, those that are ready to export, depending on
-- their EXPORT_FREQUENCY
for _, recipient in pairs(recipients) do
for _, recipient in pairs(self.recipients) do
local module_name = recipient.endpoint_key
if modules_by_name[module_name] then
@ -601,8 +664,8 @@ function recipients:process_notifications(now, periodic_frequency, force_export)
-- Use table.clone to pass recipients as the table is modified to only leave, after the call,
-- only those recipients who didn't complete their job.
process_notifications_by_priority(table.clone(ready_recipients), true --[[ high priority --]], now, periodic_frequency, force_export)
process_notifications_by_priority(table.clone(ready_recipients), false --[[ low priority --]], now, periodic_frequency, force_export)
process_notifications_by_priority(table.clone(ready_recipients), true --[[ high priority --]], now, deadline, periodic_frequency, force_export)
process_notifications_by_priority(table.clone(ready_recipients), false --[[ low priority --]], now, deadline, periodic_frequency, force_export)
-- Refresh recipients periodically
ntop.recipients_refresh()
end