Restructure code to use the upcoming alerts cache

This commit is contained in:
emanuele-f 2017-04-22 15:20:43 +02:00
parent 704cf4844e
commit a4a7f0d12f

View file

@ -71,6 +71,18 @@ function throughput(old, new, interval)
return((bytes(old, new, interval) * 8)/ (interval*1000000))
end
function ingress(old, new, interval)
return new["ingress"] - old["ingress"]
end
function egress(old, new, interval)
return new["egress"] - old["egress"]
end
function inner(old, new, interval)
return new["inner"] - old["inner"]
end
-- ##############################################################################
if ntop.isEnterprise() then
@ -246,54 +258,37 @@ function refresh_alert_configuration(alert_source, ifname, timespan, alerts_stri
interface.refreshNumAlerts()
end
function check_host_alert(ifname, hostname, mode, key, old_json, new_json)
if(verbose) then
print("check_host_alert("..ifname..", "..hostname..", "..mode..", "..key..")<br>\n")
-- #################################
print("<p>--------------------------------------------<p>\n")
print("NEW<br>"..new_json.."<br>\n")
print("<p>--------------------------------------------<p>\n")
print("OLD<br>"..old_json.."<br>\n")
print("<p>--------------------------------------------<p>\n")
end
-- TODO use this information to generate the alerts
local function not_used()
local ifname_clean = "iface_"..tostring(getInterfaceId(ifname))
local alert_level = 2 -- alert_level_error
local alert_status = 1 -- alert_on
local alert_type = 2 -- alert_threshold_exceeded
local alert_level = 2 -- alert_level_error
local alert_type = 2 -- alert_threshold_exceeded
local alert_status -- to be set later
local alert_id = threshold.key
old = j.decode(old_json, 1, nil)
new = j.decode(new_json, 1, nil)
if(rc) then
local alert_msg = "Threshold <b>"..threshold.metric.."</b> crossed by interface <A HREF='"..ntop.getHttpPrefix().."/lua/if_stats.lua?ifid="..tostring(getInterfaceId(ifname))..
"'>"..ifname.."</A> [".. val .." ".. op .. " " .. threshold.edge.."]"
-- str = "bytes;>;123,packets;>;12"
hkey = get_alerts_hash_name(mode, ifname)
str = ntop.getHashCache(hkey, hostname)
duration = granularity2sec(mode)
-- if(verbose) then ("--"..hkey.."="..str.."--<br>") end
if((str ~= nil) and (str ~= "")) then
tokens = split(str, ",")
for _,s in pairs(tokens) do
-- if(verbose) then ("<b>"..s.."</b><br>\n") end
t = string.split(s, ";")
if(t[2] == "gt") then
op = ">"
if verbose then io.write("queuing alert\n") end
interface.engageInterfaceAlert(alert_id, alert_type, alert_level, alert_msg)
if ntop.isPro() then
-- possibly send the alert to nagios as well
ntop.sendNagiosAlert(ifname_clean, granularity, threshold.metric, alert_msg)
end
if(verbose) then print("<font color=red>".. alert_msg .."</font><br>\n") end
else
if(t[2] == "lt") then
op = "<"
else
op = "=="
if(verbose) then print("<p><font color=green><b>Threshold "..threshold.metric.."@"..ifname.." not crossed</b> [value="..val.."]["..op.." "..threshold.edge.."]</font><p>\n") end
interface.releaseInterfaceAlert(alert_id, alert_type, alert_level, "released!")
if ntop.isPro() then
ntop.withdrawNagiosAlert(ifname_clean, granularity, threshold.metric, "service OK")
end
end
-- This is where magic happens: loadstring() evaluates the string
local what = "val = "..t[1].."(old, new, duration); if(val ".. op .. " " .. t[3] .. ") then return(true) else return(false) end"
local f = loadstring(what)
local rc = f()
local alert_id = mode.."_"..t[1] -- the alert identifies is the concat. of time granularity and condition, e.g., min_bytes
if(rc) then
if(rc) then
alert_status = 1 -- alert on
local alert_msg = "Threshold <b>"..t[1].."</b> crossed by host <A HREF='"..ntop.getHttpPrefix().."/lua/host_details.lua?host="..key.."'>"..key:gsub("@0","").."</A> [".. val .." ".. op .. " " .. t[3].."]"
@ -314,62 +309,8 @@ function check_host_alert(ifname, hostname, mode, key, old_json, new_json)
mode, t[1], "service OK")
end
end
end
end
end
function check_network_alert(ifname, network_name, mode, key, old_table, new_table)
if(verbose) then
io.write("check_network_alert("..ifname..", "..network_name..", "..mode..", "..key..")\n")
io.write("new:\n")
tprint(new_table)
io.write("old:\n")
tprint(old_table)
end
local alert_level = 2 -- alert_level_error
local alert_status = 1 -- alert_on
local alert_type = 2 -- alert_threshold_exceeded
deltas = {}
local delta_names = {'ingress', 'egress', 'inner'}
for i = 1, 3 do
local delta_name = delta_names[i]
deltas[delta_name] = 0
if old_table[delta_name] and new_table[delta_name] then
deltas[delta_name] = new_table[delta_name] - old_table[delta_name]
end
end
-- str = "bytes;>;123,packets;>;12"
hkey = get_alerts_hash_name(mode, ifname)
local str = ntop.getHashCache(hkey, network_name)
-- if(verbose) then ("--"..hkey.."="..str.."--<br>") end
if((str ~= nil) and (str ~= "")) then
local tokens = split(str, ",")
for _,s in pairs(tokens) do
-- if(verbose) then ("<b>"..s.."</b><br>\n") end
local t = string.split(s, ";")
if(t[2] == "gt") then
op = ">"
else
if(t[2] == "lt") then
op = "<"
else
op = "=="
end
end
-- This is where magic happens: loadstring() evaluates the string
local what = "val = deltas['"..t[1].."']; if(val ".. op .. " " .. t[3] .. ") then return(true) else return(false) end"
local f = loadstring(what)
local rc = f()
local alert_id = mode.."_"..t[1] -- the alert identifies is the concat. of time granularity and condition, e.g., min_bytes
if(rc) then
if(rc) then
local alert_msg = "Threshold <b>"..t[1].."</b> crossed by network <A HREF='"..ntop.getHttpPrefix().."/lua/network_details.lua?network="..key.."&page=historical'>"..network_name.."</A> [".. val .." ".. op .. " " .. t[3].."]"
if verbose then io.write("queuing alert\n") end
@ -386,75 +327,26 @@ function check_network_alert(ifname, network_name, mode, key, old_table, new_tab
ntop.withdrawNagiosAlert(network_name, mode, t[1], "service OK")
end
end
end
end
end
-- #################################
function check_interface_alert(ifname, mode, old_table, new_table)
local ifname_clean = "iface_"..tostring(getInterfaceId(ifname))
if(verbose) then
print("check_interface_alert("..ifname..", "..mode..")<br>\n")
end
local alert_level = 2 -- alert_level_error
local alert_status = 1 -- alert_on
local alert_type = 2 -- alert_threshold_exceeded
-- Note: see getConfiguredAlertsThresholds for threshold object format
local function entity_threshold_crossed(ifname, granularity, old_table, new_table, threshold)
-- Needed because Lua. loadstring() won't work otherwise.
old = old_table
new = new_table
duration = granularity2sec(granularity)
-- str = "bytes;>;123,packets;>;12"
hkey = get_alerts_hash_name(mode, ifname)
duration = granularity2sec(mode)
str = ntop.getHashCache(hkey, ifname_clean)
local op = op2jsop(threshold.operator)
-- if(verbose) then ("--"..hkey.."="..str.."--<br>") end
if((str ~= nil) and (str ~= "")) then
tokens = split(str, ",")
-- This is where magic happens: loadstring() evaluates the string
local what = "val = "..threshold.metric.."(old, new, duration); if(val ".. op .. " " .. threshold.edge .. ") then return(true) else return(false) end"
for _,s in pairs(tokens) do
-- if(verbose) then ("<b>"..s.."</b><br>\n") end
t = string.split(s, ";")
local f = loadstring(what)
local rc = f()
if(t[2] == "gt") then
op = ">"
else
if(t[2] == "lt") then
op = "<"
else
op = "=="
end
end
-- This is where magic happens: loadstring() evaluates the string
local what = "val = "..t[1].."(old, new, duration); if(val ".. op .. " " .. t[3] .. ") then return(true) else return(false) end"
local f = loadstring(what)
local rc = f()
local alert_id = mode.."_"..t[1] -- the alert identifies is the concat. of time granularity and condition, e.g., min_bytes
if(rc) then
local alert_msg = "Threshold <b>"..t[1].."</b> crossed by interface <A HREF='"..ntop.getHttpPrefix().."/lua/if_stats.lua?ifid="..tostring(getInterfaceId(ifname))..
"'>"..ifname.."</A> [".. val .." ".. op .. " " .. t[3].."]"
if verbose then io.write("queuing alert\n") end
interface.engageInterfaceAlert(alert_id, alert_type, alert_level, alert_msg)
if ntop.isPro() then
-- possibly send the alert to nagios as well
ntop.sendNagiosAlert(ifname_clean, mode, t[1], alert_msg)
end
if(verbose) then print("<font color=red>".. alert_msg .."</font><br>\n") end
else
if(verbose) then print("<p><font color=green><b>Threshold "..t[1].."@"..ifname.." not crossed</b> [value="..val.."]["..op.." "..t[3].."]</font><p>\n") end
interface.releaseInterfaceAlert(alert_id, alert_type, alert_level, "released!")
if ntop.isPro() then
ntop.withdrawNagiosAlert(ifname_clean, mode, t[1], "service OK")
end
end
end
end
return rc
end
-- #################################
@ -469,138 +361,16 @@ function granularity2sec(g)
return(0)
end
-- #################################
function check_interface_threshold(ifname, mode)
interface.select(ifname)
local ifstats = interface.getStats()
ifname_id = ifstats.id
if are_alerts_suppressed("iface_"..ifname_id, ifname) then return end
if(verbose) then print("check_interface_threshold(ifaceId="..ifname_id..", timePeriod="..mode..")<br>\n") end
basedir = fixPath(dirs.workingdir .. "/" .. ifname_id .. "/json/" .. mode)
if(not(ntop.exists(basedir))) then
ntop.mkdir(basedir)
end
if (ifstats ~= nil) then
fname = fixPath(basedir.."/iface_"..ifname_id.."_lastdump")
if(verbose) then print(fname.."<p>\n") end
if (ntop.exists(fname)) then
-- Read old version
old_dump = persistence.load(fname)
if old_dump ~= nil and old_dump.stats ~= nil then
check_interface_alert(ifname, mode, old_dump, ifstats)
end
end
-- Write new version
persistence.store(fname, ifstats)
function op2jsop(op)
if op == "gt" then
return ">"
elseif op "lt" then
return "<"
else
return "=="
end
end
function check_networks_threshold(ifname, mode)
interface.select(ifname)
local subnet_stats = interface.getNetworksStats()
local alarmed_subnets = ntop.getHashKeysCache(get_alerts_hash_name(mode, ifname))
local ifname_id = interface.getStats().id
local basedir = fixPath(dirs.workingdir .. "/" .. ifname_id .. "/json/" .. mode)
if not ntop.exists(basedir) then
ntop.mkdir(basedir)
end
for subnet,sstats in pairs(subnet_stats) do
if sstats == nil or type(alarmed_subnets) ~= "table" or alarmed_subnets[subnet] == nil or are_alerts_suppressed(subnet, ifname) then goto continue end
local statspath = getPathFromKey(subnet)
statspath = fixPath(basedir.. "/" .. statspath)
if not ntop.exists(statspath) then
ntop.mkdir(statspath)
end
statspath = fixPath(statspath .. "/alarmed_subnet_stats_lastdump")
if ntop.exists(statspath) then
-- Read old version
old_dump = persistence.load(statspath)
if (old_dump ~= nil) then
-- (ifname, network_name, mode, key, old_table, new_table)
check_network_alert(ifname, subnet, mode, sstats['network_id'], old_dump, subnet_stats[subnet])
end
end
persistence.store(statspath, subnet_stats[subnet])
::continue::
end
end
-- #################################
function check_host_threshold(ifname, host_ip, mode)
interface.select(ifname)
local ifstats = interface.getStats()
ifname_id = ifstats.id
local host_ip_fsname = host_ip
if are_alerts_suppressed(host_ip, ifname) then return end
if string.ends(host_ip, "@0") then
host_ip_fsname = string.split(host_ip, "@")
host_ip_fsname = host_ip_fsname[1]
end
if(verbose) then print("check_host_threshold("..ifname_id..", "..host_ip..", "..mode..")<br>\n") end
basedir = fixPath(dirs.workingdir .. "/" .. ifname_id .. "/json/" .. mode)
if(not(ntop.exists(basedir))) then
ntop.mkdir(basedir)
end
json = interface.getHostInfo(host_ip)
if(json ~= nil) then
fname = fixPath(basedir.."/".. host_ip_fsname ..".json")
if(verbose) then print(fname.."<p>\n") end
-- Read old version
f = io.open(fname, "r")
if(f ~= nil) then
old_json = f:read("*all")
f:close()
check_host_alert(ifname, host_ip, mode, host_ip, old_json, json["json"])
end
-- Write new version
f = io.open(fname, "w")
if(f ~= nil) then
f:write(json["json"])
f:close()
end
end
end
-- #################################
function scanAlerts(granularity, ifname)
if(verbose) then print("[minute.lua] Scanning ".. granularity .." alerts for interface " .. ifname.."<p>\n") end
check_interface_threshold(ifname, granularity)
check_networks_threshold(ifname, granularity)
-- host alerts checks
local hash_key = get_alerts_hash_name(granularity, ifname)
local hosts = ntop.getHashKeysCache(hash_key)
if(hosts ~= nil) then
for h in pairs(hosts) do
if(verbose) then print("[minute.lua] Checking host " .. h.." alerts<p>\n") end
check_host_threshold(ifname, h, granularity)
end
end
end
-- #################################
function performAlertsQuery(statement, what, opts)
local wargs = {"WHERE", "1=1"}
@ -1848,3 +1618,248 @@ function alertAnomalousHosts()
callback_utils.foreachInterface(ifnames, enable_second_debug, cb)
end
-- #################################
-- script local data - only valid if granularity does not change during script execution
local dirty = false
local engaged_cache = nil
local configured_thresholds = nil
local function getEngagedAlertsCacheKey(granularity)
return "ntopng.cache.engaged_alerts_cache_" .. granularity
end
local function getConfiguredAlertsThresholds(ifname, granularity)
local thresholds_key = get_alerts_hash_name(granularity, ifname)
local res = {}
for entity_val, thresholds_str in pairs(ntop.getHashAllCache(thresholds_key) or {}) do
local thresholds = split(thresholds_str, ",")
res[entity_val] = {}
for _, threshold in pairs(thresholds) do
local parts = string.split(threshold, ";")
if #parts == 3 then
local alert_key = granularity .. "_" .. parts[1] -- the alert key is the concatenation of the granularity and the metric
res[entity_val][parts[1]] = {metric=parts[1], operator=parts[2], edge=parts[3], key=alert_key}
end
end
end
return res
end
local function engageAlert(entity_type, entity_value, atype, akey, message)
-- TOOD engage based on the entity_type
if(verbose) then io.write("Engage Alert: "..entity_value.." "..atype.." "..akey..": "..message.."\n") end
end
local function releaseAlert(entity_type, entity_value, atype, message)
-- TOOD release based on the entity_type
if(verbose) then io.write("Release Alert: "..entity_value.." "..alert.atype.." "..alert.akey..": "..message.."\n") end
end
local function getEngagedAlertsCache(granularity)
-- TODO: this function should return the engaged alerts for the given granularity, from redis or from SQLite
return {}
end
local function initEngagedAlertsCache(ifname, granularity)
if engaged_cache == nil then
engaged_cache = getEngagedAlertsCache(granularity)
configured_thresholds = getConfiguredAlertsThresholds(ifname, granularity)
end
end
function invalidateEngagedAlertsCache()
local keys = ntop.getKeysCache(getEngagedAlertsCacheKey("*")) or {}
for key in pairs(keys) do
ntop.delCache(key)
end
if(verbose) then io.write("Engaged Alerts Cache invalidated\n") end
end
-- #################################
local function check_entity_alerts(ifname, entity_type, entity_value, granularity, old_entity_info, entity_info)
if are_alerts_suppressed(entity_value, ifname) then return end
local current_alerts = {}
local invalidate = false
if (granularity == "min") and (entity_type == "host") then
-- Populate current_alerts with host anomalies
for anomal_name, anomaly in pairs(entity_info.anomalies or {}) do
if starts(anomal_name, "syn_flood") then
current_alerts["tcp_syn_flood"] = current_alerts["tcp_syn_flood"] or {}
current_alerts["tcp_syn_flood"][anomaly] = true
end
end
end
-- Populate current_alerts with threshold crosses
for _, threshold in pairs(configured_thresholds[entity_value] or {}) do
local exceeded = entity_threshold_crossed(ifname, granularity, old_entity_info, entity_info, threshold)
if exceeded then
current_alerts["threshold_cross"] = current_alerts["threshold_cross"] or {}
current_alerts["threshold_cross"][threshold.key] = true
end
end
-- Process current_alerts to detect Engage / Release
for atype, akeys in pairs(current_alerts) do
for akey in pairs(akeys) do
if ((engaged_cache[entity_type] == nil)
or (engaged_cache[entity_type][entity_value] == nil)
or (engaged_cache[entity_type][entity_value][atype] == nil)
or (engaged_cache[entity_type][entity_value][atype][akey] == nil)) then
engageAlert(entity_type, entity_value, atype, akey, "")
invalidate = true
end
end
end
if (engaged_cache[entity_type] ~= nil) and (engaged_cache[entity_type][entity_value] ~= nil) then
for atype, akey in pairs(engaged_cache[entity_type][entity_value]) do
if (current_alerts[atype] ~= nil) and (current_alerts[atype][akey] ~= nil) then
releaseAlert(entity_type, entity_value, atype, akey, "")
invalidate = true
end
end
end
if invalidate and (dirty == false) then
invalidateEngagedAlertsCache()
dirty = true
end
end
-- #################################
local function entity_threshold_status_rw(granularity, ifname_id, fname, use_persistance, to_write --[[nil if it's a read]])
local basedir = fixPath(dirs.workingdir .. "/" .. ifname_id .. "/json/" .. granularity)
local fpath = fixPath(basedir.."/"..fname)
if to_write ~= nil then
local path = string.match(fpath, ".*/")
if not(ntop.exists(path)) then
ntop.mkdir(path)
end
-- Write new version
if use_persistance then
persistence.store(fpath, to_write)
else
local f = io.open(fpath, "w")
if f ~= nil then
f:write(to_write)
f:close()
end
end
elseif ntop.exists(fpath) then
-- Read old version
local old_dump = nil
if use_persistance then
old_dump = persistence.load(fpath)
else
local f = io.open(fpath, "r")
if(f ~= nil) then
old_dump = f:read("*all")
f:close()
if not isEmptyString(old_dump) then
old_dump = j.decode(old_dump, 1, nil)
end
end
end
if old_dump ~= nil then
return old_dump
end
end
end
local function interface_threshold_status_rw(granularity, ifid, to_write)
return entity_threshold_status_rw(granularity, ifid, "iface_"..ifid.."_lastdump", true, to_write)
end
local function network_threshold_status_rw(granularity, ifid, network, to_write)
return entity_threshold_status_rw(granularity, ifid, getPathFromKey(network) .. "/alarmed_subnet_stats_lastdump", true, to_write)
end
local function host_threshold_status_rw(granularity, ifid, hostinfo, to_write)
return entity_threshold_status_rw(granularity, ifid, hostinfo.ip .. ".json", false, to_write)
end
-- #################################
local function check_interface_alerts(ifname, ifid, granularity)
local ifstats = interface.getStats()
local entity_value = "iface_"..ifid
if configured_thresholds[entity_value] then
local old_entity_info = interface_threshold_status_rw(granularity, ifid) -- read old json
local new_entity_info = ifstats
-- Check if there is any threshold configured for the interface
if old_entity_info ~= nil then
check_entity_alerts(ifname, "iface", entity_value, granularity, old_entity_info, new_entity_info)
end
interface_threshold_status_rw(granularity, ifid, new_entity_info) -- write new json
end
end
local function check_networks_alerts(ifname, ifid, granularity)
local subnet_stats = interface.getNetworksStats()
for subnet, sstats in pairs(subnet_stats) do
-- Check if there is any threshold configured for the network
if configured_thresholds[subnet] then
local old_entity_info = network_threshold_status_rw(granularity, ifid, subnet) -- read old json
local new_entity_info = sstats
if old_entity_info ~= nil then
check_entity_alerts(ifname, "network", subnet, granularity, old_entity_info, new_entity_info)
end
network_threshold_status_rw(granularity, ifid, subnet, new_entity_info) -- write new json
end
end
end
local function check_hosts_alerts(ifname, ifid, granularity)
local hosts = interface.getLocalHostsInfo(false)
if hosts ~= nil then
for host, hoststats in pairs(hosts["hosts"] or {}) do
local hostinfo = interface.getHostInfo(host)
local entity_value = hostinfo2hostkey({host=host, vlan=hostinfo.vlan}, nil, true --[[force vlan]])
local old_entity_info = host_threshold_status_rw(granularity, ifid, hostinfo) -- read old json
local new_entity_info_json = hostinfo["json"]
if old_entity_info ~= nil then
check_entity_alerts(ifname, "host", entity_value, granularity, old_entity_info, j.decode(new_entity_info_json, 1, nil))
end
host_threshold_status_rw(granularity, ifid, hostinfo, new_entity_info_json) -- write new json
end
end
end
-- #################################
function scanAlerts(granularity, ifname)
if(verbose) then print("[minute.lua] Scanning ".. granularity .." alerts for interface " .. ifname.."<p>\n") end
local ifid = getInterfaceId(ifname)
initEngagedAlertsCache(ifname, granularity)
check_interface_alerts(ifname, ifid, granularity)
check_networks_alerts(ifname, ifid, granularity)
check_hosts_alerts(ifname, ifid, granularity)
end