Unify alerts and alerts notifications format

This commit is contained in:
emanuele-f 2019-07-30 18:12:21 +02:00
parent 115378f8fc
commit ee9e5ec9de
16 changed files with 262 additions and 423 deletions

View file

@ -35,12 +35,11 @@ end
-- ##############################################
local function makeAlertId(alert_type, subtype, periodicity, alert_entity)
return(string.format("%s_%s_%s_%s", alert_type, subtype or "", periodicity or "", alert_entity))
end
function alerts_api:getId()
return(makeAlertId(self.type_id, self.subtype, self.periodicity, self.entity_type_id))
-- Returns a string which identifies an alert
function alerts_api.getAlertId(alert)
return(string.format("%s_%s_%s_%s_%s", alert.alert_type,
alert.alert_subtype or "", alert.alert_granularity or "",
alert.alert_entity, alert.alert_entity_val))
end
-- ##############################################
@ -111,60 +110,17 @@ end
-- ##############################################
-- TODO unify alerts and metadata/notications format
function alerts_api.parseNotification(metadata)
local alert_id = makeAlertId(alertType(metadata.type), metadata.alert_subtype, metadata.alert_periodicity, alertEntity(metadata.entity_type))
if known_alerts[alert_id] then
return(known_alerts[alert_id])
end
-- new alert
return(alerts_api:newAlert({
entity = metadata.entity_type,
type = metadata.type,
severity = metadata.severity,
periodicity = metadata.periodicity,
subtype = metadata.subtype,
}))
end
-- ##############################################
-- TODO unify alerts and metadata/notications format
function alerts_api.alertNotificationToRecord(notif)
return {
alert_entity = alertEntity(notif.entity_type),
alert_type = alertType(notif.type),
alert_severity = alertSeverity(notif.severity),
periodicity = notif.periodicity,
alert_subtype = notif.subtype,
alert_entity_val = notif.entity_value,
alert_tstamp = notif.tstamp,
alert_tstamp_end = notif.tstamp_end or notif.tstamp,
alert_granularity = notif.granularity,
alert_json = notif.message,
}
end
-- ##############################################
local function get_alert_triggered_key(type_info)
return(string.format("%d@%s", type_info.alert_type.alert_id, type_info.alert_subtype or ""))
end
-- ##############################################
local function enqueueAlertEvent(alert_event)
-- Enqueue a store alert action on the DB to be performed asynchronously
local function enqueueStoreAlert(ifid, alert)
local trim = nil
local ifid = interface.getId()
if(alert_event.ifid ~= ifid) then
traceError(TRACE_ERROR, TRACE_CONSOLE, string.format("Wrong interface selected: expected %s, got %s", alert_event.ifid, ifid))
return(false)
end
local event_json = json.encode(alert_event)
local alert_json = json.encode(alert)
local queue = getAlertEventQueue(ifid)
if(ntop.llenCache(queue) > MAX_NUM_ENQUEUED_ALERT_PER_INTERFACE) then
@ -174,41 +130,38 @@ local function enqueueAlertEvent(alert_event)
interface.incNumDroppedAlerts(trim)
end
ntop.rpushCache(queue, event_json, trim)
ntop.rpushCache(queue, alert_json, trim)
return(true)
end
-- ##############################################
-- Performs the trigger/release asynchronously.
-- Performs the alert store asynchronously.
-- This is necessary both to avoid paying the database io cost inside
-- the other scripts and as a necessity to avoid a deadlock on the
-- host hash in the host.lua script
function alerts_api.processPendingAlertEvents(deadline)
function alerts_api.checkPendingStoreAlerts(deadline)
local ifnames = interface.getIfNames()
ifnames[getSystemInterfaceId()] = getSystemInterfaceName()
for ifid, _ in pairs(ifnames) do
interface.select(ifid)
local queue = getAlertEventQueue(ifid)
while(true) do
local event_json = ntop.lpopCache(queue)
local alert_json = ntop.lpopCache(queue)
if(not event_json) then
if(not alert_json) then
break
end
local event = json.decode(event_json)
local alert = json.decode(alert_json)
if(event.action == "release") then
interface.storeAlert(
event.tstamp, event.tstamp_end, event.granularity,
event.type, event.subtype or "", event.severity,
event.entity_type, event.entity_value,
event.message) -- event.message: nil for "release"
end
alert_endpoints.dispatchNotification(event, event_json)
interface.storeAlert(
alert.alert_tstamp, alert.alert_tstamp_end, alert.alert_granularity,
alert.alert_type, alert.alert_subtype, alert.alert_severity,
alert.alert_entity, alert.alert_entity_val,
alert.alert_json)
if(os.time() > deadline) then
return(false)
@ -221,6 +174,14 @@ end
-- ##############################################
local function pushAlertNotification(ifid, action, alert)
alert.ifid = ifid
alert.action = action
ntop.rpushCache("ntopng.alerts.notifications_queue", json.encode(alert))
end
-- ##############################################
--! @brief Stores a single alert (or event) into the alerts database
--! @param entity_info data returned by one of the entity_info building functions
--! @param type_info data returned by one of the type_info building functions
@ -241,34 +202,30 @@ function alerts_api.store(entity_info, type_info, when)
return(false)
end
local rv = interface.storeAlert(when, when, granularity_sec,
type_info.alert_type.alert_id, subtype, type_info.alert_severity.severity_id,
entity_info.alert_entity.entity_id, entity_info.alert_entity_val, alert_json)
-- Here the alert is considered stored. The actual store will be performed
-- asynchronously
-- NOTE: keep in sync with SQLite alert format in AlertsManager.cpp
local alert_to_store = {
alert_type = type_info.alert_type.alert_id,
alert_subtype = subtype,
alert_granularity = granularity_sec,
alert_entity = entity_info.alert_entity.entity_id,
alert_entity_val = entity_info.alert_entity_val,
alert_severity = type_info.alert_severity.severity_id,
alert_tstamp = when,
alert_tstamp_end = when,
alert_json = alert_json,
}
if(entity_info.alert_entity.entity_id == alertEntity("host")) then
-- NOTE: for engaged alerts this operation is performed during trigger in C
interface.incTotalHostAlerts(entity_info.alert_entity_val, type_info.alert_type.alert_id)
end
if(rv) then
local action = "store"
local message = {
ifid = ifid,
granularity = granularity_sec,
entity_type = entity_info.alert_entity.entity_id,
entity_value = entity_info.alert_entity_val,
type = type_info.alert_type.alert_id,
severity = type_info.alert_severity.severity_id,
message = alert_json,
subtype = subtype,
tstamp = when,
action = action,
}
alert_endpoints.dispatchNotification(message, json.encode(message))
end
return(rv)
enqueueStoreAlert(ifid, alert_to_store)
pushAlertNotification(ifid, "store", alert_to_store)
return(true)
end
-- ##############################################
@ -313,7 +270,7 @@ function alerts_api.trigger(entity_info, type_info, when)
return(false)
end
if(not triggered) then
if(triggered == nil) then
if(do_trace) then print("[Don't Trigger alert (already triggered?) @ "..granularity_sec.."] "..
entity_info.alert_entity_val .."@"..type_info.alert_type.i18n_title..":".. subtype .. "\n") end
return(false)
@ -328,20 +285,8 @@ function alerts_api.trigger(entity_info, type_info, when)
entity_info.alert_entity_val .."@"..type_info.alert_type.i18n_title..":".. subtype .. "\n") end
end
local alert_event = {
ifid = ifid,
granularity = granularity_sec,
entity_type = entity_info.alert_entity.entity_id,
entity_value = entity_info.alert_entity_val,
type = type_info.alert_type.alert_id,
severity = type_info.alert_severity.severity_id,
message = alert_json,
subtype = subtype,
tstamp = when,
action = "engage",
}
return(enqueueAlertEvent(alert_event))
pushAlertNotification(ifid, "engage", triggered)
return(true)
end
-- ##############################################
@ -358,6 +303,7 @@ function alerts_api.release(entity_info, type_info, when)
local granularity_id = type_info.alert_granularity and type_info.alert_granularity.granularity_id or nil
local subtype = type_info.alert_subtype or ""
local alert_key_name = get_alert_triggered_key(type_info)
local ifid = interface.getId()
local released = nil
if((host.releaseTriggeredAlert) and (entity_info.alert_entity.entity_id == alertEntity("host"))) then
@ -385,21 +331,9 @@ function alerts_api.release(entity_info, type_info, when)
return(false)
end
local alert_event = {
ifid = interface.getId(),
granularity = granularity_sec,
entity_type = entity_info.alert_entity.entity_id,
entity_value = entity_info.alert_entity_val,
type = type_info.alert_type.alert_id,
severity = type_info.alert_severity.severity_id,
subtype = subtype,
tstamp = released.alert_tstamp,
tstamp_end = released.alert_tstamp_end,
message = released.alert_json,
action = "release",
}
return(enqueueAlertEvent(alert_event))
enqueueStoreAlert(ifid, released)
pushAlertNotification(ifid, "release", released)
return(true)
end
-- ##############################################