Fixes alerts starving in per-recipient queues

This commit is contained in:
Simone Mainardi 2020-09-02 14:52:40 +02:00
parent 6fb8393117
commit 28caa3ac33
6 changed files with 64 additions and 54 deletions

View file

@ -22,7 +22,7 @@ end
-- NOTE: sqlite can handle about 10-50 alerts/sec
alert_consts.MAX_NUM_QUEUED_ALERTS_PER_MODULE = 1024 -- should match ALERTS_MANAGER_MAX_ENTITY_ALERTS
alert_consts.MAX_NUM_QUEUED_ALERTS_PER_RECIPIENT = 128
alert_consts.MAX_NUM_QUEUED_ALERTS_PER_RECIPIENT = 4096
-- Alerts (see ntop_typedefs.h)
-- each table entry is an array as:

View file

@ -9,7 +9,7 @@ local plugins_utils = require("plugins_utils")
local json = require "dkjson"
local notification_configs = require("notification_configs")
local alert_consts = require("alert_consts")
local do_trace = false
-- #################################################################
@ -397,47 +397,57 @@ function notification_recipients.processNotifications(now, periodic_frequency, f
end
end
-- Cycle ready recipients and perform the actual export
for _, ready_recipient in pairs(ready_recipients) do
local recipient = ready_recipient.recipient
local m = ready_recipient.mod
-- Total budget availabe, which is a multiple of the periodic_frequency
-- Budget in this case is the maximum number of notifications which can
-- be processed during this call.
local total_budget = 1000 * periodic_frequency
-- To avoid having one recipient jeopardizing all the resources, the total
-- budget is consumed in chunks, that is, recipients are iterated multiple times
-- and, each time any recipient has a maximum budget for every iteration.
local budget_per_iter = 100
if m.dequeueRecipientAlerts then
local rv = m.dequeueRecipientAlerts(recipient, 1 --[[ budget ]])
if not rv.success then
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg)
-- Cycle until there are ready_recipients and total_budget left
while #ready_recipients > 0 and total_budget >= 0 and not ntop.isDeadlineApproaching() do
for i = #ready_recipients, 1, -1 do
local ready_recipient = ready_recipients[i]
local recipient = ready_recipient.recipient
local m = ready_recipient.mod
if do_trace then tprint("Dequeuing alerts for ready recipient: ".. recipient.recipient_name) end
if m.dequeueRecipientAlerts then
local rv = m.dequeueRecipientAlerts(recipient, budget_per_iter)
-- If the recipient has failed (not rv.success) or
-- if it has no more work to do (not rv.more_available)
-- it can be removed from the array of ready recipients.
if not rv.success or not rv.more_available then
table.remove(ready_recipients, i)
if do_trace then tprint("Ready recipient done: ".. recipient.recipient_name) end
if not rv.success then
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg)
end
end
end
end
if ntop.isDeadlineApproaching() then
-- No time left
break
end
-- Update the total budget
total_budget = total_budget - budget_per_iter
end
-- for _, recipient in pairs(recipients) do
-- local module_name = recipient.endpoint_conf.endpoint_key
-- if modules_by_name[module_name] then
-- local m = modules_by_name[module_name]
-- if force_export or not m.EXPORT_FREQUENCY or ((now % m.EXPORT_FREQUENCY) < periodic_frequency) then
-- if m.dequeueRecipientAlerts then
-- local rv = m.dequeueRecipientAlerts(recipient, 1 --[[ budget ]])
-- if not rv.success then
-- local msg = rv.error_message or "Unknown Error"
-- traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. module_name .. " module: " .. msg)
-- end
-- else
-- -- traceError(TRACE_ERROR, TRACE_CONSOLE, "No dequeueRecipientAlerts callback defined for "..recipient.recipient_name)
-- end
-- end
-- else
-- -- traceError(TRACE_ERROR, TRACE_CONSOLE, "Module "..module_name.." not available")
-- end
-- end
if do_trace then
if #ready_recipients > 0 then
tprint("Deadline approaching: "..tostring(ntop.isDeadlineApproaching()))
tprint("Budget left: "..total_budget)
tprint("The following recipients were unable to dequeue all their notifications")
for _, ready_recipient in pairs(ready_recipients) do
tprint(" "..ready_recipient.recipient.recipient_name)
end
end
end
end
-- #################################################################