Enqueueing flow alerts to be stored/notified from Lua, removed DB select to notiy alert (using the alert object directly)

This commit is contained in:
Alfredo Cardigliano 2019-11-05 15:53:10 +01:00
parent 17a22ef986
commit acdab024da
8 changed files with 134 additions and 158 deletions

View file

@ -13,9 +13,6 @@ local do_trace = false
local alerts_api = {}
-- NOTE: sqlite can handle about 10-50 alerts/sec
local MAX_NUM_ENQUEUED_ALERT_PER_INTERFACE = 256
-- Just helpers
local str_2_periodicity = {
["min"] = 60,
@ -69,8 +66,8 @@ local function enqueueStoreAlert(ifid, alert)
local alert_json = json.encode(alert)
local queue = getAlertEventQueue(ifid)
if(ntop.llenCache(queue) > MAX_NUM_ENQUEUED_ALERT_PER_INTERFACE) then
trim = math.ceil(MAX_NUM_ENQUEUED_ALERT_PER_INTERFACE/2)
if(ntop.llenCache(queue) > alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE) then
trim = math.ceil(alert_consts.MAX_NUM_QUEUED_ALERTS_PER_INTERFACE/2)
traceError(TRACE_INFO, TRACE_CONSOLE, string.format("Alerts event queue too long: dropping %u alerts", trim))
interface.incNumDroppedAlerts(trim)
@ -82,21 +79,36 @@ end
-- ##############################################
local function pushAlertNotification(ifid, action, alert)
alert.ifid = ifid
alert.action = action
ntop.rpushCache("ntopng.alerts.notifications_queue", json.encode(alert))
-- Enqueue a store flow alert action on the DB to be performed asynchronously
local function enqueueStoreFlowAlert(ifid, alert_json)
local trim = nil
local queue = getFlowAlertEventQueue(ifid)
if(ntop.llenCache(queue) > alert_consts.MAX_NUM_QUEUED_ALERTS_PER_MODULE) then
trim = math.ceil(alert_consts.MAX_NUM_QUEUED_ALERTS_PER_MODULE/2)
traceError(TRACE_INFO, TRACE_CONSOLE, string.format("Flow alerts event queue too long: dropping %u alerts", trim))
interface.incNumDroppedAlerts(trim)
end
ntop.rpushCache(queue, alert_json, trim)
return(true)
end
-- ##############################################
local function pushFlowAlertNotification(ifid, rowid)
local alert = {}
local function pushAlertJSONNotification(alert_json)
ntop.rpushCache("ntopng.alerts.notifications_queue", alert_json)
end
alert.table_name = "flows_alerts"
alert.rowid = rowid
-- ##############################################
pushAlertNotification(ifid, "store", alert)
local function pushAlertNotification(ifid, action, alert)
alert.ifid = ifid
alert.action = action
pushAlertJSONNotification(json.encode(alert))
end
-- ##############################################
@ -156,26 +168,20 @@ function alerts_api.checkPendingStoreAlerts(deadline)
local alert = json.decode(alert_json)
if(alert) then
local ret = interface.storeFlowAlert(
alert.tstamp, alert.alert_type,
alert.alert_severity, alert.status,
json.encode(alert.alert_json),
alert.vlan_id, alert.protocol,
alert.master_protocol, alert.app_protocol,
alert.cli.ip, alert.srv.ip,
alert.cli.country, alert.srv.country,
alert.cli.os, alert.srv.os,
alert.cli.asn, alert.srv.asn,
alert.cli.is_localhost, alert.srv.is_localhost,
alert.cli.is_blacklisted, alert.srv.is_blacklisted,
alert.cli2srv.bytes, alert.cli2srv.packets,
alert.srv2cli.bytes, alert.srv2cli.packets)
if ret and ret.rowid and ret.rowid > 0 then
-- pushFlowAlertNotification should be probably moved earlier to
-- flow.lua:triggerFlowAlert for consistency with the host alerts
pushFlowAlertNotification(ifid, ret.rowid)
end
interface.storeFlowAlert(
alert.alert_tstamp, alert.alert_type,
alert.alert_severity, alert.flow_status,
alert.alert_json,
alert.vlan_id, alert.proto,
alert.l7_master_proto, alert.l7_proto,
alert.cli_addr, alert.srv_addr,
alert.cli_country, alert.srv_country,
alert.cli_os, alert.srv_os,
alert.cli_asn, alert.srv_asn,
alert.cli_localhost, alert.srv_localhost,
alert.cli_blacklisted, alert.srv_blacklisted,
alert.cli2srv_bytes, alert.cli2srv_packets,
alert.srv2cli_bytes, alert.srv2cli_packets)
end
if(os.time() > deadline) then
@ -190,6 +196,21 @@ end
-- ##############################################
--! @brief Stores a flow alert into the alerts database
function alerts_api.storeFlow(alert)
local ifid = interface.getId()
alert.ifid = ifid
alert.action = "store"
local alert_json = json.encode(alert)
enqueueStoreFlowAlert(ifid, alert_json)
pushAlertJSONNotification(alert_json)
end
-- ##############################################
--! @brief Stores a single alert (or event) into the alerts database
--! @param entity_info data returned by one of the entity_info building functions
--! @param type_info data returned by one of the type_info building functions