Implements MySQL Top Talkers Retrieval and Visualization

Overal Top Talkers are retrieved with just one single
MySQL query and include both ipv4 and ipv4. Top talkers
can be visualized on a per-interface and per-host basis,
and can be sliced and diced.
This commit is contained in:
Simone Mainardi 2016-02-08 21:35:27 +01:00
parent bfb47398b3
commit 08622e8f18
4 changed files with 110 additions and 12 deletions

View file

@ -247,8 +247,79 @@ function getTopL7Protocols(interface_id, version, host, protocol, port, info, be
end
end
function getOverallTopTalkersSELECT_FROM_WHERE_clause(src_or_dst, v4_or_v6, begin_epoch, end_epoch, ifid)
local sql = ""
if v4_or_v6 == 6 then
sql = " SELECT NULL addrv4, "..src_or_dst.." addrv6, "
sql = sql.."BYTES as bytes, PACKETS as packets "
sql = sql.."FROM flowsv6 "
elseif v4_or_v6 == 4 then -- ipv4
sql = " SELECT "..src_or_dst.." addrv4, NULL addrv6, "
sql = sql.."BYTES as bytes, PACKETS as packets "
sql = sql.."FROM flowsv4 "
else
sql = ""
end
sql = sql.." WHERE FIRST_SWITCHED <= "..end_epoch.." and FIRST_SWITCHED >= "..begin_epoch
sql = sql.." AND (NTOPNG_INSTANCE_NAME='"..ntop.getPrefs()["instance_name"].."'OR NTOPNG_INSTANCE_NAME IS NULL) "
sql = sql.." AND (INTERFACE='"..getInterfaceName(ifid).."' OR INTERFACE IS NULL) "
return sql..'\n'
end
function getHostTopTalkers(interface_id, host, info, begin_epoch, end_epoch)
function getOverallTopTalkers(interface_id, info, begin_epoch, end_epoch, sort_column, sort_order, offset, limit)
-- retrieves top talkers in the given time range
if(info == "") then info = nil end
-- AGGREGATE AND CRUNCH DATA
sql = "select CASE WHEN addrv4 IS NOT NULL THEN INET_NTOA(addrv4) ELSE addrv6 END addr, "
sql = sql.."SUM(bytes) tot_bytes, SUM(packets) tot_packets, count(*) tot_flows from "
sql = sql.."("
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 4, begin_epoch, end_epoch, interface_id)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 4, begin_epoch, end_epoch, interface_id)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_SRC_ADDR', 6, begin_epoch, end_epoch, interface_id)
sql = sql.." UNION ALL "
sql = sql..getOverallTopTalkersSELECT_FROM_WHERE_clause('IP_DST_ADDR', 6, begin_epoch, end_epoch, interface_id)
sql = sql..") talkers"
sql = sql.." group by addr "
-- ORDER
local order_by_column = "tot_bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "tot_packets"
end
if sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "tot_flows"
end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local sclice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return nil
elseif res == nil then
return {}
else
return(res)
end
end
function getHostTopTalkers(interface_id, host, info, begin_epoch, end_epoch, sort_column, sort_order, offset, limit)
if host == nil or host == "" then return nil end
local version = 4
@ -280,14 +351,34 @@ function getHostTopTalkers(interface_id, host, info, begin_epoch, end_epoch)
-- we don't care about the order so we group by least and greatest
sql = sql.." group by least(IP_SRC_ADDR, IP_DST_ADDR), greatest(IP_SRC_ADDR, IP_DST_ADDR) "
sql = sql.." order by bytes desc limit 100"
-- ORDER
local order_by_column = "bytes" -- defaults to tot_bytes
if sort_column == "column_packets" or sort_column == "packets" or sort_column == "tot_packets" then
order_by_column = "packets"
end
if sort_column == "column_flows" or sort_column == "flows" or sort_column == "tot_flows" then
order_by_column = "flows"
end
local order_by_order = "desc"
if sort_order == "asc" then order_by_order = "asc" end
sql = sql.." order by "..order_by_column.." "..order_by_order.." "
-- SLICE
local slice_offset = 0
local sclice_limit = 100
if tonumber(offset) >= 0 then slice_offset = offset end
if tonumber(limit) > 0 then slice_limit = limit end
sql = sql.."limit "..slice_offset..","..slice_limit.." "
if(db_debug == true) then io.write(sql.."\n") end
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return nil
return {}
elseif res == nil then
return {}
else
return(res)
end
@ -358,7 +449,9 @@ function getTopApplications(interface_id, peer1, peer2, info, begin_epoch, end_e
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return nil
return {}
elseif res == nil then
return {}
else
return(res)
end
@ -410,7 +503,9 @@ function getPeersTrafficHistogram(interface_id, peer1, peer2, info, begin_epoch,
res = interface.execSQLQuery(sql)
if(type(res) == "string") then
if(db_debug == true) then io.write(res.."\n") end
return nil
return {}
elseif res == nil then
return {}
else
return(res)
end