mirror of
https://github.com/ntop/ntopng.git
synced 2026-04-30 07:59:35 +00:00
1511 lines
48 KiB
Lua
1511 lines
48 KiB
Lua
--
|
|
-- (C) 2020-24 - ntop.org
|
|
--
|
|
local driver = {}
|
|
|
|
local os_utils = require("os_utils")
|
|
local ts_common = require("ts_common")
|
|
|
|
require("rrd_paths")
|
|
|
|
local use_hwpredict = false
|
|
local use_rrd_queue = true
|
|
|
|
local type_to_rrdtype = {
|
|
[ts_common.metrics.counter] = "DERIVE",
|
|
[ts_common.metrics.gauge] = "GAUGE"
|
|
}
|
|
|
|
local aggregation_to_consolidation = {
|
|
[ts_common.aggregation.mean] = "AVERAGE",
|
|
[ts_common.aggregation.max] = "MAX",
|
|
[ts_common.aggregation.min] = "MIN",
|
|
[ts_common.aggregation.last] = "LAST"
|
|
}
|
|
|
|
local L4_PROTO_KEYS = {
|
|
tcp = 6,
|
|
udp = 17,
|
|
icmp = 1,
|
|
eigrp = 88,
|
|
other_ip = -1
|
|
}
|
|
|
|
-- ##############################################
|
|
|
|
local debug_enabled = nil
|
|
local function isDebugEnabled()
|
|
if debug_enabled == nil then
|
|
-- cache it
|
|
debug_enabled = (ntop.getPref("ntopng.prefs.rrd_debug_enabled") == "1")
|
|
end
|
|
|
|
return (debug_enabled)
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:new(options)
|
|
local obj = {
|
|
base_path = options.base_path
|
|
}
|
|
|
|
setmetatable(obj, self)
|
|
self.__index = self
|
|
|
|
return obj
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:getLatestTimestamp(ifid)
|
|
return os.time()
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
-- Maps second tag name to getRRDName
|
|
local HOST_PREFIX_MAP = {
|
|
host = "",
|
|
mac = "",
|
|
subnet = "net:",
|
|
flowdev_port = "flow_device:",
|
|
snmp_if = "snmp:",
|
|
host_pool = "pool:"
|
|
}
|
|
local WILDCARD_TAGS = {
|
|
protocol = 1,
|
|
category = 1,
|
|
l4proto = 1,
|
|
dscp_class = 1
|
|
}
|
|
|
|
local function get_fname_for_schema(schema, tags)
|
|
if schema.options.rrd_fname ~= nil then
|
|
return schema.options.rrd_fname
|
|
end
|
|
|
|
local last_tag = schema._tags[#schema._tags]
|
|
|
|
if WILDCARD_TAGS[last_tag] then
|
|
-- return the last defined tag
|
|
return tags[last_tag]
|
|
end
|
|
|
|
-- e.g. host:contacts -> contacts
|
|
local suffix = string.split(schema.name, ":")[2]
|
|
return suffix
|
|
end
|
|
|
|
local function schema_get_path(schema, tags)
|
|
local parts = {schema.name}
|
|
local suffix = ""
|
|
local rrd
|
|
|
|
if isEmptyString(schema.name) then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "Empty schema requested.")
|
|
tprint(debug.traceback())
|
|
return
|
|
end
|
|
|
|
-- ifid is mandatory here
|
|
local ifid = tags.ifid or -1
|
|
local host_or_network = nil
|
|
local parts = string.split(schema.name, ":")
|
|
|
|
if ((string.find(schema.name, "iface:") ~= 1) and -- interfaces are only identified by the first tag
|
|
(#schema._tags >= 1)) then -- some schema do not have any tag, e.g. "process:*" schemas
|
|
local prefix = HOST_PREFIX_MAP[parts[1]] or (parts[1] .. ":")
|
|
local suffix = tags[schema._tags[2] or schema._tags[1]] or tags[schema._tags[1]]
|
|
|
|
if (suffix ~= ifid) then
|
|
host_or_network = prefix .. suffix
|
|
else
|
|
-- Avoid repeating the ifid suffix in the path
|
|
host_or_network = prefix .. ""
|
|
end
|
|
end
|
|
|
|
-- Some exceptions to avoid conflicts / keep compatibility
|
|
if parts[1] == "snmp_if" then
|
|
if tags.qos_class_id then
|
|
suffix = tags.if_index .. "/" .. tags.qos_class_id .. "/"
|
|
else
|
|
suffix = tags.if_index .. "/"
|
|
end
|
|
elseif (parts[1] == "flowdev_port") then
|
|
if (tags.port) then
|
|
if (type(tags.port) == "table") then
|
|
suffix = tags.port.ifindex .. "/"
|
|
else
|
|
suffix = tags.port .. "/"
|
|
end
|
|
elseif (tags.ifid) then
|
|
suffix = tags.ifid .. "/"
|
|
else
|
|
suffix = tags.port.ifindex .. "/"
|
|
end
|
|
elseif #schema._tags >= 3 then
|
|
local intermediate_tags = {}
|
|
|
|
-- tag1:ifid
|
|
-- tag2:already handled as host_or_network
|
|
-- last tag must be handled separately
|
|
for i = 3, #schema._tags - 1 do
|
|
intermediate_tags[#intermediate_tags + 1] = tags[schema._tags[i]]
|
|
end
|
|
|
|
local last_tag = schema._tags[#schema._tags]
|
|
|
|
if (not WILDCARD_TAGS[last_tag]) then
|
|
intermediate_tags[#intermediate_tags + 1] = tags[last_tag]
|
|
end
|
|
|
|
if intermediate_tags[1] ~= nil then
|
|
-- All the intermediate tags should be mapped in the path
|
|
suffix = table.concat(intermediate_tags, "/") .. "/"
|
|
end
|
|
|
|
if parts[2] == "ndpi_categories" then
|
|
suffix = suffix .. "ndpi_categories/"
|
|
elseif parts[2] == "ndpi_flows" then
|
|
suffix = suffix .. "ndpi_flows/"
|
|
elseif parts[2] == "l4protos" then
|
|
suffix = suffix .. "l4protos/"
|
|
elseif parts[2] == "dscp" then
|
|
suffix = suffix .. "dscp/"
|
|
end
|
|
|
|
elseif parts[2] == "ndpi_categories" then
|
|
suffix = "ndpi_categories/"
|
|
elseif parts[2] == "ndpi_flows" then
|
|
suffix = "ndpi_flows/"
|
|
elseif parts[2] == "l4protos" then
|
|
suffix = "l4protos/"
|
|
elseif parts[2] == "dscp" then
|
|
suffix = "dscp/"
|
|
end
|
|
|
|
local path = getRRDName(ifid, host_or_network) .. suffix
|
|
local rrd = get_fname_for_schema(schema, tags)
|
|
|
|
return path, rrd
|
|
end
|
|
|
|
function driver.schema_get_full_path(schema, tags)
|
|
local base, rrd = schema_get_path(schema, tags)
|
|
|
|
if ((not base) or (not rrd)) then
|
|
return nil
|
|
end
|
|
|
|
local full_path = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
|
|
|
|
return full_path
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function getRRAParameters(step, resolution, retention_time)
|
|
local aggregation_dp = math.ceil(resolution / step)
|
|
local retention_dp = math.ceil(retention_time / resolution)
|
|
return aggregation_dp, retention_dp
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function getConsolidationFunction(schema)
|
|
local fn = schema:getAggregationFunction()
|
|
|
|
if (aggregation_to_consolidation[fn] ~= nil) then
|
|
return (aggregation_to_consolidation[fn])
|
|
end
|
|
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "unknown aggregation function: %s", fn)
|
|
|
|
return ("AVERAGE")
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function create_rrd(schema, path, timestamp)
|
|
local heartbeat = schema.options.rrd_heartbeat or (schema.options.insertion_step * 2)
|
|
local rrd_type = type_to_rrdtype[schema.options.metrics_type]
|
|
local params = {path, schema.options.insertion_step}
|
|
local cf = getConsolidationFunction(schema)
|
|
|
|
if (timestamp ~= nil) then
|
|
-- RRD start time (--start/-b)
|
|
-- It must be tuned so that the first point of the chart in the subsequent
|
|
-- rrd_update will not be discarded
|
|
params[#params + 1] = timestamp - schema.options.insertion_step
|
|
end
|
|
|
|
for idx, metric in ipairs(schema._metrics) do
|
|
params[#params + 1] = "DS:" .. metric .. ":" .. rrd_type .. ':' .. heartbeat .. ':U:U'
|
|
end
|
|
|
|
for _, rra in ipairs(schema.retention) do
|
|
params[#params + 1] = "RRA:" .. cf .. ":0.5:" .. rra.aggregation_dp .. ":" .. rra.retention_dp
|
|
end
|
|
|
|
if use_hwpredict and schema.hwpredict then
|
|
-- NOTE: at most one RRA, otherwise rrd_update crashes.
|
|
local hwpredict = schema.hwpredict
|
|
params[#params + 1] = "RRA:HWPREDICT:" .. hwpredict.row_count .. ":0.1:0.0035:" .. hwpredict.period
|
|
end
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE,
|
|
string.format("ntop.rrd_create(%s) schema=%s", table.concat(params, ", "), schema.name))
|
|
end
|
|
|
|
local err = ntop.rrd_create(table.unpack(params))
|
|
if (err ~= nil) then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
|
|
return false
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- ###############################################
|
|
|
|
-- Converts a number (either decimal or integer) to a string
|
|
-- in a format which is friendly for the subsequent call to rrd_update
|
|
local function number_to_rrd_string(what, schema)
|
|
local schema_name = schema and schema.name or 'unknown'
|
|
local err_msg = ''
|
|
what = tonumber(what)
|
|
|
|
if (what == nil) then
|
|
err_msg = "Attempting at converting a nil to number"
|
|
elseif (type(what) ~= "number") then
|
|
err_msg = "number_to_rrd_string got a non-number argument: " .. type(what)
|
|
elseif (what ~= what) then
|
|
err_msg = "Trying to convert NaN to integer"
|
|
elseif (what == math.huge) then
|
|
err_msg = "Trying to convert inf to integer"
|
|
elseif (math.maxinteger and ((what >= math.maxinteger) or (what <= math.mininteger))) then
|
|
err_msg = "Number out of integers range: " .. what
|
|
elseif what == math.floor(what) then
|
|
-- If the number has no decimal place, print it as a digit
|
|
return (string.format("%d", what))
|
|
else
|
|
-- If the number has decimal places, print it as a float
|
|
-- (don't touch the precision, let's the rrd do this job if necessary)
|
|
return (string.format("%f", what))
|
|
end
|
|
|
|
-- Log the error with the schema name
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, string.format("%s [%s]", err_msg, schema_name))
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, debug.traceback())
|
|
tprint(what)
|
|
tprint(schema)
|
|
|
|
return ("0")
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function update_rrd(schema, rrdfile, timestamp, data)
|
|
local params = {number_to_rrd_string(timestamp, schema)}
|
|
|
|
-- io.write("update_rrd(".. rrdfile ..")\n")
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("Going to update %s [%s]", schema.name, rrdfile))
|
|
end
|
|
|
|
-- Verify last update time
|
|
local last_update = ntop.rrd_lastupdate(rrdfile)
|
|
|
|
if ((last_update ~= nil) and (timestamp <= last_update)) then
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format(
|
|
"Skip RRD update in the past: timestamp=%u but last_update=%u", timestamp, last_update))
|
|
end
|
|
|
|
return false
|
|
end
|
|
|
|
for _, metric in ipairs(schema._metrics) do
|
|
params[#params + 1] = number_to_rrd_string(data[metric], schema)
|
|
end
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE,
|
|
string.format("ntop.rrd_update(%s, %s) schema=%s", rrdfile, table.concat(params, ", "), schema.name))
|
|
end
|
|
|
|
local err = ntop.rrd_update(rrdfile, table.unpack(params))
|
|
|
|
if (err ~= nil) then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
|
|
return false
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:append(schema, timestamp, tags, metrics)
|
|
local base, rrd = schema_get_path(schema, tags)
|
|
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
|
|
|
|
if use_rrd_queue then
|
|
if not schema.options.is_critical_ts then
|
|
local res = interface.rrd_enqueue(schema.name, timestamp, tags, metrics)
|
|
|
|
if not res then
|
|
ntop.rrd_inc_num_drops()
|
|
end
|
|
|
|
return res
|
|
end
|
|
end
|
|
|
|
if not ntop.notEmptyFile(rrdfile) then
|
|
ntop.mkdir(base)
|
|
if not create_rrd(schema, rrdfile, timestamp) then
|
|
return false
|
|
end
|
|
end
|
|
|
|
return update_rrd(schema, rrdfile, timestamp, metrics)
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function makeTotalSerie(series, count)
|
|
local total = {}
|
|
|
|
for _, serie in pairs(series) do
|
|
for i, val in pairs(serie.data) do
|
|
local val_is_nan = (val ~= val)
|
|
|
|
if (total[i] == nil) then
|
|
total[i] = 0
|
|
end
|
|
|
|
if (not val_is_nan) then
|
|
total[i] = total[i] + val
|
|
end
|
|
end
|
|
end
|
|
|
|
return total
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function sampleSeries(schema, cur_points, step, max_points, series, consolidation)
|
|
local sampled_dp = math.floor(cur_points / max_points + 0.5)
|
|
local count = nil
|
|
local nan = 0 / 0
|
|
|
|
for _, data_serie in pairs(series) do
|
|
local serie = data_serie.data
|
|
local num = 0
|
|
local sum = 0
|
|
local all_nan = true
|
|
local end_idx = 1
|
|
local max_val = -1
|
|
|
|
for idx, dp in ipairs(serie) do
|
|
if (dp ~= dp) then
|
|
-- Convert NaN to 0 to calculate the sums
|
|
dp = 0
|
|
else
|
|
all_nan = false
|
|
end
|
|
|
|
sum = sum + dp
|
|
num = num + 1
|
|
|
|
if (dp > max_val) then
|
|
max_val = dp
|
|
end
|
|
|
|
if num == sampled_dp then
|
|
-- A data group is ready
|
|
if all_nan then
|
|
-- If all the points into the datagroup are NaN, calculate them
|
|
-- as NaN
|
|
sum = nan
|
|
end
|
|
|
|
if (consolidation == "MAX") then
|
|
serie[end_idx] = max_val
|
|
else
|
|
serie[end_idx] = sum / num
|
|
end
|
|
|
|
end_idx = end_idx + 1
|
|
|
|
num = 0
|
|
sum = 0
|
|
all_nan = true
|
|
max_val = 0
|
|
end
|
|
end
|
|
|
|
-- Last group
|
|
if num > 0 then
|
|
if all_nan then
|
|
sum = nan
|
|
end
|
|
|
|
serie[end_idx] = sum / num
|
|
end_idx = end_idx + 1
|
|
end
|
|
|
|
count = end_idx - 1
|
|
|
|
-- remove the exceeding points
|
|
for i = end_idx, #serie do
|
|
serie[i] = nil
|
|
end
|
|
|
|
data_serie.data = serie
|
|
end
|
|
|
|
-- new step, new count, new data
|
|
return step * sampled_dp, count, series
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
-- Make sure we do not fetch data from RRDs that have been update too much long ago
|
|
-- as this creates issues with the consolidation functions when we want to compare
|
|
-- results coming from different RRDs.
|
|
-- This is also needed to make sure that multiple data series on graphs have the
|
|
-- same number of points, otherwise d3js will generate errors.
|
|
local function touchRRD(rrdname)
|
|
local now = os.time()
|
|
local last, ds_count = ntop.rrd_lastupdate(rrdname)
|
|
|
|
if ((last ~= nil) and ((now - last) > 3600)) then
|
|
local tdiff = now - 1800 -- This avoids to set the update continuously
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE,
|
|
string.format("touchRRD(%s, %u), last_update was %u", rrdname, tdiff, last))
|
|
tprint(debug.traceback())
|
|
end
|
|
|
|
if (ds_count == 1) then
|
|
ntop.rrd_update(rrdname, tdiff .. "", "0")
|
|
elseif (ds_count == 2) then
|
|
ntop.rrd_update(rrdname, tdiff .. "", "0", "0")
|
|
elseif (ds_count == 3) then
|
|
ntop.rrd_update(rrdname, tdiff .. "", "0", "0", "0")
|
|
elseif (ds_count == 4) then
|
|
ntop.rrd_update(rrdname, tdiff .. "", "0", "0", "0", "0")
|
|
end
|
|
end
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:timeseries_query(options)
|
|
local base, rrd = schema_get_path(options.schema_info, options.tags)
|
|
local rrdfile = options.rrdfile or os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
|
|
local res = nil
|
|
|
|
if not ntop.notEmptyFile(rrdfile) then
|
|
return res
|
|
end
|
|
|
|
touchRRD(rrdfile)
|
|
|
|
-- Get last update
|
|
local last_update = ntop.rrd_lastupdate(rrdfile)
|
|
|
|
-- Avoid reporting the last point when the timeseries write has not completed
|
|
-- yet. Use 2*step as a bound.
|
|
if ((options.epoch_end > last_update) and
|
|
((options.epoch_end - last_update) <= 2 * options.schema_info.options.step)) then
|
|
options.epoch_end = last_update
|
|
end
|
|
|
|
local epoch_end = options.epoch_end - options.schema_info.options.step
|
|
local epoch_begin = options.epoch_begin - options.schema_info.options.step
|
|
-- Query rrd to get the data
|
|
-- tprint("rrdtool fetch ".. rrdfile.. " " .. getConsolidationFunction(schema) .. " -s ".. tstart .. " -e " .. tend)
|
|
local consolidation = getConsolidationFunction(options.schema_info)
|
|
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, consolidation, epoch_begin,
|
|
epoch_end)
|
|
|
|
if fdata == nil then
|
|
return res
|
|
end
|
|
|
|
local count = 0
|
|
local series = {}
|
|
local sampled_fstep = fstep
|
|
local serie_idx = 0
|
|
|
|
for name, _ in pairs(fdata) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = options.schema_info._metrics[serie_idx]
|
|
|
|
-- Process the series requested, adding statistics if requested (min, max, tot, ...)
|
|
-- and normalize the data if needed
|
|
local max_val = ts_common.getMaxPointValue(options.schema_info, name, options.tags)
|
|
|
|
local fdata_name = names[serie_idx]
|
|
local serie = fdata[fdata_name]
|
|
local modified_serie = {}
|
|
local num_not_nan_pts = 0
|
|
count = 0
|
|
|
|
for i, v in pairs(serie) do
|
|
-- Keep track of NaN points
|
|
if v == v then
|
|
num_not_nan_pts = num_not_nan_pts + 1
|
|
end
|
|
end
|
|
|
|
-- Normalize the value
|
|
for i, v in pairs(serie) do
|
|
-- Not enough point to represent the data, empty the serie
|
|
if num_not_nan_pts >= options.min_num_points then
|
|
modified_serie[i] = ts_common.normalizeVal(v, max_val, options)
|
|
else
|
|
modified_serie[i] = options.fill_value
|
|
end
|
|
count = count + 1
|
|
end
|
|
|
|
series[#series + 1] = {
|
|
data = modified_serie,
|
|
id = name
|
|
}
|
|
end
|
|
|
|
if count > options.max_num_points then
|
|
sampled_fstep, count, series = sampleSeries(options.schema_info, count, fstep, options.max_num_points, series,
|
|
consolidation)
|
|
end
|
|
|
|
-- Need to re-iterate all the series, otherwise the stats cannot be
|
|
-- calculated on the sampled serie
|
|
for i, data_serie in pairs(series) do
|
|
-- Add statistics if requested, by default yes
|
|
if options.calculate_stats then
|
|
-- tprint("Schema :" .. series[i].id)
|
|
local s = ts_common.calculateStatistics(series[i].data, sampled_fstep,
|
|
options.schema_info.options.keep_total, options.schema_info.options.metrics_type)
|
|
local min_max = ts_common.calculateMinMax(data_serie.data)
|
|
local statistics = table.merge(s, min_max)
|
|
series[i].statistics = statistics
|
|
end
|
|
end
|
|
|
|
res = {
|
|
metadata = {
|
|
epoch_begin = fstart,
|
|
epoch_end = fend,
|
|
epoch_step = sampled_fstep,
|
|
num_point = count or 0,
|
|
schema = options.schema,
|
|
query = options.tags
|
|
},
|
|
series = series
|
|
}
|
|
|
|
return res
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:query(schema, tstart, tend, tags, options)
|
|
local base, rrd = schema_get_path(schema, tags)
|
|
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
|
|
|
|
if not ntop.notEmptyFile(rrdfile) then
|
|
return nil
|
|
end
|
|
|
|
touchRRD(rrdfile)
|
|
|
|
local last_update = ntop.rrd_lastupdate(rrdfile)
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE,
|
|
string.format("RRD_FETCH schema=%s %s -> (%s): last_update=%u", schema.name, table.tconcat(tags, "=", ","),
|
|
rrdfile, last_update))
|
|
end
|
|
|
|
-- Avoid reporting the last point when the timeseries write has not completed
|
|
-- yet. Use 2*step as a bound.
|
|
if ((tend > last_update) and ((tend - last_update) <= 2 * schema.options.step)) then
|
|
tend = last_update
|
|
end
|
|
|
|
-- tprint("rrdtool fetch ".. rrdfile.. " " .. getConsolidationFunction(schema) .. " -s ".. tstart .. " -e " .. tend)
|
|
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
|
|
tstart, tend)
|
|
|
|
if fdata == nil then
|
|
return nil
|
|
end
|
|
|
|
local count = 0
|
|
local series = {}
|
|
|
|
local serie_idx = 0
|
|
|
|
for name, _ in pairs(fdata) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = schema._metrics[serie_idx]
|
|
local fdata_name = names[serie_idx]
|
|
serie = fdata[fdata_name]
|
|
|
|
local max_val = ts_common.getMaxPointValue(schema, name, tags)
|
|
count = 0
|
|
|
|
-- unify the format
|
|
for i, v in pairs(serie) do
|
|
local value = ts_common.normalizeVal(v, max_val, options)
|
|
local val_is_nan = (v ~= v)
|
|
|
|
if options.keep_nan == true and val_is_nan then
|
|
value = v
|
|
end
|
|
|
|
serie[i] = value
|
|
count = count + 1
|
|
end
|
|
|
|
-- Remove the last value: RRD seems to give an additional point
|
|
serie[#serie] = nil
|
|
count = count - 1
|
|
|
|
series[serie_idx] = {
|
|
label = name,
|
|
data = serie
|
|
}
|
|
end
|
|
|
|
-- table.clone needed as series can be modified below (sampleSeries works on it in-place)
|
|
local unsampled_series = table.clone(series)
|
|
local unsampled_count = count
|
|
local unsampled_fstep = fstep
|
|
|
|
if count > options.max_num_points then
|
|
sampled_fstep, count = sampleSeries(schema, count, fstep, options.max_num_points, series)
|
|
else
|
|
sampled_fstep = fstep
|
|
end
|
|
|
|
-- local returned_tend = fstart + fstep * (count-1)
|
|
-- tprint(returned_tend .. " " .. fstart)
|
|
|
|
local total_series = nil
|
|
local stats = nil
|
|
|
|
-- tprint("Step: "..fstep.." / unsampled_fstep: "..unsampled_fstep)
|
|
-- tprint(unsampled_series)
|
|
|
|
if options.calculate_stats then
|
|
total_series = makeTotalSerie(series, count)
|
|
-- statistics used in report page
|
|
local total_series = makeTotalSerie(series, count)
|
|
stats = ts_common.calculateStatistics(total_series, sampled_fstep, tend - tstart, schema.options.metrics_type)
|
|
|
|
stats = stats or {}
|
|
stats.by_serie = {}
|
|
|
|
-- Also calculate per-series statistics
|
|
for k, v in pairs(series) do
|
|
local s = ts_common.calculateStatistics(v.data, sampled_fstep, tend - tstart, schema.options.metrics_type)
|
|
local min_max = ts_common.calculateMinMax(v.data)
|
|
|
|
-- Adding per timeseries min-max stats
|
|
stats.by_serie[k] = table.merge(s, min_max)
|
|
end
|
|
end
|
|
|
|
if options.initial_point then
|
|
local serie_idx = 1
|
|
local _, _, initial_pt, _, _, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
|
|
tstart - schema.options.step, tstart - schema.options.step)
|
|
initial_pt = initial_pt or {}
|
|
|
|
for name_key, values in pairs(initial_pt) do
|
|
local name = schema._metrics[serie_idx]
|
|
local max_val = ts_common.getMaxPointValue(schema, name, tags)
|
|
local ptval = ts_common.normalizeVal(values[1], max_val, options)
|
|
|
|
table.insert(series[serie_idx].data, 1, ptval)
|
|
serie_idx = serie_idx + 1
|
|
end
|
|
|
|
count = count + 1
|
|
|
|
if total_series then
|
|
-- recalculate with additional point
|
|
total_series = makeTotalSerie(series, count)
|
|
end
|
|
end
|
|
|
|
-- tprint(rrdfile)
|
|
-- tprint(schema)
|
|
-- tprint(schema.options.metrics_type)
|
|
|
|
-- tprint("fstep: "..sampled_fstep.." / ".."count: "..count .. " / tot: ".. (sampled_fstep*count))
|
|
|
|
if options.calculate_stats then
|
|
stats = table.merge(stats, ts_common.calculateMinMax(total_series))
|
|
end
|
|
|
|
return {
|
|
start = fstart,
|
|
step = sampled_fstep,
|
|
count = count,
|
|
series = series,
|
|
statistics = stats,
|
|
additional_series = {
|
|
total = total_series
|
|
}
|
|
}
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
-- *Limitation*
|
|
-- tags_filter is expected to contain all the tags of the schema except the last
|
|
-- one. For such tag, a list of available values will be returned.
|
|
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, not_print_error)
|
|
if #wildcard_tags > 1 then
|
|
if not not_print_error then
|
|
tprint(debug.traceback())
|
|
tprint({
|
|
schema_name = schema.name,
|
|
wildcards = wildcard_tags
|
|
})
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support listSeries on multiple tags")
|
|
end
|
|
|
|
return nil
|
|
end
|
|
|
|
local wildcard_tag = wildcard_tags[1]
|
|
|
|
if not wildcard_tag then
|
|
local full_path = driver.schema_get_full_path(schema, tags_filter)
|
|
local last_update = ntop.rrd_lastupdate(full_path)
|
|
|
|
if last_update ~= nil and last_update >= start_time then
|
|
return {tags_filter}
|
|
else
|
|
return nil
|
|
end
|
|
end
|
|
|
|
if wildcard_tag ~= schema._tags[#schema._tags] then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE,
|
|
"RRD driver only support listSeries with wildcard in the last tag, got wildcard on '" .. wildcard_tag .. "'")
|
|
return nil
|
|
end
|
|
|
|
local base, rrd = schema_get_path(schema, table.merge(tags_filter, {
|
|
[wildcard_tag] = ""
|
|
}))
|
|
local files = ntop.readdir(base)
|
|
local res = {}
|
|
|
|
-- tprint(base)
|
|
-- tprint(files)
|
|
|
|
for f in pairs(files or {}) do
|
|
local v = string.split(f, "%.rrd")
|
|
local fpath = base .. "/" .. f
|
|
if ((v ~= nil) and (#v == 1)) then
|
|
local last_update = ntop.rrd_lastupdate(fpath)
|
|
if last_update ~= nil and last_update >= start_time then
|
|
local value = v[1]
|
|
local toadd = false
|
|
|
|
if wildcard_tag == "if_index" then
|
|
-- NOTE: needed to add this crazy exception. Don't now what it is
|
|
-- but it's needed, otherwise this function is tricked into thinking
|
|
-- other timeseries (such as snmp_dev:cpu_states) are if_index
|
|
--
|
|
-- This is what we get:
|
|
-- 1.device string 192.168.2.1
|
|
-- 1.ifid string -1
|
|
-- 1.if_index string 5
|
|
-- 2 table
|
|
-- 2.device string 192.168.2.1
|
|
-- 2.ifid string -1
|
|
-- 2.if_index string 2
|
|
-- 3 table
|
|
-- 3.device string 192.168.2.1
|
|
-- 3.ifid string -1
|
|
-- 3.if_index string cpu_states <<<<<<<<<<<<<<<<< don't know why this happens to be here
|
|
-- 4 table
|
|
--
|
|
if tonumber(value) then
|
|
toadd = true
|
|
end
|
|
elseif wildcard_tag == "dscp_class" then
|
|
toadd = true
|
|
elseif wildcard_tag == "l4proto" then
|
|
if L4_PROTO_KEYS[value] ~= nil then
|
|
toadd = true
|
|
end
|
|
elseif ((wildcard_tag ~= "protocol") or
|
|
((L4_PROTO_KEYS[value] == nil) and interface.getnDPIProtoId(value) ~= 0)) and
|
|
((wildcard_tag ~= "category") or (interface.getnDPICategoryId(value) ~= -1)) then
|
|
toadd = true
|
|
end
|
|
|
|
if toadd then
|
|
res[#res + 1] = table.merge(tags_filter, {
|
|
[wildcard_tag] = value
|
|
})
|
|
end
|
|
end
|
|
elseif ntop.isdir(fpath) then
|
|
fpath = fpath .. "/" .. rrd .. ".rrd"
|
|
|
|
local last_update = ntop.rrd_lastupdate(fpath)
|
|
|
|
if last_update ~= nil and last_update >= start_time then
|
|
res[#res + 1] = table.merge(tags_filter, {
|
|
[wildcard_tag] = f
|
|
})
|
|
end
|
|
end
|
|
end
|
|
|
|
return res
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:timeseries_top(options, top_tags)
|
|
if #top_tags > 1 then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
|
|
return nil
|
|
end
|
|
|
|
local top_tag = top_tags[1]
|
|
|
|
if top_tag ~= options.schema_info._tags[#options.schema_info._tags] then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE,
|
|
"RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
|
|
return nil
|
|
end
|
|
|
|
local series = driver:listSeries(options.schema_info, options.tags, top_tags, options.epoch_begin)
|
|
if not series then
|
|
return nil
|
|
end
|
|
|
|
local available_items = {}
|
|
local available_tags = {}
|
|
local available_series = {}
|
|
local total_valid = true
|
|
local step = 0
|
|
local query_start = options.epoch_begin
|
|
local cf = getConsolidationFunction(options.schema_info)
|
|
|
|
-- Quering all the different schema and unify all the data
|
|
for _, serie_tags in pairs(series) do
|
|
local rrdfile = driver.schema_get_full_path(options.schema_info, serie_tags)
|
|
options.rrdfile = rrdfile
|
|
local options_merged = options
|
|
options_merged.tags = serie_tags
|
|
local stats = driver:timeseries_query(options_merged)
|
|
local partials = {}
|
|
local serie_idx = 0
|
|
|
|
if stats then
|
|
local sum = 0
|
|
step = stats.metadata.epoch_step
|
|
local aggregated_serie = {}
|
|
local statistics = {}
|
|
-- For each serie, get the sum and other stats
|
|
for _, serie in pairs(stats.series or {}) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = options.schema_info._metrics[serie_idx]
|
|
|
|
if table.len(statistics) == 0 then
|
|
statistics = serie.statistics
|
|
else
|
|
for stat_name, value in pairs(serie.statistics) do
|
|
statistics[stat_name] = statistics[stat_name] + value
|
|
end
|
|
end
|
|
partials[name] = 0
|
|
|
|
for i, serie_point in pairs(serie.data) do
|
|
local val_is_nan = (serie_point ~= serie_point)
|
|
|
|
if not val_is_nan then
|
|
sum = sum + tonumber(serie_point)
|
|
end
|
|
|
|
if not aggregated_serie[i] then
|
|
aggregated_serie[i] = 0
|
|
end
|
|
|
|
aggregated_serie[i] = aggregated_serie[i] + serie_point
|
|
partials[name] = partials[name] + serie_point * step
|
|
end
|
|
end
|
|
|
|
available_items[serie_tags[top_tag]] = sum * step
|
|
available_tags[serie_tags[top_tag]] = {serie_tags, partials}
|
|
available_series[serie_tags[top_tag]] = {
|
|
data = aggregated_serie,
|
|
statistics = statistics,
|
|
tags = options_merged.tags
|
|
}
|
|
end
|
|
end
|
|
|
|
local top_series = {}
|
|
local count = 0
|
|
local id = "bytes"
|
|
|
|
if ends(options.schema, "packets") then
|
|
id = "packets"
|
|
elseif ends(options.schema, "hits") then
|
|
id = "hits"
|
|
end
|
|
|
|
for top_item, value in pairsByValues(available_items, rev) do
|
|
if value > 0 then
|
|
package.path = dirs.installdir .. "/pro/scripts/lua/modules/?.lua;" .. package.path
|
|
local snmp_utils = require "snmp_utils"
|
|
local snmp_cached_dev = require "snmp_cached_dev"
|
|
local cached_device = snmp_cached_dev:create(options.tags.device)
|
|
local ifindex = available_tags[top_item][1].if_index or available_tags[top_item][1].port
|
|
local ext_label = nil
|
|
if cached_device then
|
|
ext_label = snmp_utils.get_snmp_interface_label(cached_device["interfaces"][ifindex])
|
|
end
|
|
if isEmptyString(ext_label) then
|
|
ext_label = ifindex
|
|
end
|
|
|
|
count = table.len(available_series[top_item].data)
|
|
top_series[#top_series + 1] = {
|
|
data = available_series[top_item].data,
|
|
id = id,
|
|
statistics = available_series[top_item].statistics,
|
|
tags = available_series[top_item].tags,
|
|
name = top_item,
|
|
ext_label = ext_label
|
|
}
|
|
end
|
|
|
|
if #top_series >= options.top then
|
|
break
|
|
end
|
|
end
|
|
|
|
local stats = nil
|
|
|
|
return {
|
|
metadata = {
|
|
epoch_begin = options.epoch_begin,
|
|
epoch_end = options.epoch_end,
|
|
epoch_step = step,
|
|
num_point = count,
|
|
schema = options.schema,
|
|
query = options.tags
|
|
},
|
|
series = top_series
|
|
}
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:topk(schema, tags, tstart, tend, options, top_tags)
|
|
if #top_tags > 1 then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
|
|
return nil
|
|
end
|
|
|
|
local top_tag = top_tags[1]
|
|
|
|
if top_tag ~= schema._tags[#schema._tags] then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE,
|
|
"RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
|
|
return nil
|
|
end
|
|
|
|
local series = driver:listSeries(schema, tags, top_tags, tstart, tend)
|
|
if not series then
|
|
return nil
|
|
end
|
|
|
|
local items = {}
|
|
local tag_2_series = {}
|
|
local total_series = {}
|
|
local total_valid = true
|
|
local step = 0
|
|
local query_start = tstart
|
|
local cf = getConsolidationFunction(schema)
|
|
|
|
if options.initial_point then
|
|
query_start = tstart - schema.options.step
|
|
end
|
|
|
|
for _, serie_tags in pairs(series) do
|
|
local rrdfile = driver.schema_get_full_path(schema, serie_tags)
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE,
|
|
string.format("RRD_FETCH[topk] schema=%s %s[%s] -> (%s): last_update=%u", schema.name,
|
|
table.tconcat(tags, "=", ","), table.concat(top_tags, ","), rrdfile, ntop.rrd_lastupdate(rrdfile)))
|
|
end
|
|
|
|
touchRRD(rrdfile)
|
|
|
|
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, cf, query_start, tend)
|
|
local sum = 0
|
|
|
|
if fdata == nil then
|
|
goto continue
|
|
end
|
|
|
|
step = fstep
|
|
|
|
local partials = {}
|
|
|
|
local serie_idx = 0
|
|
for _, _ in pairs(fdata) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = schema._metrics[serie_idx]
|
|
local fdata_name = names[serie_idx]
|
|
serie = fdata[fdata_name]
|
|
|
|
local max_val = ts_common.getMaxPointValue(schema, name, serie_tags)
|
|
partials[name] = 0
|
|
|
|
-- Remove the last value: RRD seems to give an additional point
|
|
serie[#serie] = nil
|
|
|
|
if (#total_series ~= 0) and #total_series ~= #serie then
|
|
-- NOTE: even if touchRRD is used, series can still have a different number
|
|
-- of points when the tend parameter does not correspond to the current time
|
|
-- e.g. when comparing with the past or manually zooming.
|
|
-- In this case, total serie il discarded as it's incorrect
|
|
total_valid = false
|
|
end
|
|
|
|
for i = #total_series + 1, #serie do
|
|
-- init
|
|
total_series[i] = 0
|
|
end
|
|
|
|
for i, v in pairs(serie) do
|
|
local v = ts_common.normalizeVal(v, max_val, options)
|
|
|
|
if type(v) == "number" then
|
|
sum = sum + v
|
|
partials[name] = partials[name] + v * step
|
|
total_series[i] = total_series[i] + v
|
|
end
|
|
end
|
|
end
|
|
|
|
items[serie_tags[top_tag]] = sum * step
|
|
tag_2_series[serie_tags[top_tag]] = {serie_tags, partials}
|
|
|
|
::continue::
|
|
end
|
|
|
|
local topk = {}
|
|
|
|
for top_item, value in pairsByValues(items, rev) do
|
|
if value > 0 then
|
|
topk[#topk + 1] = {
|
|
tags = tag_2_series[top_item][1],
|
|
value = value,
|
|
partials = tag_2_series[top_item][2]
|
|
}
|
|
end
|
|
|
|
if #topk >= options.top then
|
|
break
|
|
end
|
|
end
|
|
|
|
local stats = nil
|
|
|
|
-- table.clone needed as augumented_total can be modified below (sampleSeries works on it in-place)
|
|
local augumented_total = table.clone(total_series)
|
|
|
|
if options.initial_point and total_series then
|
|
-- remove initial point to avoid stats calculation on it
|
|
table.remove(total_series, 1)
|
|
end
|
|
|
|
local fstep, count = sampleSeries(schema, #augumented_total, step, options.max_num_points, {{
|
|
data = augumented_total
|
|
}})
|
|
|
|
if options.calculate_stats then
|
|
stats = ts_common.calculateStatistics(total_series, step, tend - tstart, schema.options.metrics_type)
|
|
stats = table.merge(stats, ts_common.calculateMinMax(augumented_total))
|
|
end
|
|
|
|
if not total_valid then
|
|
total_series = nil
|
|
augumented_total = nil
|
|
end
|
|
|
|
return {
|
|
topk = topk,
|
|
additional_series = {
|
|
total = augumented_total
|
|
},
|
|
statistics = stats
|
|
}
|
|
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:queryTotal(schema, tstart, tend, tags, options)
|
|
local rrdfile = driver.schema_get_full_path(schema, tags)
|
|
if not rrdfile or not ntop.notEmptyFile(rrdfile) then
|
|
return nil
|
|
end
|
|
|
|
touchRRD(rrdfile)
|
|
|
|
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(schema),
|
|
tstart, tend)
|
|
local totals = {}
|
|
|
|
local serie_idx = 0
|
|
for _, _ in pairs(fdata or {}) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = schema._metrics[serie_idx]
|
|
local fdata_name = names[serie_idx]
|
|
local serie = fdata[fdata_name]
|
|
|
|
local max_val = ts_common.getMaxPointValue(schema, name, tags)
|
|
local sum = 0
|
|
|
|
-- Remove the last value: RRD seems to give an additional point
|
|
serie[#serie] = nil
|
|
|
|
for i, v in pairs(serie) do
|
|
local v = ts_common.normalizeVal(v, max_val, options)
|
|
|
|
-- v is not null
|
|
if v == v then
|
|
sum = sum + v * fstep
|
|
end
|
|
end
|
|
|
|
totals[name] = sum
|
|
end
|
|
|
|
return totals
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:queryLastValues(options)
|
|
local rrdfile = driver.schema_get_full_path(options.schema_info, options.tags)
|
|
if not rrdfile or not ntop.notEmptyFile(rrdfile) then
|
|
return nil
|
|
end
|
|
|
|
touchRRD(rrdfile)
|
|
|
|
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, getConsolidationFunction(
|
|
options.schema_info), options.epoch_begin, options.epoch_end)
|
|
local last_values = {}
|
|
|
|
local serie_idx = 0
|
|
for _, _ in pairs(fdata or {}) do
|
|
serie_idx = serie_idx + 1 -- the first id is 1
|
|
local name = options.schema_info._metrics[serie_idx]
|
|
local fdata_name = names[serie_idx]
|
|
local serie = fdata[fdata_name]
|
|
|
|
local max_val = ts_common.getMaxPointValue(options.schema_info, name, options.tags)
|
|
|
|
-- Remove the last value: RRD seems to give an additional point
|
|
serie[#serie] = nil
|
|
|
|
local values = {}
|
|
-- Start from the last series item - 1 because RRD seems to give an additional point
|
|
for i = (#serie - 1), 1, -1 do
|
|
if (#values == options.num_points) then
|
|
break
|
|
end
|
|
-- nan check
|
|
if serie[i] == serie[i] then
|
|
values[#values + 1] = ts_common.normalizeVal(serie[i], max_val, options)
|
|
end
|
|
end
|
|
last_values[name] = values
|
|
end
|
|
return last_values
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
local function deleteAllData(ifid)
|
|
local paths = getRRDPaths()
|
|
|
|
for _, path in pairs(paths) do
|
|
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" .. path .. "/")
|
|
local path_to_del = os_utils.fixPath(ifpath)
|
|
|
|
if ntop.exists(path_to_del) and not ntop.rmdir(path_to_del) then
|
|
return false
|
|
end
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:delete(schema_prefix, tags)
|
|
if schema_prefix == "" then
|
|
-- Delete all data
|
|
return deleteAllData(tags.ifid)
|
|
end
|
|
|
|
-- In RRD we must determine the root path of a given entity (e.g. of an
|
|
-- host if schema_prefix is "host", of a network if schema_prefix is "subnet"
|
|
-- and so on). In order to do so, we list all the schemas starting with the
|
|
-- given prefix, then determine the shortest path to be deleted.
|
|
--
|
|
-- E.g. for ts_utils.delete("mac", {ifid=1, mac="11:22:33:44:55:66"})
|
|
-- we find the following paths:
|
|
-- - /var/lib/ntopng/-1/rrd/macs/11_22_33/44/55/66 (schema "mac:traffic")
|
|
-- - /var/lib/ntopng/-1/rrd/macs/11_22_33/44/55/66/ndpi_categories (schema "mac:ndpi_categories")
|
|
-- We delete the shortest ("/var/lib/ntopng/-1/rrd/macs/11_22_33/44/55/66") as it includes the other.
|
|
--
|
|
-- NOTE: this logic assumes that schemas are well defined, which
|
|
-- means that:
|
|
-- - The first tag is the "ifid" tag
|
|
-- - Tags are defined in order of cardinality, e.g. the "host" tag is
|
|
-- defined before the "protocol" tag
|
|
local ts_utils = require "ts_utils" -- required to get the schemas
|
|
local num_valorized_tags = table.len(tags)
|
|
local s_prefix = schema_prefix .. ""
|
|
|
|
if (num_valorized_tags < 1) then
|
|
traceError(TRACE_ERROR, TRACE_CONSOLE, "At least 1 tags must be specified for the delete operation")
|
|
return false
|
|
end
|
|
|
|
local found = nil
|
|
|
|
-- Iterate the entity schemas and find the shortest RRD directory path
|
|
for k, s in pairs(ts_utils.getLoadedSchemas()) do
|
|
if starts(k, s_prefix) then
|
|
local unreleted = false
|
|
|
|
-- Ensure that all the tags are valorized in order to avoid
|
|
-- deleting unrelated data
|
|
for k in pairs(tags) do
|
|
if (s.tags[k] == nil) then
|
|
-- Missing tag, this schema is possibly unrelated
|
|
unreleted = true
|
|
break
|
|
end
|
|
end
|
|
|
|
if (not unreleted) then
|
|
local check_tags = {}
|
|
|
|
-- Fill the missing tags with empty strings to account them as
|
|
-- possible shortest paths
|
|
for k in pairs(s.tags) do
|
|
check_tags[k] = tags[k] or ""
|
|
end
|
|
|
|
local path = schema_get_path(s, check_tags)
|
|
|
|
-- Choose the shortest string to pick the path that includes the others
|
|
if path and ((found == nil) or (string.len(path) < string.len(found))) then
|
|
found = path
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
if (not found) then
|
|
return false
|
|
end
|
|
|
|
local path_to_del = os_utils.fixPath(found)
|
|
|
|
if ntop.exists(path_to_del) and not ntop.rmdir(path_to_del) then
|
|
return false
|
|
end
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("DELETE schema=%s, %s => %s", schema_prefix,
|
|
table.tconcat(tags, "=", ","), path_to_del))
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:deleteOldData(ifid)
|
|
local data_retention_utils = require "data_retention_utils"
|
|
local paths = getRRDPaths()
|
|
local dirs = ntop.getDirs()
|
|
local retention_days = data_retention_utils.getTSAndStatsDataRetentionDays()
|
|
|
|
for _, path in pairs(paths) do
|
|
local ifpath = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/" .. path .. "/")
|
|
local deadline = retention_days * 86400
|
|
|
|
if isDebugEnabled() then
|
|
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("ntop.deleteOldRRDs(%s, %u)", ifpath, deadline))
|
|
end
|
|
|
|
ntop.deleteOldRRDs(ifpath, deadline)
|
|
end
|
|
|
|
return true
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:setup(ts_utils)
|
|
return true
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
-- Parses a line in line protocol format (https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
|
|
-- into tags and metrics
|
|
local function line_protocol_to_tags_and_metrics(protocol_line)
|
|
-- An example of line protocol is the following
|
|
--
|
|
-- weather,location=us-midwest temperature=82 1465839830100400200
|
|
-- | -------------------- -------------- |
|
|
-- | | | |
|
|
-- | | | |
|
|
-- +-----------+--------+-+---------+-+---------+
|
|
-- |measurement|,tag_set| |field_set| |timestamp|
|
|
-- +-----------+--------+-+---------+-+---------+
|
|
--
|
|
-- NOTE: no need to worry about possible spaces in the tag names. As we are using a regexp to parse
|
|
-- the line, and regexps are greedy, spaces in the tags won't be of any issue. Examples that are correctly parsed are:
|
|
--
|
|
-- local test_line2 = "iface:traffic_rxtx,ifid=0 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
|
|
-- local test_line = "iface:traffic_rxtx,ifid=0,ndpi_category=My Category,ndpi_proto=Apple iTunes,host=1.2.3.4 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
|
|
--
|
|
local measurement_and_tag_set, field_set, timestamp = protocol_line:match("(.+)%s(.+)%s(.+)\n")
|
|
|
|
local measurement
|
|
local tags = {}
|
|
local metrics = {}
|
|
|
|
-- Parse measurement and tags
|
|
local items = measurement_and_tag_set:split(",")
|
|
if not items then
|
|
-- no tag set, just the measurement
|
|
measurement = measurement_and_tag_set
|
|
else
|
|
-- measurement is at position 1, tags at positions 2+
|
|
measurement = items[1]
|
|
for i = 2, #items do
|
|
local tag_items = items[i]:split("=")
|
|
if tag_items and #tag_items == 2 then
|
|
tags[tag_items[1]] = tonumber(tag_items[2]) or tag_items[2]
|
|
end
|
|
end
|
|
end
|
|
|
|
-- Parse metrics
|
|
local items = field_set:split(",")
|
|
if not items then
|
|
-- Just one metric
|
|
items = {field_set}
|
|
end
|
|
|
|
for i = 1, #items do
|
|
local field_items = items[i]:split("=")
|
|
|
|
if field_items and #field_items == 2 then
|
|
metrics[field_items[1]] = tonumber(field_items[2]) or field_items[2]
|
|
end
|
|
end
|
|
|
|
local res = {
|
|
schema_name = measurement,
|
|
tags = tags,
|
|
metrics = metrics,
|
|
timestamp = tonumber(timestamp)
|
|
}
|
|
|
|
return res
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
function driver:export()
|
|
if (not (use_rrd_queue)) then
|
|
return -- Nothing to do
|
|
end
|
|
|
|
local ts_utils = require "ts_utils" -- required to get the schema from the schema name
|
|
|
|
local available_interfaces = interface.getIfNames() or {}
|
|
-- Add the system interface to the available interfaces
|
|
available_interfaces[getSystemInterfaceId()] = getSystemInterfaceName()
|
|
local rrd_queue_max_dequeues_per_interface = 8192
|
|
|
|
for cur_ifid, iface in pairs(available_interfaces) do
|
|
for cur_dequeue = 1, rrd_queue_max_dequeues_per_interface do
|
|
local ts_point = interface.rrd_dequeue(tonumber(cur_ifid))
|
|
|
|
if not ts_point then
|
|
break
|
|
end
|
|
|
|
local parsed_ts_point = line_protocol_to_tags_and_metrics(ts_point)
|
|
|
|
-- No need to do coherence checks on the schema. This queue is 'private' and should
|
|
-- only be written with valid data already checked.
|
|
local schema = ts_utils.getSchema(parsed_ts_point["schema_name"])
|
|
local timestamp = parsed_ts_point["timestamp"]
|
|
local tags = parsed_ts_point["tags"]
|
|
local metrics = parsed_ts_point["metrics"]
|
|
local base, rrd = schema_get_path(schema, tags)
|
|
|
|
if rrd then
|
|
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
|
|
|
|
if not ntop.notEmptyFile(rrdfile) then
|
|
ntop.mkdir(base)
|
|
if not create_rrd(schema, rrdfile, timestamp) then
|
|
return false
|
|
end
|
|
end
|
|
|
|
update_rrd(schema, rrdfile, timestamp, metrics)
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
-- ##############################################
|
|
|
|
return driver
|