Implement experimental timeseries query API

This commit is contained in:
emanuele-f 2018-06-26 11:59:39 +02:00
parent 5f461dfbd3
commit 84c32d499b
11 changed files with 442 additions and 140 deletions

View file

@ -17,60 +17,6 @@ local type_to_rrdtype = {
[ts_types.gauge] = "GAUGE",
}
-- NOTE: to get the actual rentention period, multiply retention_dp * aggregation_dp * step
local supported_steps = {
["1"] = {
rra = {
{aggregation_dp = 1, retention_dp = 86400}, -- 1 second resolution: keep for 1 day
{aggregation_dp = 60, retention_dp = 43200}, -- 1 minute resolution: keep for 1 month
{aggregation_dp = 3600, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
}, hwpredict = {
row_count = 86400, -- keep 1 day prediction
period = 3600, -- assume 1 hour periodicity
}
},
["60"] = {
rra = {
{aggregation_dp = 1, retention_dp = 1440}, -- 1 minute resolution: keep for 1 day
{aggregation_dp = 60, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 1440, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 10080, -- keep 1 week prediction
period = 1440, -- assume 1 day periodicity
}
},
["60_ext"] = {
rra = {
{aggregation_dp = 1, retention_dp = 43200}, -- 1 minute resolution: keep for 1 month
{aggregation_dp = 60, retention_dp = 24000}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 1440, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 10080, -- keep 1 week prediction
period = 1440, -- assume 1 day periodicity
}
},
["300"] = {
rra = {
{aggregation_dp = 1, retention_dp = 288}, -- 5 minute resolution: keep for 1 day
{aggregation_dp = 12, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 288, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 2016, -- keep 1 week prediction
period = 288, -- assume 1 day periodicity
}
},
["300_ext"] = {
rra = {
{aggregation_dp = 1, retention_dp = 8640}, -- 5 minutes resolution: keep for 1 month
{aggregation_dp = 12, retention_dp = 2400}, -- 1 hour resolution: keep for 100 days
{aggregation_dp = 288, retention_dp = 365}, -- 1 day resolution: keep for 1 year
}, hwpredict = {
row_count = 2016, -- keep 1 week prediction
period = 288, -- assume 1 day periodicity
}
}
}
-------------------------------------------------------
function driver:new(options)
@ -138,6 +84,41 @@ local function schema_get_path(schema, tags)
return path, rrd
end
-- TODO remove after migration
function find_schema(rrdFile, rrdfname, tags, ts_utils)
-- try to guess additional tags
local v = split(rrdfname, ".rrd")
if #v == 2 then
local app = v[1]
if interface.getnDPIProtoId(app) ~= -1 then
tags.protocol = app
elseif interface.getnDPICategoryId(app) ~= -1 then
tags.category = app
end
end
for _, schema in pairs(ts_utils.getLoadedSchemas()) do
-- verify tags compatibility
for tag in pairs(schema.tags) do
if tags[tag] == nil then
goto next_schema
end
end
local base, rrd = schema_get_path(schema, tags)
local full_path = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
if full_path == rrdFile then
return schema
end
::next_schema::
end
return nil
end
-------------------------------------------------------
local function getRRAParameters(step, resolution, retention_time)
@ -162,21 +143,9 @@ local function map_metrics_to_rrd_columns(schema)
return nil
end
local function get_step_key(schema)
local step_k = tostring(schema.options.step)
if string.find(schema.name, "iface:tcp_") ~= nil then
-- This is an extended counter
step_k = step_k .. "_ext"
end
return step_k
end
local function create_rrd(schema, path)
local heartbeat = schema.options.rrd_heartbeat or (schema.options.step * 2)
local params = {path, schema.options.step}
local supported_steps = supported_steps[get_step_key(schema)]
local metrics_map = map_metrics_to_rrd_columns(schema)
if not metrics_map then
@ -188,13 +157,13 @@ local function create_rrd(schema, path)
params[#params + 1] = "DS:" .. metrics_map[idx] .. ":" .. type_to_rrdtype[info.type] .. ':' .. heartbeat .. ':U:U'
end
for _, rra in ipairs(supported_steps.rra) do
for _, rra in ipairs(schema.retention) do
params[#params + 1] = "RRA:" .. RRD_CONSOLIDATION_FUNCTION .. ":0.5:" .. rra.aggregation_dp .. ":" .. rra.retention_dp
end
if use_hwpredict then
-- NOTE: at most one RRA, otherwise rrd_update crashes.
local hwpredict = supported_steps.hwpredict
local hwpredict = schema.hwpredict
params[#params + 1] = "RRA:HWPREDICT:" .. hwpredict.row_count .. ":0.1:0.0035:" .. hwpredict.period
end
@ -220,25 +189,7 @@ end
-------------------------------------------------------
local function verify_schema_compatibility(schema)
if not supported_steps[get_step_key(schema)] then
traceError(TRACE_ERROR, TRACE_CONSOLE, "unsupported step '" .. schema.options.step .. "' in schema " .. schema.name)
return false
end
if schema.tags.ifid == nil then
traceError(TRACE_ERROR, TRACE_CONSOLE, "missing ifid tag in schema " .. schema.name)
return false
end
return true
end
function driver:append(schema, timestamp, tags, metrics)
if not verify_schema_compatibility(schema) then
return false
end
local base, rrd = schema_get_path(schema, tags)
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
@ -254,8 +205,47 @@ end
-------------------------------------------------------
function driver:query(schema, tstart, tend, tags)
tprint("TODO QUERY")
function driver:query(schema, tstart, tend, tags, options)
local base, rrd = schema_get_path(schema, tags)
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
local fstart, fstep, fdata, fend, fcount = ntop.rrd_fetch_columns(rrdfile, RRD_CONSOLIDATION_FUNCTION, tstart, tend)
local serie_idx = 1
local series = {}
for _, serie in pairs(fdata) do
local name = schema._metrics[serie_idx]
-- unify the format
for i, v in pairs(serie) do
if v ~= v then
-- NaN value
v = options.fill_value
elseif v < options.min_value then
v = options.min_value
elseif v > options.max_value then
v = options.max_value
elseif schema.metrics[name].type == ts_types.counter then -- DERIVE type
-- Multiply the derivate to get the total value for the minute
v = v * fstep
end
serie[i] = v
end
-- Remove the last value: RRD seems to give an additional point
serie[#serie] = nil
series[serie_idx] = {label=name, data=serie}
serie_idx = serie_idx + 1
end
return {
start = fstart,
step = fstep,
count = fcount,
series = series
}
end
function driver:delete(schema, tags)