Implement ts_utils.listSeries

This commit is contained in:
emanuele-f 2018-06-26 18:30:12 +02:00
parent 84c32d499b
commit 1f7cbfd13e
3 changed files with 161 additions and 29 deletions

View file

@ -44,28 +44,7 @@ end
-------------------------------------------------------
function driver:query(schema, tstart, tend, tags, options)
local metrics = {}
-- For now we assume that the step of the data is the raw step, ignoring the data aggregations
local time_step = schema.options.step
for i, metric in ipairs(schema._metrics) do
local data_type = schema.metrics[metric].type
if data_type == ts_types.counter then
metrics[i] = "DERIVATIVE(\"" .. metric .. "\") as " .. metric
else
metrics[i] = metric
end
end
local url = ntop.getPref("ntopng.prefs.ts_post_data_url")
local query = 'SELECT '.. table.concat(metrics, ",") ..' FROM "' .. schema.name .. '" WHERE ' ..
table.tconcat(tags, "=", " AND ", nil, "'") .. " AND time >= " .. tstart .. "000000000 AND time <= " .. tend .. "000000000"
local full_url = url .. "/query?db=ntopng&epoch=s&q=" .. urlencode(query)
local function influx_query(full_url)
local res = ntop.httpGet(full_url, "", "", INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res then
@ -90,7 +69,38 @@ function driver:query(schema, tstart, tend, tags, options)
return nil
end
data = jres.results[1].series[1]
return jres.results[1].series[1]
end
-------------------------------------------------------
function driver:query(schema, tstart, tend, tags, options)
local metrics = {}
-- For now we assume that the step of the data is the raw step, ignoring the data aggregations
local time_step = schema.options.step
for i, metric in ipairs(schema._metrics) do
local data_type = schema.metrics[metric].type
if data_type == ts_types.counter then
metrics[i] = "DERIVATIVE(\"" .. metric .. "\") as " .. metric
else
metrics[i] = metric
end
end
local url = ntop.getPref("ntopng.prefs.ts_post_data_url")
local query = 'SELECT '.. table.concat(metrics, ",") ..' FROM "' .. schema.name .. '" WHERE ' ..
table.tconcat(tags, "=", " AND ", nil, "'") .. " AND time >= " .. tstart .. "000000000 AND time <= " .. tend .. "000000000"
local full_url = url .. "/query?db=ntopng&epoch=s&q=" .. urlencode(query)
local data = influx_query(full_url)
if not data then
return nil
end
local series = {}
@ -162,4 +172,39 @@ end
-------------------------------------------------------
function driver:listSeries(schema, tags_filter)
local url = ntop.getPref("ntopng.prefs.ts_post_data_url")
local query = 'SHOW SERIES FROM "' .. schema.name .. '" WHERE ' ..
table.tconcat(tags_filter, "=", " AND ", nil, "'")
local full_url = url .. "/query?db=ntopng&q=" .. urlencode(query)
local data = influx_query(full_url)
if not data then
return nil
end
local res = {}
for _, value in pairs(data.values) do
local v = split(value[1], ",")
local rec = {}
for i=2, #v do
local key_val = split(v[i], "=")
if #key_val == 2 then
rec[key_val[1]] = key_val[2]
end
end
res[#res + 1] = rec
end
return res
end
-------------------------------------------------------
return driver

View file

@ -84,6 +84,13 @@ local function schema_get_path(schema, tags)
return path, rrd
end
local function schema_get_full_path(schema, tags)
local base, rrd = schema_get_path(schema, tags)
local full_path = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
return full_path
end
-- TODO remove after migration
function find_schema(rrdFile, rrdfname, tags, ts_utils)
-- try to guess additional tags
@ -106,8 +113,7 @@ function find_schema(rrdFile, rrdfname, tags, ts_utils)
end
end
local base, rrd = schema_get_path(schema, tags)
local full_path = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
local full_path = schema_get_full_path(schema, tags)
if full_path == rrdFile then
return schema
@ -225,9 +231,6 @@ function driver:query(schema, tstart, tend, tags, options)
v = options.min_value
elseif v > options.max_value then
v = options.max_value
elseif schema.metrics[name].type == ts_types.counter then -- DERIVE type
-- Multiply the derivate to get the total value for the minute
v = v * fstep
end
serie[i] = v
@ -248,8 +251,62 @@ function driver:query(schema, tstart, tend, tags, options)
}
end
-------------------------------------------------------
-- *Limitation*
-- tags_filter is expected to contain all the tags of the schema except the last
-- one. For such tag, a list of available values will be returned.
function driver:listSeries(schema, tags_filter)
local wildcard_tag = nil
for tag in pairs(schema.tags) do
if not tags_filter[tag] then
if wildcard_tag then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support listSeries on multiple tags")
return nil
else
wildcard_tag = tag
end
end
end
if not wildcard_tag then
-- simple existance check
local full_path = schema_get_full_path(schema, tags_filter)
if ntop.exists(full_path) then
return {tags_filter}
else
return {}
end
end
if wildcard_tag ~= schema._tags[#schema._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver only support listSeries with wildcard in the last tag, got wildcard on '" .. wildcard_tag .. "'")
return nil
end
local base, rrd = schema_get_path(schema, table.merge(tags_filter, {[wildcard_tag] = ""}))
local files = ntop.readdir(base)
local res = {}
for f in pairs(files or {}) do
local v = split(f, ".rrd")
if #v == 2 then
res[#res + 1] = table.merge(tags_filter, {[wildcard_tag] = v[1]})
end
end
return res
end
-------------------------------------------------------
function driver:delete(schema, tags)
tprint("TODO DELETE")
end
-------------------------------------------------------
return driver