Implement initial InfluxDB rollup support

Currently disabled, enable with
  redis-cli set ntopng.prefs.beta_rollup 1
This commit is contained in:
emanuele-f 2019-01-31 12:01:00 +01:00
parent 314feb4a7b
commit 229c3b13c9
8 changed files with 160 additions and 5 deletions

View file

@ -19,6 +19,8 @@ require("ntop_utils")
local INFLUX_QUERY_TIMEMOUT_SEC = 5
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local RP_1H_DURATION = "30d"
local RP_1D_DURATION = "365d"
-- ##############################################
@ -949,4 +951,103 @@ end
-- ##############################################
function driver:_multiQuery(queries)
local query_str = table.concat(queries, ";")
local res = ntop.httpPost(self.url .. "/query", "q=" .. query_str, self.username, self.password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Invalid response for query: " .. query_str)
return false
end
if res.RESPONSE_CODE ~= 200 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Bad response code[" .. res.RESPONSE_CODE .. "]: " .. getResponseError(res))
--tprint(query_str)
return false
end
return true
end
-- ##############################################
local function getCqQuery(dbname, metrics, tags, schema, source, dest, resemple)
local cq_name = string.format("%s__%s", schema.name, dest)
local resemple_s = ""
if resemple then
resemple_s = "RESAMPLE FOR " .. resemple
end
return string.format([[
CREATE CONTINUOUS QUERY "%s" ON %s
%s
BEGIN
SELECT
%s
INTO "%s"."%s"
FROM (
SELECT
%s
FROM "%s"."%s"
GROUP BY time(%us),%s
FILL(0)
) GROUP BY time(1h),%s
END]], cq_name, dbname, resemple_s,
metrics, dest, schema.name,
metrics, source, schema.name,
schema.options.step, tags, tags)
end
function driver:setup(ts_utils)
local queries = {}
local max_batch_size = 25 -- note: each query is about 400 characters
queries[#queries + 1] = string.format('CREATE RETENTION POLICY "1h" ON %s DURATION %s REPLICATION 1', self.db, RP_1H_DURATION)
queries[#queries + 1] = string.format('CREATE RETENTION POLICY "1d" ON %s DURATION %s REPLICATION 1', self.db, RP_1D_DURATION)
ts_utils.loadSchemas()
local schemas = ts_utils.getLoadedSchemas()
for _, schema in pairs(schemas) do
local tags = table.concat(schema._tags, ",")
local metrics = {}
for _, metric in ipairs(schema._metrics) do
metrics[#metrics + 1] = string.format('MEAN(%s) as %s', metric, metric)
end
if #metrics == 0 then
goto continue
end
metrics = table.concat(metrics, ",")
local cq_1h = getCqQuery(self.db, metrics, tags, schema, "autogen", "1h", "2h")
local cq_1d = getCqQuery(self.db, metrics, tags, schema, "1h", "1d")
queries[#queries + 1] = cq_1h:gsub("\n", ""):gsub("%s%s+", " ")
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
if #queries >= max_batch_size then
if not self:_multiQuery(queries) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
queries = {}
end
::continue::
end
if #queries >= 0 then
if not self:_multiQuery(queries) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup() failed")
return false
end
end
return true
end
-- ##############################################
return driver