Enable influxdb data rollup to speedup queries

Also bind influxdb retention of CQ to the default retention
This commit is contained in:
emanuele-f 2019-03-27 12:42:54 +01:00
parent dee4685141
commit 4203d8a1c6
3 changed files with 101 additions and 49 deletions

View file

@ -19,10 +19,6 @@ require("ntop_utils")
local INFLUX_QUERY_TIMEMOUT_SEC = 5
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local RP_1H_DURATION = "30d" -- NOTE keep in sync with RP_1H_DURATION_SECS below
local RP_1H_DURATION_SECS = 2592000
local RP_1D_DURATION = "365d" -- NOTE keep in sync with RP_1D_DURATION_SECS below
local RP_1D_DURATION_SECS = 31536000
local FIRST_AGGREGATION_TIME_KEY = "ntopng.prefs.influxdb.first_aggregation_time"
-- ##############################################
@ -75,10 +71,24 @@ end
-- ##############################################
local function getDatabaseRetention()
local function getDatabaseRetentionDays()
return tonumber(ntop.getPref("ntopng.prefs.influx_retention")) or 365 -- TODO make in common with prefs.lua
end
local function get1dDatabaseRetentionDays()
return getDatabaseRetentionDays()
end
local function get1hDatabaseRetentionDays()
return getDatabaseRetentionDays()
end
-- ##############################################
local function isRollupEnabled()
return(ntop.getPref("ntopng.prefs.disable_influxdb_rollup") ~= "1")
end
-- ##############################################
-- Determines the most appropriate retention policy
@ -86,15 +96,19 @@ local function getSchemaRetentionPolicy(schema, tstart, tend, options)
options = options or {}
local first_aggr_time = tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY))
if not first_aggr_time then
if((not first_aggr_time) or (not isRollupEnabled())) then
return "raw"
end
local rp_1d_duration_sec = get1dDatabaseRetentionDays() * 86400
local rp_1h_duration_sec = get1hDatabaseRetentionDays() * 86400
-- RP selection logic
local oldest_1d_data = os.time() - RP_1D_DURATION_SECS
local oldest_1h_data = os.time() - RP_1H_DURATION_SECS
local oldest_raw_data = getDatabaseRetention()
local oldest_1d_data = os.time() - rp_1d_duration_sec
local oldest_1h_data = os.time() - rp_1h_duration_sec
local oldest_raw_data = getDatabaseRetentionDays() * 86400
local max_raw_interval = 12 * 3600 -- after 12 hours begin to use the aggregated data
local max_1h_interval = 15 * 86400 -- after 15 days use the 1d aggregated data
if options.target_aggregation then
if((options.target_aggregation == "1h") and (tstart < oldest_1h_data)) or
@ -121,7 +135,9 @@ local function getSchemaRetentionPolicy(schema, tstart, tend, options)
local interval = tend - tstart
if interval >= max_raw_interval then
if interval >= max_1h_interval then
return "1d"
elseif interval >= max_raw_interval then
return "1h"
end
@ -217,6 +233,35 @@ end
-- ##############################################
local function multiQueryPost(queries, url, username, password)
local query_str = table.concat(queries, ";")
local res = ntop.httpPost(url .. "/query", "q=" .. urlencode(query_str), username, password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res then
local err = "Invalid response for query: " .. query_str
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
if res.RESPONSE_CODE ~= 200 then
local err = "Bad response code[" .. res.RESPONSE_CODE .. "]: " .. getResponseError(res)
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
--tprint(query_str)
return false, err
end
local err = getResponseError(res)
if err ~= 200 then
local err = "Unexpected query error: " .. err
traceError(TRACE_ERROR, TRACE_CONSOLE, err)
return false, err
end
return true
end
-- ##############################################
local function influx2Series(schema, tstart, tend, tags, options, data, time_step)
local series = {}
local max_vals = {}
@ -1009,6 +1054,34 @@ end
-- ##############################################
local function updateCQRetentionPolicies(dbname, url, username, password)
local query = string.format("SHOW RETENTION POLICIES ON %s", dbname)
local res = influx_query(url .. "/query?db=".. dbname, query, username, password)
local rp_1h_statement = "CREATE"
local rp_1d_statement = "CREATE"
if res and res.series and res.series[1] then
for _, rp in pairs(res.series[1].values) do
local rp_name = rp[1]
if rp_name == "1h" then
rp_1h_statement = "ALTER"
elseif rp_name == "1d" then
rp_1d_statement = "ALTER"
end
end
end
local queries = {
string.format('%s RETENTION POLICY "1h" ON %s DURATION %ud REPLICATION 1', rp_1h_statement, dbname, get1hDatabaseRetentionDays()),
string.format('%s RETENTION POLICY "1d" ON %s DURATION %ud REPLICATION 1', rp_1d_statement, dbname, get1dDatabaseRetentionDays())
}
return multiQueryPost(queries, url, username, password)
end
-- ##############################################
local function toVersion(version_str)
local parts = string.split(version_str, "%.")
@ -1093,7 +1166,7 @@ function driver.init(dbname, url, days_retention, username, password, verbose)
if not db_found or days_retention ~= nil then
-- New database or config changed
days_retention = days_retention or getDatabaseRetention()
days_retention = days_retention or getDatabaseRetentionDays()
-- Set retention
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Setting retention for " .. dbname .. " ...") end
@ -1107,6 +1180,8 @@ function driver.init(dbname, url, days_retention, username, password, verbose)
-- This is just a warning, we can proceed
--return false, err
end
-- NOTE: updateCQRetentionPolicies will be called automatically as driver:setup is triggered after this
end
return true, i18n("prefs.successfully_connected_influxdb", {db=dbname, version=version})
@ -1143,32 +1218,6 @@ end
-- ##############################################
function driver:_multiQueryPost(queries)
local query_str = table.concat(queries, ";")
local res = ntop.httpPost(self.url .. "/query", "q=" .. urlencode(query_str), self.username, self.password, INFLUX_QUERY_TIMEMOUT_SEC, true)
if not res then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Invalid response for query: " .. query_str)
return false
end
if res.RESPONSE_CODE ~= 200 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Bad response code[" .. res.RESPONSE_CODE .. "]: " .. getResponseError(res))
--tprint(query_str)
return false
end
local err = getResponseError(res)
if err ~= 200 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Unexpected query error: " .. err)
return false
end
return true
end
-- ##############################################
local function getCqQuery(dbname, tags, schema, source, dest, step, dest_step, resemple)
local cq_name = string.format("%s__%s", schema.name, dest)
local resemple_s = ""
@ -1265,9 +1314,17 @@ function driver:setup(ts_utils)
-- Ensure that the database exists
driver.init(self.db, self.url, nil, self.username, self.password)
queries[#queries + 1] = string.format('CREATE RETENTION POLICY "1h" ON %s DURATION %s REPLICATION 1', self.db, RP_1H_DURATION)
queries[#queries + 1] = string.format('CREATE RETENTION POLICY "1d" ON %s DURATION %s REPLICATION 1', self.db, RP_1D_DURATION)
if not updateCQRetentionPolicies(self.db, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
if not isRollupEnabled() then
-- Nothing more to do
return true
end
-- Continuos Queries stuff
ts_utils.loadSchemas()
local schemas = ts_utils.getLoadedSchemas()
@ -1281,15 +1338,11 @@ function driver:setup(ts_utils)
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, 3600, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600, 86400)
-- TODO temporary fix to alter existing queries, remove after beta end
queries[#queries + 1] = string.format('DROP CONTINUOUS QUERY "%s__1d" ON %s', schema.name, self.db)
queries[#queries + 1] = string.format('DROP CONTINUOUS QUERY "%s__1h" ON %s', schema.name, self.db)
queries[#queries + 1] = cq_1h:gsub("\n", ""):gsub("%s%s+", " ")
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
if #queries >= max_batch_size then
if not self:_multiQueryPost(queries) then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
@ -1300,7 +1353,7 @@ function driver:setup(ts_utils)
end
if #queries > 0 then
if not self:_multiQueryPost(queries) then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup() failed")
return false
end