Stop writing points if InfluxDB export is failing.

The dropped points are accounted and shown in the gui. Dropped points alert are generated as usual.

Fixes #2998
This commit is contained in:
emanuele-f 2019-10-24 16:04:50 +02:00
parent afbb34e262
commit 0d04eb2d5c
6 changed files with 182 additions and 128 deletions

View file

@ -17,20 +17,61 @@ require("ntop_utils")
-- See also callback_utils.uploadTSdata
--
-- Export problems occur if the export queue length is >= INFLUX_MAX_EXPORT_QUEUE_LEN_HIGH.
-- Then, the problems are considered fixed once the queue length goes <= INFLUX_MAX_EXPORT_QUEUE_LEN_LOW.
-- NOTE: these values are multiplied by the number of interfaces
local INFLUX_MAX_EXPORT_QUEUE_LEN_LOW = 20
local INFLUX_MAX_EXPORT_QUEUE_LEN_HIGH = 30
local INFLUX_MAX_EXPORT_QUEUE_TRIM_LEN = 50 -- This edge should never be crossed. If it does, queue is manually trimmed
local INFLUX_QUERY_TIMEMOUT_SEC = 5
local INFLUX_MAX_EXPORT_QUEUE_LEN = 200
local INFLUX_EXPORT_QUEUE = "ntopng.influx_file_queue"
local MIN_INFLUXDB_SUPPORTED_VERSION = "1.5.1"
local FIRST_AGGREGATION_TIME_KEY = "ntopng.prefs.influxdb.first_aggregation_time"
-- ##############################################
local INFLUX_KEY_PREFIX = "ntopng.cache.influxdb."
-- Keep total counters for dropped points
local INFLUX_KEY_DROPPED_POINTS = INFLUX_KEY_PREFIX.."num_dropped_points"
local INFLUX_KEY_EXPORTED_POINTS = INFLUX_KEY_PREFIX.."num_exported_points"
local INFLUX_KEY_EXPORTS = INFLUX_KEY_PREFIX.."num_exports"
local INFLUX_KEY_FAILED_EXPORTS = INFLUX_KEY_PREFIX.."num_failed_exports"
-- Use this flag as TTL-based redis keys to check wether the health
-- of influxdb is OK.
local INFLUX_FLAGS_TIMEOUT = 60 -- keep the issue for 60 seconds
local INFLUX_FLAG_DROPPING_POINTS = INFLUX_KEY_PREFIX.."flag_dropping_points"
local INFLUX_FLAG_FAILING_EXPORTS = INFLUX_KEY_PREFIX.."flag_failing_exports"
local INFLUX_FLAG_IS_CURRENTLY_DROPPING = INFLUX_KEY_PREFIX.."flag_currently_dropping"
local INFLUX_QUEUE_FULL_FLAG = INFLUX_KEY_PREFIX.."export_queue_full"
-- ##############################################
local function isExportQueueFull()
return(ntop.getCache(INFLUX_QUEUE_FULL_FLAG) == "1")
end
local function setExportQueueFull(is_full)
if(is_full) then
ntop.setCache(INFLUX_QUEUE_FULL_FLAG, "1")
else
ntop.delCache(INFLUX_QUEUE_FULL_FLAG)
end
end
-- ##############################################
function driver:new(options)
local obj = {
url = options.url,
db = options.db,
username = options.username or "",
password = options.password or "",
has_full_export_queue = isExportQueueFull(),
cur_dropped_points = 0,
}
setmetatable(obj, self)
@ -41,7 +82,96 @@ end
-- ##############################################
local function inc_dropped_points(num_points)
ntop.incrCache(INFLUX_KEY_DROPPED_POINTS, num_points)
-- Status flag
ntop.setCache(INFLUX_FLAG_DROPPING_POINTS, "true", INFLUX_FLAGS_TIMEOUT)
-- Internal flag
ntop.setCache(INFLUX_FLAG_IS_CURRENTLY_DROPPING, "true")
end
-- ##############################################
local function inc_exported_points(num_points)
ntop.incrCache(INFLUX_KEY_EXPORTED_POINTS, num_points)
end
-- ##############################################
local function inc_exports()
ntop.incrCache(INFLUX_KEY_EXPORTS)
end
-- ##############################################
local function inc_failed_exports()
ntop.setCache(INFLUX_FLAG_FAILING_EXPORTS, "true", INFLUX_FLAGS_TIMEOUT)
ntop.incrCache(INFLUX_FLAG_FAILING_EXPORTS, 1)
end
-- ##############################################
local function del_all_vals()
ntop.delCache(INFLUX_KEY_DROPPED_POINTS)
ntop.delCache(INFLUX_KEY_EXPORTED_POINTS)
ntop.delCache(INFLUX_KEY_EXPORTS)
ntop.delCache(INFLUX_KEY_FAILED_EXPORTS)
ntop.delCache(INFLUX_FLAG_DROPPING_POINTS)
ntop.delCache(INFLUX_FLAG_FAILING_EXPORTS)
ntop.delCache(INFLUX_FLAG_IS_CURRENTLY_DROPPING)
end
-- ##############################################
function driver:get_dropped_points()
return tonumber(ntop.getCache(INFLUX_KEY_DROPPED_POINTS)) or 0
end
-- ##############################################
function driver:get_exported_points()
return tonumber(ntop.getCache(INFLUX_KEY_EXPORTED_POINTS)) or 0
end
-- ##############################################
function driver:get_exports()
return tonumber(ntop.getCache(INFLUX_KEY_EXPORTS)) or 0
end
-- ##############################################
local function is_dropping_points()
return ntop.getCache(INFLUX_FLAG_DROPPING_POINTS) == "true"
end
-- ##############################################
local function is_failing_exports()
return ntop.getCache(INFLUX_FLAG_FAILING_EXPORTS) == "true"
end
-- ##############################################
-- This function is called when the object is destroyed (garbage collected)
function driver:__gc()
if(self.cur_dropped_points > 0) then
inc_dropped_points(self.cur_dropped_points)
end
end
-- ##############################################
function driver:append(schema, timestamp, tags, metrics)
if(self.has_full_export_queue) then
-- Temporary buffer dropped points into a local counter.
-- They will be exported to redis once the driver is dismissed in driver:__gc()
self.cur_dropped_points = self.cur_dropped_points + 1
return(false)
end
return interface.appendInfluxDB(schema.name, timestamp, tags, metrics)
end
@ -748,99 +878,6 @@ end
-- ##############################################
local INFLUX_KEY_PREFIX = "ntopng.cache.influxdb."
-- Keys to identify redis hash caches to keep stats counters
-- such as dropped and exported points
local INFLUX_KEY_DROPPED_POINTS = INFLUX_KEY_PREFIX.."dropped_points"
local INFLUX_KEY_EXPORTED_POINTS = INFLUX_KEY_PREFIX.."exported_points"
local INFLUX_KEY_EXPORTS = INFLUX_KEY_PREFIX.."exports"
local INFLUX_KEY_FAILED_EXPORTS = INFLUX_KEY_PREFIX.."failed_exports"
-- Use this flag as TTL-based redis keys to check wether the health
-- of influxdb is OK.
local INFLUX_FLAGS_TIMEOUT = 60 -- keep the issue for 60 seconds
local INFLUX_FLAG_DROPPING_POINTS = INFLUX_KEY_PREFIX.."flag_dropping_points"
local INFLUX_FLAG_FAILING_EXPORTS = INFLUX_KEY_PREFIX.."flag_failing_exports"
-- ##############################################
local function inc_val(k, ifid, val_to_add)
local val = tonumber(ntop.getHashCache(k, ifid)) or 0
val = val + val_to_add
ntop.setHashCache(k, ifid, string.format("%i", val))
end
-- ##############################################
local function inc_dropped_points(ifid, num_points)
ntop.setCache(INFLUX_FLAG_DROPPING_POINTS, "true", INFLUX_FLAGS_TIMEOUT)
inc_val(INFLUX_KEY_DROPPED_POINTS, ifid, num_points)
end
-- ##############################################
local function inc_exported_points(ifid, num_points)
inc_val(INFLUX_KEY_EXPORTED_POINTS, ifid, num_points)
end
-- ##############################################
local function inc_exports(ifid)
inc_val(INFLUX_KEY_EXPORTS, ifid, 1)
end
-- ##############################################
local function inc_failed_exports(ifid)
ntop.setCache(INFLUX_FLAG_FAILING_EXPORTS, "true", INFLUX_FLAGS_TIMEOUT)
inc_val(INFLUX_KEY_FAILED_EXPORTS, ifid, 1)
end
-- ##############################################
local function del_all_vals()
ntop.delCache(INFLUX_KEY_DROPPED_POINTS)
ntop.delCache(INFLUX_KEY_EXPORTED_POINTS)
ntop.delCache(INFLUX_KEY_EXPORTS)
ntop.delCache(INFLUX_KEY_FAILED_EXPORTS)
ntop.delCache(INFLUX_FLAG_DROPPING_POINTS)
ntop.delCache(INFLUX_KEY_FAILED_EXPORTS)
end
-- ##############################################
function driver:get_dropped_points(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_DROPPED_POINTS, ifid)) or 0
end
-- ##############################################
function driver:get_exported_points(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_EXPORTED_POINTS, ifid)) or 0
end
-- ##############################################
function driver:get_exports(ifid)
return tonumber(ntop.getHashCache(INFLUX_KEY_EXPORTS, ifid)) or 0
end
-- ##############################################
local function is_dropping_points()
return ntop.getCache(INFLUX_FLAG_DROPPING_POINTS) == "true"
end
-- ##############################################
local function is_failing_exports()
return ntop.getCache(INFLUX_FLAG_FAILING_EXPORTS) == "true"
end
-- ##############################################
-- Returns an indication of the current InfluxDB health.
-- Health is "green" when everything is working as expected,
-- "yellow" when there are recoverable issues, or "red" when
@ -873,13 +910,13 @@ end
-- When we giveup for a certain exportable, that is, when we are not
-- going to try and export it again, we call this function
local function dropExportable(exportable)
inc_dropped_points(exportable["ifid"], exportable["num_points"])
inc_dropped_points(exportable["num_points"])
deleteExportableFile(exportable)
end
-- ##############################################
function driver:_droppedExportablesAlert()
function driver:_droppedPointsAlert()
local alert_periodicity = 60
local k = "ntopng.cache.influxdb_dropped_points_alert_triggered"
@ -901,8 +938,8 @@ end
-- Call this function when an exportable has been sent to InfluxDB
-- with success
local function exportableSuccess(exportable)
inc_exported_points(exportable["ifid"], exportable["num_points"])
inc_exports(exportable["ifid"])
inc_exported_points(exportable["num_points"])
inc_exports()
deleteExportableFile(exportable)
end
@ -911,7 +948,7 @@ end
-- Call this function when the export has failed but it is going
-- to be tried again
local function exportableFailure(exportable)
inc_failed_exports(exportable["ifid"])
inc_failed_exports()
end
-- ##############################################
@ -965,17 +1002,28 @@ end
function driver:export(deadline)
interface.select(getSystemInterfaceId())
local dropped_exportables = false
local num_ifaces = table.len(interface.getIfNames())
local high_value = (INFLUX_MAX_EXPORT_QUEUE_LEN_HIGH * num_ifaces)
local low_value = (INFLUX_MAX_EXPORT_QUEUE_LEN_LOW * num_ifaces)
local max_value = (INFLUX_MAX_EXPORT_QUEUE_TRIM_LEN * num_ifaces)
local num_pending = ntop.llenCache(INFLUX_EXPORT_QUEUE)
traceError(TRACE_INFO, TRACE_CONSOLE, "Exporting "..num_pending.." items")
if(num_pending >= high_value) then
-- Export is blocked. Inhibit subsequent writes.
setExportQueueFull(true)
elseif(num_pending <= low_value) then
-- The problem was solved
setExportQueueFull(false)
end
if num_pending == 0 then
return
end
traceError(TRACE_INFO, TRACE_CONSOLE, "Exporting "..num_pending.." items")
-- Remove the old guys for which it wasn't possible to export
while num_pending > INFLUX_MAX_EXPORT_QUEUE_LEN do
while(num_pending > max_value) do
-- NOTE: this should not happen, but if it does we manually trim the queue
local being_dropped = ntop.lpopCache(INFLUX_EXPORT_QUEUE)
local exportable_to_be_dropped = getExportable(being_dropped)
@ -984,14 +1032,11 @@ function driver:export(deadline)
traceError(TRACE_INFO, TRACE_CONSOLE, "Dropped old item "..(being_dropped or ''))
num_pending = num_pending - 1
if not dropped_exportables then
dropped_exportables = true
end
end
if dropped_exportables then
self:_droppedExportablesAlert()
if(ntop.getCache(INFLUX_FLAG_IS_CURRENTLY_DROPPING) == "true") then
self:_droppedPointsAlert()
ntop.delCache(INFLUX_FLAG_IS_CURRENTLY_DROPPING)
end
-- Post the guys using a pretty long timeout