Fixes for dumping alerts into ClickHouse

This commit is contained in:
Luca Deri 2021-11-04 19:05:23 +01:00
parent 6bfde9c3a3
commit 13407f6761
22 changed files with 276 additions and 167 deletions

View file

@ -65,6 +65,8 @@ function alert_store:init(args)
-- }
-- }
self._where = {}
-- tprint(debug.traceback())
end
-- ##############################################
@ -766,14 +768,22 @@ function alert_store:has_alerts()
return true
end
-- Now check for historical alerts written in the database. Slightly slower.
-- Fastest way to query SQLite for existance of records. Response will be either a string '1' if records exist,
-- or '0' if records don't exist
local q = string.format(" SELECT EXISTS (SELECT 1 FROM `%s`) has_historical_alerts ", self._table_name)
local res = interface.alert_store_query(q)
local has_historical_alerts = res and res[1] and res[1]["has_historical_alerts"] == "1" or false
-- Now check for historical alerts written in the database. Slightly slower.
-- Fastest way to query SQLite for existance of records. Response will be either a string '1' if records exist,
-- or '0' if records don't exist
local q, res, has_historical_alerts
if(ntop.isClickHouseEnabled()) then
q = string.format(" SELECT COUNT(*) as num_alerts FROM `%s` ", self._table_name)
res = interface.alert_store_query(q)
has_historical_alerts = res and res[1] and (tonumber(res[1].num_alerts) > 0) or false
else
q = string.format(" SELECT EXISTS (SELECT 1 FROM `%s`) has_historical_alerts ", self._table_name)
res = interface.alert_store_query(q)
has_historical_alerts = res and res[1] and res[1]["has_historical_alerts"] == "1" or false
end
return has_historical_alerts
end
@ -926,7 +936,7 @@ function alert_store:count_by_severity_and_time_historical()
local cur_slot = tonumber(p.slot)
local cur_count = tonumber(p.count)
if cur_slot >= min_slot and cur_slot <= max_slot then
all_severities[severity_id].all_slots[cur_slot] = cur_count
all_severities[severity_id].all_slots[cur_slot] = cur_count
end
end

View file

@ -54,11 +54,12 @@ function am_alert_store:insert(alert)
end
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, interface_id, resolved_ip, resolved_name, "..
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, interface_id, resolved_ip, resolved_name, "..
"measurement, measure_threshold, measure_value, json) "..
"VALUES (%u, %u, %u, %u, %u, %d, '%s', '%s', '%s', %u, %f, '%s'); ",
"VALUES (%u, %u, %u, %u, %u, %u, %d, '%s', '%s', '%s', %u, %f, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -35,16 +35,26 @@ end
-- ##############################################
function flow_alert_store:insert(alert)
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, ip_version, cli_ip, srv_ip, cli_port, srv_port, vlan_id, "..
local hex_prefix
local insert_stmt
if(ntop.isClickHouseEnabled()) then
hex_prefix = ""
else
hex_prefix = "X"
end
insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, interface_id, tstamp, tstamp_end, severity, ip_version, cli_ip, srv_ip, cli_port, srv_port, vlan_id, "..
"is_cli_attacker, is_cli_victim, is_srv_attacker, is_srv_victim, proto, l7_proto, l7_master_proto, l7_cat, "..
"cli_name, srv_name, cli_country, srv_country, cli_blacklisted, srv_blacklisted, "..
"cli2srv_bytes, srv2cli_bytes, cli2srv_pkts, srv2cli_pkts, first_seen, community_id, score, "..
"flow_risk_bitmap, alerts_map, json) "..
"VALUES (%u, %u, %u, %u, %u, '%s', '%s', %u, %u, %u, %u, %u, %u, %u, %u, %u, %u, %u, '%s', '%s', '%s', "..
"'%s', %u, %u, %u, %u, %u, %u, %u, '%s', %u, %u, X'%s', '%s'); ",
"VALUES (%u, %u, %u, %u, %u, %u, '%s', '%s', %u, %u, %u, %u, %u, %u, %u, %u, %u, %u, %u, '%s', '%s', '%s', "..
"'%s', %u, %u, %u, %u, %u, %u, %u, '%s', %u, %u, %s'%s', '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp,
ntop.mapScoreToSeverity(alert.score),
@ -76,6 +86,7 @@ function flow_alert_store:insert(alert)
alert.community_id,
alert.score,
alert.flow_risk_bitmap or 0,
hex_prefix,
alert.alerts_map,
self:_escape(alert.json)
)
@ -362,9 +373,9 @@ function flow_alert_store:format_record(value, no_html)
message = string.format("%s %s", message, flow_risk_utils.get_documentation_link(alert_risk))
end
if alert_score > 0 then
message = addExtraFlowInfo(message, alert_json, value)
end
if alert_score > 0 then
message = addExtraFlowInfo(message, alert_json, value)
end
if not other_alerts_by_score[alert_score] then
other_alerts_by_score[alert_score] = {}

View file

@ -58,10 +58,11 @@ function host_alert_store:insert(alert)
end
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, ip_version, ip, vlan_id, name, is_attacker, is_victim, is_client, is_server, tstamp, tstamp_end, severity, score, granularity, json) "..
"VALUES (%u, %u, '%s', %u, '%s', %u, %u, %u, %u, %u, %u, %u, %u, %u, '%s'); ",
"(alert_id, interface_id, ip_version, ip, vlan_id, name, is_attacker, is_victim, is_client, is_server, tstamp, tstamp_end, severity, score, granularity, json) "..
"VALUES (%u, %u, %u, '%s', %u, '%s', %u, %u, %u, %u, %u, %u, %u, %u, %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
ip_version,
ip,
vlan_id or 0,

View file

@ -37,10 +37,11 @@ function interface_alert_store:insert(alert)
local subtype = alert.subtype or ''
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, ifid, subtype, name, alias, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %d, '%s', '%s', '%s', %u, '%s'); ",
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, ifid, subtype, name, alias, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, %d, '%s', '%s', '%s', %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -34,11 +34,12 @@ end
function mac_alert_store:insert(alert)
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, address, device_type, name, "..
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, address, device_type, name, "..
"is_attacker, is_victim, json) "..
"VALUES (%u, %u, %u, %u, %u, '%s', %u, '%s', %u, %u, '%s'); ",
"VALUES (%u, %u, %u, %u, %u, %u, '%s', %u, '%s', %u, %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -37,10 +37,11 @@ function network_alert_store:insert(alert)
local alias = getLocalNetworkAlias(name)
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, local_network_id, name, alias, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, '%s', '%s', %u, '%s'); ",
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, local_network_id, name, alias, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, %u, '%s', '%s', %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -63,10 +63,11 @@ function snmp_device_alert_store:insert(alert)
end
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, ip, name, port, port_name, json) "..
"VALUES (%u, %u, %u, %u, %u, '%s', '%s', %u, '%s', '%s'); ",
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, ip, name, port, port_name, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, '%s', '%s', %u, '%s', '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -33,10 +33,11 @@ end
function system_alert_store:insert(alert)
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, name, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, '%s', %u, '%s'); ",
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, name, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, '%s', %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),

View file

@ -33,10 +33,11 @@ end
function user_alert_store:insert(alert)
local insert_stmt = string.format("INSERT INTO %s "..
"(alert_id, tstamp, tstamp_end, severity, score, user, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, '%s', %u, '%s'); ",
"(alert_id, interface_id, tstamp, tstamp_end, severity, score, user, granularity, json) "..
"VALUES (%u, %u, %u, %u, %u, %u, '%s', %u, '%s'); ",
self._table_name,
alert.alert_id,
interface.getId(),
alert.tstamp,
alert.tstamp_end,
ntop.mapScoreToSeverity(alert.score),