Make InfluxDB query timeout configurable

This commit is contained in:
emanuele-f 2019-11-29 12:10:59 +01:00
parent 6ae67a84b6
commit d86d58a80d
6 changed files with 37 additions and 42 deletions

View file

@ -25,10 +25,6 @@ local INFLUX_MAX_EXPORT_QUEUE_LEN_LOW = 10
local INFLUX_MAX_EXPORT_QUEUE_LEN_HIGH = 20
local INFLUX_MAX_EXPORT_QUEUE_TRIM_LEN = 30 -- This edge should never be crossed. If it does, queue is manually trimmed
-- When influxdb is not responding, inhibit some queries for some seconds
local INFLUX_QUERY_INHIBIT_SEC = 60
local INFLUX_QUERY_TIMEMOUT_SEC = 5
local INFLUX_EXPORT_QUEUE = "ntopng.influx_file_queue"
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local FIRST_AGGREGATION_TIME_KEY = "ntopng.prefs.influxdb.first_aggregation_time"
@ -76,7 +72,6 @@ function driver:new(options)
password = options.password or "",
has_full_export_queue = isExportQueueFull(),
cur_dropped_points = 0,
inhibit_queries = (ntop.getCache("ntopng.cache.influxdb.inhibit_queries") == "1"),
}
setmetatable(obj, self)
@ -87,6 +82,12 @@ end
-- ##############################################
local function getInfluxDBQueryTimeout()
return tonumber(ntop.getPref("ntopng.prefs.influx_query_timeout")) or 10
end
-- ##############################################
local function inc_dropped_points(num_points)
ntop.incrCache(INFLUX_KEY_DROPPED_POINTS, num_points)
@ -338,16 +339,24 @@ local function influx_query_multi(base_url, query, username, password, options)
options = options or {}
local full_url = base_url .."&q=" .. urlencode(query)
local tstart = os.time()
local res = ntop.httpGet(full_url, username, password, ternary(options.no_timeout, 99999999999, INFLUX_QUERY_TIMEMOUT_SEC), true)
local tend = os.time()
local timeout = ternary(options.no_timeout, 99999999999, getInfluxDBQueryTimeout())
local res = ntop.httpGet(full_url, username, password, timeout, true)
local tdiff = os.time() - tstart
local debug_influxdb_queries = (ntop.getPref("ntopng.prefs.influxdb.print_queries") == "1")
if debug_influxdb_queries then
local tdiff = tend - tstart
local _, num_queries = string.gsub(query, ";", "")
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("influx_query[#%u][%ds]: %s", num_queries+1, tdiff, query))
end
if tdiff >= timeout then
-- Show the error
ntop.setCache("ntopng.cache.influxdb.last_error", i18n("graphs.influxdb_not_responding", {
url = ntop.getHttpPrefix() .. "/lua/admin/prefs.lua?tab=on_disk_ts#influx_query_timeout",
flask_icon = '<i class="fa fa-flask"></i>',
}))
end
-- Log the query
local msg = os.date("%d/%b/%Y %H:%M:%S ") .. query
ntop.lpushCache("ntopng.trace.influxdb_queries", msg, 100 --[[ max queue elements ]])
@ -403,29 +412,9 @@ end
-- ##############################################
function driver:influx_query_timed(base_url, query, username, password, options)
if(self.inhibit_queries) then
return nil
end
local tstart = os.time()
local rv = influx_query(base_url, query, username, password, options)
local duration = os.time() - tstart
if(duration >= INFLUX_QUERY_TIMEMOUT_SEC) then
ntop.setCache("ntopng.cache.influxdb.last_error", i18n("graphs.influxdb_query_inhibited", {inhibit_seconds = INFLUX_QUERY_INHIBIT_SEC}))
ntop.setCache("ntopng.cache.influxdb.inhibit_queries", "1", INFLUX_QUERY_INHIBIT_SEC)
self.inhibit_queries = true
end
return rv
end
-- ##############################################
local function multiQueryPost(queries, url, username, password)
local query_str = table.concat(queries, ";")
local res = ntop.httpPost(url .. "/query", "q=" .. urlencode(query_str), username, password, INFLUX_QUERY_TIMEMOUT_SEC, true)
local res = ntop.httpPost(url .. "/query", "q=" .. urlencode(query_str), username, password, getInfluxDBQueryTimeout(), true)
if not res then
local err = "Invalid response for query: " .. query_str
@ -1186,7 +1175,7 @@ end
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, start_time)
local url = self.url
local data = self:influx_query_timed(url .. "/query?db=".. self.db, query, self.username, self.password)
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
return processListSeriesResult(data, schema, tags_filter, wildcard_tags)
end
@ -1204,7 +1193,7 @@ function driver:exists(schema, tags_filter, wildcard_tags)
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, 0)
local url = self.url
local data = self:influx_query_timed(url .. "/query?db=".. self.db, query, self.username, self.password)
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
return(not table.empty(processListSeriesResult(data, schema, tags_filter, wildcard_tags)))
end
@ -1491,7 +1480,7 @@ end
-- ##############################################
local function getInfluxdbVersion(url, username, password)
local res = ntop.httpGet(url .. "/ping", username, password, INFLUX_QUERY_TIMEMOUT_SEC, true)
local res = ntop.httpGet(url .. "/ping", username, password, getInfluxDBQueryTimeout(), true)
if not res or ((res.RESPONSE_CODE ~= 200) and (res.RESPONSE_CODE ~= 204)) then
local err_info = getResponseError(res)
if err_info == 0 then
@ -1632,7 +1621,7 @@ local function isCompatibleVersion(version)
end
function driver.init(dbname, url, days_retention, username, password, verbose)
local timeout = INFLUX_QUERY_TIMEMOUT_SEC
local timeout = getInfluxDBQueryTimeout()
-- Check version
if verbose then traceError(TRACE_NORMAL, TRACE_CONSOLE, "Contacting influxdb at " .. url .. " ...") end
@ -1726,7 +1715,7 @@ function driver:delete(schema_prefix, tags)
end
local full_url = url .. "/query?db=".. self.db .."&q=" .. urlencode(query)
local res = ntop.httpGet(full_url, self.username, self.password, INFLUX_QUERY_TIMEMOUT_SEC, true)
local res = ntop.httpGet(full_url, self.username, self.password, getInfluxDBQueryTimeout(), true)
if not res or (res.RESPONSE_CODE ~= 200) then
traceError(TRACE_ERROR, TRACE_CONSOLE, getResponseError(res))