Reduce redis load due to queue rating algorithmn

This commit is contained in:
emanuele-f 2019-11-11 10:49:08 +01:00
parent 644e659ca9
commit d9fd30b9e6

View file

@ -59,6 +59,15 @@ end
-- ##############################################
-- Rating status cache to avoid too many redis lookups
-- Format: k -> rating_status
local rating_status_cache = {}
-- Rating len cache to avoid too many redis lookups
-- Format: k -> {when->cache_when, length->queue_length}
-- when is used to recheck the cache periodically
local rating_len_cache = {}
-- @brief Implements a simple queue rating algorithmn to limit rpush operations.
-- @param queue the queue name
-- @param ref_limit the maximum queue size
@ -66,7 +75,8 @@ end
local function enqueueAlertEvent(queue, event, ref_limit, inc_drops)
local queue_rating_key = string.format("%s.rating", queue)
local queue_len_key = string.format("%s.len", queue)
local cur_status = ntop.getCache(queue_rating_key)
local cur_status = rating_status_cache[queue_rating_key] or ntop.getCache(queue_rating_key)
local now = os.time()
local rv
-- Queue rating limits
@ -75,11 +85,16 @@ local function enqueueAlertEvent(queue, event, ref_limit, inc_drops)
local trim_limit = ref_limit
-- NOTE: using a cached value to avoid calling llenCache every time
local num_pending = tonumber(ntop.getCache(queue_len_key))
local len_info = rating_len_cache[queue_len_key]
local num_pending
if(num_pending == nil) then
-- Recheck the length every second
if((len_info ~= nil) and ((now - len_info.when) < 1)) then
num_pending = len_info.length
else
-- Resync from redis
num_pending = ntop.llenCache(queue)
ntop.setCache(queue_len_key, string.format("%d", num_pending), 2 --[[ recheck every 2 seconds ]])
rating_len_cache[queue_len_key] = {when = now, length = num_pending}
end
-- The rpush is only performed in "normal" status
@ -116,7 +131,7 @@ local function enqueueAlertEvent(queue, event, ref_limit, inc_drops)
end
-- Recalculate num_pending on next round
ntop.delCache(queue_len_key)
rating_len_cache[queue_len_key] = nil
end
if(cur_status == "full") then
@ -131,6 +146,10 @@ local function enqueueAlertEvent(queue, event, ref_limit, inc_drops)
rv = true
end
-- Update rating cache
rating_status_cache[queue_rating_key] = cur_status
-- NOTE: do not update the queue_len_cache here, it will be rechecked every 2 seconds
return(rv)
end