mirror of
https://github.com/ntop/ntopng.git
synced 2026-04-29 15:39:33 +00:00
Update traffic report data handling code
This commit is contained in:
parent
86eceb6fe6
commit
6588cd8c9e
2 changed files with 309 additions and 176 deletions
|
|
@ -1,11 +1,11 @@
|
|||
|
||||
-- TODO localize
|
||||
local RRD_RESULT_ERROR_EMPTY = "Empty RRD"
|
||||
local RRD_RESULT_ERROR_MALFORMED = "Malformed RRD"
|
||||
|
||||
SECONDS_IN_A_HOUR = 3600
|
||||
SECONDS_IN_A_DAY = SECONDS_IN_A_HOUR*24
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
-- Note: these date functions are expensive, use with care!
|
||||
|
||||
function timestamp_to_date(ts)
|
||||
return os.date("*t", ts)
|
||||
end
|
||||
|
|
@ -39,210 +39,254 @@ function date_tostring(dt)
|
|||
return os.date("%x %X", dt)
|
||||
end
|
||||
|
||||
--
|
||||
-- Fetches data from RRD derivate counters
|
||||
--
|
||||
-- Parameters
|
||||
-- rrdname = path of the RRD
|
||||
-- epoch_start = start timestamp for query
|
||||
-- epoch_end = end timestamp for query
|
||||
--
|
||||
-- Returns
|
||||
-- On error:
|
||||
-- result.error = error string
|
||||
-- On success:
|
||||
-- result.data = a table, where each key is a data series containing a list of fetched values
|
||||
-- result.count = number of values in each data series
|
||||
-- result.start = timestamp for first value
|
||||
-- result.step = timestamp step between values
|
||||
-- result.names = rrd column names
|
||||
--
|
||||
function rrd_fetch_derivate(rrdname, epoch_start, epoch_end)
|
||||
local result = {}
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
if(not ntop.notEmptyFile(rrdname)) then
|
||||
result.error = RRD_RESULT_ERROR_EMPTY
|
||||
-- Considers NaN and negative values as 0
|
||||
function rrd_get_positive_value(x)
|
||||
if(x ~= x) or (x <= 0) then
|
||||
return 0
|
||||
else
|
||||
local fstart, fstep, fnames, fdata = ntop.rrd_fetch(rrdname, 'AVERAGE', epoch_start, epoch_end)
|
||||
if(fstart == nil) then
|
||||
result.error = RRD_RESULT_ERROR_MALFORMED
|
||||
else
|
||||
result.start = fstart + fstep
|
||||
result.step = fstep
|
||||
result.data = {}
|
||||
return x
|
||||
end
|
||||
end
|
||||
|
||||
-- for the data series, use the same labels as the RRD counters
|
||||
for j=1,#fnames do
|
||||
result.data[fnames[j]] = {}
|
||||
end
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
local get_rrd_value = function(x) if(x ~= x) or (x < 0) then return 0 else return x * fstep end end
|
||||
|
||||
for i=1,#fdata do
|
||||
for j=1,#fnames do
|
||||
table.insert(result.data[fnames[j]], get_rrd_value(fdata[i][j]))
|
||||
-- Select only one or more data series (columns)
|
||||
--
|
||||
-- Parameters:
|
||||
-- fdata: RRD series data
|
||||
-- selected: either a
|
||||
-- - list of columns to select
|
||||
-- - map of column->alias to select
|
||||
function rrd_select_columns(fdata, selected)
|
||||
if next(selected) == nil then
|
||||
-- selected is a list
|
||||
for sname, svalue in pairs(fdata) do
|
||||
-- check if value is in selected list
|
||||
local found = false
|
||||
|
||||
for i=1,#selected do
|
||||
if selected[i] == sname then
|
||||
found = true
|
||||
break
|
||||
end
|
||||
end
|
||||
|
||||
result.count = #fdata
|
||||
result.names = fnames
|
||||
if not found then
|
||||
fdata[sname] = nil
|
||||
end
|
||||
end
|
||||
else
|
||||
-- selected is a map
|
||||
local to_insert = {}
|
||||
for sname, svalue in pairs(fdata) do
|
||||
if selected[sname] ~= nil then
|
||||
if selected[sname] ~= sname then
|
||||
-- use alias
|
||||
to_insert[selected[sname]] = svalue
|
||||
fdata[sname] = nil
|
||||
end
|
||||
else
|
||||
-- not found
|
||||
fdata[sname] = nil
|
||||
end
|
||||
end
|
||||
|
||||
for key,val in pairs(to_insert) do
|
||||
fdata[key] = val
|
||||
end
|
||||
end
|
||||
|
||||
return result
|
||||
end
|
||||
|
||||
--------------------------------------------------------------------------------
|
||||
|
||||
--
|
||||
-- Transforms RRD data to meet the specified resolution.
|
||||
-- Integrates derivate RRD data to meet the specified resolution.
|
||||
--
|
||||
-- Parameters
|
||||
-- epoch_start - wanted epoch start
|
||||
-- epoch_end - wanted end epoch
|
||||
-- resolution - wanted resolution
|
||||
-- traffic - input traffic with the format below, plus the "time" column
|
||||
-- columns - a list of RRD column names
|
||||
--
|
||||
-- traffic format (compatible with rrd_fetch_derivate return value):
|
||||
-- .start start RRD date
|
||||
-- .step RRD hardware step
|
||||
-- .count number of RRD rows
|
||||
-- .data table containing [.count] rows of tables with [columns] format
|
||||
-- epoch_start - the desired epoch start
|
||||
-- epoch_end - the desired end epoch
|
||||
-- resolution - the desired resolution
|
||||
-- start - the raw start date
|
||||
-- rawdata - the raw derived series data
|
||||
-- npoints - number of points in each data serie
|
||||
-- rawstep - rawdata internal step
|
||||
-- date_mode - if true, the resolution is interpreted as if you want dates to be
|
||||
-- separated by this resolution step. This enables daylight checks to
|
||||
-- to adjust actual time intervals, which are computational intensive because of
|
||||
-- Lua dates functions usage.
|
||||
--
|
||||
-- Returns
|
||||
-- On success:
|
||||
-- result table, containing tables with [columns] format, with the specified resolution
|
||||
-- On success: a list of times,
|
||||
-- the data parameter will be modified in place
|
||||
-- On error: a string containing some error message
|
||||
--
|
||||
-- On RRD error:
|
||||
-- result
|
||||
-- .error - with some error message
|
||||
--
|
||||
function rrd_fix_resolution(epoch_start, epoch_end, resolution, traffic, columns)
|
||||
-- BEGIN parameters check
|
||||
if (columns == nil or #columns < 1) then
|
||||
error("No data columns")
|
||||
end
|
||||
epoch_start = tonumber(epoch_start)
|
||||
if (epoch_start == nil or traffic == nil) then
|
||||
error("Parameter error")
|
||||
end
|
||||
local rrd_start = tonumber(traffic.start)
|
||||
local rrd_step = tonumber(traffic.step)
|
||||
local rrd_count = tonumber(traffic.count)
|
||||
local rrd_data = traffic.data
|
||||
local target_resol = tonumber(resolution)
|
||||
if (rrd_start == nil or rrd_step == nil or rrd_count == nil or rrd_data == nil or target_resol == nil) then
|
||||
error("Parameter error")
|
||||
end
|
||||
-- END parameters check
|
||||
function rrd_interval_integrate(epoch_start, epoch_end, resolution, start, rawdata, npoints, rawstep, date_mode)
|
||||
resolution = math.floor(resolution)
|
||||
local orig_resol = resolution
|
||||
|
||||
-- check resolution consinstency
|
||||
if traffic.step > resolution then
|
||||
return {error=i18n('error_rrd_low_resolution', {prefs=ntop.getHttpPrefix().."/lua/admin/prefs.lua?subpage_active=on_disk_rrds"})}
|
||||
if rawstep > resolution then
|
||||
-- TODO i18n should be available
|
||||
--~ return {error=i18n('error_rrd_low_resolution', {prefs=ntop.getHttpPrefix().."/lua/admin/prefs.lua?subpage_active=on_disk_rrds"})}
|
||||
return {error='error_rrd_low_resolution'}
|
||||
end
|
||||
|
||||
-- functions to handle the n-dimensions counters
|
||||
if epoch_start < start then
|
||||
-- TODO i18n
|
||||
return {error='error_rrd_starts_before'}
|
||||
end
|
||||
|
||||
local function set_resolution(prevtime, newtime)
|
||||
if date_mode then
|
||||
-- handle daylight changes
|
||||
resolution = orig_resol
|
||||
local from_dst = timestamp_to_date(prevtime).isdst
|
||||
local to_dst = timestamp_to_date(newtime).isdst
|
||||
if from_dst and not to_dst then
|
||||
resolution = resolution + 3600
|
||||
elseif not from_dst and to_dst then
|
||||
-- 1 to avoid infinite loop
|
||||
resolution = math.max(resolution - 3600, 1)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
-- functions to handle the n-dimensional counters
|
||||
local function create_counters()
|
||||
local counters = {}
|
||||
for i=1,#columns do
|
||||
counters[columns[i]] = 0
|
||||
for sname,sdata in pairs(rawdata) do
|
||||
counters[sname] = 0
|
||||
end
|
||||
return counters
|
||||
end
|
||||
-- iterates the n-dimensions counters and calls the 1-dimension callback, passing current counter value and column
|
||||
|
||||
-- iterates the n-dimensional counters and calls the 1-dimension callback, passing current counter value and column
|
||||
-- callback should return the new counter value
|
||||
local function for_each_counter_do_update(counters, callback)
|
||||
for i=1,#columns do
|
||||
local col = columns[i]
|
||||
counters[col] = callback(counters[col], col)
|
||||
for sname,svalue in pairs(rawdata) do
|
||||
counters[sname] = callback(counters[sname], sname)
|
||||
end
|
||||
end
|
||||
|
||||
-- initialize
|
||||
-- use same table to avoid allocation overhead; Please take care not to cross src_idx with dst_idx
|
||||
local src_idx = 1 -- this will be used to read the original data
|
||||
local dst_idx = 1 -- this will be used to write the integrated data, in order to avoid further allocations
|
||||
local integr_ctrs = create_counters()
|
||||
local idx = 1
|
||||
local time_col = "time"
|
||||
local oldtime = epoch_start
|
||||
local time_sum = epoch_start
|
||||
local result = {[time_col]={}, hwstep=rrd_step}
|
||||
for i=1,#columns do
|
||||
result[columns[i]] = {}
|
||||
|
||||
-- Pre-declare callbacks to improve performance while looping
|
||||
local prefix_slice
|
||||
|
||||
local function c_dump_slice (value, col)
|
||||
-- Save the previous value to avoid overwriting it when (src_idx == dst_idx)
|
||||
local prev_val = rawdata[col][src_idx]
|
||||
-- Calculate the traffic belonging to previous step
|
||||
local prefix_traffic = prefix_slice * rawdata[col][src_idx]
|
||||
|
||||
-- Save into previous step
|
||||
rawdata[col][dst_idx] = value + prefix_traffic
|
||||
|
||||
-- Update the counter with suffix traffic
|
||||
return prev_val - prefix_traffic
|
||||
end
|
||||
|
||||
local function c_accumulate_partial (value, col)
|
||||
return value + rawdata[col][src_idx]
|
||||
end
|
||||
|
||||
local function c_fill_remaining (value, col)
|
||||
rawdata[col][dst_idx] = value
|
||||
|
||||
-- will zero integr_ctrs for next intervals
|
||||
return 0
|
||||
end
|
||||
|
||||
-- special case
|
||||
if rrd_start < epoch_start then
|
||||
local toskip = math.max(math.ceil((epoch_start - rrd_start) / rrd_step), rrd_count)
|
||||
old_time = rrd_start + rrd_step * (toskip-1)
|
||||
local aligment = (1 - (epoch_start - old_time) / rrd_step)
|
||||
local function c_fill_nil (value, col)
|
||||
rawdata[col][dst_idx] = nil
|
||||
end
|
||||
|
||||
-- normalize derived data
|
||||
for sname,sdata in pairs(rawdata) do
|
||||
for t=1, #sdata do
|
||||
sdata[t] = rrd_get_positive_value(sdata[t]) * rawstep
|
||||
end
|
||||
end
|
||||
|
||||
--~ io.write("\n\n")
|
||||
--~ io.write("\nsrc_idx="..src_idx.." dst_idx="..dst_idx.." fstep/res="..rawstep.."/"..resolution.." fstart/dstart="..start.."/"..epoch_start.."\n")
|
||||
|
||||
-- when the was data starts before desired start
|
||||
if start < epoch_start then
|
||||
local toskip = math.min(math.ceil((epoch_start - start) / rawstep), npoints)
|
||||
local prevtime = start + rawstep * (toskip-1)
|
||||
local aligment = (1 - (epoch_start - prevtime) / rawstep)
|
||||
|
||||
-- skip starting rows
|
||||
idx = toskip + 1
|
||||
src_idx = toskip + 1
|
||||
for_each_counter_do_update(integr_ctrs, function (value, col)
|
||||
return value + traffic.data[col][toskip] * aligment
|
||||
return value + rawdata[col][toskip] * aligment
|
||||
end)
|
||||
end
|
||||
|
||||
-- precondition: rrd_start >= time_sum
|
||||
local curtime = rrd_start + (idx-1) * rrd_step
|
||||
local orig_resol = target_resol
|
||||
local curtime = start + (src_idx-1) * rawstep -- goes up with raw steps
|
||||
local times = {}
|
||||
|
||||
local function debug_me()
|
||||
io.write("\nsrc_idx="..src_idx.." dst_idx="..dst_idx.." fstep/res="..rawstep.."/"..resolution.." curtime/dstart="..curtime.."/"..epoch_start.."\n")
|
||||
tprint(rawdata)
|
||||
end
|
||||
|
||||
--~ debug_me()
|
||||
--~ assert(curtime >= epoch_start)
|
||||
--~ assert(src_idx <= dst_idx)
|
||||
|
||||
while idx <= rrd_count and time_sum <= epoch_end do
|
||||
-- handle daylight changes
|
||||
target_resol = orig_resol
|
||||
local from_dst = timestamp_to_date(time_sum).isdst
|
||||
local to_dst = timestamp_to_date(curtime).isdst
|
||||
if from_dst and not to_dst then
|
||||
target_resol = target_resol + 3600
|
||||
elseif not from_dst and to_dst then
|
||||
-- 1 to avoid infinite loop
|
||||
target_resol = math.max(target_resol - 3600, 1)
|
||||
end
|
||||
-- main integration
|
||||
while src_idx <= npoints and time_sum <= epoch_end do
|
||||
set_resolution(time_sum, curtime)
|
||||
|
||||
local tdiff = curtime - time_sum
|
||||
|
||||
if tdiff >= target_resol then
|
||||
local prefix_t = time_sum + target_resol - oldtime
|
||||
if curtime + rawstep >= time_sum + resolution then
|
||||
local prefix_t = time_sum + resolution - curtime
|
||||
--~ tprint(prefix_t)
|
||||
|
||||
-- Calculate the traffic belonging to previous step
|
||||
local prefix_slice = prefix_t / rrd_step
|
||||
local prefix_ctrs = create_counters()
|
||||
-- Set prefix_ctrs with the prefix_slice of traffic
|
||||
for_each_counter_do_update(prefix_ctrs, function (value, col)
|
||||
return traffic.data[col][idx] * prefix_slice
|
||||
end)
|
||||
-- Calculate the time fraction belonging to previous step
|
||||
prefix_slice = prefix_t / rawstep
|
||||
|
||||
-- Sum prefix slice of traffic and save result
|
||||
table.insert(result[time_col], time_sum)
|
||||
for_each_counter_do_update(integr_ctrs, function (value, col)
|
||||
table.insert(result[col], value + prefix_ctrs[col])
|
||||
-- set counters to the remaining slice
|
||||
return traffic.data[col][idx] - prefix_ctrs[col]
|
||||
end)
|
||||
times[dst_idx] = time_sum
|
||||
for_each_counter_do_update(integr_ctrs, c_dump_slice)
|
||||
dst_idx = dst_idx + 1
|
||||
|
||||
time_sum = time_sum + target_resol
|
||||
tdiff = tdiff - prefix_t
|
||||
time_sum = time_sum + resolution
|
||||
else
|
||||
-- Accumulate partial slices of traffic
|
||||
for_each_counter_do_update(integr_ctrs, function (value, col)
|
||||
return value + traffic.data[col][idx]
|
||||
end)
|
||||
for_each_counter_do_update(integr_ctrs, c_accumulate_partial)
|
||||
end
|
||||
|
||||
oldtime = curtime
|
||||
curtime = curtime + rrd_step
|
||||
idx = idx + 1
|
||||
curtime = curtime + rawstep
|
||||
src_idx = src_idx + 1
|
||||
end
|
||||
|
||||
-- case RRD end is before epoch_end
|
||||
while time_sum <= epoch_end do
|
||||
table.insert(result[time_col], time_sum)
|
||||
-- Save integr_ctrs result
|
||||
for_each_counter_do_update(integr_ctrs, function (value, col)
|
||||
table.insert(result[col], value)
|
||||
-- will zero integr_ctrs for next intervals
|
||||
return 0
|
||||
end)
|
||||
time_sum = time_sum + target_resol
|
||||
set_resolution(curtime, time_sum)
|
||||
times[dst_idx] = time_sum
|
||||
--~ assert(dst_idx < src_idx)
|
||||
for_each_counter_do_update(integr_ctrs, c_fill_remaining)
|
||||
dst_idx = dst_idx + 1
|
||||
curtime = time_sum
|
||||
time_sum = time_sum + resolution
|
||||
end
|
||||
|
||||
return result
|
||||
--~ debug_me()
|
||||
|
||||
-- nullify remaining data to free table entries
|
||||
while dst_idx <= npoints do
|
||||
for_each_counter_do_update(integr_ctrs, c_fill_nil)
|
||||
dst_idx = dst_idx + 1
|
||||
end
|
||||
|
||||
return times
|
||||
end
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue