Added top asn timeseries to ASN page

This commit is contained in:
Matteo Biscosi 2025-07-01 11:13:33 +02:00
parent 893d0787eb
commit 55ec9981fd
10 changed files with 541 additions and 409 deletions

View file

@ -23,13 +23,7 @@ local aggregation_to_consolidation = {
[ts_common.aggregation.last] = "LAST"
}
local L4_PROTO_KEYS = {
tcp = 6,
udp = 17,
icmp = 1,
eigrp = 88,
other_ip = -1
}
local L4_PROTO_KEYS = {tcp = 6, udp = 17, icmp = 1, eigrp = 88, other_ip = -1}
-- ##############################################
@ -46,9 +40,7 @@ end
-- ##############################################
function driver:new(options)
local obj = {
base_path = options.base_path
}
local obj = {base_path = options.base_path}
setmetatable(obj, self)
self.__index = self
@ -58,9 +50,7 @@ end
-- ##############################################
function driver:getLatestTimestamp(ifid)
return os.time()
end
function driver:getLatestTimestamp(ifid) return os.time() end
-- ##############################################
@ -73,17 +63,10 @@ local HOST_PREFIX_MAP = {
snmp_if = "snmp:",
host_pool = "pool:"
}
local WILDCARD_TAGS = {
protocol = 1,
category = 1,
l4proto = 1,
dscp_class = 1
}
local WILDCARD_TAGS = {protocol = 1, category = 1, l4proto = 1, dscp_class = 1}
local function get_fname_for_schema(schema, tags)
if schema.options.rrd_fname ~= nil then
return schema.options.rrd_fname
end
if schema.options.rrd_fname ~= nil then return schema.options.rrd_fname end
local last_tag = schema._tags[#schema._tags]
@ -113,11 +96,11 @@ local function schema_get_path(schema, tags)
local host_or_network = nil
local parts = string.split(schema.name, ":")
if ((string.find(schema.name, "iface:") ~= 1) and -- interfaces are only identified by the first tag
(#schema._tags >= 1)) then -- some schema do not have any tag, e.g. "process:*" schemas
local prefix = HOST_PREFIX_MAP[parts[1]] or (parts[1] .. ":")
local suffix = tags[schema._tags[2] or schema._tags[1]] or tags[schema._tags[1]]
local suffix = tags[schema._tags[2] or schema._tags[1]] or
tags[schema._tags[1]]
if (suffix ~= ifid) then
host_or_network = prefix .. suffix
@ -127,7 +110,6 @@ local function schema_get_path(schema, tags)
end
end
-- Some exceptions to avoid conflicts / keep compatibility
if parts[1] == "snmp_if" then
if tags.qos_class_id then
@ -198,9 +180,7 @@ end
function driver.schema_get_full_path(schema, tags)
local base, rrd = schema_get_path(schema, tags)
if ((not base) or (not rrd)) then
return nil
end
if ((not base) or (not rrd)) then return nil end
local full_path = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
@ -224,7 +204,8 @@ local function getConsolidationFunction(schema)
return (aggregation_to_consolidation[fn])
end
traceError(TRACE_ERROR, TRACE_CONSOLE, "unknown aggregation function: %s", fn)
traceError(TRACE_ERROR, TRACE_CONSOLE, "unknown aggregation function: %s",
fn)
return ("AVERAGE")
end
@ -232,7 +213,8 @@ end
-- ##############################################
local function create_rrd(schema, path, timestamp)
local heartbeat = schema.options.rrd_heartbeat or (schema.options.insertion_step * 2)
local heartbeat = schema.options.rrd_heartbeat or
(schema.options.insertion_step * 2)
local rrd_type = type_to_rrdtype[schema.options.metrics_type]
local params = {path, schema.options.insertion_step}
local cf = getConsolidationFunction(schema)
@ -245,22 +227,26 @@ local function create_rrd(schema, path, timestamp)
end
for idx, metric in ipairs(schema._metrics) do
params[#params + 1] = "DS:" .. metric .. ":" .. rrd_type .. ':' .. heartbeat .. ':U:U'
params[#params + 1] = "DS:" .. metric .. ":" .. rrd_type .. ':' ..
heartbeat .. ':U:U'
end
for _, rra in ipairs(schema.retention) do
params[#params + 1] = "RRA:" .. cf .. ":0.5:" .. rra.aggregation_dp .. ":" .. rra.retention_dp
params[#params + 1] = "RRA:" .. cf .. ":0.5:" .. rra.aggregation_dp ..
":" .. rra.retention_dp
end
if use_hwpredict and schema.hwpredict then
-- NOTE: at most one RRA, otherwise rrd_update crashes.
local hwpredict = schema.hwpredict
params[#params + 1] = "RRA:HWPREDICT:" .. hwpredict.row_count .. ":0.1:0.0035:" .. hwpredict.period
params[#params + 1] = "RRA:HWPREDICT:" .. hwpredict.row_count ..
":0.1:0.0035:" .. hwpredict.period
end
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("ntop.rrd_create(%s) schema=%s", table.concat(params, ", "), schema.name))
string.format("ntop.rrd_create(%s) schema=%s",
table.concat(params, ", "), schema.name))
end
local err = ntop.rrd_create(table.unpack(params))
@ -284,12 +270,14 @@ local function number_to_rrd_string(what, schema)
if (what == nil) then
err_msg = "Attempting at converting a nil to number"
elseif (type(what) ~= "number") then
err_msg = "number_to_rrd_string got a non-number argument: " .. type(what)
err_msg = "number_to_rrd_string got a non-number argument: " ..
type(what)
elseif (what ~= what) then
err_msg = "Trying to convert NaN to integer"
elseif (what == math.huge) then
err_msg = "Trying to convert inf to integer"
elseif (math.maxinteger and ((what >= math.maxinteger) or (what <= math.mininteger))) then
elseif (math.maxinteger and
((what >= math.maxinteger) or (what <= math.mininteger))) then
err_msg = "Number out of integers range: " .. what
elseif what == math.floor(what) then
-- If the number has no decimal place, print it as a digit
@ -301,11 +289,12 @@ local function number_to_rrd_string(what, schema)
end
-- Log the error with the schema name
traceError(TRACE_ERROR, TRACE_CONSOLE, string.format("%s [%s]", err_msg, schema_name))
traceError(TRACE_ERROR, TRACE_CONSOLE,
string.format("%s [%s]", err_msg, schema_name))
traceError(TRACE_ERROR, TRACE_CONSOLE, debug.traceback())
tprint(what)
-- tprint(schema)
return ("0")
end
@ -317,7 +306,8 @@ local function update_rrd(schema, rrdfile, timestamp, data)
-- io.write("update_rrd(".. rrdfile ..")\n")
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("Going to update %s [%s]", schema.name, rrdfile))
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format(
"Going to update %s [%s]", schema.name, rrdfile))
end
-- Verify last update time
@ -325,8 +315,10 @@ local function update_rrd(schema, rrdfile, timestamp, data)
if ((last_update ~= nil) and (timestamp <= last_update)) then
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format(
"Skip RRD update in the past: timestamp=%u but last_update=%u", timestamp, last_update))
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format(
"Skip RRD update in the past: timestamp=%u but last_update=%u",
timestamp, last_update))
end
return false
@ -338,7 +330,8 @@ local function update_rrd(schema, rrdfile, timestamp, data)
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("ntop.rrd_update(%s, %s) schema=%s", rrdfile, table.concat(params, ", "), schema.name))
string.format("ntop.rrd_update(%s, %s) schema=%s", rrdfile,
table.concat(params, ", "), schema.name))
end
local err = ntop.rrd_update(rrdfile, table.unpack(params))
@ -359,11 +352,10 @@ function driver:append(schema, timestamp, tags, metrics)
if use_rrd_queue then
if not schema.options.is_critical_ts then
local res = interface.rrd_enqueue(schema.name, timestamp, tags, metrics)
local res = interface.rrd_enqueue(schema.name, timestamp, tags,
metrics)
if not res then
ntop.rrd_inc_num_drops()
end
if not res then ntop.rrd_inc_num_drops() end
return res
end
@ -371,9 +363,7 @@ function driver:append(schema, timestamp, tags, metrics)
if not ntop.notEmptyFile(rrdfile) then
ntop.mkdir(base)
if not create_rrd(schema, rrdfile, timestamp) then
return false
end
if not create_rrd(schema, rrdfile, timestamp) then return false end
end
return update_rrd(schema, rrdfile, timestamp, metrics)
@ -388,13 +378,9 @@ local function makeTotalSerie(series, count)
for i, val in pairs(serie.data) do
local val_is_nan = (val ~= val)
if (total[i] == nil) then
total[i] = 0
end
if (total[i] == nil) then total[i] = 0 end
if (not val_is_nan) then
total[i] = total[i] + val
end
if (not val_is_nan) then total[i] = total[i] + val end
end
end
@ -403,7 +389,8 @@ end
-- ##############################################
local function sampleSeries(schema, cur_points, step, max_points, series, consolidation)
local function sampleSeries(schema, cur_points, step, max_points, series,
consolidation)
local sampled_dp = math.floor(cur_points / max_points + 0.5)
local count = nil
local nan = 0 / 0
@ -428,9 +415,7 @@ local function sampleSeries(schema, cur_points, step, max_points, series, consol
sum = sum + dp
num = num + 1
if (dp > max_val) then
max_val = dp
end
if (dp > max_val) then max_val = dp end
if num == sampled_dp then
-- A data group is ready
@ -457,9 +442,7 @@ local function sampleSeries(schema, cur_points, step, max_points, series, consol
-- Last group
if num > 0 then
if all_nan then
sum = nan
end
if all_nan then sum = nan end
aggregated_serie[end_idx] = sum / num
end_idx = end_idx + 1
@ -468,9 +451,7 @@ local function sampleSeries(schema, cur_points, step, max_points, series, consol
count = end_idx - 1
-- remove the exceeding points
for i = end_idx, #aggregated_serie do
aggregated_serie[i] = nil
end
for i = end_idx, #aggregated_serie do aggregated_serie[i] = nil end
data_serie.data = aggregated_serie
end
@ -494,8 +475,9 @@ local function touchRRD(rrdname)
local tdiff = now - 1800 -- This avoids to set the update continuously
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("touchRRD(%s, %u), last_update was %u", rrdname, tdiff, last))
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format(
"touchRRD(%s, %u), last_update was %u", rrdname,
tdiff, last))
tprint(debug.traceback())
end
@ -515,12 +497,11 @@ end
function driver:timeseries_query(options)
local base, rrd = schema_get_path(options.schema_info, options.tags)
local rrdfile = options.rrdfile or os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
local rrdfile = options.rrdfile or
os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
local res = nil
if not ntop.notEmptyFile(rrdfile) then
return res
end
if not ntop.notEmptyFile(rrdfile) then return res end
touchRRD(rrdfile)
@ -530,7 +511,8 @@ function driver:timeseries_query(options)
-- Avoid reporting the last point when the timeseries write has not completed
-- yet. Use 2*step as a bound.
if ((options.epoch_end > last_update) and
((options.epoch_end - last_update) <= 2 * options.schema_info.options.step)) then
((options.epoch_end - last_update) <= 2 *
options.schema_info.options.step)) then
options.epoch_end = last_update
end
@ -539,12 +521,10 @@ function driver:timeseries_query(options)
-- Query rrd to get the data
-- tprint("rrdtool fetch ".. rrdfile.. " " .. getConsolidationFunction(schema) .. " -s ".. tstart .. " -e " .. tend)
local consolidation = getConsolidationFunction(options.schema_info)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, consolidation, epoch_begin,
epoch_end)
local fstart, fstep, fdata, fend, fcount, names =
ntop.rrd_fetch_columns(rrdfile, consolidation, epoch_begin, epoch_end)
if fdata == nil then
return res
end
if fdata == nil then return res end
local count = 0
local series = {}
@ -557,7 +537,8 @@ function driver:timeseries_query(options)
-- Process the series requested, adding statistics if requested (min, max, tot, ...)
-- and normalize the data if needed
local max_val = ts_common.getMaxPointValue(options.schema_info, name, options.tags)
local max_val = ts_common.getMaxPointValue(options.schema_info, name,
options.tags)
local fdata_name = names[serie_idx]
local serie = fdata[fdata_name]
@ -567,9 +548,7 @@ function driver:timeseries_query(options)
for i, v in pairs(serie) do
-- Keep track of NaN points
if v == v then
num_not_nan_pts = num_not_nan_pts + 1
end
if v == v then num_not_nan_pts = num_not_nan_pts + 1 end
end
-- Normalize the value
@ -583,15 +562,14 @@ function driver:timeseries_query(options)
count = count + 1
end
series[#series + 1] = {
data = modified_serie,
id = name
}
series[#series + 1] = {data = modified_serie, id = name}
end
if count > options.max_num_points then
sampled_fstep, count, series = sampleSeries(options.schema_info, count, fstep, options.max_num_points, series,
consolidation)
sampled_fstep, count, series = sampleSeries(options.schema_info, count,
fstep,
options.max_num_points,
series, consolidation)
end
-- Need to re-iterate all the series, otherwise the stats cannot be
@ -600,8 +578,12 @@ function driver:timeseries_query(options)
-- Add statistics if requested, by default yes
if options.calculate_stats then
-- tprint("Schema :" .. series[i].id)
local s = ts_common.calculateStatistics(series[i].data, sampled_fstep,
options.schema_info.options.keep_total, options.schema_info.options.metrics_type)
local s = ts_common.calculateStatistics(series[i].data,
sampled_fstep,
options.schema_info.options
.keep_total,
options.schema_info.options
.metrics_type)
local min_max = ts_common.calculateMinMax(data_serie.data)
local statistics = table.merge(s, min_max)
series[i].statistics = statistics
@ -629,9 +611,7 @@ function driver:query(schema, tstart, tend, tags, options)
local base, rrd = schema_get_path(schema, tags)
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
if not ntop.notEmptyFile(rrdfile) then
return nil
end
if not ntop.notEmptyFile(rrdfile) then return nil end
touchRRD(rrdfile)
@ -639,23 +619,25 @@ function driver:query(schema, tstart, tend, tags, options)
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("RRD_FETCH schema=%s %s -> (%s): last_update=%u", schema.name, table.tconcat(tags, "=", ","),
rrdfile, last_update))
string.format(
"RRD_FETCH schema=%s %s -> (%s): last_update=%u",
schema.name, table.tconcat(tags, "=", ","), rrdfile,
last_update))
end
-- Avoid reporting the last point when the timeseries write has not completed
-- yet. Use 2*step as a bound.
if ((tend > last_update) and ((tend - last_update) <= 2 * schema.options.step)) then
if ((tend > last_update) and
((tend - last_update) <= 2 * schema.options.step)) then
tend = last_update
end
-- tprint("rrdtool fetch ".. rrdfile.. " " .. getConsolidationFunction(schema) .. " -s ".. tstart .. " -e " .. tend)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart, tend)
local fstart, fstep, fdata, fend, fcount, names =
ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart, tend)
if fdata == nil then
return nil
end
if fdata == nil then return nil end
local count = 0
local series = {}
@ -676,9 +658,7 @@ function driver:query(schema, tstart, tend, tags, options)
local value = ts_common.normalizeVal(v, max_val, options)
local val_is_nan = (v ~= v)
if options.keep_nan == true and val_is_nan then
value = v
end
if options.keep_nan == true and val_is_nan then value = v end
serie[i] = value
count = count + 1
@ -688,10 +668,7 @@ function driver:query(schema, tstart, tend, tags, options)
serie[#serie] = nil
count = count - 1
series[serie_idx] = {
label = name,
data = serie
}
series[serie_idx] = {label = name, data = serie}
end
-- table.clone needed as series can be modified below (sampleSeries works on it in-place)
@ -700,7 +677,8 @@ function driver:query(schema, tstart, tend, tags, options)
local unsampled_fstep = fstep
if count > options.max_num_points then
sampled_fstep, count = sampleSeries(schema, count, fstep, options.max_num_points, series)
sampled_fstep, count = sampleSeries(schema, count, fstep,
options.max_num_points, series)
else
sampled_fstep = fstep
end
@ -718,14 +696,18 @@ function driver:query(schema, tstart, tend, tags, options)
total_series = makeTotalSerie(series, count)
-- statistics used in report page
local total_series = makeTotalSerie(series, count)
stats = ts_common.calculateStatistics(total_series, sampled_fstep, tend - tstart, schema.options.metrics_type)
stats = ts_common.calculateStatistics(total_series, sampled_fstep,
tend - tstart,
schema.options.metrics_type)
stats = stats or {}
stats.by_serie = {}
-- Also calculate per-series statistics
for k, v in pairs(series) do
local s = ts_common.calculateStatistics(v.data, sampled_fstep, tend - tstart, schema.options.metrics_type)
local s = ts_common.calculateStatistics(v.data, sampled_fstep,
tend - tstart,
schema.options.metrics_type)
local min_max = ts_common.calculateMinMax(v.data)
-- Adding per timeseries min-max stats
@ -735,8 +717,10 @@ function driver:query(schema, tstart, tend, tags, options)
if options.initial_point then
local serie_idx = 1
local _, _, initial_pt, _, _, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart - schema.options.step, tstart - schema.options.step)
local _, _, initial_pt, _, _, names =
ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart - schema.options.step,
tstart - schema.options.step)
initial_pt = initial_pt or {}
for name_key, values in pairs(initial_pt) do
@ -772,9 +756,7 @@ function driver:query(schema, tstart, tend, tags, options)
count = count,
series = series,
statistics = stats,
additional_series = {
total = total_series
}
additional_series = {total = total_series}
}
end
@ -783,15 +765,14 @@ end
-- *Limitation*
-- tags_filter is expected to contain all the tags of the schema except the last
-- one. For such tag, a list of available values will be returned.
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, not_print_error)
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time,
not_print_error)
if #wildcard_tags > 1 then
if not not_print_error then
tprint(debug.traceback())
tprint({
schema_name = schema.name,
wildcards = wildcard_tags
})
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support listSeries on multiple tags")
tprint({schema_name = schema.name, wildcards = wildcard_tags})
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver does not support listSeries on multiple tags")
end
return nil
@ -812,25 +793,25 @@ function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, not_p
if wildcard_tag ~= schema._tags[#schema._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver only support listSeries with wildcard in the last tag, got wildcard on '" .. wildcard_tag .. "'")
"RRD driver only support listSeries with wildcard in the last tag, got wildcard on '" ..
wildcard_tag .. "'")
return nil
end
local base, rrd = schema_get_path(schema, table.merge(tags_filter, {
[wildcard_tag] = ""
}))
local base, rrd = schema_get_path(schema, table.merge(tags_filter,
{[wildcard_tag] = ""}))
local files = ntop.readdir(base)
local res = {}
-- tprint(base)
-- tprint(files)
for f in pairs(files or {}) do
local v = string.split(f, "%.rrd")
local fpath = base .. "/" .. f
if ((v ~= nil) and (#v == 1)) then
local last_update = ntop.rrd_lastupdate(fpath)
if last_update ~= nil and start_time ~= nil and last_update >= start_time then
if last_update ~= nil and start_time ~= nil and last_update >=
start_time then
local value = v[1]
local toadd = false
@ -853,9 +834,7 @@ function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, not_p
-- 3.if_index string cpu_states <<<<<<<<<<<<<<<<< don't know why this happens to be here
-- 4 table
--
if tonumber(value) then
toadd = true
end
if tonumber(value) then toadd = true end
elseif wildcard_tag == "dscp_class" then
toadd = true
elseif wildcard_tag == "l4proto" then
@ -863,26 +842,31 @@ function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, not_p
toadd = true
end
elseif ((wildcard_tag ~= "protocol") or
((L4_PROTO_KEYS[value] == nil) and interface.getnDPIProtoId(value) ~= 0)) and
((wildcard_tag ~= "category") or (interface.getnDPICategoryId(value) ~= -1)) then
((L4_PROTO_KEYS[value] == nil) and
interface.getnDPIProtoId(value) ~= 0)) and
((wildcard_tag ~= "category") or
(interface.getnDPICategoryId(value) ~= -1)) then
toadd = true
end
if toadd then
res[#res + 1] = table.merge(tags_filter, {
[wildcard_tag] = value
})
res[#res + 1] = table.merge(tags_filter,
{[wildcard_tag] = value})
end
end
elseif ntop.isdir(fpath) then
fpath = fpath .. "/" .. rrd .. ".rrd"
files = ntop.getAllPaths(fpath, rrd .. ".rrd")
for f in pairs(files or {}) do
local last_update = ntop.rrd_lastupdate(f)
local last_update = ntop.rrd_lastupdate(fpath)
if last_update ~= nil and last_update >= start_time then
res[#res + 1] = table.merge(tags_filter, {
[wildcard_tag] = f
})
if last_update ~= nil and last_update >= start_time then
-- remove the path
local val = string.gsub(f, base, "")
val = string.gsub(val, rrd .. ".rrd", "")
val = string.gsub(val, "/", "")
res[#res + 1] = table.merge(tags_filter,
{[wildcard_tag] = val})
end
end
end
end
@ -894,7 +878,8 @@ end
function driver:timeseries_top(options, top_tags)
if #top_tags > 1 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver does not support topk on multiple tags")
return nil
end
@ -902,14 +887,14 @@ function driver:timeseries_top(options, top_tags)
if top_tag ~= options.schema_info._tags[#options.schema_info._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
"RRD driver only support topk with topk tag in the last tag, got topk on '" ..
(top_tag or "") .. "'")
return nil
end
local series = driver:listSeries(options.schema_info, options.tags, top_tags, options.epoch_begin)
if not series then
return nil
end
local series = driver:listSeries(options.schema_info, options.tags,
top_tags, options.epoch_begin)
if not series then return nil end
local available_items = {}
local available_tags = {}
@ -921,7 +906,8 @@ function driver:timeseries_top(options, top_tags)
-- Quering all the different schema and unify all the data
for _, serie_tags in pairs(series) do
local rrdfile = driver.schema_get_full_path(options.schema_info, serie_tags)
local rrdfile = driver.schema_get_full_path(options.schema_info,
serie_tags)
options.rrdfile = rrdfile
local options_merged = options
options_merged.tags = serie_tags
@ -990,23 +976,34 @@ function driver:timeseries_top(options, top_tags)
for top_item, value in pairsByValues(available_items, rev) do
if value > 0 then
package.path = dirs.installdir .. "/pro/scripts/lua/modules/?.lua;" .. package.path
package.path =
dirs.installdir .. "/pro/scripts/lua/modules/?.lua;" ..
package.path
local snmp_utils = require "snmp_utils"
local snmp_cached_dev = require "snmp_cached_dev"
local cached_device = snmp_cached_dev:create(options.tags.device)
local ifindex = available_tags[top_item][1].if_index or available_tags[top_item][1].port
local ifindex = available_tags[top_item][1].if_index or
available_tags[top_item][1].port
local ext_label = nil
if cached_device then
ext_label = snmp_utils.get_snmp_interface_label(cached_device["interfaces"][ifindex])
end
if isEmptyString(ext_label) then
ext_label = ifindex
ext_label = snmp_utils.get_snmp_interface_label(
cached_device["interfaces"][ifindex])
end
if isEmptyString(ext_label) then ext_label = ifindex end
-- Special case, top protocol timeseries, here the ext_label needs to be the protocol
if available_tags[top_item][1].protocol then
ext_label = top_item
end
if available_tags[top_item][1].asn then
local info = interface.getASInfo(tonumber(top_item))
if info and not isEmptyString(info.asname) then
ext_label = info.asname
else
ext_label = top_item
end
end
count = table.len(available_series[top_item].data)
top_series[#top_series + 1] = {
data = available_series[top_item].data,
@ -1018,9 +1015,7 @@ function driver:timeseries_top(options, top_tags)
}
end
if #top_series >= options.top then
break
end
if #top_series >= options.top then break end
end
return {
@ -1040,7 +1035,8 @@ end
function driver:topk(schema, tags, tstart, tend, options, top_tags)
if #top_tags > 1 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver does not support topk on multiple tags")
return nil
end
@ -1048,14 +1044,13 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
if top_tag ~= schema._tags[#schema._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
"RRD driver only support topk with topk tag in the last tag, got topk on '" ..
(top_tag or "") .. "'")
return nil
end
local series = driver:listSeries(schema, tags, top_tags, tstart, tend)
if not series then
return nil
end
if not series then return nil end
local items = {}
local tag_2_series = {}
@ -1065,27 +1060,27 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
local query_start = tstart
local cf = getConsolidationFunction(schema)
if options.initial_point then
query_start = tstart - schema.options.step
end
if options.initial_point then query_start = tstart - schema.options.step end
for _, serie_tags in pairs(series) do
local rrdfile = driver.schema_get_full_path(schema, serie_tags)
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("RRD_FETCH[topk] schema=%s %s[%s] -> (%s): last_update=%u", schema.name,
table.tconcat(tags, "=", ","), table.concat(top_tags, ","), rrdfile, ntop.rrd_lastupdate(rrdfile)))
string.format(
"RRD_FETCH[topk] schema=%s %s[%s] -> (%s): last_update=%u",
schema.name, table.tconcat(tags, "=", ","),
table.concat(top_tags, ","), rrdfile,
ntop.rrd_lastupdate(rrdfile)))
end
touchRRD(rrdfile)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, cf, query_start, tend)
local fstart, fstep, fdata, fend, fcount, names =
ntop.rrd_fetch_columns(rrdfile, cf, query_start, tend)
local sum = 0
if fdata == nil then
goto continue
end
if fdata == nil then goto continue end
step = fstep
@ -1145,9 +1140,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
}
end
if #topk >= options.top then
break
end
if #topk >= options.top then break end
end
local stats = nil
@ -1160,12 +1153,13 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
table.remove(total_series, 1)
end
local fstep, count = sampleSeries(schema, #augumented_total, step, options.max_num_points, {{
data = augumented_total
}})
local fstep, count = sampleSeries(schema, #augumented_total, step,
options.max_num_points,
{{data = augumented_total}})
if options.calculate_stats then
stats = ts_common.calculateStatistics(total_series, step, tend - tstart, schema.options.metrics_type)
stats = ts_common.calculateStatistics(total_series, step, tend - tstart,
schema.options.metrics_type)
stats = table.merge(stats, ts_common.calculateMinMax(augumented_total))
end
@ -1176,9 +1170,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
return {
topk = topk,
additional_series = {
total = augumented_total
},
additional_series = {total = augumented_total},
statistics = stats
}
@ -1188,14 +1180,13 @@ end
function driver:queryTotal(schema, tstart, tend, tags, options)
local rrdfile = driver.schema_get_full_path(schema, tags)
if not rrdfile or not ntop.notEmptyFile(rrdfile) then
return nil
end
if not rrdfile or not ntop.notEmptyFile(rrdfile) then return nil end
touchRRD(rrdfile)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart, tend)
local fstart, fstep, fdata, fend, fcount, names =
ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
tstart, tend)
local totals = {}
local serie_idx = 0
@ -1215,9 +1206,7 @@ function driver:queryTotal(schema, tstart, tend, tags, options)
local v = ts_common.normalizeVal(v, max_val, options)
-- v is not null
if v == v then
sum = sum + v * fstep
end
if v == v then sum = sum + v * fstep end
end
totals[name] = sum
@ -1229,15 +1218,16 @@ end
-- ##############################################
function driver:queryLastValues(options)
local rrdfile = driver.schema_get_full_path(options.schema_info, options.tags)
if not rrdfile or not ntop.notEmptyFile(rrdfile) then
return nil
end
local rrdfile = driver.schema_get_full_path(options.schema_info,
options.tags)
if not rrdfile or not ntop.notEmptyFile(rrdfile) then return nil end
touchRRD(rrdfile)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(
options.schema_info), options.epoch_begin, options.epoch_end)
local fstart, fstep, fdata, fend, fcount, names =
ntop.rrd_fetch_columns(rrdfile,
getConsolidationFunction(options.schema_info),
options.epoch_begin, options.epoch_end)
local last_values = {}
local serie_idx = 0
@ -1247,7 +1237,8 @@ function driver:queryLastValues(options)
local fdata_name = names[serie_idx]
local serie = fdata[fdata_name]
local max_val = ts_common.getMaxPointValue(options.schema_info, name, options.tags)
local max_val = ts_common.getMaxPointValue(options.schema_info, name,
options.tags)
-- Remove the last value: RRD seems to give an additional point
serie[#serie] = nil
@ -1255,12 +1246,11 @@ function driver:queryLastValues(options)
local values = {}
-- Start from the last series item - 1 because RRD seems to give an additional point
for i = (#serie - 1), 1, -1 do
if (#values == options.num_points) then
break
end
if (#values == options.num_points) then break end
-- nan check
if serie[i] == serie[i] then
values[#values + 1] = ts_common.normalizeVal(serie[i], max_val, options)
values[#values + 1] = ts_common.normalizeVal(serie[i], max_val,
options)
end
end
last_values[name] = values
@ -1274,7 +1264,8 @@ local function deleteAllData(ifid)
local paths = getRRDPaths()
for _, path in pairs(paths) do
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" .. path .. "/")
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" ..
path .. "/")
local path_to_del = os_utils.fixPath(ifpath)
if ntop.exists(path_to_del) and not ntop.rmdir(path_to_del) then
@ -1314,7 +1305,8 @@ function driver:delete(schema_prefix, tags)
local s_prefix = schema_prefix .. ""
if (num_valorized_tags < 1) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "At least 1 tags must be specified for the delete operation")
traceError(TRACE_ERROR, TRACE_CONSOLE,
"At least 1 tags must be specified for the delete operation")
return false
end
@ -1347,16 +1339,15 @@ function driver:delete(schema_prefix, tags)
local path = schema_get_path(s, check_tags)
-- Choose the shortest string to pick the path that includes the others
if path and ((found == nil) or (string.len(path) < string.len(found))) then
if path and
((found == nil) or (string.len(path) < string.len(found))) then
found = path
end
end
end
end
if (not found) then
return false
end
if (not found) then return false end
local path_to_del = os_utils.fixPath(found)
@ -1365,8 +1356,9 @@ function driver:delete(schema_prefix, tags)
end
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("DELETE schema=%s, %s => %s", schema_prefix,
table.tconcat(tags, "=", ","), path_to_del))
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("DELETE schema=%s, %s => %s", schema_prefix,
table.tconcat(tags, "=", ","), path_to_del))
end
return true
@ -1381,11 +1373,13 @@ function driver:deleteOldData(ifid)
local retention_days = data_retention_utils.getTSAndStatsDataRetentionDays()
for _, path in pairs(paths) do
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" .. path .. "/")
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" ..
path .. "/")
local deadline = retention_days * 86400
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("ntop.deleteOldRRDs(%s, %u)", ifpath, deadline))
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format(
"ntop.deleteOldRRDs(%s, %u)", ifpath, deadline))
end
ntop.deleteOldRRDs(ifpath, deadline)
@ -1396,9 +1390,7 @@ end
-- ##############################################
function driver:setup(ts_utils)
return true
end
function driver:setup(ts_utils) return true end
-- ##############################################
@ -1421,7 +1413,8 @@ local function line_protocol_to_tags_and_metrics(protocol_line)
-- local test_line2 = "iface:traffic_rxtx,ifid=0 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
-- local test_line = "iface:traffic_rxtx,ifid=0,ndpi_category=My Category,ndpi_proto=Apple iTunes,host=1.2.3.4 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
--
local measurement_and_tag_set, field_set, timestamp = protocol_line:match("(.+)%s(.+)%s(.+)\n")
local measurement_and_tag_set, field_set, timestamp =
protocol_line:match("(.+)%s(.+)%s(.+)\n")
local measurement
local tags = {}
@ -1486,9 +1479,7 @@ function driver:export()
for cur_dequeue = 1, rrd_queue_max_dequeues_per_interface do
local ts_point = interface.rrd_dequeue(tonumber(cur_ifid))
if not ts_point then
break
end
if not ts_point then break end
local parsed_ts_point = line_protocol_to_tags_and_metrics(ts_point)