Initial implementation of OO recipients and integer ids

This commit is contained in:
Simone Mainardi 2020-09-07 13:03:34 +02:00
parent 5db42ecec4
commit 3cefe89d4b
7 changed files with 489 additions and 26 deletions

View file

@ -3,6 +3,7 @@
--
local json = require "dkjson"
local notification_configs = require("notification_configs")
-- ##############################################
@ -18,7 +19,7 @@ function recipients:create(args)
end
end
local this = args or {key = "base", enabled = true}
local this = args or {key = "recipients"}
setmetatable(this, self)
self.__index = self
@ -41,36 +42,390 @@ end
-- ##############################################
-- @brief Dispatches a store `notification` to the recipient
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients:dispatch_store_notification(notification)
return self.enabled
function recipients:_get_recipients_lock_key()
local key = string.format("ntopng.cache.recipients.recipients_lock")
return key
end
-- ##############################################
-- @brief Dispatches a trigger `notification` to the recipient
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients:dispatch_trigger_notification(notification)
return self.enabled
function recipients:_lock()
local max_lock_duration = 5 -- seconds
local max_lock_attempts = 5 -- give up after at most this number of attempts
local lock_key = self:_get_recipients_lock_key()
for i = 1, max_lock_attempts do
local value_set = ntop.setnxCache(lock_key, "1", max_lock_duration)
if value_set then
return true -- lock acquired
end
ntop.msleep(1000)
end
return false -- lock not acquired
end
-- ##############################################
-- @brief Dispatches a release `notification` to the recipient
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients:dispatch_release_notification(notification)
return self.enabled
function recipients:_unlock()
ntop.delCache(self:_get_recipients_lock_key())
end
-- ##############################################
-- @brief Process notifications previously dispatched with one of the dispatch_{store,trigger,release}_notification
function recipients:process_notifications()
return self.enabled
function recipients:_get_recipients_prefix_key()
local key = string.format("ntopng.prefs.recipients")
return key
end
-- ##############################################
function recipients:_get_next_recipient_id_key()
local key = string.format("%s.next_recipient_id", self:_get_recipients_prefix_key())
return key
end
-- ##############################################
function recipients:_get_recipient_ids_key()
local key = string.format("%s.recipient_ids", self:_get_recipients_prefix_key())
return key
end
-- ##############################################
function recipients:_get_recipient_details_key(recipient_id)
if not recipient_id then
-- A recipient id is always needed
return nil
end
local key = string.format("%s.recipient_id_%d.details", self:_get_recipients_prefix_key(), recipient_id)
return key
end
-- ##############################################
-- @brief Returns an array with all the currently assigned recipient ids
function recipients:_get_assigned_recipient_ids()
local res = { }
local cur_recipient_ids = ntop.getMembersCache(self:_get_recipient_ids_key())
for _, cur_recipient_id in pairs(cur_recipient_ids) do
cur_recipient_id = tonumber(cur_recipient_id)
res[#res + 1] = cur_recipient_id
end
return res
end
-- ##############################################
function recipients:_assign_recipient_id()
local next_recipient_id_key = self:_get_next_recipient_id_key()
-- Atomically assign a new recipient id
local next_recipient_id = ntop.incrCache(next_recipient_id_key)
-- Add the atomically assigned recipient id to the set of current recipient ids (set wants a string)
ntop.setMembersCache(self:_get_recipient_ids_key(), string.format("%d", next_recipient_id))
return next_recipient_id
end
-- ##############################################
-- @brief Sanity checks for the endpoint configuration parameters
-- @param endpoint_key A string with the notification endpoint key
-- @param recipient_params A table with endpoint recipient params that will be possibly sanitized
-- @return false with a description of the error, or true, with a table containing sanitized configuration params.
local function check_endpoint_recipient_params(endpoint_key, recipient_params)
if not recipient_params or not type(recipient_params) == "table" then
return false, {status = "failed", error = {type = "invalid_recipient_params"}}
end
-- Create a safe_params table with only expected params
local safe_params = {}
-- So iterate across all expected params of the current endpoint
for _, param in ipairs(notification_configs.get_types()[endpoint_key].recipient_params) do
-- param is a lua table so we access its elements
local param_name = param["param_name"]
local optional = param["optional"]
if recipient_params and recipient_params[param_name] and not safe_params[param_name] then
safe_params[param_name] = recipient_params[param_name]
elseif not optional then
return false, {status = "failed", error = {type = "missing_mandatory_param", missing_param = param_name}}
end
end
return true, {status = "OK", safe_params = safe_params}
end
-- ##############################################
-- @brief Set a configuration along with its params. Configuration name and params must be already sanitized
-- @param endpoint_conf_name A string with the notification endpoint configuration name
-- @param endpoint_recipient_name A string with the recipient name
-- @param safe_params A table with endpoint recipient params already sanitized
-- @return nil
function recipients:_set_endpoint_recipient_params(recipient_id, endpoint_conf_name, endpoint_recipient_name, safe_params)
-- Write the endpoint recipient config into another hash
local k = self:_get_recipient_details_key(recipient_id)
ntop.setCache(k, json.encode({endpoint_conf_name = endpoint_conf_name,
recipient_name = endpoint_recipient_name,
recipient_params = safe_params}))
return recipient_id
end
-- ##############################################
function recipients:add_recipient(endpoint_conf_name, endpoint_recipient_name, recipient_params)
local locked = self:_lock()
local res = { status = "failed" }
if locked then
local ec = notification_configs.get_endpoint_config(endpoint_conf_name)
if ec["status"] == "OK" and endpoint_recipient_name then
-- Is the endpoint already existing?
local same_recipient = self:get_recipient_by_name(endpoint_recipient_name)
if same_recipient then
res = {status = "failed",
error = {type = "endpoint_recipient_already_existing",
endpoint_recipient_name = endpoint_recipient_name}
}
else
local endpoint_key = ec["endpoint_key"]
local ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params)
if ok then
local safe_params = status["safe_params"]
-- Assign the recipient id
local recipient_id = self:_assign_recipient_id()
-- Persist the configuration
self:_set_endpoint_recipient_params(recipient_id, endpoint_conf_name, endpoint_recipient_name, safe_params)
res = {status = "OK", recipient_id = recipient_id}
end
end
end
self:_unlock()
end
return res
end
-- ##############################################
-- @brief Edit the recipient parameters of an existing endpoint configuration
-- @param recipient_id The integer recipient identificator
-- @param endpoint_recipient_name A string with the recipient name
-- @param recipient_params A table with endpoint recipient params that will be possibly sanitized
-- @return A table with a key status which is either "OK" or "failed". When "failed", the table contains another key "error" with an indication of the issue
function recipients:edit_recipient(recipient_id, endpoint_recipient_name, recipient_params)
local locked = self:_lock()
local res = { status = "failed" }
if locked then
local rc = self:get_recipient(recipient_id)
if not rc then
res = {status = "failed", error = {type = "endpoint_recipient_not_existing", endpoint_recipient_name = endpoint_recipient_name}}
else
local ec = notification_configs.get_endpoint_config(rc["endpoint_conf_name"])
if ec["status"] ~= "OK" then
res = ec
else
-- Are the submitted params those expected by the endpoint?
local ok, status = check_endpoint_recipient_params(ec["endpoint_key"], recipient_params)
if not ok then
res = status
else
local safe_params = status["safe_params"]
-- Persist the configuration
self:_set_endpoint_recipient_params(recipient_id, rc["endpoint_conf_name"], endpoint_recipient_name, safe_params)
res = {status = "OK"}
end
end
end
self:_unlock()
end
return res
end
-- ##############################################
function recipients:delete_recipient(recipient_id)
local ret = false
local locked = self:_lock()
if locked then
-- Make sure the recipient exists
local cur_recipient_details = self:get_recipient(recipient_id)
if cur_recipient_details then
-- Remove the key with all the recipient details (e.g., with members, and configset_id)
ntop.delCache(self:_get_recipient_details_key(recipient_id))
-- Remove the recipient_id from the set of all currently existing recipient ids
ntop.delMembersCache(self:_get_recipient_ids_key(), string.format("%d", recipient_id))
ret = true
end
self:_unlock()
end
return ret
end
-- #################################################################
function recipients:test_recipient(endpoint_conf_name, recipient_params)
-- Get endpoint config
local ec = notification_configs.get_endpoint_config(endpoint_conf_name)
if ec["status"] ~= "OK" then
return ec
end
-- Check recipient parameters
local endpoint_key = ec["endpoint_key"]
ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params)
if not ok then
return status
end
local safe_params = status["safe_params"]
-- Create dummy recipient
local recipient = {
endpoint_conf = ec,
recipient_params = safe_params,
}
-- Get endpoint module
local modules_by_name = notification_configs.get_types()
local module_name = recipient.endpoint_conf.endpoint_key
local m = modules_by_name[module_name]
if not m then
return {status = "failed", error = {type = "endpoint_module_not_existing", endpoint_recipient_name = recipient.endpoint_conf.endpoint_key}}
end
-- Run test
if not m.runTest then
return {status = "failed", error = {type = "endpoint_test_not_available", endpoint_recipient_name = recipient.endpoint_conf.endpoint_key}}
end
local success, message = m.runTest(recipient)
if success then
return {status = "OK"}
else
return {status = "failed", error = {type = "endpoint_test_failure", message = message }}
end
end
-- ##############################################
function recipients:get_recipient(recipient_id)
local recipient_details
local recipient_details_key = self:_get_recipient_details_key(recipient_id)
-- Attempt at retrieving the recipient details key and at decoding it from JSON
if recipient_details_key then
local recipient_details_str = ntop.getCache(recipient_details_key)
recipient_details = json.decode(recipient_details_str)
if recipient_details then
-- Add the integer recipient id
recipient_details["recipient_id"] = tonumber(recipient_id)
-- Add also the endpoint configuration
local ec = notification_configs.get_endpoint_config(recipient_details["endpoint_conf_name"])
recipient_details["endpoint_conf"] = ec["endpoint_conf"]
end
end
-- Upon success, recipient details are returned, otherwise nil
return recipient_details
end
-- ##############################################
function recipients:get_all_recipients()
local res = {}
local cur_recipient_ids = self:_get_assigned_recipient_ids()
for _, recipient_id in pairs(cur_recipient_ids) do
local recipient_details = self:get_recipient(recipient_id)
if recipient_details then
res[#res + 1] = recipient_details
end
end
return res
end
-- ##############################################
function recipients:get_recipient_by_name(name)
local cur_recipient_ids = self:_get_assigned_recipient_ids()
for _, recipient_id in pairs(cur_recipient_ids) do
local recipient_details = self:get_recipient(recipient_id)
if recipient_details and recipient_details["recipient_name"] and recipient_details["recipient_name"] == name then
return recipient_details
end
end
return nil
end
-- ##############################################
function recipients:cleanup()
-- Delete recipient details
local cur_recipient_ids = self:_get_assigned_recipient_ids()
for _, recipient_id in pairs(cur_recipient_ids) do
self:delete_recipient(recipient_id)
end
local locked = self:_lock()
if locked then
-- Delete recipient ids
ntop.delCache(self:_get_recipient_ids_key())
ntop.delCache(self:_get_next_recipient_id_key())
self:_unlock()
end
end
-- ##############################################

View file

@ -1,121 +0,0 @@
--
-- (C) 2017-20 - ntop.org
--
local dirs = ntop.getDirs()
package.path = dirs.installdir .. "/scripts/lua/modules/recipients/?.lua;" .. package.path
local json = require "dkjson"
local os_utils = require "os_utils"
local lua_path_utils = require "lua_path_utils"
local recipients = require "recipients"
-- ##############################################
local recipients_lua_utils = {}
-- ##############################################
-- @brief Returns an array of recipient Lua class instances, for all available recipients
-- e.g., {sqlite_recipients:create(), ...}
--
local function all_recipient_instances_factory()
local recipients_dir = os_utils.fixPath(dirs.installdir .. "/scripts/lua/modules/recipients/")
lua_path_utils.package_path_prepend(recipients_dir)
local res = {}
for recipient_file in pairs(ntop.readdir(recipients_dir)) do
if recipient_file:match("_recipients%.lua$") then
local recipient_module_name = recipient_file:gsub(".lua", "")
local recipient_require = os_utils.fixPath(string.format("recipients.%s", recipient_module_name))
local recipient = require(recipient_require)
if recipient.create then
-- If it has a method create, then we can instantiate it and add it to the result
local instance = recipient:create()
res[#res + 1] = instance
end
end
end
return res
end
-- ##############################################
local all_instances_cache
-- @brief Caches all available recipient instances to avoid reloading them every time
local function get_all_instances_cache()
if not all_instances_cache then
all_instances_cache = all_recipient_instances_factory()
end
return all_instances_cache
end
-- ##############################################
-- @brief Dispatches a trigger `notification` to every available recipient (trigger notifications are generated in `alerts_api.trigger`)
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients_lua_utils.dispatch_trigger_notification(notification)
local all_instances = get_all_instances_cache()
local res = true
for _, instance in pairs(all_instances) do
res = res and instance:dispatch_trigger_notification(notification)
end
return res
end
-- ##############################################
-- @brief Dispatches a release `notification` to every available recipient (trigger notifications are generated in `alerts_api.release`)
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients_lua_utils.dispatch_release_notification(notification)
local all_instances = get_all_instances_cache()
local res = true
for _, instance in pairs(all_instances) do
res = res and instance:dispatch_release_notification(notification)
end
return res
end
-- ##############################################
-- @brief Dispatches a store `notification` to every available recipient (trigger notifications are generated in `alerts_api.store`)
-- @param notification A JSON string with all the alert information
-- @return true If the dispatching has been successfull, false otherwise
function recipients_lua_utils.dispatch_store_notification(notification)
local all_instances = get_all_instances_cache()
local res = true
for _, instance in pairs(all_instances) do
res = res and instance:dispatch_store_notification(notification)
end
return res
end
-- ##############################################
-- @brief Processs notifications previously dispatched for every available recipient
function recipients_lua_utils.process_notifications(notification)
local all_instances = get_all_instances_cache()
local res = true
for _, instance in pairs(all_instances) do
res = res and instance:process_notifications(notification)
end
return res
end
-- ##############################################
return recipients_lua_utils

View file

@ -1,89 +0,0 @@
--
-- (C) 2017-20 - ntop.org
--
local dirs = ntop.getDirs()
package.path = dirs.installdir .. "/scripts/lua/modules/recipients/?.lua;" .. package.path
local recipients = require "recipients"
local json = require "dkjson"
local sqlite_recipients = {}
-- ##############################################
function sqlite_recipients:create()
-- Instance of the base class
local _sqlite_recipients = recipients:create()
self.enabled = true -- Toggle this to skip dispatch and processing of notifications
-- Subclass using the base class instance
self.key = "sqlite"
-- self is passed as argument so it will be set as base class metatable
-- and this will actually make it possible to override functions
local _sqlite_recipients_instance = _sqlite_recipients:create(self)
-- Return the instance
return _sqlite_recipients_instance
end
-- ##############################################
function sqlite_recipients:dispatch_store_notification(notification)
if self.enabled then
return ntop.pushSqliteAlert(notification)
end
return false
end
-- ##############################################
function sqlite_recipients:dispatch_release_notification(notification)
if self.enabled then
return ntop.pushSqliteAlert(notification)
end
return false
end
-- ##############################################
function sqlite_recipients:process_notifications()
if not self.enabled or not areAlertsEnabled() then
return false
end
-- SQLite Alerts
while(true) do
local alert_json = ntop.popSqliteAlert()
if(not alert_json) then
break
end
local alert = json.decode(alert_json)
if(alert) then
interface.select(string.format("%d", alert.ifid))
if(alert.is_flow_alert) then
interface.storeFlowAlert(alert)
else
interface.storeAlert(
alert.alert_tstamp, alert.alert_tstamp_end, alert.alert_granularity,
alert.alert_type, alert.alert_subtype, alert.alert_severity,
alert.alert_entity, alert.alert_entity_val,
alert.alert_json)
end
end
if ntop.isDeadlineApproaching() then
return(false)
end
end
return(true)
end
-- ##############################################
return sqlite_recipients