Migrate lua alerts to unified alerts queue

This commit is contained in:
emanuele-f 2019-07-30 12:22:00 +02:00
parent d38cd23615
commit 93a3cb9b7e
3 changed files with 61 additions and 168 deletions

View file

@ -19,9 +19,7 @@ local alerts_api = require "alerts_api"
local alert_endpoints = require "alert_endpoints_utils"
local store_alerts_queue = "ntopng.alert_store_queue"
local alert_process_queue = "ntopng.alert_process_queue"
local inactive_hosts_hash_key = "ntopng.prefs.alerts.ifid_%d.inactive_hosts_alerts"
local snmp_alert_queue = "ntopng.snmp_alert_queue"
local shaper_utils = nil
@ -2205,34 +2203,6 @@ local function getSavedDeviceName(mac)
return ntop.getCache(key)
end
-- Global function
function check_process_alerts()
while(true) do
local message = ntop.lpopCache(alert_process_queue)
local elems
if((message == nil) or (message == "")) then
break
end
if(verbose) then print(message.."\n") end
local decoded = json.decode(message)
if(decoded == nil) then
if(verbose) then io.write("JSON Decoding error: "..message.."\n") end
else
interface.select(getSystemInterfaceId())
alerts_api.store(
alerts_api.processEntity(decoded.entity_value),
alerts_api.processNotificationType(decoded.event, decoded.severity, decoded.msg_details),
decoded.when
)
end
end
end
local function check_macs_alerts(ifid, granularity)
if granularity ~= "min" then
return
@ -2668,6 +2638,8 @@ end
-- ##############################################
-- Processes queued alerts and returns the information necessary to store them.
-- Alerts are enqueued by AlertsQueue (C) and enqueueStoreAlert (Lua)
local function processStoreAlertFromQueue(alert)
local entity_info = nil
local type_info = nil
@ -2701,6 +2673,21 @@ local function processStoreAlertFromQueue(alert)
elseif(alert.alert_type == alertType("nfq_flushed")) then
entity_info = alerts_api.interfaceAlertEntity(alert.ifid)
type_info = alerts_api.nfqFlushedType(getInterfaceName(alert.ifid), alert.pct, alert.tot, alert.dropped)
elseif(alert.alert_type == alertType("process_notification")) then
entity_info = alerts_api.processEntity(alert.entity_value)
type_info = alerts_api.processNotificationType(alert.event, alert.severity, alert.msg_details)
elseif(alert.alert_type == alertType("port_status_change")) then
entity_info = alerts_api.snmpInterfaceEntity(alert.device, alert.interface)
type_info = alerts_api.snmpInterfaceStatusChangeType(alert.device, alert.interface, alert.interface_name, alert.status)
elseif(alert.alert_type == alertType("port_duplexstatus_change")) then
entity_info = alerts_api.snmpInterfaceEntity(alert.device, alert.interface)
type_info = alerts_api.snmpInterfaceDuplexStatusChangeType(alert.device, alert.interface, alert.interface_name, alert.status)
elseif(alert.alert_type == alertType("port_errors")) then
entity_info = alerts_api.snmpInterfaceEntity(alert.device, alert.interface)
type_info = alerts_api.snmpInterfaceErrorsType(alert.device, alert.interface, alert.interface_name)
elseif(alert.alert_type == alertType("port_load_threshold_exceeded")) then
entity_info = alerts_api.snmpInterfaceEntity(alert.device, alert.interface)
type_info = alerts_api.snmpPortLoadThresholdExceededType(alert.device, alert.interface, alert.interface_name, alert.interface_load, alert.in_direction)
else
traceError(TRACE_ERROR, TRACE_CONSOLE, "Unknown alert type " .. (alert.alert_type or ""))
end
@ -2765,6 +2752,23 @@ end
-- ##############################################
-- This function is the lua equivalent of the AlertsQueue::pushAlertJson
function enqueueStoreAlert(ifid, alert_type, alert_info, when)
when = when or os.time()
-- Alert-specific fields
local obj = table.clone(alert_info)
-- Mandatory fields
obj.ifid = ifid
obj.alert_type = alert_type.alert_id
obj.alert_tstamp = when
ntop.rpushCache(store_alerts_queue, json.encode(obj))
end
-- ##############################################
local function notify_ntopng_status(started)
local info = ntop.getInfo()
local severity = alertSeverity("info")
@ -2805,105 +2809,47 @@ local function notify_ntopng_status(started)
telemetry_utils.notify(obj)
end
ntop.rpushCache(alert_process_queue, json.encode({
enqueueStoreAlert(getSystemInterfaceId(), alert_consts.alert_types.process_notification, {
msg_details = msg_details,
entity_value = entity_value, event = event,
severity = severity, when = os.time(),
}))
end
-- Global function
function check_snmp_alerts()
while(true) do
local message = ntop.lpopCache(snmp_alert_queue)
local elems
if((message == nil) or (message == "")) then
break
end
if(verbose) then print(message.."\n") end
local decoded = json.decode(message)
if(decoded == nil) then
if(verbose) then io.write("JSON Decoding error: "..message.."\n") end
else
interface.select(getSystemInterfaceId())
local entity_info = alerts_api.snmpInterfaceEntity(decoded.device, decoded.interface)
local type_info = nil
interface.select(getSystemInterfaceId())
if decoded.event == "port_status_change" then
type_info = alerts_api.snmpInterfaceStatusChangeType(decoded.device, decoded.interface, decoded.interface_name, decoded.status)
elseif decoded.event == "port_duplexstatus_change" then
type_info = alerts_api.snmpInterfaceDuplexStatusChangeType(decoded.device, decoded.interface, decoded.interface_name, decoded.status)
elseif decoded.event == "port_errors" then
type_info = alerts_api.snmpInterfaceErrorsType(decoded.device, decoded.interface, decoded.interface_name)
elseif decoded.event == "port_load_threshold_exceeded" then
type_info = alerts_api.snmpPortLoadThresholdExceededType(decoded.device, decoded.interface, decoded.interface_name, decoded.interface_load, decoded.in_direction)
else
traceError(TRACE_ERROR, TRACE_CONSOLE, "Unknown SNMP event " .. (decoded.event or ""))
end
if(type_info ~= nil) then
alerts_api.store(entity_info, type_info)
end
end
end
severity = severity,
})
end
function notify_snmp_device_interface_status_change(snmp_host, snmp_interface)
local msg = {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
status = snmp_interface["status"],
event = "port_status_change",
when = os.time(),
}
ntop.rpushCache(snmp_alert_queue, json.encode(msg))
enqueueStoreAlert(getSystemInterfaceId(), alert_consts.alert_types.port_status_change, {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
status = snmp_interface["status"],
})
end
function notify_snmp_device_interface_duplexstatus_change(snmp_host, snmp_interface)
local msg = {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
status = snmp_interface["duplexstatus"],
event = "port_duplexstatus_change",
when = os.time(),
}
ntop.rpushCache(snmp_alert_queue, json.encode(msg))
enqueueStoreAlert(getSystemInterfaceId(), alert_consts.alert_types.port_duplexstatus_change, {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
status = snmp_interface["duplexstatus"],
})
end
function notify_snmp_device_interface_errors(snmp_host, snmp_interface)
local msg = {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
event = "port_errors",
when = os.time(),
}
ntop.rpushCache(snmp_alert_queue, json.encode(msg))
enqueueStoreAlert(getSystemInterfaceId(), alert_consts.alert_types.port_errors, {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
})
end
function notify_snmp_device_interface_load_threshold_exceeded(snmp_host, snmp_interface, interface_load, in_direction)
local msg = {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
interface_load = interface_load,
event = "port_load_threshold_exceeded",
in_direction = in_direction,
when = os.time(),
}
ntop.rpushCache(snmp_alert_queue, json.encode(msg))
enqueueStoreAlert(getSystemInterfaceId(), alert_consts.alert_types.port_load_threshold_exceeded, {
device = snmp_host,
interface = snmp_interface["index"],
interface_name = snmp_interface["name"],
interface_load = interface_load,
in_direction = in_direction,
})
end
function notify_ntopng_start()