Add support for influxdb _internal queries at visualization phase

Also implement influxdb storage chart
This commit is contained in:
emanuele-f 2019-06-07 16:19:33 +02:00
parent a15f5fe485
commit f51b0f1cda
7 changed files with 68 additions and 23 deletions

View file

@ -89,10 +89,14 @@ local function isRollupEnabled()
return(ntop.getPref("ntopng.prefs.disable_influxdb_rollup") ~= "1")
end
-- ##############################################
--##############################################
-- Determines the most appropriate retention policy
local function getSchemaRetentionPolicy(schema, tstart, tend, options)
if schema.options.influx_internal_query then
return "raw"
end
options = options or {}
local first_aggr_time = tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY))
@ -439,7 +443,17 @@ end
-- ##############################################
local function makeSeriesQuery(query_schema, metrics, tags, tstart, tend, time_step)
local function getDatabaseName(schema, db)
return ternary(schema.options.influx_internal_query, '_internal', db)
end
function driver:_makeSeriesQuery(query_schema, metrics, tags, tstart, tend, time_step, schema)
local internal_query = schema.options.influx_internal_query
if internal_query ~= nil then
return internal_query(self, schema, tstart, tend, time_step)
end
return 'SELECT '.. table.concat(metrics, ",") ..' FROM ' .. query_schema .. where_tags(tags) ..
" time >= " .. tstart .. "000000000 AND time <= " .. tend .. "000000000" ..
" GROUP BY TIME(".. time_step .."s)"
@ -471,10 +485,10 @@ function driver:query(schema, tstart, tend, tags, options)
AND time >= 1531991910000000000 AND time <= 1532002710000000000
GROUP BY TIME(60s)
]]
local query = makeSeriesQuery(query_schema, metrics, tags, tstart, tend + unaligned_offset, time_step)
local query = self:_makeSeriesQuery(query_schema, metrics, tags, tstart, tend + unaligned_offset, time_step, schema)
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
local data = influx_query(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", query, self.username, self.password, options)
local series, count
if table.empty(data) then
@ -529,8 +543,8 @@ function driver:query(schema, tstart, tend, tags, options)
initial_metrics[idx] = "FIRST(" .. metric .. ")"
end
local query = makeSeriesQuery(query_schema, metrics, tags, tstart-time_step, tstart+unaligned_offset, time_step)
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
local query = self:_makeSeriesQuery(query_schema, metrics, tags, tstart-time_step, tstart+unaligned_offset, time_step, schema)
local data = influx_query(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", query, self.username, self.password, options)
if table.empty(data) then
-- Data fill
@ -748,6 +762,11 @@ local function processListSeriesResult(data, schema, tags_filter, wildcard_tags)
end
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
if schema.options.influx_internal_query then
-- internal metrics always exist
return {{}} -- exists
end
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, start_time)
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
@ -761,6 +780,8 @@ function driver:listSeriesBatched(batch)
local max_batch_size = 30
local url = self.url
local rv = {}
local idx_to_batchid = {}
local internal_results = {}
for i=1,#batch,max_batch_size do
local queries = {}
@ -768,7 +789,15 @@ function driver:listSeriesBatched(batch)
-- Prepare the batch
for j=i,math.min(i+max_batch_size-1, #batch) do
local cur_query = batch[j]
queries[#queries +1] = makeListSeriesQuery(cur_query.schema, cur_query.filter_tags, cur_query.wildcard_tags, cur_query.start_time)
if cur_query.schema.options.influx_internal_query then
-- internal metrics always exist
internal_results[j] = {{}} -- exists
else
local idx = #queries +1
idx_to_batchid[idx] = j
queries[idx] = makeListSeriesQuery(cur_query.schema, cur_query.filter_tags, cur_query.wildcard_tags, cur_query.start_time)
end
end
local query_str = table.concat(queries, ";")
@ -776,7 +805,8 @@ function driver:listSeriesBatched(batch)
-- Collect the results
if data and data.results then
for j, result in pairs(data.results) do
for idx, result in pairs(data.results) do
local j = idx_to_batchid[idx]
local cur_query = batch[j]
local result = processListSeriesResult(result, cur_query.schema, cur_query.filter_tags, cur_query.wildcard_tags)
rv[j] = result
@ -784,7 +814,7 @@ function driver:listSeriesBatched(batch)
end
end
return rv
return table.merge(rv, internal_results)
end
-- ##############################################