Limit alerts insertions if the queues are full

This commit is contained in:
emanuele-f 2019-11-08 11:26:08 +01:00
parent 3a0e4b13f7
commit af1dc8a05e
2 changed files with 70 additions and 27 deletions

View file

@ -59,47 +59,89 @@ end
-- ##############################################
-- Enqueue a store alert action on the DB to be performed asynchronously
local function enqueueStoreAlert(ifid, alert)
local trim = nil
-- @brief Implements a simple queue rating algorithmn to limit rpush operations.
-- @param queue the queue name
-- @param ref_limit the maximum queue size
-- @param inc_drops true if alert drops should be incremented
local function enqueueAlertEvent(queue, event, ref_limit, inc_drops)
local num_pending = ntop.llenCache(queue)
local queue_rating_key = string.format("%s.rating", queue)
local cur_status = ntop.getCache(queue_rating_key)
local rv
local alert_json = json.encode(alert)
local queue = getAlertEventQueue(ifid)
-- Queue rating limits
local low_value = math.ceil(ref_limit / 3)
local high_value = math.ceil(ref_limit / 2)
local trim_limit = ref_limit
if(ntop.llenCache(queue) > alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE) then
trim = math.ceil(alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE/2)
traceError(TRACE_INFO, TRACE_CONSOLE, string.format("Alerts event queue too long: dropping %u alerts", trim))
interface.incNumDroppedAlerts(trim)
-- The rpush is only performed in "normal" status
if isEmptyString(cur_status) then
cur_status = "normal"
end
ntop.rpushCache(queue, alert_json, trim)
return(true)
-- Status transitions:
-- num_pending >= high_value -> "full"
-- num_pending <= low_value -> "normal"
if((num_pending >= high_value) and (cur_status ~= "full")) then
-- Limit reached, inhibit subsequent push
cur_status = "full"
ntop.setCache(queue_rating_key, cur_status)
elseif((num_pending <= low_value) and (cur_status ~= "normal")) then
-- The problem was solved, can now push
cur_status = "normal"
ntop.setCache(queue_rating_key, cur_status)
end
if(num_pending > trim_limit) then
-- In normal conditions, this should not happen as the "high_value"
-- should have already limited the insertions
local new_size = math.ceil(trim_limit / 2)
local num_drop = num_pending - new_size
traceError(TRACE_INFO, TRACE_CONSOLE, string.format("Event queue [%s] too long: dropping %u alerts", queue, num_drop))
-- Drop the oldest alerts (from of the queue)
ntop.ltrimCache(queue, num_drop, num_pending)
if(inc_drops) then
interface.incNumDroppedAlerts(num_drop)
end
end
if(cur_status == "full") then
if(inc_drops) then
interface.incNumDroppedAlerts(1)
end
rv = false
else
-- Success
ntop.rpushCache(queue, event)
rv = true
end
return(rv)
end
-- ##############################################
-- Enqueue a store alert action on the DB to be performed asynchronously
local function enqueueStoreAlert(ifid, alert)
return(enqueueAlertEvent(getAlertEventQueue(ifid), json.encode(alert),
alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE, true))
end
-- Enqueue a store flow alert action on the DB to be performed asynchronously
local function enqueueStoreFlowAlert(ifid, alert_json)
local trim = nil
local queue = getFlowAlertEventQueue(ifid)
if(ntop.llenCache(queue) > alert_consts.MAX_NUM_QUEUED_ALERTS_PER_MODULE) then
trim = math.ceil(alert_consts.MAX_NUM_QUEUED_ALERTS_PER_MODULE/2)
traceError(TRACE_INFO, TRACE_CONSOLE, string.format("Flow alerts event queue too long: dropping %u alerts", trim))
interface.incNumDroppedAlerts(trim)
end
ntop.rpushCache(queue, alert_json, trim)
return(true)
return(enqueueAlertEvent(getFlowAlertEventQueue(ifid), alert_json,
alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE, true))
end
-- ##############################################
local function pushAlertJSONNotification(alert_json)
ntop.rpushCache("ntopng.alerts.notifications_queue", alert_json)
return(enqueueAlertEvent("ntopng.alerts.notifications_queue", alert_json,
alert_consts.MAX_NUM_QUEUED_ALERTS_NOTIFICATIONS, false --[[just a notification, don't count drops]]))
end
-- ##############################################