Implements Top Talkers and Top Applications In Time Range

Top Talkers and Top applications can be retrieved via MySQL
both interface-wide and per-host. A couple of tabs, namely
"Top Talkers" and "Top Applications" become available in the
historical page of interfaces and hosts. Upon focus of each
tab, an ajax request is triggered and data is fetched from MySQL.

TODO: interface wide top-talkers are retrieved from sqlite and
need to be moved to mysql.
This commit is contained in:
Simone Mainardi 2016-02-06 18:11:31 +01:00
parent 91df9df585
commit 25ee0d5c11
6 changed files with 500 additions and 191 deletions

View file

@ -2,6 +2,7 @@
-- (C) 2013-15 - ntop.org
--
require "lua_utils"
require "historical_utils"
top_rrds = {
["bytes.rrd"] = "Traffic",
@ -62,7 +63,7 @@ function getProtoVolume(ifName, start_time, end_time)
end
end
end
return(ret)
end
@ -92,7 +93,7 @@ function navigatedir(url, label, base, path, go_deep, print_html, ifid, host, st
if(do_debug) then print(v.."<br>\n") end
end
end
else
else
rrd = singlerrd2json(ifid, host, v, start_time, end_time, true, false)
if((rrd.totalval ~= nil) and (rrd.totalval > 0)) then
@ -108,15 +109,15 @@ function navigatedir(url, label, base, path, go_deep, print_html, ifid, host, st
shown = true
end
end
what = string.sub(path.."/"..v, string.len(base)+2)
label = string.sub(v, 1, string.len(v)-4)
label = l4Label(string.gsub(label, "_", " "))
ret[label] = what
if(do_debug) then print(what.."<br>\n") end
if(print_html) then
if(not(printed)) then print('<li class="divider"></li>\n') printed = true end
print("<li> <A HREF="..url..what..">"..label.."</A> </li>\n")
@ -139,11 +140,11 @@ end
function breakdownBar(sent, sentLabel, rcvd, rcvdLabel, thresholdLow, thresholdHigh)
if((sent+rcvd) > 0) then
sent2rcvd = round((sent * 100) / (sent+rcvd), 0)
--print(sent.."/"..rcvd.."/"..sent2rcvd)
--print(sent.."/"..rcvd.."/"..sent2rcvd)
if((thresholdLow == nil) or (thresholdLow < 0)) then thresholdLow = 0 end
if((thresholdHigh == nil) or (thresholdHigh > 100)) then thresholdHigh = 100 end
if(sent2rcvd < thresholdLow) then sentLabel = '<i class="fa fa-warning fa-lg"></i> '..sentLabel
if(sent2rcvd < thresholdLow) then sentLabel = '<i class="fa fa-warning fa-lg"></i> '..sentLabel
elseif(sent2rcvd > thresholdHigh) then rcvdLabel = '<i class="fa fa-warning fa-lg""></i> '..rcvdLabel end
print('<div class="progress"><div class="progress-bar progress-bar-warning" aria-valuenow="'.. sent2rcvd..'" aria-valuemin="0" aria-valuemax="100" style="width: ' .. sent2rcvd.. '%;">'..sentLabel)
@ -212,8 +213,8 @@ function getZoomAtPos(cur_zoom, pos_offset)
for k,v in pairs(zoom_vals) do
if(zoom_vals[k][1] == cur_zoom) then
if (pos+pos_offset >= 1 and pos+pos_offset < 13) then
new_zoom_level = zoom_vals[pos+pos_offset][1]
break
new_zoom_level = zoom_vals[pos+pos_offset][1]
break
end
end
pos = pos + 1
@ -262,16 +263,16 @@ function drawPeity(ifid, host, rrdFile, zoomLevel, selectedEpoch)
for k,v in ipairs(zoom_vals) do
if(zoom_vals[k][1] == zoomLevel) then
if(k > 1) then
if(k > 1) then
nextZoomLevel = zoom_vals[k-1][1]
end
if(epoch) then
end
if(epoch) then
start_time = epoch - zoom_vals[k][3]/2
end_time = epoch + zoom_vals[k][3]/2
else
else
start_time = zoom_vals[k][2]
end_time = "now"
end
end
end
end
@ -342,7 +343,7 @@ end
-- ########################################################
function drawRRD(ifid, host, rrdFile, zoomLevel, baseurl, show_timeseries,
selectedEpoch, selected_epoch_sanitized, topArray)
selectedEpoch, selected_epoch_sanitized, topArray)
local debug_rrd = false
if(zoomLevel == nil) then zoomLevel = "1h" end
@ -373,16 +374,16 @@ function drawRRD(ifid, host, rrdFile, zoomLevel, baseurl, show_timeseries,
for k,v in ipairs(zoom_vals) do
if(zoom_vals[k][1] == zoomLevel) then
if(k > 1) then
if(k > 1) then
nextZoomLevel = zoom_vals[k-1][1]
end
if(epoch) then
end
if(epoch) then
start_time = epoch - zoom_vals[k][3]/2
end_time = epoch + zoom_vals[k][3]/2
else
else
start_time = zoom_vals[k][2]
end_time = "now"
end
end
end
end
@ -450,7 +451,7 @@ end
d = fixPath(p)
go_deep = false
navigatedir(baseurl .. '&graph_zoom=' .. zoomLevel .. '&epoch=' .. (selectedEpoch or '')..'&rrd_file=',
navigatedir(baseurl .. '&graph_zoom=' .. zoomLevel .. '&epoch=' .. (selectedEpoch or '')..'&rrd_file=',
"*", d, d, go_deep, true, ifid, host, start_time, end_time)
print [[
@ -648,36 +649,36 @@ print[[
infoHTML += "<ul>";
$.ajax({
type: 'GET',
url: ']]
print(ntop.getHttpPrefix().."/lua/top_generic.lua?m=top_talkers&epoch='+point.value.x+'&addvlan=true")
print [[',
data: { epoch: point.value.x },
async: false,
success: function(content) {
var info = jQuery.parseJSON(content);
$.each(info, function(i, n) {
if (n.length > 0)
infoHTML += "<li>"+capitaliseFirstLetter(i)+" [Avg Traffic/sec]<ol>";
var items = 0;
var other_traffic = 0;
$.each(n, function(j, m) {
if(items < 3) {
infoHTML += "<li><a href='host_details.lua?host="+m.address+"'>"+abbreviateString(m.label ? m.label : m.address,24);
infoHTML += "</a>";
if (m.vlan != "0") infoHTML += " ("+m.vlanm+")";
infoHTML += " ("+fbits((m.value*8)/60)+")</li>";
items++;
} else
other_traffic += m.value;
});
if (other_traffic > 0)
infoHTML += "<li>Other ("+fbits((other_traffic*8)/60)+")</li>";
if (n.length > 0)
infoHTML += "</ol></li>";
});
infoHTML += "</ul></li></li>";
}
type: 'GET',
url: ']]
print(ntop.getHttpPrefix().."/lua/top_generic.lua?m=top_talkers&epoch='+point.value.x+'&addvlan=true")
print [[',
data: { epoch: point.value.x },
async: false,
success: function(content) {
var info = jQuery.parseJSON(content);
$.each(info, function(i, n) {
if (n.length > 0)
infoHTML += "<li>"+capitaliseFirstLetter(i)+" [Avg Traffic/sec]<ol>";
var items = 0;
var other_traffic = 0;
$.each(n, function(j, m) {
if(items < 3) {
infoHTML += "<li><a href='host_details.lua?host="+m.address+"'>"+abbreviateString(m.label ? m.label : m.address,24);
infoHTML += "</a>";
if (m.vlan != "0") infoHTML += " ("+m.vlanm+")";
infoHTML += " ("+fbits((m.value*8)/60)+")</li>";
items++;
} else
other_traffic += m.value;
});
if (other_traffic > 0)
infoHTML += "<li>Other ("+fbits((other_traffic*8)/60)+")</li>";
if (n.length > 0)
infoHTML += "</ol></li>";
});
infoHTML += "</ul></li></li>";
}
});
infoHTML += "</ul>";]]
end -- topArray
@ -967,13 +968,18 @@ end
selected = false
if(not((limitv4 == nil) or (limitv4 == "") or (limitv4 == "0"))) then
print('<li class="active"> <a href="#ipv4" role="tab" data-toggle="tab"> IPv4 </a> </li>\n')
print('<li class="active"> <a href="#ipv4" role="tab" data-toggle="tab"> IPv4 Conversations </a> </li>\n')
selected = true
end
if(not((limitv6 == nil) or (limitv6 == "") or (limitv6 == "0"))) then
if(selected == false) then print('<li class="active">\n') else print('<li>\n') end
print('<a href="#ipv6" role="tab" data-toggle="tab"> IPv6 </a> </li>\n')
print('<a href="#ipv6" role="tab" data-toggle="tab"> IPv6 Conversations </a> </li>\n')
end
if ntop.isPro() and ntop.getPrefs().is_dump_flows_to_mysql_enabled then
print('<li><a href="#historical-top-talkers" role="tab" data-toggle="tab"> Top Talkers </a> </li>\n')
print('<li><a href="#historical-top-apps" role="tab" data-toggle="tab"> Top Applications </a> </li>\n')
end
print [[
@ -983,6 +989,15 @@ print [[
<div class="tab-content">
]]
if ntop.isPro() and ntop.getPrefs().is_dump_flows_to_mysql_enabled then
print('<div class="tab-pane fade" id="historical-top-talkers">')
historicalTopTalkersTable(ifid, epoch_begin, epoch_end, host)
print('</div>')
print('<div class="tab-pane fade" id="historical-top-apps">')
historicalTopApplicationsTable(ifid, epoch_begin, epoch_end, host)
print('</div>')
end
if(not((limitv4 == nil) or (limitv4 == "") or (limitv4 == "0"))) then
print [[
<div class="tab-pane fade active in" id="ipv4"> <div id="table-flows4"></div> </div>
@ -1109,7 +1124,7 @@ print [[
var table4 = $("#table-flows4").datatable(graph_options4);
var table4 = $("#table-flows4").datatable(graph_options4);
]]
end
@ -1243,10 +1258,10 @@ function singlerrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json
if(string.contains(rrdFile, "num_") or string.contains(rrdFile, "packets") or string.contains(rrdFile, "drops"))
then
-- do not scale number, packets, and drops
scaling_factor = 1
-- do not scale number, packets, and drops
scaling_factor = 1
end
if(not ntop.notEmptyFile(rrdname)) then return '{}' end
local fstart, fstep, fnames, fdata = ntop.rrd_fetch(rrdname, 'AVERAGE', start_time, end_time)
@ -1262,24 +1277,24 @@ function singlerrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json
local sample_rate = round(num_points_found / max_num_points)
if(sample_rate < 1) then sample_rate = 1 end
-- prepare rrd labels
for i, n in ipairs(fnames) do
-- handle duplicates
if (names_cache[n] == nil) then
local extra_info = ''
local extra_info = ''
names_cache[n] = true
if append_ifname_to_labels then
extra_info = getInterfaceName(ifid)
end
if append_ifname_to_labels then
extra_info = getInterfaceName(ifid)
end
if host ~= nil and not string.starts(host, 'profile:') and not string.starts(rrdFile, 'categories/') then
extra_info = extra_info.." ".. firstToUpper(n)
end
if extra_info ~= "" then
names[#names+1] = prefixLabel.." ("..trimSpace(extra_info)..")"
else
names[#names+1] = prefixLabel
end
extra_info = extra_info.." ".. firstToUpper(n)
end
if extra_info ~= "" then
names[#names+1] = prefixLabel.." ("..trimSpace(extra_info)..")"
else
names[#names+1] = prefixLabel
end
end
end
@ -1303,47 +1318,47 @@ function singlerrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json
else
--io.write(w.."\n")
w = tonumber(w)
if(w < 0) then
if(w < 0) then
w = 0
end
end
-- update the total value counter, which is the non-scaled integral over time
totalval[instant] = totalval[instant] + w * fstep
-- also update the average val (do not multiply by fstep, this is not the integral)
avgval[instant] = avgval[instant] + w
-- and the scaled current value (remember that these are derivatives)
w = w * scaling_factor
-- the scaled current value w goes into its own element elemId
-- update the total value counter, which is the non-scaled integral over time
totalval[instant] = totalval[instant] + w * fstep
-- also update the average val (do not multiply by fstep, this is not the integral)
avgval[instant] = avgval[instant] + w
-- and the scaled current value (remember that these are derivatives)
w = w * scaling_factor
-- the scaled current value w goes into its own element elemId
if (s[elemId] == nil) then s[elemId] = 0 end
s[elemId] = s[elemId] + w
--if(s[elemId] > 0) then io.write("[".. elemId .. "]=" .. s[elemId] .."\n") end
elemId = elemId + 1
end
-- stops every sample_rate samples, or when there are no more points
if(sampling == sample_rate or num_points_found == i) then
local sample_sum = 0
for elemId=1,#s do
-- calculate the average in the sampling period
local sample_sum = 0
for elemId=1,#s do
-- calculate the average in the sampling period
s[elemId] = s[elemId] / sampling
sample_sum = sample_sum + s[elemId]
sample_sum = sample_sum + s[elemId]
end
-- update last instant
if lastval_time == nil or instant > lastval_time then
lastval = sample_sum
lastval_time = instant
end
-- possibly update maximum value (grab the most recent in case of a tie)
if maxval_time == nil or (sample_sum >= maxval and instant > maxval_time) then
maxval = sample_sum
maxval_time = instant
end
-- possibly update the minimum value (grab the most recent in case of a tie)
if minval_time == nil or (sample_sum <= minval and instant > minval_time) then
minval = sample_sum
minval_time = instant
end
-- update last instant
if lastval_time == nil or instant > lastval_time then
lastval = sample_sum
lastval_time = instant
end
-- possibly update maximum value (grab the most recent in case of a tie)
if maxval_time == nil or (sample_sum >= maxval and instant > maxval_time) then
maxval = sample_sum
maxval_time = instant
end
-- possibly update the minimum value (grab the most recent in case of a tie)
if minval_time == nil or (sample_sum <= minval and instant > minval_time) then
minval = sample_sum
minval_time = instant
end
series[#series+1] = s
sampling = 1
s = {}
@ -1510,9 +1525,9 @@ function rrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json, expa
local ret = {}
local num = 0
local debug_metric = false
interface.select(getInterfaceName(ifid))
local ifstats = interface.getStats()
local ifstats = interface.getStats()
local rrd_if_ids = {} -- read rrds for interfaces listed here
rrd_if_ids[1] = ifid -- the default submitted interface
-- interface.select(getInterfaceName(ifid))
@ -1523,14 +1538,14 @@ function rrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json, expa
io.write('ifstats.isView: '..tostring(ifstats.isView)..'\n')
end
if expand_interface_views and ifstats.isView then
-- expand rrds for views and read each physical interface separately
for iface,_ in pairs(ifstats.interfaces) do
if(debug_metric) then io.write('iface: '..iface..' id: '..getInterfaceId(iface)..'\n') end
rrd_if_ids[#rrd_if_ids+1] = getInterfaceId(iface)
end
-- expand rrds for views and read each physical interface separately
for iface,_ in pairs(ifstats.interfaces) do
if(debug_metric) then io.write('iface: '..iface..' id: '..getInterfaceId(iface)..'\n') end
rrd_if_ids[#rrd_if_ids+1] = getInterfaceId(iface)
end
end
if(debug_metric) then io.write("RRD File: "..rrdFile.."\n") end
if(rrdFile == "all") then
@ -1539,45 +1554,45 @@ function rrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json, expa
local dirs = ntop.getDirs()
local p = dirs.workingdir .. "/" .. ifid .. "/rrd/"
if(debug_metric) then io.write("Navigating: "..p.."\n") end
if(host ~= nil) then
p = p .. getPathFromKey(host)
go_deep = true
p = p .. getPathFromKey(host)
go_deep = true
else
go_deep = false
go_deep = false
end
d = fixPath(p)
rrds = navigatedir("", "*", d, d, go_deep, false, ifid, host, start_time, end_time)
local traffic_array = {}
for key, value in pairs(rrds) do
rsp = singlerrd2json(ifid, host, value, start_time, end_time, rickshaw_json, expand_interface_views)
if(rsp.totalval ~= nil) then total = rsp.totalval else total = 0 end
if(total > 0) then
traffic_array[total] = rsp
if(debug_metric) then io.write("Analyzing: "..value.." [total "..total.."]\n") end
end
rsp = singlerrd2json(ifid, host, value, start_time, end_time, rickshaw_json, expand_interface_views)
if(rsp.totalval ~= nil) then total = rsp.totalval else total = 0 end
if(total > 0) then
traffic_array[total] = rsp
if(debug_metric) then io.write("Analyzing: "..value.." [total "..total.."]\n") end
end
end
for key, value in pairsByKeys(traffic_array, rev) do
ret[#ret+1] = value
if(ret[#ret].json ~= nil) then
if(debug_metric) then io.write(key.."\n") end
num = num + 1
if(num >= 10) then break end
end
ret[#ret+1] = value
if(ret[#ret].json ~= nil) then
if(debug_metric) then io.write(key.."\n") end
num = num + 1
if(num >= 10) then break end
end
end
else
num = 0
for _,iface in pairs(rrd_if_ids) do
if(debug_metric) then io.write('iface: '..iface..'\n') end
for i,rrd in pairs(split(rrdFile, ",")) do
if(debug_metric) then io.write("["..i.."] "..rrd..' iface: '..iface.."\n") end
ret[#ret + 1] = singlerrd2json(iface, host, rrd, start_time, end_time, rickshaw_json, expand_interface_views)
if(ret[#ret].json ~= nil) then num = num + 1 end
end
if(debug_metric) then io.write('iface: '..iface..'\n') end
for i,rrd in pairs(split(rrdFile, ",")) do
if(debug_metric) then io.write("["..i.."] "..rrd..' iface: '..iface.."\n") end
ret[#ret + 1] = singlerrd2json(iface, host, rrd, start_time, end_time, rickshaw_json, expand_interface_views)
if(ret[#ret].json ~= nil) then num = num + 1 end
end
end
end
@ -1593,13 +1608,13 @@ function rrd2json(ifid, host, rrdFile, start_time, end_time, rickshaw_json, expa
-- jsons for single interfaces, and not for the view. Since view statistics
-- are in ret[1], it suffices to aggregate jsons from index i >= 2
if expand_interface_views and ifstats.isView then
i = 2
i = 2
end
local json = "["
local first = true -- used to decide where to append commas
while i <= num do
if(debug_metric) then io.write("->"..i.."\n") end
if not first then json = json.."," end
if not first then json = json.."," end
json = json..ret[i].json
i = i + 1
first = false