Added support to influxdb v.2 with compatible v.1 buckets (#3764)

This commit is contained in:
Matteo Biscosi 2024-06-19 17:16:54 +02:00
parent 344a2b4736
commit d65b78ecbf
3 changed files with 138 additions and 105 deletions

View file

@ -31,6 +31,8 @@ local INFLUX_MAX_EXPORT_QUEUE_TRIM_LEN = 30 -- This edge should never be crossed
local INFLUX_EXPORT_QUEUE = "ntopng.influx_file_queue"
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local MIN_INFLUXDB_MAJOR_SUPPORTED_VERSION = 1
local MAX_INFLUXDB_MAJOR_SUPPORTED_VERSION = 2
local FIRST_AGGREGATION_TIME_KEY = "ntopng.prefs.influxdb.first_aggregation_time"
-- hourly continuous queries are disabled as they create a lot of pressure
@ -42,6 +44,8 @@ local HOURLY_CQ_DISABLED_KEY = "ntopng.prefs.influxdb.1h_cq_disabled"
local INFLUX_KEY_PREFIX = "ntopng.cache.influxdb."
local INFLUXDB_KEY_SKIP_RETENTION_AND_CREATION = INFLUX_KEY_PREFIX .. "skip_retention_and_creation"
-- Keep total counters for dropped points
local INFLUX_KEY_EXPORTED_POINTS = INFLUX_KEY_PREFIX .. "num_exported_points"
local INFLUX_KEY_EXPORTS = INFLUX_KEY_PREFIX .. "num_exports"
@ -1887,9 +1891,15 @@ local function isCompatibleVersion(version)
return false
end
return (current.major == required.major) and
((current.minor > required.minor) or
((current.minor == required.minor) and (current.patch >= required.patch)))
if current.major == MAX_INFLUXDB_MAJOR_SUPPORTED_VERSION then
ntop.setCache(INFLUXDB_KEY_SKIP_RETENTION_AND_CREATION, true)
return true
else
ntop.setCache(INFLUXDB_KEY_SKIP_RETENTION_AND_CREATION, false)
return (current.major == required.major) and
((current.minor > required.minor) or
((current.minor == required.minor) and (current.patch >= required.patch)))
end
end
function driver.init(dbname, url, days_retention, username, password, verbose)
@ -2115,121 +2125,130 @@ local function getCqQuery(dbname, tags, schema, source, dest, step, dest_step, r
end
function driver:setup(ts_utils)
local queries = {}
local max_batch_size = 25 -- note: each query is about 400 characters
local retention_changed = ntop.getCache("ntopng.influxdb.retention_changed") or '0'
local retention = nil
-- Clear saved values (e.g., number of exported points) as
-- we want to start clean and keep values since-ntopng-startup
del_all_vals()
if retention_changed == '1' then
retention = getDatabaseRetentionDays()
ntop.delCache("ntopng.influxdb.retention_changed")
end
-- Ensure that the database exists
driver.init(self.db, self.url, retention, self.username, self.password)
if not updateCQRetentionPolicies(self.db, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
local skip = ntop.getCache(INFLUXDB_KEY_SKIP_RETENTION_AND_CREATION)
if isEmptyString(skip) then
skip = false
else
skip = toboolean(skip) or false
end
if not isRollupEnabled() then
-- Nothing more to do
return true
end
if not skip then
local queries = {}
local max_batch_size = 25 -- note: each query is about 400 characters
local retention_changed = ntop.getCache("ntopng.influxdb.retention_changed") or '0'
local retention = nil
-- Clear saved values (e.g., number of exported points) as
-- we want to start clean and keep values since-ntopng-startup
del_all_vals()
-- Continuos Queries stuff
ts_utils.loadSchemas()
local schemas = ts_utils.getLoadedSchemas()
if retention_changed == '1' then
retention = getDatabaseRetentionDays()
ntop.delCache("ntopng.influxdb.retention_changed")
end
-- Ensure that the database exists
driver.init(self.db, self.url, retention, self.username, self.password)
-- NOTE: continuos queries cannot be altered, so they must be manually
-- dropped and created in case of changes
for _, schema in pairs(ts_utils.getPossiblyChangedSchemas()) do
if (HOURLY_CQ_ENABLED) then
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema .. '__1h" ON ' .. self.db
if not updateCQRetentionPolicies(self.db, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema .. '__1d" ON ' .. self.db
end
if not isRollupEnabled() then
-- Nothing more to do
return true
end
-- Needed to handle migration
local previous_1h_enabled = not (ntop.getPref(HOURLY_CQ_DISABLED_KEY) == "1")
local migration_necessary = (previous_1h_enabled ~= HOURLY_CQ_ENABLED)
-- Continuos Queries stuff
ts_utils.loadSchemas()
local schemas = ts_utils.getLoadedSchemas()
for _, schema in pairs(schemas) do
local tags = table.concat(schema._tags, ",")
-- NOTE: continuos queries cannot be altered, so they must be manually
-- dropped and created in case of changes
for _, schema in pairs(ts_utils.getPossiblyChangedSchemas()) do
if (HOURLY_CQ_ENABLED) then
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema .. '__1h" ON ' .. self.db
end
if ((#schema._metrics == 0) or (schema.options.influx_internal_query)) then
goto continue
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema .. '__1d" ON ' .. self.db
end
-- Needed to handle migration
local previous_1h_enabled = not (ntop.getPref(HOURLY_CQ_DISABLED_KEY) == "1")
local migration_necessary = (previous_1h_enabled ~= HOURLY_CQ_ENABLED)
for _, schema in pairs(schemas) do
local tags = table.concat(schema._tags, ",")
if ((#schema._metrics == 0) or (schema.options.influx_internal_query)) then
goto continue
end
if (migration_necessary) then
-- NOTE: dropping all the continuous queries all together is not possible
-- as InfluxDB does not provide an API for this and calling "SHOW CONTINUOUS QUERIES"
-- yelds too much result data
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema.name .. '__1h" ON ' .. self.db
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema.name .. '__1d" ON ' .. self.db
end
if (HOURLY_CQ_ENABLED) then
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, 3600, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600, 86400)
queries[#queries + 1] = cq_1h:gsub("\n", ""):gsub("%s%s+", " ")
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
else
local cq_1d = getCqQuery(self.db, tags, schema, "autogen", "1d", schema.options.step, 86400)
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
end
if #queries >= max_batch_size then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
end
queries = {}
end
::continue::
end
if #queries > 0 then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup() failed")
return false
end
end
if (migration_necessary) then
-- NOTE: dropping all the continuous queries all together is not possible
-- as InfluxDB does not provide an API for this and calling "SHOW CONTINUOUS QUERIES"
-- yelds too much result data
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema.name .. '__1h" ON ' .. self.db
queries[#queries + 1] = 'DROP CONTINUOUS QUERY "' .. schema.name .. '__1d" ON ' .. self.db
ntop.setPref(HOURLY_CQ_DISABLED_KEY, ternary(HOURLY_CQ_ENABLED, "0", "1"))
-- Need to recalculate it
ntop.delCache(FIRST_AGGREGATION_TIME_KEY)
traceError(TRACE_NORMAL, TRACE_CONSOLE, "InfluxDB CQ migration completed")
end
if (HOURLY_CQ_ENABLED) then
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, 3600, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600, 86400)
if tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY)) == nil then
local first_rp = ternary(HOURLY_CQ_ENABLED, "1h", "1d")
local res = influx_query(self.url .. "/query?db=" .. self.db .. "&epoch=s",
'SELECT FIRST(bytes) FROM "' .. first_rp .. '"."iface:traffic"', self.username, self.password)
local first_t = os.time()
queries[#queries + 1] = cq_1h:gsub("\n", ""):gsub("%s%s+", " ")
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
else
local cq_1d = getCqQuery(self.db, tags, schema, "autogen", "1d", schema.options.step, 86400)
if res and res.series and res.series[1].values then
local v = res.series[1].values[1][1]
queries[#queries + 1] = cq_1d:gsub("\n", ""):gsub("%s%s+", " ")
end
if #queries >= max_batch_size then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup failed")
return false
if v ~= nil then
first_t = v
end
end
queries = {}
ntop.setPref(FIRST_AGGREGATION_TIME_KEY, tostring(first_t))
end
::continue::
return true
end
if #queries > 0 then
if not multiQueryPost(queries, self.url, self.username, self.password) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "InfluxDB setup() failed")
return false
end
end
if (migration_necessary) then
ntop.setPref(HOURLY_CQ_DISABLED_KEY, ternary(HOURLY_CQ_ENABLED, "0", "1"))
-- Need to recalculate it
ntop.delCache(FIRST_AGGREGATION_TIME_KEY)
traceError(TRACE_NORMAL, TRACE_CONSOLE, "InfluxDB CQ migration completed")
end
if tonumber(ntop.getPref(FIRST_AGGREGATION_TIME_KEY)) == nil then
local first_rp = ternary(HOURLY_CQ_ENABLED, "1h", "1d")
local res = influx_query(self.url .. "/query?db=" .. self.db .. "&epoch=s",
'SELECT FIRST(bytes) FROM "' .. first_rp .. '"."iface:traffic"', self.username, self.password)
local first_t = os.time()
if res and res.series and res.series[1].values then
local v = res.series[1].values[1][1]
if v ~= nil then
first_t = v
end
end
ntop.setPref(FIRST_AGGREGATION_TIME_KEY, tostring(first_t))
end
return true
end
-- ##############################################
@ -2240,4 +2259,10 @@ end
-- ##############################################
function driver.getSkipRetentionAndCreationKey()
return INFLUXDB_KEY_SKIP_RETENTION_AND_CREATION
end
-- ##############################################
return driver