This commit is contained in:
Alfredo Cardigliano 2022-12-20 06:30:07 -05:00
parent 4618ca357a
commit c08014b934

View file

@ -77,8 +77,8 @@ function recipients.initialize()
for endpoint_key, endpoint in pairs(endpoints.get_types()) do
if endpoint.builtin then
-- Delete (if existing) the old, string-keyed endpoint configuration
endpoints.delete_config("builtin_config_"..endpoint_key)
-- Delete (if existing) the old, string-keyed endpoint configuration
endpoints.delete_config("builtin_config_"..endpoint_key)
-- Add the configuration
local res = endpoints.add_config(
@ -87,22 +87,22 @@ function recipients.initialize()
{} --[[ no default params --]]
)
-- Endpoint successfully created (or existing)
if res and res.endpoint_id then
-- And the recipient
-- Endpoint successfully created (or existing)
if res and res.endpoint_id then
-- And the recipient
local recipient_res = recipients.add_recipient(
res.endpoint_id --[[ the id of the endpoint --]],
"builtin_recipient_"..endpoint_key --[[ the name of the endpoint recipient --]],
all_categories,
all_entities,
default_builtin_minimum_severity,
local recipient_res = recipients.add_recipient(
res.endpoint_id --[[ the id of the endpoint --]],
"builtin_recipient_"..endpoint_key --[[ the name of the endpoint recipient --]],
all_categories,
all_entities,
default_builtin_minimum_severity,
all_host_pools, -- host pools
all_am_hosts, -- active monitoring hosts
{} --[[ no recipient params --]]
)
{} --[[ no recipient params --]]
)
end
end
end
end
@ -153,7 +153,7 @@ local function _lock()
local value_set = ntop.setnxCache(lock_key, "1", max_lock_duration)
if value_set then
return true -- lock acquired
return true -- lock acquired
end
ntop.msleep(1000)
@ -233,8 +233,8 @@ local function _assign_recipient_id()
-- belonging to deleted recipients
for i = 0, recipients.MAX_NUM_RECIPIENTS - 1 do
if not ids_by_key[i] then
next_recipient_id = i
break
next_recipient_id = i
break
end
end
@ -271,7 +271,7 @@ local function check_endpoint_recipient_params(endpoint_key, recipient_params)
if recipient_params and recipient_params[param_name] and not safe_params[param_name] then
safe_params[param_name] = recipient_params[param_name]
elseif not optional then
return false, {status = "failed", error = {type = "missing_mandatory_param", missing_param = param_name}}
return false, {status = "failed", error = {type = "missing_mandatory_param", missing_param = param_name}}
end
end
end
@ -294,13 +294,13 @@ local function _set_endpoint_recipient_params(endpoint_id, recipient_id, endpoin
local k = _get_recipient_details_key(recipient_id)
ntop.setCache(k, json.encode({endpoint_id = endpoint_id,
recipient_name = endpoint_recipient_name,
check_categories = check_categories,
check_entities = check_entities,
minimum_severity = minimum_severity,
host_pools = host_pools_ids,
am_hosts = am_hosts_ids,
recipient_params = safe_params}))
recipient_name = endpoint_recipient_name,
check_categories = check_categories,
check_entities = check_entities,
minimum_severity = minimum_severity,
host_pools = host_pools_ids,
am_hosts = am_hosts_ids,
recipient_params = safe_params}))
return recipient_id
end
@ -329,48 +329,48 @@ function recipients.add_recipient(endpoint_id, endpoint_recipient_name, check_ca
if ec["status"] == "OK" and endpoint_recipient_name then
-- Is the endpoint already existing?
local same_recipient = recipients.get_recipient_by_name(endpoint_recipient_name)
if same_recipient then
res = {
-- Is the endpoint already existing?
local same_recipient = recipients.get_recipient_by_name(endpoint_recipient_name)
if same_recipient then
res = {
status = "failed",
error = {
error = {
type = "endpoint_recipient_already_existing",
endpoint_recipient_name = endpoint_recipient_name
endpoint_recipient_name = endpoint_recipient_name
}
}
else
local endpoint_key = ec["endpoint_key"]
local ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params)
}
else
local endpoint_key = ec["endpoint_key"]
local ok, status = check_endpoint_recipient_params(endpoint_key, recipient_params)
if ok then
local safe_params = status["safe_params"]
if ok then
local safe_params = status["safe_params"]
-- Assign the recipient id
local recipient_id = _assign_recipient_id()
-- Persist the configuration
_set_endpoint_recipient_params(endpoint_id, recipient_id, endpoint_recipient_name, check_categories, check_entities, minimum_severity, host_pools_ids, am_hosts_ids, safe_params)
-- Assign the recipient id
local recipient_id = _assign_recipient_id()
-- Persist the configuration
_set_endpoint_recipient_params(endpoint_id, recipient_id, endpoint_recipient_name, check_categories, check_entities, minimum_severity, host_pools_ids, am_hosts_ids, safe_params)
-- Finally, register the recipient in C so we can start enqueuing/dequeuing notifications
ntop.recipient_register(recipient_id, minimum_severity,
-- Finally, register the recipient in C so we can start enqueuing/dequeuing notifications
ntop.recipient_register(recipient_id, minimum_severity,
table.concat(check_categories, ','),
table.concat(host_pools_ids, ','),
table.concat(check_entities, ',')
)
-- Set a flag to indicate that a recipient has been created
if not ec.endpoint_conf.builtin and isEmptyString(ntop.getPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY)) then
ntop.setPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY, "1")
end
-- Set a flag to indicate that a recipient has been created
if not ec.endpoint_conf.builtin and isEmptyString(ntop.getPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY)) then
ntop.setPref(recipients.FIRST_RECIPIENT_CREATED_CACHE_KEY, "1")
end
res = {
res = {
status = "OK",
recipient_id = recipient_id
}
else
res = status
end
end
end
end
else
res = {
status = "failed",
@ -403,44 +403,44 @@ function recipients.edit_recipient(recipient_id, endpoint_recipient_name, check_
local rc = recipients.get_recipient(recipient_id)
if not rc then
res = {status = "failed", error = {type = "endpoint_recipient_not_existing", endpoint_recipient_name = endpoint_recipient_name}}
res = {status = "failed", error = {type = "endpoint_recipient_not_existing", endpoint_recipient_name = endpoint_recipient_name}}
else
local ec = endpoints.get_endpoint_config(rc["endpoint_id"])
local ec = endpoints.get_endpoint_config(rc["endpoint_id"])
if ec["status"] ~= "OK" then
res = ec
else
-- Are the submitted params those expected by the endpoint?
local ok, status = check_endpoint_recipient_params(ec["endpoint_key"], recipient_params)
if ec["status"] ~= "OK" then
res = ec
else
-- Are the submitted params those expected by the endpoint?
local ok, status = check_endpoint_recipient_params(ec["endpoint_key"], recipient_params)
if not ok then
res = status
else
local safe_params = status["safe_params"]
if not ok then
res = status
else
local safe_params = status["safe_params"]
-- Persist the configuration
_set_endpoint_recipient_params(
rc["endpoint_id"],
recipient_id,
endpoint_recipient_name,
check_categories,
check_entities,
minimum_severity,
host_pools_ids,
-- Persist the configuration
_set_endpoint_recipient_params(
rc["endpoint_id"],
recipient_id,
endpoint_recipient_name,
check_categories,
check_entities,
minimum_severity,
host_pools_ids,
am_hosts_ids,
safe_params)
safe_params)
-- Finally, register the recipient in C to make sure also the C knows about this edit
-- and periodic scripts can be reloaded
ntop.recipient_register(tonumber(recipient_id), minimum_severity,
-- Finally, register the recipient in C to make sure also the C knows about this edit
-- and periodic scripts can be reloaded
ntop.recipient_register(tonumber(recipient_id), minimum_severity,
table.concat(check_categories, ','),
table.concat(host_pools_ids, ','),
table.concat(check_entities, ',')
)
res = {status = "OK"}
end
end
res = {status = "OK"}
end
end
end
_unlock()
@ -462,15 +462,15 @@ function recipients.delete_recipient(recipient_id)
local cur_recipient_details = recipients.get_recipient(recipient_id)
if cur_recipient_details then
-- Remove the key with all the recipient details (e.g., with members)
ntop.delCache(_get_recipient_details_key(recipient_id))
-- Remove the key with all the recipient details (e.g., with members)
ntop.delCache(_get_recipient_details_key(recipient_id))
-- Remove the recipient_id from the set of all currently existing recipient ids
ntop.delMembersCache(_get_recipient_ids_key(), string.format("%d", recipient_id))
-- Remove the recipient_id from the set of all currently existing recipient ids
ntop.delMembersCache(_get_recipient_ids_key(), string.format("%d", recipient_id))
-- Finally, remove the recipient from C
ntop.recipient_delete(recipient_id)
ret = true
-- Finally, remove the recipient from C
ntop.recipient_delete(recipient_id)
ret = true
end
_unlock()
@ -491,7 +491,7 @@ function recipients.delete_recipients_by_conf(endpoint_id)
for _, recipient in pairs(all_recipients) do
-- Use tostring for backwards compatibility
if tostring(recipient.endpoint_id) == tostring(endpoint_id) then
recipients.delete_recipient(recipient.recipient_id)
recipients.delete_recipient(recipient.recipient_id)
end
end
end
@ -510,7 +510,7 @@ function recipients.get_recipients_by_conf(endpoint_id, include_stats)
-- Use tostring for backward compatibility, to handle
-- both integer and string endpoint_id
if tostring(recipient.endpoint_id) == tostring(endpoint_id) then
res[#res + 1] = recipient
res[#res + 1] = recipient
end
end
@ -582,81 +582,81 @@ function recipients.get_recipient(recipient_id, include_stats)
recipient_details = json.decode(recipient_details_str)
if recipient_details then
-- Add the integer recipient id
recipient_details["recipient_id"] = tonumber(recipient_id)
-- Add the integer recipient id
recipient_details["recipient_id"] = tonumber(recipient_id)
-- Add also the endpoint configuration name
-- Use the endpoint id to get the endpoint configuration (use endpoint_conf_name for the old endpoints)
local ec = endpoints.get_endpoint_config(recipient_details["endpoint_id"] or recipient_details["endpoint_conf_name"])
recipient_details["endpoint_conf_name"] = ec["endpoint_conf_name"]
recipient_details["endpoint_id"] = ec["endpoint_id"]
-- Add also the endpoint configuration name
-- Use the endpoint id to get the endpoint configuration (use endpoint_conf_name for the old endpoints)
local ec = endpoints.get_endpoint_config(recipient_details["endpoint_id"] or recipient_details["endpoint_conf_name"])
recipient_details["endpoint_conf_name"] = ec["endpoint_conf_name"]
recipient_details["endpoint_id"] = ec["endpoint_id"]
-- Add check categories. nil or empty check categories read from the JSON imply ANY AVAILABLE category
if not recipient_details["check_categories"] or #recipient_details["check_categories"] == 0 then
if not recipient_details["check_categories"] then
recipient_details["check_categories"] = {}
end
-- Add check categories. nil or empty check categories read from the JSON imply ANY AVAILABLE category
if not recipient_details["check_categories"] or #recipient_details["check_categories"] == 0 then
if not recipient_details["check_categories"] then
recipient_details["check_categories"] = {}
end
local checks = require "checks"
for _, category in pairs(checks.check_categories) do
recipient_details["check_categories"][#recipient_details["check_categories"] + 1] = category.id
end
end
local checks = require "checks"
for _, category in pairs(checks.check_categories) do
recipient_details["check_categories"][#recipient_details["check_categories"] + 1] = category.id
end
end
-- Add check entities. nil or empty check entities read from the JSON imply ANY AVAILABLE entity
if not recipient_details["check_entities"] or #recipient_details["check_entities"] == 0 then
if not recipient_details["check_entities"] then
recipient_details["check_entities"] = {}
end
-- Add check entities. nil or empty check entities read from the JSON imply ANY AVAILABLE entity
if not recipient_details["check_entities"] or #recipient_details["check_entities"] == 0 then
if not recipient_details["check_entities"] then
recipient_details["check_entities"] = {}
end
for _, entity_info in pairs(alert_entities) do
recipient_details["check_entities"][#recipient_details["check_entities"] + 1] = entity_info.entity_id
end
end
recipient_details["check_entities"][#recipient_details["check_entities"] + 1] = entity_info.entity_id
end
end
-- Add host pools
if not recipient_details["host_pools"] then
local pools = host_pools:get_all_pools()
-- Add host pools
if not recipient_details["host_pools"] then
local pools = host_pools:get_all_pools()
if(recipient_details["host_pools"] == nil) then
recipient_details["host_pools"] = {}
end
for _, pool in pairs(pools) do
recipient_details["host_pools"][#recipient_details["host_pools"] + 1] = pool.pool_id
end
end
if(recipient_details["host_pools"] == nil) then
recipient_details["host_pools"] = {}
end
for _, pool in pairs(pools) do
recipient_details["host_pools"][#recipient_details["host_pools"] + 1] = pool.pool_id
end
end
-- Add active monitoring hosts
if not recipient_details["am_hosts"] then
-- Add active monitoring hosts
if not recipient_details["am_hosts"] then
-- No hosts by default
recipient_details["am_hosts"] = {}
end
recipient_details["am_hosts"] = {}
end
-- Add minimum alert severity. nil or empty minimum severity assumes a minimum severity of notice
if not tonumber(recipient_details["minimum_severity"]) then
recipient_details["minimum_severity"] = default_builtin_minimum_severity
end
-- Add minimum alert severity. nil or empty minimum severity assumes a minimum severity of notice
if not tonumber(recipient_details["minimum_severity"]) then
recipient_details["minimum_severity"] = default_builtin_minimum_severity
end
if ec then
recipient_details["endpoint_conf"] = ec["endpoint_conf"]
recipient_details["endpoint_key"] = ec["endpoint_key"]
if ec then
recipient_details["endpoint_conf"] = ec["endpoint_conf"]
recipient_details["endpoint_key"] = ec["endpoint_key"]
local modules_by_name = endpoints.get_types()
local cur_module = modules_by_name[recipient_details["endpoint_key"]]
if cur_module and cur_module.format_recipient_params then
-- Add a formatted output of recipient params
recipient_details["recipient_params_fmt"] = cur_module.format_recipient_params(recipient_details["recipient_params"])
else
-- A default
recipient_details["recipient_params_fmt"] = ""
end
end
local modules_by_name = endpoints.get_types()
local cur_module = modules_by_name[recipient_details["endpoint_key"]]
if cur_module and cur_module.format_recipient_params then
-- Add a formatted output of recipient params
recipient_details["recipient_params_fmt"] = cur_module.format_recipient_params(recipient_details["recipient_params"])
else
-- A default
recipient_details["recipient_params_fmt"] = ""
end
end
if include_stats then
-- Read stats from C
recipient_details["stats"] = ntop.recipient_stats(recipient_details["recipient_id"])
end
-- Read stats from C
recipient_details["stats"] = ntop.recipient_stats(recipient_details["recipient_id"])
end
end
end
@ -674,7 +674,7 @@ function recipients.get_all_recipients(exclude_builtin, include_stats)
local recipient_details = recipients.get_recipient(recipient_id, include_stats)
if recipient_details and (not exclude_builtin or not recipient_details.endpoint_conf.builtin) then
res[#res + 1] = recipient_details
res[#res + 1] = recipient_details
end
end
@ -690,7 +690,7 @@ function recipients.get_recipient_by_name(name)
local recipient_details = recipients.get_recipient(recipient_id)
if recipient_details and recipient_details["recipient_name"] and recipient_details["recipient_name"] == name then
return recipient_details
return recipient_details
end
end
@ -727,11 +727,11 @@ local function get_notification_category(notification, current_script)
else
--- Determined from the entity
if entity_id == alert_entities.system.entity_id then
-- System alert entity becomes system
-- System alert entity becomes system
cur_category_id = checks.check_categories.system.id
else
-- All other entities fall into other category
cur_category_id = checks.check_categories.other.id
-- All other entities fall into other category
cur_category_id = checks.check_categories.other.id
end
end
@ -872,35 +872,35 @@ local function process_notifications(ready_recipients, now, deadline, periodic_f
local cur_time = os.time()
while #ready_recipients > 0 and total_budget >= 0 and cur_time <= deadline and (force_export or not ntop.isDeadlineApproaching()) do
for i = #ready_recipients, 1, -1 do
local ready_recipient = ready_recipients[i]
local recipient = ready_recipient.recipient
local m = ready_recipient.mod
local ready_recipient = ready_recipients[i]
local recipient = ready_recipient.recipient
local m = ready_recipient.mod
debug_print("Dequeuing alerts for ready recipient: ".. recipient.recipient_name.. " recipient_id: "..recipient.recipient_id)
debug_print("Dequeuing alerts for ready recipient: ".. recipient.recipient_name.. " recipient_id: "..recipient.recipient_id)
if last_error_notification == 0 then
last_error_notification = tonumber(ntop.getCache(string.format(ERROR_KEY, recipient.recipient_name))) or 0
end
if last_error_notification == 0 then
last_error_notification = tonumber(ntop.getCache(string.format(ERROR_KEY, recipient.recipient_name))) or 0
end
if m.dequeueRecipientAlerts and (now > MIN_ERROR_DELAY + last_error_notification) then
local rv = m.dequeueRecipientAlerts(recipient, budget_per_iter)
if m.dequeueRecipientAlerts and (now > MIN_ERROR_DELAY + last_error_notification) then
local rv = m.dequeueRecipientAlerts(recipient, budget_per_iter)
-- If the recipient has failed (not rv.success) or
-- if it has no more work to do (not rv.more_available)
-- it can be removed from the array of ready recipients.
if not rv.success or not rv.more_available then
table.remove(ready_recipients, i)
-- If the recipient has failed (not rv.success) or
-- if it has no more work to do (not rv.more_available)
-- it can be removed from the array of ready recipients.
if not rv.success or not rv.more_available then
table.remove(ready_recipients, i)
debug_print("Ready recipient done: ".. recipient.recipient_name)
debug_print("Ready recipient done: ".. recipient.recipient_name)
if not rv.success then
last_error_notification = now
ntop.setCache(string.format(ERROR_KEY, recipient.recipient_name), now)
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg)
end
end
end
if not rv.success then
last_error_notification = now
ntop.setCache(string.format(ERROR_KEY, recipient.recipient_name), now)
local msg = rv.error_message or "Unknown Error"
traceError(TRACE_ERROR, TRACE_CONSOLE, "Error while sending notifications via " .. recipient.recipient_name .. " " .. msg)
end
end
end
end
-- Update the total budget
@ -910,12 +910,12 @@ local function process_notifications(ready_recipients, now, deadline, periodic_f
if do_trace then
if #ready_recipients > 0 then
debug_print("Deadline approaching: "..tostring(deadline < cur_time))
debug_print("Budget left: "..total_budget)
debug_print("The following recipients were unable to dequeue all their notifications")
for _, ready_recipient in pairs(ready_recipients) do
debug_print(" "..ready_recipient.recipient.recipient_name)
end
debug_print("Deadline approaching: "..tostring(deadline < cur_time))
debug_print("Budget left: "..total_budget)
debug_print("The following recipients were unable to dequeue all their notifications")
for _, ready_recipient in pairs(ready_recipients) do
debug_print(" "..ready_recipient.recipient.recipient_name)
end
end
end
end
@ -969,12 +969,12 @@ function recipients.process_notifications(now, deadline, periodic_frequency, for
if modules_by_name[module_name] then
local m = modules_by_name[module_name]
if force_export or check_endpoint_export(recipient.recipient_id, m.EXPORT_FREQUENCY, now) then
-- This recipient is ready for export...
local ready_recipient = {recipient = recipient, recipient_id = recipient.recipient_id, mod = m}
if force_export or check_endpoint_export(recipient.recipient_id, m.EXPORT_FREQUENCY, now) then
-- This recipient is ready for export...
local ready_recipient = {recipient = recipient, recipient_id = recipient.recipient_id, mod = m}
ready_recipients[#ready_recipients + 1] = ready_recipient
end
ready_recipients[#ready_recipients + 1] = ready_recipient
end
end
end