Fixes possibly skipped endpoint notifications export

This commit is contained in:
Simone Mainardi 2020-09-01 19:54:54 +02:00
parent 9fd8c20324
commit b6f53df891
2 changed files with 72 additions and 16 deletions

View file

@ -16,12 +16,13 @@ local alert_consts = require("alert_consts")
local ENDPOINT_RECIPIENT_TO_ENDPOINT_CONFIG = "ntopng.prefs.notification_endpoint.endpoint_recipient_to_endpoint_conf"
local ENDPOINT_RECIPIENTS_KEY = "ntopng.prefs.notification_endpoint.endpoint_config_%s.recipients"
local RECIPIENT_QUEUE_KEY = "ntopng.alerts.notification_recipient_queue.%s"
local RECIPIENT_NEXT_EXPORT_TIME_KEY = "ntopng.cache.notification_recipient_export_time.%s"
-- #################################################################
local notification_recipients = {}
-- ##############################################
-- #################################################################
local function get_endpoint_recipient_queue(endpoint_recipient_name)
local k = string.format(RECIPIENT_QUEUE_KEY, endpoint_recipient_name)
@ -30,6 +31,26 @@ end
-- #################################################################
local function check_endpoint_export(endpoint_recipient_name, export_frequency)
local k = string.format(RECIPIENT_NEXT_EXPORT_TIME_KEY, endpoint_recipient_name)
local cached_val = tonumber(ntop.getCache(k))
if cached_val then
-- Cached key exists. TTL not eached, not yet time to export
-- tprint({endpoint_recipient_name, "cached"})
return false
else
-- Cached key doesn't exists: TTL has expired
-- Set the cache with TTL equal to the export_frequency and do the export!
ntop.setCache(k, "1", export_frequency)
-- tprint({endpoint_recipient_name, "time to export!!"})
return true
end
end
-- #################################################################
-- @brief Check if an endpoint configuration with name `endpoint_recipient_name` exists
-- @param endpoint_recipient_name A string with the endpoint recipient name
-- @return true if the configuration exists, false otherwise
@ -361,29 +382,62 @@ end
function notification_recipients.processNotifications(now, periodic_frequency, force_export)
local recipients = notification_recipients.get_recipients()
local modules_by_name = notification_configs.get_types()
local ready_recipients = {}
-- Check, among all available recipients, those that are ready to export, depending on
-- their EXPORT_FREQUENCY
for _, recipient in pairs(recipients) do
local module_name = recipient.endpoint_conf.endpoint_key
if modules_by_name[module_name] then
local m = modules_by_name[module_name]
if force_export or not m.EXPORT_FREQUENCY or ((now % m.EXPORT_FREQUENCY) < periodic_frequency) then
if m.dequeueRecipientAlerts then
local rv = m.dequeueRecipientAlerts(recipient, 1 --[[ budget ]])
if not rv.success then
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. module_name .. " module: " .. msg)
end
else
-- traceError(TRACE_ERROR, TRACE_CONSOLE, "No dequeueRecipientAlerts callback defined for "..recipient.recipient_name)
end
end
else
-- traceError(TRACE_ERROR, TRACE_CONSOLE, "Module "..module_name.." not available")
if force_export or check_endpoint_export(recipient.recipient_name, m.EXPORT_FREQUENCY) then
ready_recipients[#ready_recipients + 1] = {recipient = recipient, mod = m}
end
end
end
-- Cycle ready recipients and perform the actual export
for _, ready_recipient in pairs(ready_recipients) do
local recipient = ready_recipient.recipient
local m = ready_recipient.mod
if m.dequeueRecipientAlerts then
local rv = m.dequeueRecipientAlerts(recipient, 1 --[[ budget ]])
if not rv.success then
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg)
end
end
if ntop.isDeadlineApproaching() then
-- No time left
break
end
end
-- for _, recipient in pairs(recipients) do
-- local module_name = recipient.endpoint_conf.endpoint_key
-- if modules_by_name[module_name] then
-- local m = modules_by_name[module_name]
-- if force_export or not m.EXPORT_FREQUENCY or ((now % m.EXPORT_FREQUENCY) < periodic_frequency) then
-- if m.dequeueRecipientAlerts then
-- local rv = m.dequeueRecipientAlerts(recipient, 1 --[[ budget ]])
-- if not rv.success then
-- local msg = rv.error_message or "Unknown Error"
-- traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. module_name .. " module: " .. msg)
-- end
-- else
-- -- traceError(TRACE_ERROR, TRACE_CONSOLE, "No dequeueRecipientAlerts callback defined for "..recipient.recipient_name)
-- end
-- end
-- else
-- -- traceError(TRACE_ERROR, TRACE_CONSOLE, "Module "..module_name.." not available")
-- end
-- end
end
-- #################################################################