Updated flow aggregation with new aggregation function

This commit is contained in:
Matteo Biscosi 2025-09-02 11:12:24 +02:00
parent 04afeefb5f
commit 307d10e81f
5 changed files with 90 additions and 136 deletions

View file

@ -11,6 +11,7 @@ end
local flow_data = {}
local callback_utils = require "callback_utils"
local flow_data_preset = require "flow_data_preset"
local flow_data_historical = require "flow_data_historical"
local trace_stats = false
local separator = " | "
@ -142,29 +143,40 @@ end
-- ###################################################
function flow_data.getStats(queries)
function flow_data.getStats(queries, isHistorical)
local results = {}
if isHistorical ~= nil then
if not isHistorical then
-- In case of sankey, the aggregation function is called by flow_sankey.lua
interface.aggregateASNFlows()
end
end
for _, query_info in pairs(queries or {}) do
local isHistorical = false
if (query_info.filters and query_info.filters.last_seen) then
isHistorical = true
end
local columns = flow_data_preset.retrieveColumns(
query_info.select_query, isHistorical)
local filters = flow_data_preset.convertFilters(query_info.where_query,
query_info.filters,
isHistorical)
local select_columns = flow_data_preset.retrieveColumns(
query_info.select_query)
local sort_columns =
flow_data_preset.retrieveColumns(query_info.sort_by) or {}
local where_filters = flow_data_preset.convertFilters(
query_info.where_query, query_info.filters,
isHistorical)
local different_columns = flow_data_preset.retrieveColumns(
query_info.different_from, isHistorical)
query_info.different_from)
-- Function used to, given a flow, merge all the same data togheter
local function formatData(_, flow)
if not ntop.isEnterpriseM() then return {} end
-- Create an empty table, composed only by key values
-- e.g. ip: 1.1.1.1
-- asn: 2222
-- bytes_sent: 0
-- bytes_rcvd: 0
local empty = formatEmptyStats(columns, flow,
local empty = formatEmptyStats(select_columns, flow,
query_info.rename_key_field,
different_columns,
query_info.skip_flow)
@ -179,30 +191,19 @@ function flow_data.getStats(queries)
end
-- Now update the data (e.g. bytes_sent and bytes_rcvd)
results[key] = updateStats(columns, query_info.invert_direction,
flow, results[key])
results[key] = updateStats(select_columns,
query_info.invert_direction, flow,
results[key])
::skip_flow::
end
if isHistorical then -- Historical
if not ntop.isEnterpriseM() then return {} end
flow_data_historical = require "flow_data_historical"
local first_seen = query_info.filters.first_seen
local last_seen = query_info.filters.last_seen
local sort_columns = flow_data_preset.retrieveColumns(
query_info.sort_by, isHistorical) or {}
local query_result = flow_data_historical.retrieveFlowData(columns,
filters,
sort_columns,
query_info.invert_direction,
first_seen,
last_seen)
for _, flow in pairs(query_result or {}) do
formatData(_, flow)
end
else -- Live
callback_utils.foreachFlow(filters.ifid, os.time() + 30, -- deadline
formatData, filters)
end
local first_seen = query_info.filters.first_seen
local last_seen = query_info.filters.last_seen
local query_result = flow_data_historical.retrieveFlowData(
select_columns, where_filters, sort_columns,
query_info.invert_direction, first_seen,
last_seen, isHistorical)
for _, flow in pairs(query_result or {}) do formatData(_, flow) end
end
-- Now we have equal table for live and historical, so now format the data and run the checks
@ -225,11 +226,14 @@ function flow_data.formatStats(stats_to_format)
local formatted_element = {}
for key, value in pairs(values or {}) do
-- Format the data
local formatted_data, url_link = flow_data_preset.getFormattedDataAndLink(key,
value,
values)
local formatted_data, url_link =
flow_data_preset.getFormattedDataAndLink(key, value, values)
if (formatted_data ~= value) or (type(formatted_data) == "string") then
formatted_element[key] = {id = value, name = formatted_data, url = url_link}
formatted_element[key] = {
id = value,
name = formatted_data,
url = url_link
}
else
formatted_element[key] = value
end

View file

@ -12,30 +12,27 @@ local flow_data_preset = {}
-- if the column has to be used as key (except bytes/packets everything should be a key)
-- and the formatter
local columns = {
asn = {filters = {live = "asnFilter", historical = {"src_asn", "dst_asn"}}},
asn = {filters = {"src_asn", "dst_asn"}},
src_asn = {
live = "src_as",
historical = "SRC_ASN",
column_id = "SRC_ASN",
is_key = true,
filters = {live = "asnSrcFilter", historical = "SRC_ASN"},
filters = "SRC_ASN",
formatter = {
funct = format_utils.formatASN,
link = "/lua/as_overview.lua?asn=%s"
}
},
dst_asn = {
live = "dst_as",
historical = "DST_ASN",
column_id = "DST_ASN",
is_key = true,
filters = {live = "asnDstFilter", historical = "DST_ASN"},
filters = "DST_ASN",
formatter = {
funct = format_utils.formatASN,
link = "/lua/as_overview.lua?asn=%s"
}
},
src_peer_asn = {
live = "src_peer_as",
historical = "SRC_PEER_ASN",
column_id = "SRC_PEER_ASN",
is_key = true,
formatter = {
funct = format_utils.formatASN,
@ -43,8 +40,7 @@ local columns = {
}
},
dst_peer_asn = {
live = "dst_peer_as",
historical = "DST_PEER_ASN",
column_id = "DST_PEER_ASN",
is_key = true,
formatter = {
funct = format_utils.formatASN,
@ -52,10 +48,9 @@ local columns = {
}
},
in_device = {
live = "device_ip",
historical = "PROBE_IP",
column_id = "PROBE_IP",
is_key = true,
filters = {live = "deviceIpFilter", historical = "PROBE_IP"},
filters = "PROBE_IP",
db_formatting_fun = "IPv4NumToString",
formatter = {
funct = getProbeName,
@ -64,11 +59,10 @@ local columns = {
}
},
out_device = {
live = "device_ip",
historical = "PROBE_IP",
column_id = "PROBE_IP",
is_key = true,
db_formatting_fun = "IPv4NumToString",
filters = {live = "deviceIpFilter", historical = "PROBE_IP"},
filters = "PROBE_IP",
formatter = {
funct = getProbeName,
link = "/lua/pro/enterprise/exporters.lua?%s",
@ -76,10 +70,9 @@ local columns = {
}
},
device = {
live = "device_ip",
historical = "PROBE_IP",
column_id = "PROBE_IP",
is_key = true,
filters = {live = "deviceIpFilter", historical = "PROBE_IP"},
filters = "PROBE_IP",
db_formatting_fun = "IPv4NumToString",
formatter = {
funct = getProbeName,
@ -88,8 +81,7 @@ local columns = {
}
},
in_iface_index = {
live = "in_index",
historical = "INPUT_SNMP",
column_id = "INPUT_SNMP",
is_key = true,
formatter = {
funct = format_portidx_name,
@ -99,8 +91,7 @@ local columns = {
}
},
out_iface_index = {
live = "out_index",
historical = "OUTPUT_SNMP",
column_id = "OUTPUT_SNMP",
is_key = true,
formatter = {
funct = format_portidx_name,
@ -117,20 +108,9 @@ local columns = {
generateLinkParams = generateExporterInterfaceLink
}
},
bytes_sent = {
live = "bytes_sent",
historical = "SUM(SRC2DST_BYTES)",
invert_with = "bytes_rcvd"
},
bytes_rcvd = {
live = "bytes_rcvd",
historical = "SUM(DST2SRC_BYTES)",
invert_with = "bytes_sent"
},
total_bytes = {
live = "bytes",
historical = "SUM(TOTAL_BYTES)",
},
bytes_sent = {column_id = "SUM(SRC2DST_BYTES)", invert_with = "bytes_rcvd"},
bytes_rcvd = {column_id = "SUM(DST2SRC_BYTES)", invert_with = "bytes_sent"},
total_bytes = {column_id = "SUM(TOTAL_BYTES)"},
as = {
formatter = {
funct = format_utils.formatASN,
@ -179,9 +159,9 @@ local columns = {
link = "/lua/as_overview.lua?asn=%s"
}
},
ifid = {live = "ifid", historical = "INTERFACE_ID"},
first_seen = {historical = "FIRST_SEEN"},
last_seen = {historical = "LAST_SEEN"}
ifid = {column_id = "INTERFACE_ID"},
first_seen = {column_id = "FIRST_SEEN"},
last_seen = {column_id = "LAST_SEEN"}
}
-- ###########################################
@ -189,18 +169,14 @@ local columns = {
-- @brief Given a list of ids in an array format, returns the
-- name of the data in case of live flows or historical
-- @param columns_id Array, containing a list of columns ids
-- @param is_historical Boolean, true if historical ids are needed, false for live ones
-- @return a list of matching ids for the requested type (live or historical)
function flow_data_preset.retrieveColumns(columns_id, is_historical)
function flow_data_preset.retrieveColumns(columns_id)
local id_list = {}
local data_type = "live"
if is_historical then data_type = "historical" end
for position, id in pairs(columns_id or {}) do
if columns[id] then
local column_info = columns[id]
column_info["key"] = column_info[data_type]
column_info["key"] = column_info["column_id"]
column_info["id"] = id
id_list[position] = column_info
end
@ -230,40 +206,31 @@ end
-- @brief Given a list of filters, returns a list of particular conditions for each filter if available
-- @param where Array, containing a list of ids for the where
-- @param available_filters List, containing a list filters, key is the key of the filter, value is the value
-- @param is_historical Boolean, true if historical ids are needed, false for live ones
-- @return a list of filters, key - value
function flow_data_preset.convertFilters(where, available_filters, is_historical)
function flow_data_preset.convertFilters(where, available_filters)
local where_query = {}
local data_type = "live"
if (not available_filters) or (table.len(available_filters) == 0) then
return where_query
end
if is_historical then data_type = "historical" end
for _, key in pairs(where or {}) do
if (columns[key] and columns[key]["filters"] and
columns[key]["filters"][data_type]) then
local filter = columns[key]["filters"][data_type]
if data_type == "live" then
where_query[filter] = available_filters[key]
else
-- Multiple filters only available in historical, see asn
if type(filter) == "table" then
for _, or_filter in pairs(filter or {}) do
local new_filter = columns[or_filter]
new_filter.filter_value = available_filters[key]
new_filter.id = or_filter
new_filter.key = new_filter["filters"][data_type]
if not where_query[key] then
where_query[key] = {}
end
where_query[key][#where_query[key] + 1] = new_filter
if (columns[key] and columns[key]["filters"]) then
local filter = columns[key]["filters"]
-- Multiple filters requested, see asn
if type(filter) == "table" then
for _, or_filter in pairs(filter or {}) do
local new_filter = columns[or_filter]
new_filter.filter_value = available_filters[key]
new_filter.id = or_filter
new_filter.key = new_filter["filters"]
if not where_query[key] then
where_query[key] = {}
end
else
where_query[filter] = columns[key]
where_query[filter].filter_value = available_filters[key]
where_query[key][#where_query[key] + 1] = new_filter
end
else
where_query[filter] = columns[key]
where_query[filter].filter_value = available_filters[key]
end
end
end
@ -271,15 +238,9 @@ function flow_data_preset.convertFilters(where, available_filters, is_historical
-- Ifid filter is mandatory, add it in case it's missing, only in live data
if not where_query["ifid"] and not where_query["INTERFACE_ID"] then
local ifid = available_filters["ifid"] or interface.getId() -- Use current ifid
if not is_historical then
where_query["ifid"] = ifid
else
where_query["INTERFACE_ID"] = ifid
end
where_query["INTERFACE_ID"] = ifid
end
if data_type == "live" then where_query["detailsLevel"] = "normal" end
return where_query
end
@ -318,7 +279,8 @@ function flow_data_preset.getFormattedDataAndLink(key, value)
end
formatted_value = formatter(dependent_values[1], dependent_values[2])
if link_params_formatter then
local link_params = link_params_formatter(dependent_values[1], dependent_values[2])
local link_params = link_params_formatter(dependent_values[1],
dependent_values[2])
link = string.format(columns[key]["formatter"]["link"], link_params)
end
else
@ -332,7 +294,7 @@ function flow_data_preset.getFormattedDataAndLink(key, value)
if not link and columns[key]["formatter"]["link"] then
link = string.format(columns[key]["formatter"]["link"], value)
end
return formatted_value, link
end

View file

@ -279,7 +279,7 @@ end
-- @param max_nodes_per_level number, representing the maximum number of nodes per level
-- in case the number is surpassed, the "Other" node is added
-- @return a list composed by nodes and links
function flow_sankey.generateSankey(queries, max_nodes_per_level)
function flow_sankey.generateSankey(queries, max_nodes_per_level, isHistorical)
-- In case of multiple queries, run each query one by one,
-- then merge the data.
-- In case the rename_key_field and each queries search for different data,
@ -288,6 +288,10 @@ function flow_sankey.generateSankey(queries, max_nodes_per_level)
local nodes = {}
local links = {}
if not isHistorical then
interface.aggregateASNFlows()
end
for _, query in pairs(queries) do
local table_stats = flow_data.getStats({query})
local single_query_nodes = {}

View file

@ -121,7 +121,7 @@ end
local nodes = {}
local links = {}
local MAX_NODES_PER_LEVEL = 10
nodes, links = flow_sankey.generateSankey(queries, MAX_NODES_PER_LEVEL)
nodes, links = flow_sankey.generateSankey(queries, MAX_NODES_PER_LEVEL, not isEmptyString(epoch_begin))
res["nodes"] = nodes
res["links"] = links