Implements batched lua flows iterator

This commit is contained in:
Simone Mainardi 2019-06-25 12:52:38 +02:00
parent 699f8a71cc
commit b4f7788dea
4 changed files with 93 additions and 15 deletions

View file

@ -61,7 +61,9 @@ local function getBatchedIterator(batched_function, field, function_params)
iterator_finished = true
end
if iterator_finished then return nil end
if iterator_finished then
return nil
end
-- we need to load new slots from C
if(debug_enabled) then
@ -80,8 +82,9 @@ local function getBatchedIterator(batched_function, field, function_params)
loaded_elems = slot[field]
if(debug_enabled) then
io.write("getBatchedIterator["..field.."](numElems=".. table.len(loaded_elems) ..", nextSlot=".. nextSlot ..")\n")
io.write("getBatchedIterator["..field.."] nextSlot=".. nextSlot ..")\n")
end
end
for key, value in pairs(loaded_elems) do
@ -91,6 +94,11 @@ local function getBatchedIterator(batched_function, field, function_params)
end
end
-- A batched iterator over the active flows
function callback_utils.getFlowsIterator(...)
return getBatchedIterator(interface.getBatchedFlowsInfo, "flows", { ... })
end
-- A batched iterator over the local hosts with timeseries
function callback_utils.getLocalHostsTsIterator(...)
return getBatchedIterator(interface.getBatchedLocalHostsTs, "hosts", { ... })
@ -118,6 +126,32 @@ end
-- ########################################################
-- Iterates each active flow on the ifname interface.
-- Each flow is passed to the callback with some more information.
function callback_utils.foreachFlow(ifname, deadline, callback, ...)
interface.select(ifname)
local iterator = callback_utils.getFlowsIterator({...})
for flow_key, flow in iterator do
if(ntop.isShutdown()) then return true end
if ((deadline ~= nil) and (os.time() >= deadline)) then
-- Out of time
return false
end
if callback(flow_key, flow) == false then
return false
end
end
return true
end
-- ########################################################
-- Iterates each active host on the ifname interface for RRD creation.
-- Each host is passed to the callback with some more information.
function callback_utils.foreachLocalRRDHost(ifname, deadline, with_ts, callback)