ntopng/scripts/lua/modules/timeseries/drivers/influxdb.lua
2019-06-30 12:56:04 +02:00

1701 lines
53 KiB
Lua

--
-- (C) 2018 - ntop.org
--
local driver = {}
local ts_common = require("ts_common")
local json = require("dkjson")
local os_utils = require("os_utils")
local alerts = require("alerts_api")
require("ntop_utils")
--
-- Sample query:
-- select * from "iface:ndpi" where ifid='0' and protocol='SSL'
--
-- See also callback_utils.uploadTSdata
--
local INFLUX_QUERY_TIMEMOUT_SEC = 5
local INFLUX_MAX_EXPORT_QUEUE_LEN = 200
local INFLUX_EXPORT_QUEUE = "ntopng.influx_file_queue"
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local FIRST_AGGREGATION_TIME_KEY = "ntopng.prefs.influxdb.first_aggregation_time"
-- ##############################################
function driver:new(options)
local obj = {
url = options.url,
db = options.db,
username = options.username or "",
password = options.password or "",
}
setmetatable(obj, self)
self.__index = self
return obj
end
-- ##############################################
function driver:append(schema, timestamp, tags, metrics)
return interface.appendInfluxDB(schema.name, timestamp, tags, metrics)
end
-- ##############################################
local function getResponseError(res)
if res.CONTENT and res.CONTENT_TYPE == "application/json" then
local jres = json.decode(res.CONTENT)
if jres then
if jres.error then
if res.RESPONSE_CODE then
return "[".. res.RESPONSE_CODE .. "] " ..jres.error
else
return jres.error
end
elseif jres.results then
for _, single_res in pairs(jres.results) do
if single_res.error then
return single_res.error, single_res.statement_id
end
end
end
end
end
return res.RESPONSE_CODE or "unknown error"
end
-- ##############################################
local function getDatabaseRetentionDays()
return tonumber(ntop.getPref("ntopng.prefs.influx_retention")) or 365 -- TODO make in common with prefs.lua
end
local function get1dDatabaseRetentionDays()
return getDatabaseRetentionDays()
end
local function get1hDatabaseRetentionDays()
return getDatabaseRetentionDays()
end
-- ##############################################
local function isRollupEnabled()
return(ntop.getPref("ntopng.prefs.disable_influxdb_rollup") ~= "1")
end
--##############################################
-- Determines the most appropriate retention policy
local function getSchemaRetentionPolicy(schema, tstart, tend, options)
if schema.options.influx_internal_query then
return "raw"
end
options = options or {}
local first_aggr_time = tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY))
if((not first_aggr_time) or (not isRollupEnabled())) then
return "raw"
end
local rp_1d_duration_sec = get1dDatabaseRetentionDays() * 86400
local rp_1h_duration_sec = get1hDatabaseRetentionDays() * 86400
-- RP selection logic
local oldest_1d_data = os.time() - rp_1d_duration_sec
local oldest_1h_data = os.time() - rp_1h_duration_sec
local oldest_raw_data = getDatabaseRetentionDays() * 86400
local max_raw_interval = 12 * 3600 -- after 12 hours begin to use the aggregated data
local max_1h_interval = 15 * 86400 -- after 15 days use the 1d aggregated data
if options.target_aggregation then
if((options.target_aggregation == "1h") and (tstart < oldest_1h_data)) or
((options.target_aggregation == "raw") and (tstart < oldest_raw_data)) then
-- cannot satisfy the target_aggregation
else
-- user override
return options.target_aggregation
end
end
-- Fix the limits to ensure that the first_aggr_time is not crossed
-- Crossing it is still acceptable for the options.target_aggregation check above
oldest_1d_data = math.max(oldest_1d_data, first_aggr_time)
oldest_1h_data = math.max(oldest_1h_data, first_aggr_time)
if tstart < oldest_1d_data then
return "raw"
elseif tstart < oldest_1h_data then
return "1d"
elseif tstart < oldest_raw_data then
return "1h"
end
local interval = tend - tstart
if interval >= max_1h_interval then
return "1d"
elseif interval >= max_raw_interval then
return "1h"
end
return "raw"
end
-- ##############################################
-- This is necessary to avoid fetching a point which is not already
-- aggregated by the retention policy
local function fixTendForRetention(tend, rp)
local now = os.time()
if(rp == "1h") then
local current_hour = now - (now % 3600)
return(math.min(tend, current_hour - 3600))
elseif(rp == "1d") then
local current_day = os.date("*t", now)
current_day.min = 0
current_day.sec = 0
current_day.hour = 0
current_day = os.time(current_day)
return(math.min(tend, current_day - 86400))
end
return(tend)
end
-- ##############################################
-- returns schema_name, step
local function retentionPolicyToSchema(schema, rp, db)
if((rp == "raw") or (rp == "autogen")) then
rp = nil
end
if not isEmptyString(rp) then
-- counters become derivatives
local metric_type = ternary(schema.options.metrics_type == ts_common.metrics.counter, ts_common.metrics.derivative, schema.options.metrics_type)
return string.format('"%s"."%s"."%s"', db, rp, schema.name), ternary(rp == "1d", 86400, 3600), metric_type
end
-- raw
return string.format('"%s"', schema.name), schema.options.step, schema.options.metrics_type
end
-- ##############################################
local function getQuerySchema(schema, tstart, tend, db, options)
return retentionPolicyToSchema(schema, getSchemaRetentionPolicy(schema, tstart, tend, options), db)
end
-- ##############################################
local function influx_query_multi(base_url, query, username, password, options)
options = options or {}
local full_url = base_url .."&q=" .. urlencode(query)
local tstart = os.time()
local res = ntop.httpGet(full_url, username, password, ternary(options.no_timeout, 99999999999, INFLUX_QUERY_TIMEMOUT_SEC), true)
local tend = os.time()
local debug_influxdb_queries = (ntop.getPref("ntopng.prefs.influxdb.print_queries") == "1")
if debug_influxdb_queries then
local tdiff = tend - tstart
local _, num_queries = string.gsub(query, ";", "")
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("influx_query[#%u][%ds]: %s", num_queries+1, tdiff, query))
end
if not res then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Invalid response for query: " .. full_url)
return nil
end
if((res.RESPONSE_CODE ~= 200) and (not res.IS_PARTIAL)) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Bad response code[" .. res.RESPONSE_CODE .. "]: " .. (res.CONTENT or ""))
return nil
end
if((res.CONTENT == nil) or (res.IS_PARTIAL)) then
ts_common.setLastError(ts_common.ERR_OPERATION_TOO_SLOW, i18n("graphs.query_too_slow"))
return nil
end
local jres = json.decode(res.CONTENT)
if (not jres) or (not jres.results) or (not #jres.results) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Invalid JSON reply[" .. res.CONTENT_LEN .. " bytes]: " .. string.sub(res.CONTENT, 1, 50))
return nil
end
for _, result in ipairs(jres.results) do
if result["error"] then
traceError(TRACE_ERROR, TRACE_CONSOLE, result["error"])
return nil
end
end
return jres
end
local function influx_query(base_url, query, username, password, options)
local jres = influx_query_multi(base_url, query, username, password, options)
if jres and jres.results then
if not jres.results[1].series then
-- no results found
return {}
end
return jres.results[1]
end
return nil
end
-- ##############################################
local function multiQueryPost(queries, url, username, password)
local query_str = table.concat(queries, ";")
local res = ntop.httpPost(url .. "/query", "q=" .. urlencode(query_str), username, password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res then
local err = "Invalid response for query: " .. query_str
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
if res.RESPONSE_CODE ~= 200 then
local err = "Bad response code[" .. res.RESPONSE_CODE .. "]: " .. getResponseError(res)
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
--tprint(query_str)
return false, err
end
local err, statement_id = getResponseError(res)
if err ~= 200 then
local err = "Unexpected query error: " .. err
if statement_id ~= nil then
err = err .. string.format(", in query #%d: %s", statement_id, queries[statement_id+1] or "nil")
end
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
return true
end
-- ##############################################
local function influx2Series(schema, tstart, tend, tags, options, data, time_step)
local series = {}
local max_vals = {}
-- Create the columns
for i=2, #data.columns do
series[i-1] = {label=data.columns[i], data={}}
max_vals[i-1] = ts_common.getMaxPointValue(schema, series[i-1].label, tags)
end
-- The first time available in the returned data
local first_t = data.values[1][1]
-- Align tstart to the first timestamp
tstart = tstart + (first_t - tstart) % time_step
-- next_t holds the expected timestamp of the next point to process
local next_t = tstart
-- the next index to use for insertion in the result table
local series_idx = 1
--tprint(time_step .. ") " .. tstart .. " vs " .. first_t .. " - " .. next_t)
-- Convert the data
for idx, values in ipairs(data.values) do
local cur_t = data.values[idx][1]
if #values < 2 or cur_t < tstart then
-- skip empty points (which are out of query bounds)
goto continue
end
-- Fill the missing points
while((cur_t - next_t) >= time_step) do
for _, serie in pairs(series) do
serie.data[series_idx] = options.fill_value
end
--tprint("FILL [" .. series_idx .."] " .. cur_t .. " vs " .. next_t)
series_idx = series_idx + 1
next_t = next_t + time_step
end
for i=2, #values do
local val = ts_common.normalizeVal(values[i], max_vals[i-1], options)
series[i-1].data[series_idx] = val
end
--traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("@ %u = %.2f", cur_t, values[2]))
if(false) then -- consinstency check
local expected_t = next_t
local actual_t = values[1]
if math.abs(expected_t - actual_t) >= time_step then
traceError(TRACE_WARNING, TRACE_CONSOLE,
string.format("Bad point timestamp: expected %u, found %u [value = %.2f]", expected_t, actual_t, values[2]))
end
end
series_idx = series_idx + 1
next_t = next_t + time_step
::continue::
end
-- Fill the missing points at the end
while((tend - next_t) >= 0) do
for _, serie in pairs(series) do
serie.data[series_idx] = options.fill_value
end
--tprint("FILL [" .. series_idx .."] " .. tend .. " vs " .. next_t)
series_idx = series_idx + 1
next_t = next_t + time_step
end
local count = series_idx - 1
return series, count, tstart
end
-- Test only
driver._influx2Series = influx2Series
--##############################################
local function where_tags(tags)
if not table.empty(tags) then
return ' WHERE ' .. table.tconcat(tags, "=", " AND ", nil, "'") .. " AND"
else
return " WHERE"
end
end
--##############################################
local function getTotalSerieQuery(schema, query_schema, raw_step, tstart, tend, tags, time_step, data_type, label)
label = label or "total_serie"
--[[
SELECT NON_NEGATIVE_DERIVATIVE(total_serie) AS total_serie FROM // derivate the serie, if necessary
(SELECT MEAN("total_serie") AS "total_serie" FROM // sample the total serie points, if necessary
(SELECT SUM("value") AS "total_serie" FROM // sum all the series together
(SELECT (bytes_sent + bytes_rcvd) AS "value" FROM "host:ndpi" // possibly sum multiple metrics within same serie
WHERE host='192.168.43.18' AND ifid='2'
AND time >= 1531916170000000000 AND time <= 1532002570000000000)
GROUP BY time(300s))
GROUP BY time(600s))
]]
local is_single_serie = schema:allTagsDefined(tags)
local simplified_query = 'SELECT (' .. table.concat(schema._metrics, " + ") ..') AS "'.. label ..'" FROM '.. query_schema .. where_tags(tags) .. ' time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000'
local query
if is_single_serie then
-- optimized version
query = simplified_query
else
query = 'SELECT SUM("'.. label ..'") AS "'.. label ..'" FROM ('.. simplified_query ..')'..
' GROUP BY time('.. raw_step ..'s)'
end
if time_step and (raw_step ~= time_step) then
-- sample the points
query = 'SELECT MEAN("'.. label ..'") AS "'.. label ..'" FROM ('.. query ..') GROUP BY time('.. time_step ..'s)'
end
if data_type == ts_common.metrics.counter then
local optimized = false
if (query == simplified_query) and (#schema._metrics == 1) then
-- Remove nested query if possible
local parts = split(query, " FROM ")
if #parts == 2 then
query = "SELECT NON_NEGATIVE_DERIVATIVE(".. schema._metrics[1] ..") AS ".. label .." FROM " .. parts[2]
end
end
if not optimized then
query = "SELECT NON_NEGATIVE_DERIVATIVE(".. label ..") AS ".. label .." FROM (" .. query .. ")"
end
end
return query
end
-- ##############################################
function driver:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
local query = getTotalSerieQuery(schema, query_schema, raw_step, tstart, tend + unaligned_offset, tags, time_step, data_type, label)
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
if table.empty(data) then
local rv = {}
local i = 1
for t=tstart + time_step, tend, time_step do
rv[i] = 0
i = i + 1
end
return rv
end
data = data.series[1]
local series, count, tstart = influx2Series(schema, tstart + time_step, tend, tags, options, data, time_step)
return series[1].data
end
-- ##############################################
-- NOTE: mean / percentile values are calculated manually because of an issue with
-- empty points in the queries https://github.com/influxdata/influxdb/issues/6967
function driver:_performStatsQuery(stats_query, tstart, tend)
local data = influx_query(self.url .. "/query?db=".. self.db .."&epoch=s", stats_query, self.username, self.password)
if (data and data.series and data.series[1] and data.series[1].values[1]) then
local data_stats = data.series[1].values[1]
local total = data_stats[2]
return {total=total, average = (total / (tend - tstart))}
end
return nil
end
-- ##############################################
local function getDatabaseName(schema, db)
return ternary(schema.options.influx_internal_query, '_internal', db)
end
function driver:_makeSeriesQuery(query_schema, metrics, tags, tstart, tend, time_step, schema)
local internal_query = schema.options.influx_internal_query
if internal_query ~= nil then
return internal_query(self, schema, tstart, tend, time_step)
end
return 'SELECT '.. table.concat(metrics, ",") ..' FROM ' .. query_schema .. where_tags(tags) ..
" time >= " .. tstart .. "000000000 AND time <= " .. tend .. "000000000" ..
" GROUP BY TIME(".. time_step .."s)"
end
function driver:query(schema, tstart, tend, tags, options)
local metrics = {}
local retention_policy = getSchemaRetentionPolicy(schema, tstart, tend, options)
local query_schema, raw_step, data_type = retentionPolicyToSchema(schema, retention_policy, self.db)
tend = fixTendForRetention(tend, retention_policy)
local time_step = ts_common.calculateSampledTimeStep(raw_step, tstart, tend, options)
-- NOTE: this offset is necessary to fix graph edge points when data insertion is not aligned with tstep
local unaligned_offset = raw_step - 1
for i, metric in ipairs(schema._metrics) do
-- NOTE: why we need to device by time_step ? is MEAN+GROUP BY TIME bugged?
if data_type == ts_common.metrics.counter then
metrics[i] = "(DERIVATIVE(MEAN(\"" .. metric .. "\")) / ".. time_step ..") as " .. metric
else -- gauge / derivative
metrics[i] = schema:getAggregationFunction() .. "(\"".. metric .."\") as " .. metric
end
end
-- NOTE: GROUP BY TIME and FILL do not work well together! Additional zeroes produce non-existent derivative values
-- Will perform fill manually below
--[[
SELECT (DERIVATIVE(MEAN("bytes")) / 60) as bytes
FROM "iface:ndpi" WHERE protocol='SSL' AND ifid='2'
AND time >= 1531991910000000000 AND time <= 1532002710000000000
GROUP BY TIME(60s)
]]
local query = self:_makeSeriesQuery(query_schema, metrics, tags, tstart, tend + unaligned_offset, time_step, schema)
local url = self.url
local data = influx_query(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", query, self.username, self.password, options)
local series, count
if table.empty(data) then
series, count = ts_common.fillSeries(schema, tstart + time_step, tend, time_step, options.fill_value)
else
-- Note: we are working with intervals because of derivatives. The first interval ends at tstart + time_step
-- which is the first value returned by InfluxDB
series, count, tstart = influx2Series(schema, tstart + time_step, tend, tags, options, data.series[1], time_step)
end
local total_serie = nil
local stats = nil
if options.calculate_stats then
local is_single_serie = (#series == 1)
if is_single_serie then
-- optimization
total_serie = table.clone(series[1].data)
else
-- try to inherit label from existing series
local label = series and series[1] and series[1].label
total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart + time_step, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
end
if total_serie then
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, data_type)
if stats.total ~= nil then
-- override total and average
-- NOTE: using -1 to avoid overflowing into the next hour
local stats_query = "(SELECT ".. table.concat(schema._metrics, " + ") .. ' AS value FROM ' .. query_schema ..
' ' ..getWhereClause(tags, tstart, tend, -1) .. ")"
if data_type == ts_common.metrics.counter then
stats_query = "(SELECT NON_NEGATIVE_DIFFERENCE(value) as value FROM " .. stats_query .. ")"
end
stats_query = "SELECT SUM(value) as value FROM " .. stats_query
if data_type == ts_common.metrics.derivative then
stats_query = "SELECT (value * ".. time_step ..") as value FROM (" .. stats_query .. ")"
end
stats = table.merge(stats, self:_performStatsQuery(stats_query, tstart, tend))
end
end
end
if options.initial_point then
local initial_metrics = {}
for idx, metric in ipairs(schema._metrics) do
initial_metrics[idx] = "FIRST(" .. metric .. ")"
end
local query = self:_makeSeriesQuery(query_schema, metrics, tags, tstart-time_step, tstart+unaligned_offset, time_step, schema)
local data = influx_query(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", query, self.username, self.password, options)
if table.empty(data) then
-- Data fill
for _, serie in pairs(series) do
table.insert(serie.data, 1, options.fill_value)
end
else
local values = data.series[1].values[1]
for i=2, #values do
local max_val = ts_common.getMaxPointValue(schema, series[i-1].label, tags)
local val = ts_common.normalizeVal(values[i], max_val, options)
table.insert(series[i-1].data, 1, val)
end
end
-- shift tstart as we added one point
tstart = tstart - time_step
if total_serie then
local label = series and series[1].label
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset, data_type) or {options.fill_value}
table.insert(total_serie, 1, additional_pt[1])
end
count = count + 1
end
if options.calculate_stats and total_serie then
stats = table.merge(stats, ts_common.calculateMinMax(total_serie))
end
local rv = {
start = tstart,
step = time_step,
raw_step = raw_step,
count = count,
series = series,
statistics = stats,
source_aggregation = retention_policy or "raw",
additional_series = {
total = total_serie,
},
}
return rv
end
-- ##############################################
function driver:_exportErrorMsg(ret)
local err_msg = i18n("delete_data.msg_err_unknown")
local suffix = ""
if ret ~= nil then
if ret.error_msg ~= nil then
err_msg = ret.error_msg .. "."
elseif ret.CONTENT ~= nil then
local content = json.decode(ret.CONTENT)
if((content ~= nil) and (content.error ~= nil)) then
err_msg = content.error
if string.find(err_msg, "max-values-per-tag limit exceeded", nil, true) ~= nil then
suffix = ". " .. i18n("alert_messages.influxdb_partial_write")
end
elseif ret.RESPONSE_CODE ~= nil then
err_msg = err_msg .. " [" .. ret.RESPONSE_CODE .. "]"
end
elseif ret.RESPONSE_CODE ~= nil then
err_msg = err_msg .. " [" .. ret.RESPONSE_CODE .. "]"
end
end
return i18n("alert_messages.influxdb_write_error", {influxdb=self.url, err=err_msg}) .. suffix
end
-- ##############################################
local function droppedPointsErrorMsg(url)
return i18n("alert_messages.influxdb_dropped_points", {influxdb=url})
end
-- ##############################################
-- Exports a timeseries file in line format to InfluxDB
-- Returns a tuple(success, file_still_existing)
function driver:_exportTsFile(fname)
local rv = true
if not ntop.exists(fname) then
traceError(TRACE_ERROR, TRACE_CONSOLE,
string.format("Cannot find ts file %s. Some timeseries data will be lost.", fname))
return false, false
end
-- Delete the file after POST
local delete_file_after_post = false
local ret = ntop.postHTTPTextFile(self.username, self.password, self.url .. "/write?db=" .. self.db, fname, delete_file_after_post, 30 --[[ timeout ]])
if((ret == nil) or ((ret.RESPONSE_CODE ~= 200) and (ret.RESPONSE_CODE ~= 204))) then
-- local msg = self:_exportErrorMsg(ret)
-- local influx_alert = alerts:newAlert({
-- entity = "influx_db",
-- type = "influxdb_export_failure",
-- severity = "warning",
-- })
-- influx_alert:trigger(self.url, msg)
rv = false
end
return rv
end
-- ##############################################
local INFLUX_KEY_PREFIX = "ntopng.cache.influxdb."
local INFLUX_KEY_DROPPED_POINTS = INFLUX_KEY_PREFIX.."dropped_points"
local INFLUX_KEY_EXPORTED_POINTS = INFLUX_KEY_PREFIX.."exported_points"
local INFLUX_KEY_EXPORTS = INFLUX_KEY_PREFIX.."exports"
-- ##############################################
local function inc_val(k, ifid, val_to_add)
local val = tonumber(ntop.getHashCache(k, ifid)) or 0
val = val + val_to_add
ntop.setHashCache(k, ifid, string.format("%i", val))
end
-- ##############################################
function inc_dropped_points(ifid, num_points)
inc_val(INFLUX_KEY_DROPPED_POINTS, ifid, num_points)
end
-- ##############################################
function inc_exported_points(ifid, num_points)
inc_val(INFLUX_KEY_EXPORTED_POINTS, ifid, num_points)
end
-- ##############################################
function inc_exports(ifid)
inc_val(INFLUX_KEY_EXPORTS, ifid, 1)
end
-- ##############################################
function driver:get_dropped_points(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_DROPPED_POINTS, ifid)) or 0
end
-- ##############################################
function driver:get_exported_points(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_EXPORTED_POINTS, ifid)) or 0
end
-- ##############################################
function driver:get_exports(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_EXPORTS, ifid)) or 0
end
-- ##############################################
local function deleteExportableFile(exportable)
if exportable and exportable["fname"] then
if ntop.exists(exportable["fname"]) then
os.remove(exportable["fname"])
traceError(TRACE_INFO, TRACE_CONSOLE, "Removed file "..exportable["fname"])
end
end
end
-- ##############################################
local function dropExportable(exportable)
inc_dropped_points(exportable["ifid"], exportable["num_points"])
deleteExportableFile(exportable)
end
-- ##############################################
function driver:_droppedExportablesAlert()
local alert_periodicity = 60
local k = "ntopng.cache.influxdb_dropped_points_alert_triggered"
if ntop.getCache(k) ~= "1" then
local influx_alert = alerts:newAlert({
entity = "influx_db",
type = "influxdb_dropped_points",
severity = "error",
periodicity = alert_periodicity,
})
local msg = droppedPointsErrorMsg(self.url)
influx_alert:trigger(self.url, msg)
-- Just to avoid doing :trigger too often
ntop.setCache(k, "1", alert_periodicity / 2)
end
end
-- ##############################################
local function exportableSuccess(exportable)
inc_exported_points(exportable["ifid"], exportable["num_points"])
inc_exports(exportable["ifid"])
deleteExportableFile(exportable)
end
-- ##############################################
function driver:_performExport(exportable)
local time_key = "ntopng.cache.influxdb_export_time_" .. self.db .. "_" .. exportable["ifid"]
local prev_t = tonumber(ntop.getCache(time_key)) or 0
local start_t = ntop.gettimemsec()
local rv = self:_exportTsFile(exportable["fname"])
local end_t = ntop.gettimemsec()
if rv then
-- Successfully exported
traceError(TRACE_INFO, TRACE_CONSOLE,
string.format("Successfully exported %u points in %.2fs", exportable["num_points"], (end_t - start_t)))
ntop.setCache(time_key, tostring(math.max(prev_t, exportable["time_ref"])))
end
return rv
end
-- ##############################################
local function getExportable(item)
local parts = split(item, "|")
local ifid_str = parts[1]
local ifid = tonumber(ifid_str)
local time_ref = tonumber(parts[2])
local export_id = tonumber(parts[3])
local num_points = tonumber(parts[4])
local res = {item = item}
if not ifid or not time_ref or not export_id or not num_points then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Missing mandatory data from the exportable item "..(item or ''))
return res
end
res["fname"] = os_utils.fixPath(dirs.workingdir .. "/" .. ifid .. "/ts_export/" .. export_id .. "_" .. time_ref)
res["ifid_str"] = ifid_str
res["ifid"] = ifid
res["num_points"] = num_points
res["time_ref"] = time_ref
return res
end
-- ##############################################
function driver:export(deadline)
interface.select(getSystemInterfaceId())
local dropped_exportables = false
local num_pending = ntop.llenCache(INFLUX_EXPORT_QUEUE)
if num_pending == 0 then
return
end
traceError(TRACE_INFO, TRACE_CONSOLE, "Exporting "..num_pending.." items")
-- Remove the old guys for which it wasn't possible to export
while num_pending > INFLUX_MAX_EXPORT_QUEUE_LEN do
local being_dropped = ntop.lpopCache(INFLUX_EXPORT_QUEUE)
local exportable_to_be_dropped = getExportable(being_dropped)
dropExportable(exportable_to_be_dropped)
traceError(TRACE_INFO, TRACE_CONSOLE, "Dropped old item "..(being_dropped or ''))
num_pending = num_pending - 1
if not dropped_exportables then
dropped_exportables = true
end
end
if dropped_exportables then
self:_droppedExportablesAlert()
end
-- Post the guys using a pretty long timeout
local pending_exports = ntop.lrangeCache(INFLUX_EXPORT_QUEUE, 0, -1)
if not pending_exports then
return
end
-- Process pending exports older-to-newer
for _, cur_export in ipairs(pending_exports) do
local exportable = getExportable(cur_export)
local res = self:_performExport(exportable)
if res then
-- export SUCCEDED
exportableSuccess(exportable)
ntop.lremCache(INFLUX_EXPORT_QUEUE, cur_export)
else
-- export FAILED, retry next time
end
end
interface.select(getSystemInterfaceId())
end
-- ##############################################
function driver:getLatestTimestamp(ifid)
local k = "ntopng.cache.influxdb_export_time_" .. self.db .. "_" .. ifid
local v = tonumber(ntop.getCache(k))
if v ~= nil then
return v
end
return os.time()
end
-- ##############################################
-- At least 2 values are needed otherwise derivative will return empty
local min_values_list_series = 2
local function makeListSeriesQuery(schema, tags_filter, wildcard_tags, start_time)
-- NOTE: do not use getQuerySchema here, otherwise we'll miss series
-- NOTE: time based query not currently supported on show tags/series, using select
-- https://github.com/influxdata/influxdb/issues/5668
--[[
SELECT * FROM "iface:ndpi_categories"
WHERE ifid='2' AND time >= 1531981349000000000
GROUP BY category
LIMIT 2
]]
return 'SELECT * FROM "' .. schema.name .. '"' .. where_tags(tags_filter) ..
" time >= " .. start_time .. "000000000" ..
ternary(not table.empty(wildcard_tags), " GROUP BY " .. table.concat(wildcard_tags, ","), "") ..
" LIMIT " .. min_values_list_series
end
local function processListSeriesResult(data, schema, tags_filter, wildcard_tags)
if table.empty(data) then
return nil
end
if table.empty(data.series) then
return nil
end
if table.empty(wildcard_tags) then
-- Simple "exists" check
if #data.series[1].values >= min_values_list_series then
return {tags_filter}
else
return nil
end
end
local res = {}
for _, serie in pairs(data.series) do
if #serie.values < min_values_list_series then
goto continue
end
for _, value in pairs(serie.values) do
local tags = {}
for i=2, #value do
local tag = serie.columns[i]
-- exclude metrics
if schema.tags[tag] ~= nil then
tags[tag] = value[i]
end
end
for key, val in pairs(serie.tags) do
tags[key] = val
end
res[#res + 1] = tags
break
end
::continue::
end
return res
end
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
if schema.options.influx_internal_query then
-- internal metrics always exist
return {{}} -- exists
end
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, start_time)
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
return processListSeriesResult(data, schema, tags_filter, wildcard_tags)
end
-- ##############################################
function driver:listSeriesBatched(batch)
local max_batch_size = 30
local url = self.url
local rv = {}
local idx_to_batchid = {}
local internal_results = {}
for i=1,#batch,max_batch_size do
local queries = {}
-- Prepare the batch
for j=i,math.min(i+max_batch_size-1, #batch) do
local cur_query = batch[j]
if cur_query.schema.options.influx_internal_query then
-- internal metrics always exist
internal_results[j] = {{}} -- exists
else
local idx = #queries +1
idx_to_batchid[idx] = j
queries[idx] = makeListSeriesQuery(cur_query.schema, cur_query.filter_tags, cur_query.wildcard_tags, cur_query.start_time)
end
end
local query_str = table.concat(queries, ";")
local data = influx_query_multi(url .. "/query?db=".. self.db, query_str, self.username, self.password)
-- Collect the results
if data and data.results then
for idx, result in pairs(data.results) do
local j = idx_to_batchid[idx]
local cur_query = batch[j]
local result = processListSeriesResult(result, cur_query.schema, cur_query.filter_tags, cur_query.wildcard_tags)
rv[j] = result
end
end
end
return table.merge(rv, internal_results)
end
-- ##############################################
function getWhereClause(tags, tstart, tend, unaligned_offset)
return where_tags(tags) .. ' time >= '.. tstart ..'000000000 AND time <= '.. (tend + unaligned_offset) .. "000000000"
end
function driver:topk(schema, tags, tstart, tend, options, top_tags)
if #top_tags ~= 1 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB driver expects exactly one top tag, " .. #top_tags .. " found")
return nil
end
local top_tag = top_tags[1]
local retention_policy = getSchemaRetentionPolicy(schema, tstart, tend, options)
local query_schema, raw_step, data_type = retentionPolicyToSchema(schema, retention_policy, self.db)
tend = fixTendForRetention(tend, retention_policy)
-- NOTE: this offset is necessary to fix graph edge points when data insertion is not aligned with tstep
local unaligned_offset = raw_step - 1
local derivate_metrics = {}
local sum_metrics = {}
local all_metrics = table.concat(schema._metrics, ", ")
for idx, metric in ipairs(schema._metrics) do
if data_type == "counter" then
derivate_metrics[idx] = 'NON_NEGATIVE_DIFFERENCE('.. metric .. ') as ' .. metric
else -- derivative
derivate_metrics[idx] = '('.. metric .. ' * '.. raw_step ..') as ' .. metric
end
sum_metrics[idx] = 'SUM('.. metric .. ') as ' .. metric
end
--[[
SELECT TOP(value,protocol,10) FROM
(SELECT SUM(value) AS value FROM
(SELECT NON_NEGATIVE_DIFFERENCE(value) as value FROM
(SELECT protocol, (bytes_sent + bytes_rcvd) AS "value" FROM "host:ndpi" WHERE host='192.168.1.1'
AND ifid='1' AND time >= 1537540320000000000 AND time <= 1537540649000000000)
GROUP BY protocol)
GROUP BY protocol)
]]
-- Aggregate into 1 metric and filter
local base_query = '(SELECT '.. top_tag ..', (' .. table.concat(schema._metrics, " + ") ..') AS "value", '..
all_metrics .. ' FROM '.. query_schema ..
' '.. getWhereClause(tags, tstart, tend, -1) ..')'
-- Calculate difference between counter values
if data_type == "counter" then
base_query = '(SELECT NON_NEGATIVE_DIFFERENCE(value) as value, '.. table.concat(derivate_metrics, ", ") ..
' FROM ' .. base_query .. " GROUP BY ".. top_tag ..")"
else
-- derivative
base_query = '(SELECT (value * '.. raw_step ..') as value, '.. table.concat(derivate_metrics, ", ") ..
' FROM ' .. base_query .. " GROUP BY ".. top_tag ..")"
end
-- Sum the traffic
base_query = '(SELECT SUM(value) AS value, '.. table.concat(sum_metrics, ", ") ..
' FROM '.. base_query .. ' GROUP BY '.. top_tag ..')'
-- Calculate TOPk
local query = 'SELECT TOP(value,'.. top_tag ..','.. options.top ..'), '.. all_metrics ..' FROM ' .. base_query
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
if table.empty(data) then
return data
end
if table.empty(data.series) then
return {}
end
data = data.series[1]
local res = {}
for idx, value in pairs(data.values) do
-- top value
res[idx] = value[2]
end
local sorted = {}
for idx in pairsByValues(res, rev) do
local value = data.values[idx]
if value[2] > 0 then
local partials = {}
for idx=4, #value do
partials[data.columns[idx]] = value[idx]
end
sorted[#sorted + 1] = {
tags = table.merge(tags, {[top_tag] = value[3]}),
value = value[2],
partials = partials,
}
end
end
local time_step = ts_common.calculateSampledTimeStep(raw_step, tstart, tend, options)
local label = series and series[1].label
local total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
local stats = nil
if options.calculate_stats and total_serie then
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, data_type)
if stats.total ~= nil then
-- override total and average
-- NOTE: sum must be calculated on individual top_tag fields to avoid
-- calculating DIFFERENCE on a decreasing serie (e.g. this can happend on a top hosts query
-- when a previously seen host becomes idle, which causes its traffic contribution on the total to become zero on next points)
local stats_query = "SELECT SUM(value) as value FROM " .. base_query
stats = table.merge(stats, self:_performStatsQuery(stats_query, tstart, tend))
end
end
if options.initial_point and total_serie then
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset, data_type) or {options.fill_value}
table.insert(total_serie, 1, additional_pt[1])
-- shift tstart as we added one point
tstart = tstart - time_step
end
if options.calculate_stats and total_serie then
stats = table.merge(stats, ts_common.calculateMinMax(total_serie))
end
return {
topk = sorted,
statistics = stats,
source_aggregation = retention_policy or "raw",
additional_series = {
total = total_serie,
},
}
end
-- ##############################################
function driver:queryTotal(schema, tstart, tend, tags, options)
local query_schema, raw_step, data_type = getQuerySchema(schema, tstart, tend, self.db, options)
local query
if data_type == ts_common.metrics.counter then
local metrics = {}
local sum_metrics = {}
for i, metric in ipairs(schema._metrics) do
metrics[i] = "NON_NEGATIVE_DIFFERENCE(" .. metric .. ") as " .. metric
sum_metrics[i] = "SUM(" .. metric .. ") as " .. metric
end
-- SELECT SUM("bytes_sent") as "bytes_sent", SUM("bytes_rcvd") as "bytes_rcvd" FROM
-- (SELECT DIFFERENCE("bytes_sent") AS "bytes_sent", DIFFERENCE("bytes_rcvd") AS "bytes_rcvd"
-- FROM "host:traffic" WHERE ifid='1' AND host='192.168.1.1' AND time >= 1536321770000000000 AND time <= 1536322070000000000)
query = 'SELECT ' .. table.concat(sum_metrics, ", ") .. ' FROM ' ..
'(SELECT ' .. table.concat(metrics, ", ") .. ' FROM '.. query_schema .. where_tags(tags) ..
' time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000)'
else
-- gauge/derivative
local metrics = {}
for i, metric in ipairs(schema._metrics) do
metrics[i] = "(SUM(" .. metric .. ")"
if data_type == "derivative" then
metrics[i] = metrics[i] .. " * " .. raw_step
end
metrics[i] = metrics[i] .. ") as " .. metric
end
query = 'SELECT ' .. table.concat(metrics, ", ") .. ' FROM ' .. query_schema .. where_tags(tags) ..
' time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000'
end
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
if table.empty(data) then
return data
end
data = data.series[1]
local res = {}
for i=2, #data.columns do
local metric = data.columns[i]
local total = data.values[1][i]
res[metric] = total
end
return res
end
-- ##############################################
function driver:queryMean(schema, tags, tstart, tend, options)
local metrics = {}
local query_schema = getQuerySchema(schema, tstart, tend, self.db, options)
for i, metric in ipairs(schema._metrics) do
metrics[i] = "MEAN(" .. metric .. ") as " .. metric
end
local query = 'SELECT ' .. table.concat(metrics, ", ") .. ' FROM '.. query_schema .. where_tags(tags) ..
' time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000'
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password)
if table.empty(data) then
return data
end
data = data.series[1]
local res = {}
for i=2, #data.columns do
local metric = data.columns[i]
local average = data.values[1][i]
res[metric] = average
end
return res
end
-- ##############################################
local function getInfluxdbVersion(url, username, password)
local res = ntop.httpGet(url .. "/ping", username, password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res or ((res.RESPONSE_CODE ~= 200) and (res.RESPONSE_CODE ~= 204)) then
local err_info = getResponseError(res)
if err_info == 0 then
err_info = i18n("prefs.is_influxdb_running")
end
local err = i18n("prefs.could_not_contact_influxdb", {msg=err_info})
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return nil, err
end
local content = res.CONTENT or ""
return string.match(content, "\nX%-Influxdb%-Version: ([%d|%.]+)")
end
function driver:getInfluxdbVersion()
return getInfluxdbVersion(self.url, self.username, self.password)
end
-- ##############################################
local function single_query(base_url, query, username, password)
local data = influx_query(base_url, query, username, password)
if data and data.series and data.series[1] and data.series[1].values[1] then
return data.series[1].values[1][2]
end
return nil
end
-- ##############################################
function driver:getDiskUsage()
local query = 'SELECT SUM(last) FROM (select LAST(diskBytes) FROM "monitor"."shard" where "database" = \''.. self.db ..'\' group by id)'
return single_query(self.url .. "/query?db=_internal", query, self.username, self.password)
end
-- ##############################################
function driver:getMemoryUsage()
local query = 'SELECT LAST(Sys) FROM "_internal".."runtime"'
return single_query(self.url .. "/query?db=_internal", query, self.username, self.password)
end
-- ##############################################
function driver:getSeriesCardinality()
local query = 'SELECT LAST("numSeries") FROM "_internal".."database" where "database"=\'' .. self.db ..'\''
return single_query(self.url .. "/query?db=_internal", query, self.username, self.password)
end
-- ##############################################
function driver.getShardGroupDuration(days_retention)
-- https://docs.influxdata.com/influxdb/v1.7/query_language/database_management/#description-of-syntax-1
if days_retention < 2 then
return "1h"
elseif days_retention < 180 then
return "1d"
else
return "7d"
end
end
local function makeRetentionPolicyQuery(statement, rpname, dbname, retention)
return(string.format('%s RETENTION POLICY "%s" ON "%s" DURATION %ud REPLICATION 1 SHARD DURATION %s',
statement, rpname, dbname,
retention, driver.getShardGroupDuration(retention)
))
end
-- ##############################################
local function updateCQRetentionPolicies(dbname, url, username, password)
local query = string.format("SHOW RETENTION POLICIES ON %s", dbname)
local res = influx_query(url .. "/query?db=".. dbname, query, username, password)
local rp_1h_statement = "CREATE"
local rp_1d_statement = "CREATE"
if res and res.series and res.series[1] then
for _, rp in pairs(res.series[1].values) do
local rp_name = rp[1]
if rp_name == "1h" then
rp_1h_statement = "ALTER"
elseif rp_name == "1d" then
rp_1d_statement = "ALTER"
end
end
end
local queries = {
makeRetentionPolicyQuery(rp_1h_statement, "1h", dbname, get1hDatabaseRetentionDays()),
makeRetentionPolicyQuery(rp_1d_statement, "1d", dbname, get1dDatabaseRetentionDays())
}
return multiQueryPost(queries, url, username, password)
end
-- ##############################################
local function toVersion(version_str)
local parts = string.split(version_str, "%.")
if (#parts ~= 3) then
return nil
end
return {
major = tonumber(parts[1]) or 0,
minor = tonumber(parts[2]) or 0,
patch = tonumber(parts[3]) or 0,
}
end
local function isCompatibleVersion(version)
local current = toVersion(version)
local required = toVersion(MIN_INFLUXDB_SUPPORTED_VERSION)
if (version == nil) or (required == nil) then
return false
end
return (current.major == required.major) and
((current.minor > required.minor) or
((current.minor == required.minor) and (current.patch >= required.patch)))
end
function driver.init(dbname, url, days_retention, username, password, verbose)
local timeout = INFLUX_QUERY_TIMEMOUT_SEC
-- Check version
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Contacting influxdb at " .. url .. " ...") end
local version, err = getInfluxdbVersion(url, username, password)
if((not version) and (err ~= nil)) then
return false, err
elseif((not version) or (not isCompatibleVersion(version))) then
local err = i18n("prefs.incompatible_influxdb_version_found",
{required=MIN_INFLUXDB_SUPPORTED_VERSION, found=version, url="https://portal.influxdata.com/downloads"})
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
-- Check existing database (this is used to prevent db creationg error for unprivileged users)
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Checking database " .. dbname .. " ...") end
local query = "SHOW DATABASES"
local res = ntop.httpPost(url .. "/query", "q=" .. query, username, password, timeout, true)
local db_found = false
if res and (res.RESPONSE_CODE == 200) and res.CONTENT then
local reply = json.decode(res.CONTENT)
if reply and reply.results and reply.results[1] and reply.results[1].series then
local dbs = reply.results[1].series[1] or {values={}}
for _, row in pairs(dbs.values) do
local user_db = row[1]
if user_db == dbname then
db_found = true
break
end
end
end
end
if not db_found then
-- Create database
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Creating database " .. dbname .. " ...") end
local query = "CREATE DATABASE \"" .. dbname .. "\""
local res = ntop.httpPost(url .. "/query", "q=" .. query, username, password, timeout, true)
if not res or (res.RESPONSE_CODE ~= 200) then
local err = i18n("prefs.influxdb_create_error", {db=dbname, msg=getResponseError(res)})
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
end
if not db_found or days_retention ~= nil then
-- New database or config changed
days_retention = days_retention or getDatabaseRetentionDays()
-- Set retention
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Setting retention for " .. dbname .. " ...") end
local query = makeRetentionPolicyQuery("ALTER", "autogen", dbname, days_retention)
local res = ntop.httpPost(url .. "/query", "q=" .. query, username, password, timeout, true)
if not res or (res.RESPONSE_CODE ~= 200) then
local warning = i18n("prefs.influxdb_retention_error", {db=dbname, msg=getResponseError(res)})
traceError(TRACE_WARNING, TRACE_CONSOLE, warning)
-- This is just a warning, we can proceed
--return false, err
end
-- NOTE: updateCQRetentionPolicies will be called automatically as driver:setup is triggered after this
end
return true, i18n("prefs.successfully_connected_influxdb", {db=dbname, version=version})
end
-- ##############################################
function driver:delete(schema_prefix, tags)
local url = self.url
local measurement_pattern = ternary(schema_prefix == "", '//', '/^'.. schema_prefix ..':/')
local query = 'DELETE FROM '.. measurement_pattern
if not table.empty(tags) then
query = query .. ' WHERE ' .. table.tconcat(tags, "=", " AND ", nil, "'")
end
local full_url = url .. "/query?db=".. self.db .."&q=" .. urlencode(query)
local res = ntop.httpGet(full_url, self.username, self.password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res or (res.RESPONSE_CODE ~= 200) then
traceError(TRACE_ERROR, TRACE_CONSOLE, getResponseError(res))
return false
end
return true
end
-- ##############################################
function driver:deleteOldData(ifid)
-- NOTE: retention is perfomed automatically based on the database retention policy,
-- no need for manual intervention
return true
end
-- ##############################################
local function getCqQuery(dbname, tags, schema, source, dest, step, dest_step, resemple)
local cq_name = string.format("%s__%s", schema.name, dest)
local resemple_s = ""
local _, _, data_type = retentionPolicyToSchema(schema, source, dbname)
if resemple then
resemple_s = "RESAMPLE FOR " .. resemple
end
if data_type == ts_common.metrics.counter then
local sums = {}
local diffs = {}
for _, metric in ipairs(schema._metrics) do
sums[#sums + 1] = string.format('(SUM(%s) / %u) as %s', metric, dest_step, metric)
diffs[#diffs + 1] = string.format('NON_NEGATIVE_DIFFERENCE(%s) as %s', metric, metric)
end
sums = table.concat(sums, ",")
diffs = table.concat(diffs, ",")
--[[
CREATE CONTINUOUS QUERY "iface:packets__1h" ON ntopng
RESAMPLE FOR 2h
BEGIN
SELECT (SUM(packets)/3600) as packets
INTO "1h"."iface:packets" FROM (
SELECT NON_NEGATIVE_DIFFERENCE(packets) as packets
FROM "autogen"."iface:packets"
) GROUP BY time(1h),ifid
END]]
return string.format([[
CREATE CONTINUOUS QUERY "%s" ON %s
%s
BEGIN
SELECT
%s
INTO "%s"."%s"
FROM (
SELECT
%s
FROM "%s"."%s"
) GROUP BY time(%s)%s%s
END]], cq_name, dbname, resemple_s,
sums, dest, schema.name,
diffs, source, schema.name,
dest, ternary(isEmptyString(tags), "", ","), tags)
else
local means = {}
for _, metric in ipairs(schema._metrics) do
means[#means + 1] = string.format('%s(%s) as %s', schema:getAggregationFunction(), metric, metric)
end
means = table.concat(means, ",")
--[[
CREATE CONTINUOUS QUERY "asn:rtt__1h" ON ntopng
RESAMPLE FOR 2h
BEGIN
SELECT MEAN(millis_rtt) as millis_rtt
INTO "1h"."asn:rtt" FROM (
SELECT MEAN(millis_rtt) as millis_rtt
FROM "autogen"."asn:rtt"
GROUP BY time(300s),ifid,asn FILL(0)
) GROUP BY time(1h),ifid,asn
END]]
return string.format([[
CREATE CONTINUOUS QUERY "%s" ON %s
%s
BEGIN
SELECT
%s
INTO "%s"."%s"
FROM (
SELECT
%s
FROM "%s"."%s"
GROUP BY time(%us)%s%s
FILL(0)
) GROUP BY time(%s)%s%s
END]], cq_name, dbname, resemple_s,
means, dest, schema.name,
means, source, schema.name,
step, ternary(isEmptyString(tags), "", ","), tags,
dest, ternary(isEmptyString(tags), "", ","), tags)
end
end
function driver:setup(ts_utils)
local queries = {}
local max_batch_size = 25 -- note: each query is about 400 characters
-- Ensure that the database exists
driver.init(self.db, self.url, nil, self.username, self.password)
if not updateCQRetentionPolicies(self.db, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
if not isRollupEnabled() then
-- Nothing more to do
return true
end
-- Continuos Queries stuff
ts_utils.loadSchemas()
local schemas = ts_utils.getLoadedSchemas()
for _, schema in pairs(ts_utils.getPossiblyChangedSchemas()) do
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "'.. schema ..'__1h" ON ' .. self.db
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "'.. schema ..'__1d" ON ' .. self.db
end
for _, schema in pairs(schemas) do
local tags = table.concat(schema._tags, ",")
if((#schema._metrics == 0) or (schema.options.influx_internal_query)) then
goto continue
end
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, 3600, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600, 86400)
queries[#queries + 1] = cq_1h:gsub("\n", ""):gsub("%s%s+", " ")
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
if #queries >= max_batch_size then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
queries = {}
end
::continue::
end
if #queries > 0 then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup() failed")
return false
end
end
if tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY)) == nil then
local res = influx_query(self.url .. "/query?db=".. self.db .."&epoch=s",
'SELECT FIRST(bytes) FROM "1h"."iface:traffic"', self.username, self.password)
local first_t = os.time()
if res and res.series and res.series[1].values then
local v = res.series[1].values[1][1]
if v ~= nil then
first_t = v
end
end
ntop.setPref(FIRST_AGGREGATION_TIME_KEY, tostring(first_t))
end
return true
end
-- ##############################################
function driver.getExportQueueLength()
return(ntop.llenCache(INFLUX_EXPORT_QUEUE))
end
-- ##############################################
return driver