Implemented ASN Sankey and table for historical flows (#9464)

This commit is contained in:
Manuel Ceroni 2025-07-31 16:22:44 +02:00 committed by GitHub
parent 9a3cc8933d
commit 78459d4335
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 241 additions and 308 deletions

View file

@ -19,6 +19,10 @@
--
local dirs = ntop.getDirs()
package.path = dirs.installdir .. "/scripts/lua/modules/?.lua;" .. package.path
if ntop.isPro() then
package.path = dirs.installdir .. "/scripts/lua/pro/modules/?.lua;" .. package.path
end
require "lua_utils"
require "flow_utils"
@ -29,21 +33,28 @@ local rest_utils = require "rest_utils"
local format_utils = require "format_utils"
local info = ntop.getInfo()
local callback_utils = require "callback_utils"
--local flow_as_utils = require "flow_as_historical_utils"
local epoch_begin = tonumber(_GET["epoch_begin"])
local epoch_end = tonumber(_GET["epoch_end"])
local flow_as_utils = require "flow_as_utils"
if (epoch_end ~= nil and epoch_end < os.time()) and ntop.isClickHouseEnabled() then
flow_as_utils = require "flow_as_historical_utils"
end
local rc = rest_utils.consts.success.ok
local ifid = _GET["ifid"]
local criteria_as = _GET["criteria_as"]
local asn = tonumber(_GET["asn"])
-- Read the ASn preferences from Policy -> Network Configuration -> ASN
local customer_asn, sub_customer_asn, remote_asn = as_utils.getAllConfigurations()
local rsp = {}
local edges = {}
local nodes = {}
local traffic_criteria = {INGRESS = 0, EGRESS = 1, TOTAL = 2, ING_EGR = 3, AS_TRAFFIC = 4}
local node_type = {ASN = 0}
local criteria
local other_asns = "Other"
@ -64,6 +75,7 @@ local max_nodes = 15
-- If show_all is enabled, the max_nodes setting is ignored and all nodes are
-- shown regardless of their number.
local show_all = false
-- ################################################
if isEmptyString(ifid) then
@ -146,6 +158,97 @@ end
-- ####################
local function create_node_url(node, type)
if type == node_type.ASN and node ~= "other" then
return ntop.getHttpPrefix() .. "/lua/as_overview.lua?asn=" .. node .. "&criteria_as=" .. criteria_as
else
return "#"
end
end
local function create_node_label(node, type)
if type == node_type.ASN and node ~= "other" then
return format_utils.formatASN(tonumber(node), false, true)
else
return node
end
end
-- ####################
-- REWORK
local tot_bytes_exporter = {}
local tot_bytes_exp_if = {}
-- REWORK
local sankey_table = {}
local function init_sankey_table(key, src_id, src, type_src, dst_id, dst, type_dst)
if not sankey_table[key] then
sankey_table[key] = {
src_id = src_id,
src = create_node_label(src, type_src),
url_src = create_node_url(src, type_src),
dst_id = dst_id,
dst = create_node_label(dst, type_dst),
url_dst = create_node_url(dst, type_dst),
weight = 0,
}
end
end
-- ####################
local function inc_sankey_node(key, bytes)
sankey_table[key].weight = sankey_table[key].weight + bytes
end
-- ####################
local function get_sankey_key(node_src, node_dst)
return node_src .. "@" .. node_dst
end
-- ####################
-- create_sankey creates the nodes and links for the Sankey.
-- Node SRC_ID -> Node DST_ID
local function create_sankey(sankey_table)
--tprint(sankey_table)
for id, link in pairs(sankey_table) do
if link.src_id ~= as_root_key then
add_unique_node(link.src_id, link.src, link.url_src) end
if link.dst_id ~= as_root_key then
add_unique_node(link.dst_id, link.dst, link.url_dst) end
add_link(link.src_id,link.dst_id,bytesToSize(link.weight),link.weight)
end
end
-- ####################
-- key: key used to reorder the table
-- index: index of the reduced table
local function top_max_nodes(l, key, index)
local sorted_list = {}
for i, v in pairs(l) do
table.insert(sorted_list, v)
end
table.sort(sorted_list, function(a, b)
return tonumber(a[key]) > tonumber(b[key])
end)
local result = {}
for i = 1, math.min(max_nodes, table.len(sorted_list)) do
local entry = sorted_list[i]
if entry[index] ~= nil then
result[entry[index]] = entry
end
end
return result
end
-- ####################
-- INGRESS_EGRESS
local function search_probe(ip)
for interface_id, probe_list in pairs(ifstats.probes or {}) do
for probe_ip, probe_info in pairsByKeys(probe_list or {}) do
@ -158,46 +261,6 @@ local function search_probe(ip)
end
end
local tot_bytes_exporter = {}
local tot_bytes_transit = {}
local tot_bytes_as = {}
local tot_bytes_exp_if = {}
local tot_bytes_as_transit = {}
local function get_interface_key(device_ip, interface_index)
return device_ip .. "@" .. interface_index
end
local function get_as_key(transit, src_dst_as)
return transit .. "@" .. src_dst_as
end
-- ####################
local function top_max_nodes(l, criteria)
local list = {}
for id, data in pairs(l) do
table.insert(list, { id = id, rcvd = data.rcvd, sent = data.sent })
end
if criteria == traffic_criteria.INGRESS then
table.sort(list, function(a, b) return a.sent > b.sent end)
else
table.sort(list, function(a, b) return a.rcvd > b.rcvd end)
end
local reduced = {}
for i = 1, math.min(max_nodes-1, table.len(list)) do
local entry = list[i]
reduced[entry.id] = {
sent = entry.sent,
rcvd = entry.rcvd
}
end
return reduced
end
-- ####################
-- Builds the nodes and links of the Sankey between the interfaces and the exporters
local function build_interface_exporter(criteria, tot_bytes_exp_if,
exporter_nodes)
@ -258,83 +321,6 @@ end
-- ####################
-- Builds the nodes and links of the Sankey between the source/destination
-- autonomous systems and the transit autonomous systems
local function build_as_transit(criteria, tot_bytes_as_transit, transit_nodes)
for n_id, data in pairs(tot_bytes_as_transit) do
if (criteria == traffic_criteria.INGRESS and data.sent > 0) or
(criteria == traffic_criteria.EGRESS and data.rcvd > 0) or
(criteria == traffic_criteria.TOTAL and (data.rcvd + data.sent) > 0) then
local transit
local src_dst_as
if tot_bytes_as[data.src_dst_as] == nil then
src_dst_as = other_asns
else
src_dst_as = format_utils.formatASN(data.src_dst_as, false, true)
end
if tot_bytes_transit[data.transit] == nil then
transit = other_asns
else
transit = format_utils.formatASN(data.transit, false, true)
end
local transit_node_id
if data.src_dst_as ~= data.transit then
transit_node_id = find_node_id(transit)
end
local src_dst_as_id
if criteria == traffic_criteria.EGRESS then
src_dst_as_id = find_node_id("egress" .. "_" .. src_dst_as)
if data.src_dst_as ~= data.transit then
transit_node_id = "egress" .. "_" .. transit_node_id
end
src_dst_as_id = "egress" .. "_" .. src_dst_as_id
else
src_dst_as_id = find_node_id("ingress" .. "_" .. src_dst_as)
if data.src_dst_as ~= data.transit then
transit_node_id = "ingress" .. "_" .. transit_node_id
end
src_dst_as_id = "ingress" .. "_" .. src_dst_as_id
end
if transit_nodes[data.transit] == nil and data.src_dst_as ~= data.transit then
transit_nodes[data.transit] = transit_node_id
if transit == other_asns then other_node = transit_node_id end
end
local url = ntop.getHttpPrefix() .. "/lua/as_overview.lua?asn=" .. data.src_dst_as .. "&criteria_as=" .. criteria_as
if src_dst_as == other_asns then url = "#" end
add_unique_node(src_dst_as_id, src_dst_as, url)
if data.transit ~= data.src_dst_as then
url = ntop.getHttpPrefix() .. "/lua/as_overview.lua?asn=" .. data.transit .. "&criteria_as=" .. criteria_as
if transit == other_asns then url = "#" end
add_unique_node(transit_node_id, transit, url)
end
if criteria == traffic_criteria.INGRESS then
if data.transit ~= data.src_dst_as then
add_link(src_dst_as_id, transit_node_id, bytesToSize(data.sent), data.sent)
else
add_link(src_dst_as_id, as_root_key, bytesToSize(data.sent), data.sent)
end
elseif criteria == traffic_criteria.EGRESS then
if data.transit ~= data.src_dst_as then
add_link(transit_node_id, src_dst_as_id, bytesToSize(data.rcvd), data.rcvd)
else
add_link(as_root_key, src_dst_as_id, bytesToSize(data.rcvd), data.rcvd)
end
elseif criteria == traffic_criteria.TOTAL then
if data.src_dst_as then
add_link(src_dst_as_id, transit_node_id,
bytesToSize(data.rcvd + data.sent), data.rcvd + data.sent)
else
add_link(src_dst_as_id, as_root_key,
bytesToSize(data.rcvd + data.sent), data.rcvd + data.sent)
end
end
end
end
end
-- ####################
-- Builds the nodes and links of the Sankey that lead to the root,
-- the AS at the center of the Sankey. (exporter->AS)
local function build_to_as(criteria, nodes, tot_bytes)
@ -353,100 +339,109 @@ local function build_to_as(criteria, nodes, tot_bytes)
end
-- ####################
-- AS_VIEW
-- Builds the nodes and links of the Sankey that lead to the root,
-- the AS at the center of the Sankey. (transit->AS)
local function build_transit_as(criteria, nodes, tot_bytes)
for id, node_id in pairs(nodes) do
local sent = tot_bytes[id].sent
local rcvd = tot_bytes[id].rcvd
if criteria == traffic_criteria.INGRESS and sent > 0 then
if tot_bytes_transit[id] ~= nil then
add_link(node_id,as_root_key,bytesToSize(sent),sent)
else
add_link(other_node,as_root_key,bytesToSize(sent),sent)
end
elseif criteria == traffic_criteria.EGRESS and rcvd > 0 then
if tot_bytes_transit[id] ~= nil then
add_link(as_root_key, node_id, bytesToSize(rcvd), rcvd)
else
add_link(as_root_key, other_node, bytesToSize(rcvd), rcvd)
end
elseif criteria == traffic_criteria.TOTAL then
add_link(node_id, as_root_key, bytesToSize(rcvd + sent),rcvd + sent)
end
end
local function checkTransit(src_id, src, dst_id, dst, src_dst_as, src_dst_peer, bytes, criteria, top_transit)
if(src_dst_peer ~= src_dst_as) then
local transit = tonumber(src_dst_peer)
if not top_transit[src_dst_peer] then transit = "other" end
local transit_id
if criteria == traffic_criteria.INGRESS then
transit_id = "ingress_transit_".. find_node_id(transit)
else
transit_id = "egress_transit_".. find_node_id(transit)
end
local key_transit = get_sankey_key(src_id, transit_id)
init_sankey_table(key_transit, src_id, src, node_type.ASN, transit_id, transit, node_type.ASN)
inc_sankey_node(key_transit, bytes)
local key_dst = get_sankey_key(transit_id, dst_id)
init_sankey_table(key_dst, transit_id, transit, node_type.ASN, dst_id, dst, node_type.ASN)
inc_sankey_node(key_dst, bytes)
else
local key = get_sankey_key(src_id, dst_id)
init_sankey_table(key, src_id, src, node_type.ASN, dst_id, dst, node_type.ASN)
inc_sankey_node(key, bytes)
end
end
-- ####################
local function build_as_traffic(criteria)
local transit_nodes = {}
local tot_bytes_transit_aux = tot_bytes_transit
local tot_bytes_as_aux = tot_bytes_as
if(table.len(tot_bytes_transit) > max_nodes) then
-- order and reduce the transit list
tot_bytes_transit = top_max_nodes(tot_bytes_transit, criteria)
end
if(table.len(tot_bytes_as) > max_nodes) then
-- order and reduce the as list
tot_bytes_as = top_max_nodes(tot_bytes_as, criteria)
end
build_as_transit(criteria, tot_bytes_as_transit, transit_nodes)
build_transit_as(criteria, transit_nodes, tot_bytes_transit_aux)
reset_nodes()
tot_bytes_transit = tot_bytes_transit_aux
tot_bytes_as = tot_bytes_as_aux
-- build_as_view_ing_egr creates the ingress or egress part of the Sankey
-- Note: src/dst refers to the source/destination node of the Sankey, not the flows.
local function build_as_view_ing_egr(transit_traffic, criteria, top, top_transit)
for id, data in pairs (transit_traffic) do
if(criteria == traffic_criteria.INGRESS) and (data.bytes_rcvd ~= nil and tonumber(data.bytes_rcvd) > 0) then
-- Check if dst_as is in the top_sent; otherwise, insert it into the other node.
local src = data.dst_as
if not top[src] then
src = "other"
end
local src_id = "ingress_" .. find_node_id(src)
local dst = data.src_as
local dst_id = as_root_key
checkTransit(src_id, src, dst_id, dst, data.dst_as, data.dst_peer_as, tonumber(data.bytes_rcvd), criteria, top_transit)
elseif (criteria == traffic_criteria.EGRESS) and (data.bytes_sent ~= nil and tonumber(data.bytes_sent) > 0) then
local dst = data.src_as
if not top[dst] then
dst = "other"
end
local dst_id = "egress_" .. find_node_id(dst)
local src = data.dst_as
local src_id = as_root_key
checkTransit(src_id, src, dst_id, dst, data.src_as, data.src_peer_as, tonumber(data.bytes_sent), criteria, top_transit)
end
end
end
local byte_tables = flow_as_utils.getAll(asn, ifid)
tot_bytes_exporter = byte_tables.exporter
tot_bytes_transit = byte_tables.transit
tot_bytes_as = byte_tables.as
tot_bytes_exp_if = byte_tables.exp_if
tot_bytes_as_transit = byte_tables.as_transit
-- ###################################################
if (sankey_debug) then
tprint(tot_bytes_exp_if)
tprint(tot_bytes_exporter)
end
local exporter_nodes = {}
-- If the criteria is ING_EGR, the Sankey will consist of two parts:
-- ingress: ingress interface -> exporter -> AS;
-- egress: AS -> exporter -> egress interface.
-- It is necessary to create the links interface<->exporter and then exporter<->AS root.
if (criteria == traffic_criteria.ING_EGR) then
-- Ingress
build_interface_exporter(traffic_criteria.INGRESS, tot_bytes_exp_if,
exporter_nodes)
build_to_as(traffic_criteria.INGRESS, exporter_nodes, tot_bytes_exporter)
tot_bytes_exporter = flow_as_utils.getExporter(asn, ifid)
tot_bytes_exp_if = flow_as_utils.getExporterIf(as, ifid)
local exporter_nodes = {}
-- Ingress
build_interface_exporter(traffic_criteria.INGRESS, tot_bytes_exp_if,
exporter_nodes)
build_to_as(traffic_criteria.INGRESS, exporter_nodes, tot_bytes_exporter)
reset_nodes()
exporter_nodes = {}
reset_nodes()
exporter_nodes = {}
-- Egress
build_interface_exporter(traffic_criteria.EGRESS, tot_bytes_exp_if,
exporter_nodes)
build_to_as(traffic_criteria.EGRESS, exporter_nodes, tot_bytes_exporter)
-- Egress
build_interface_exporter(traffic_criteria.EGRESS, tot_bytes_exp_if,
exporter_nodes)
build_to_as(traffic_criteria.EGRESS, exporter_nodes, tot_bytes_exporter)
-- If the criteria is AS_TRAFFIC, the Sankey diagram will consist of two parts:
-- ingress: source AS -> transit AS -> AS;
-- egress: AS -> transit AS -> destination AS.
-- It is therefore necessary to create the links source/destination AS <-> transit AS and then transit AS <-> root AS.
elseif (criteria == traffic_criteria.AS_TRAFFIC) then
build_as_traffic(traffic_criteria.INGRESS)
build_as_traffic(traffic_criteria.EGRESS)
else
-- Build Interface <-> Exporter links
build_interface_exporter(criteria, tot_bytes_exp_if, exporter_nodes)
-- Build Exporter <-> AS links
build_exporter_as(criteria, exporter_nodes)
local transit_traffic_ingress = flow_as_utils.getAsTransit(asn, ifid, traffic_criteria.INGRESS, epoch_begin, epoch_end)
local transit_traffic_egress = flow_as_utils.getAsTransit(asn, ifid, traffic_criteria.EGRESS, epoch_begin, epoch_end)
-- top_sent and top_rcvd are necessary for the other node
local top_sent = top_max_nodes(transit_traffic_ingress, "bytes_rcvd", "dst_as")
local top_rcvd = top_max_nodes(transit_traffic_egress, "bytes_sent", "src_as")
-- transit_list_ingress and transit_list_egress are necessary for the transit other node.
local transit_list_ingress = flow_as_utils.getTransitList(asn, ifid, traffic_criteria.INGRESS, epoch_begin, epoch_end)
local transit_list_egress = flow_as_utils.getTransitList(asn, ifid, traffic_criteria.EGRESS, epoch_begin, epoch_end)
local top_sent_transit = top_max_nodes(transit_list_ingress, "bytes_rcvd", "dst_peer_as")
local top_rcvd_transit = top_max_nodes(transit_list_egress, "bytes_sent", "src_peer_as")
build_as_view_ing_egr(transit_traffic_ingress, traffic_criteria.INGRESS, top_sent, top_sent_transit)
create_sankey(sankey_table)
reset_nodes()
sankey_table = {}
build_as_view_ing_egr(transit_traffic_egress, traffic_criteria.EGRESS, top_rcvd, top_rcvd_transit)
create_sankey(sankey_table)
end
rsp["nodes"] = nodes
rsp["links"] = links