Use schema name in timeseries API

This commit is contained in:
emanuele-f 2018-06-27 17:18:23 +02:00
parent 39a7375727
commit 5d50622341
9 changed files with 86 additions and 108 deletions

View file

@ -466,24 +466,24 @@ end
function host_pools_utils.updateRRDs(ifid, dump_ndpi, verbose)
local ts_utils = require "ts_utils"
local ts_schemas = require "ts_5min"
require "ts_5min"
-- NOTE: requires graph_utils
for pool_id, pool_stats in pairs(interface.getHostPoolsStats() or {}) do
ts_utils.append(ts_schemas.host_pool_traffic, {ifid=ifid, pool=pool_id,
ts_utils.append("host_pool:traffic", {ifid=ifid, pool=pool_id,
bytes_sent=pool_stats["bytes.sent"], bytes_rcvd=pool_stats["bytes.rcvd"]}, when)
if pool_id ~= tonumber(host_pools_utils.DEFAULT_POOL_ID) then
local flows_dropped = pool_stats["flows.dropped"] or 0
ts_utils.append(ts_schemas.host_pool_blocked_flows, {ifid=ifid, pool=pool_id,
ts_utils.append("host_pool:blocked_flows", {ifid=ifid, pool=pool_id,
num_flows=flows_dropped}, when)
end
-- nDPI stats
if dump_ndpi then
for proto,v in pairs(pool_stats["ndpi"] or {}) do
ts_utils.append(ts_schemas.host_pool_ndpi, {ifid=ifid, pool=pool_id, protocol=proto,
ts_utils.append("host_pool:ndpi", {ifid=ifid, pool=pool_id, protocol=proto,
bytes_sent=pool_stats["bytes.sent"], bytes_rcvd=pool_stats["bytes.rcvd"]}, when)
end
end

View file

@ -6,7 +6,7 @@ local host_pools_utils = require "host_pools_utils"
local callback_utils = require "callback_utils"
local os_utils = require "os_utils"
local ts_utils = require "ts_utils"
local ts_schemas = require "ts_5min"
require "ts_5min"
local dirs = ntop.getDirs()
local rrd_dump = {}
@ -14,18 +14,18 @@ local rrd_dump = {}
-- ########################################################
function rrd_dump.host_update_stats_rrds(when, hostname, hostbase, host, ifstats, verbose)
ts_utils.append(ts_schemas.host_traffic, {ifid=ifstats.id, host=hostname,
ts_utils.append("host:traffic", {ifid=ifstats.id, host=hostname,
bytes_sent=host["bytes.sent"], bytes_rcvd=host["bytes.rcvd"]}, when, verbose)
-- Number of flows
ts_utils.append(ts_schemas.host_flows, {ifid=ifstats.id, host=hostname,
ts_utils.append("host:flows", {ifid=ifstats.id, host=hostname,
num_flows=host["active_flows.as_client"] + host["active_flows.as_server"]}, when, verbose)
-- L4 Protocols
for id, _ in ipairs(l4_keys) do
k = l4_keys[id][2]
if((host[k..".bytes.sent"] ~= nil) and (host[k..".bytes.rcvd"] ~= nil)) then
ts_utils.append(ts_schemas.host_l4protos, {ifid=ifstats.id, host=hostname,
ts_utils.append("host:l4protos", {ifid=ifstats.id, host=hostname,
l4proto=tostring(k), bytes_sent=host[k..".bytes.sent"], bytes_rcvd=host[k..".bytes.rcvd"]}, when, verbose)
else
-- L2 host
@ -37,7 +37,7 @@ end
function rrd_dump.host_update_ndpi_rrds(when, hostname, hostbase, host, ifstats, verbose)
-- nDPI Protocols
for k in pairs(host["ndpi"] or {}) do
ts_utils.append(ts_schemas.host_ndpi, {ifid=ifstats.id, host=hostname, protocol=k,
ts_utils.append("host:ndpi", {ifid=ifstats.id, host=hostname, protocol=k,
bytes_sent=host["ndpi"][k]["bytes.sent"], bytes_rcvd=host["ndpi"][k]["bytes.rcvd"]}, when, verbose)
end
end
@ -45,7 +45,7 @@ end
function rrd_dump.host_update_categories_rrds(when, hostname, hostbase, host, ifstats, verbose)
-- nDPI Protocol CATEGORIES
for k, cat in pairs(host["ndpi_categories"] or {}) do
ts_utils.append(ts_schemas.host_ndpi_categories, {ifid=ifstats.id, host=hostname, category=k,
ts_utils.append("host:ndpi_categories", {ifid=ifstats.id, host=hostname, category=k,
bytes=cat["bytes"]}, when, verbose)
end
end
@ -55,13 +55,13 @@ end
function rrd_dump.l2_device_update_categories_rrds(when, devicename, device, devicebase, ifstats, verbose)
-- nDPI Protocol CATEGORIES
for k, cat in pairs(device["ndpi_categories"] or {}) do
ts_utils.append(ts_schemas.mac_ndpi_categories, {ifid=ifstats.id, mac=devicename, category=k,
ts_utils.append("mac:ndpi_categories", {ifid=ifstats.id, mac=devicename, category=k,
bytes=cat["bytes"]}, when, verbose)
end
end
function rrd_dump.l2_device_update_stats_rrds(when, devicename, device, devicebase, ifstats, verbose)
ts_utils.append(ts_schemas.mac_traffic, {ifid=ifstats.id, mac=devicename,
ts_utils.append("mac:traffic", {ifid=ifstats.id, mac=devicename,
bytes_sent=device["bytes.sent"], bytes_rcvd=device["bytes.rcvd"]}, when, verbose)
end
@ -74,19 +74,19 @@ function rrd_dump.asn_update_rrds(when, ifstats, verbose)
local asn = asn_stats["asn"]
-- Save ASN bytes
ts_utils.append(ts_schemas.asn_traffic, {ifid=ifstats.id, asn=asn,
ts_utils.append("asn:traffic", {ifid=ifstats.id, asn=asn,
bytes_sent=asn_stats["bytes.sent"], bytes_rcvd=asn_stats["bytes.rcvd"]}, when)
-- Save ASN ndpi stats
if asn_stats["ndpi"] ~= nil then
for proto_name, proto_stats in pairs(asn_stats["ndpi"]) do
ts_utils.append(ts_schemas.asn_ndpi, {ifid=ifstats.id, asn=asn, protocol=proto_name,
ts_utils.append("asn:ndpi", {ifid=ifstats.id, asn=asn, protocol=proto_name,
bytes_sent=proto_stats["bytes.sent"], bytes_rcvd=proto_stats["bytes.rcvd"]}, when, verbose)
end
end
-- Save ASN RTT stats
ts_utils.append(ts_schemas.asn_rtt, {ifid=ifstats.id, asn=asn,
ts_utils.append("asn:rtt", {ifid=ifstats.id, asn=asn,
millis_rtt=asn_stats["round_trip_time"]}, when, verbose)
end
end
@ -100,7 +100,7 @@ function rrd_dump.country_update_rrds(when, ifstats, verbose)
for _, country_stats in ipairs(countries_info["Countries"] or {}) do
local country = country_stats.country
ts_utils.append(ts_schemas.country_traffic, {ifid=ifstats.id, country=country,
ts_utils.append("country:traffic", {ifid=ifstats.id, country=country,
bytes_ingress=country_stats["ingress"], bytes_egress=country_stats["egress"],
bytes_inner=country_stats["inner"]}, when, verbose)
end
@ -116,13 +116,13 @@ function rrd_dump.vlan_update_rrds(when, ifstats, verbose)
for _, vlan_stats in pairs(vlan_info["VLANs"]) do
local vlan_id = vlan_stats["vlan_id"]
ts_utils.append(ts_schemas.vlan_traffic, {ifid=ifstats.id, vlan=vlan_id,
ts_utils.append("vlan:traffic", {ifid=ifstats.id, vlan=vlan_id,
bytes_sent=vlan_stats["bytes.sent"], bytes_rcvd=vlan_stats["bytes.rcvd"]}, when, verbose)
-- Save VLAN ndpi stats
if vlan_stats["ndpi"] ~= nil then
for proto_name, proto_stats in pairs(vlan_stats["ndpi"]) do
ts_utils.append(ts_schemas.vlan_ndpi, {ifid=ifstats.id, vlan=vlan_id, protocol=proto_name,
ts_utils.append("vlan:ndpi", {ifid=ifstats.id, vlan=vlan_id, protocol=proto_name,
bytes_sent=proto_stats["bytes.sent"], bytes_rcvd=proto_stats["bytes.rcvd"]}, when, verbose)
end
end
@ -143,7 +143,7 @@ function rrd_dump.sflow_device_update_rrds(when, ifstats, verbose)
end
for port_idx,port_value in pairs(ports) do
ts_utils.append(ts_schemas.sflowdev_port_traffic, {ifid=ifstats.id, device=flow_device_ip, port=port_idx,
ts_utils.append("sflowdev_port:traffic", {ifid=ifstats.id, device=flow_device_ip, port=port_idx,
bytes_sent=port_value.ifOutOctets, bytes_rcvd=port_value.ifInOctets}, when, verbose)
end
end
@ -160,7 +160,7 @@ function rrd_dump.flow_device_update_rrds(when, ifstats, verbose)
if(verbose) then print ("["..__FILE__()..":"..__LINE__().."] Processing flow device "..flow_device_ip.."\n") end
for port_idx,port_value in pairs(ports) do
ts_utils.append(ts_schemas.flowdev_port_traffic, {ifid=ifstats.id, device=flow_device_ip, port=port_idx,
ts_utils.append("flowdev_port:traffic", {ifid=ifstats.id, device=flow_device_ip, port=port_idx,
bytes_sent=port_value["bytes.out_bytes"], bytes_rcvd=port_value["bytes.in_bytes"]}, when, verbose)
end
end

View file

@ -8,7 +8,7 @@ require "rrd_utils"
local os_utils = require "os_utils"
local top_talkers_utils = require "top_talkers_utils"
local ts_utils = require("ts_utils")
local ts_schemas = require("ts_minute")
require("ts_minute")
local rrd_dump = {}
@ -19,7 +19,7 @@ function rrd_dump.iface_update_ndpi_rrds(when, basedir, _ifname, ifstats, verbos
local v = ifstats["ndpi"][k]["bytes.sent"]+ifstats["ndpi"][k]["bytes.rcvd"]
if(verbose) then print("["..__FILE__()..":"..__LINE__().."] ".._ifname..": "..k.."="..v.."\n") end
ts_utils.append(ts_schemas.iface_ndpi, {ifid=ifstats.id, protocol=k, bytes=v}, when, verbose)
ts_utils.append("iface:ndpi", {ifid=ifstats.id, protocol=k, bytes=v}, when, verbose)
end
end
@ -30,7 +30,7 @@ function rrd_dump.iface_update_categories_rrds(when, basedir, _ifname, ifstats,
v = v["bytes"]
if(verbose) then print("["..__FILE__()..":"..__LINE__().."] ".._ifname..": "..k.."="..v.."\n") end
ts_utils.append(ts_schemas.iface_ndpi_categories, {ifid=ifstats.id, category=k, bytes=v}, when, verbose)
ts_utils.append("iface:ndpi_categories", {ifid=ifstats.id, category=k, bytes=v}, when, verbose)
end
end
@ -39,11 +39,11 @@ end
function rrd_dump.iface_update_stats_rrds(when, basedir, _ifname, ifstats, verbose)
-- IN/OUT counters
if(ifstats["localstats"]["bytes"]["local2remote"] > 0) then
ts_utils.append(ts_schemas.iface_local2remote, {ifid=ifstats.id, bytes=ifstats["localstats"]["bytes"]["local2remote"]}, when, verbose)
ts_utils.append("iface:local2remote", {ifid=ifstats.id, bytes=ifstats["localstats"]["bytes"]["local2remote"]}, when, verbose)
end
if(ifstats["localstats"]["bytes"]["remote2local"] > 0) then
ts_utils.append(ts_schemas.iface_remote2local, {ifid=ifstats.id, bytes=ifstats["localstats"]["bytes"]["remote2local"]}, when, verbose)
ts_utils.append("iface:remote2local", {ifid=ifstats.id, bytes=ifstats["localstats"]["bytes"]["remote2local"]}, when, verbose)
end
end
@ -60,11 +60,11 @@ function rrd_dump.subnet_update_rrds(when, ifstats, basedir, verbose)
ntop.mkdir(rrdpath)
end
ts_utils.append(ts_schemas.subnet_traffic, {ifid=ifstats.id, subnet=subnet,
ts_utils.append("subnet:traffic", {ifid=ifstats.id, subnet=subnet,
bytes_ingress=sstats["ingress"], bytes_egress=sstats["egress"],
bytes_inner=sstats["inner"]}, when)
ts_utils.append(ts_schemas.subnet_broadcast_traffic, {ifid=ifstats.id, subnet=subnet,
ts_utils.append("subnet:broadcast_traffic", {ifid=ifstats.id, subnet=subnet,
bytes_ingress=sstats["broadcast"]["ingress"], bytes_egress=sstats["broadcast"]["egress"],
bytes_inner=sstats["broadcast"]["inner"]}, when, verbose)
end
@ -74,23 +74,23 @@ end
function rrd_dump.iface_update_general_stats(when, ifstats, basedir, verbose)
-- General stats
ts_utils.append(ts_schemas.iface_hosts, {ifid=ifstats.id, num_hosts=ifstats.stats.hosts}, when, verbose)
ts_utils.append(ts_schemas.iface_devices, {ifid=ifstats.id, num_devices=ifstats.stats.devices}, when, verbose)
ts_utils.append(ts_schemas.iface_flows, {ifid=ifstats.id, num_flows=ifstats.stats.flows}, when, verbose)
ts_utils.append(ts_schemas.iface_http_hosts, {ifid=ifstats.id, num_hosts=ifstats.stats.http_hosts}, when, verbose)
ts_utils.append("iface:hosts", {ifid=ifstats.id, num_hosts=ifstats.stats.hosts}, when, verbose)
ts_utils.append("iface:devices", {ifid=ifstats.id, num_devices=ifstats.stats.devices}, when, verbose)
ts_utils.append("iface:flows", {ifid=ifstats.id, num_flows=ifstats.stats.flows}, when, verbose)
ts_utils.append("iface:http_hosts", {ifid=ifstats.id, num_hosts=ifstats.stats.http_hosts}, when, verbose)
end
function rrd_dump.iface_update_tcp_stats(when, ifstats, basedir, verbose)
ts_utils.append(ts_schemas.iface_tcp_retransmissions, {ifid=ifstats.id, packets=ifstats.tcpPacketStats.retransmissions}, when, verbose)
ts_utils.append(ts_schemas.iface_tcp_out_of_order, {ifid=ifstats.id, packets=ifstats.tcpPacketStats.out_of_order}, when, verbose)
ts_utils.append(ts_schemas.iface_tcp_lost, {ifid=ifstats.id, packets=ifstats.tcpPacketStats.lost}, when, verbose)
ts_utils.append("iface:tcp_retransmissions", {ifid=ifstats.id, packets=ifstats.tcpPacketStats.retransmissions}, when, verbose)
ts_utils.append("iface:tcp_out_of_order", {ifid=ifstats.id, packets=ifstats.tcpPacketStats.out_of_order}, when, verbose)
ts_utils.append("iface:tcp_lost", {ifid=ifstats.id, packets=ifstats.tcpPacketStats.lost}, when, verbose)
end
function rrd_dump.iface_update_tcp_flags(when, ifstats, basedir, verbose)
ts_utils.append(ts_schemas.iface_tcp_syn, {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.syn}, when, verbose)
ts_utils.append(ts_schemas.iface_tcp_synack, {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.synack}, when, verbose)
ts_utils.append(ts_schemas.iface_tcp_finack, {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.finack}, when, verbose)
ts_utils.append(ts_schemas.iface_tcp_rst, {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.rst}, when, verbose)
ts_utils.append("iface:tcp_syn", {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.syn}, when, verbose)
ts_utils.append("iface:tcp_synack", {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.synack}, when, verbose)
ts_utils.append("iface:tcp_finack", {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.finack}, when, verbose)
ts_utils.append("iface:tcp_rst", {ifid=ifstats.id, packets=ifstats.pktSizeDistribution.rst}, when, verbose)
end
-- ########################################################
@ -99,7 +99,7 @@ function rrd_dump.profiles_update_stats(when, ifstats, basedir, verbose)
local basedir = os_utils.fixPath(dirs.workingdir .. "/" .. ifstats.id..'/profilestats')
for pname, ptraffic in pairs(ifstats.profiles) do
ts_utils.append(ts_schemas.profile_traffic, {ifid=ifstats.id, profile=pname, bytes=ptraffic}, when, verbose)
ts_utils.append("profile:traffic", {ifid=ifstats.id, profile=pname, bytes=ptraffic}, when, verbose)
end
end
@ -161,7 +161,7 @@ function rrd_dump.run_min_dump(_ifname, ifstats, config, when, verbose)
if ntop.isnEdge() and ifstats.type == "netfilter" and ifstats.netfilter then
local st = ifstats.netfilter.nfq or {}
ts_utils.append(ts_schemas.iface_nfq_pct, {ifid=ifstats.id, num_nfq_pct = st.queue_pct}, when, verbose)
ts_utils.append("iface:nfq_pct", {ifid=ifstats.id, num_nfq_pct = st.queue_pct}, when, verbose)
end
ts_utils.flush()

View file

@ -105,7 +105,7 @@ function find_schema(rrdFile, rrdfname, tags, ts_utils)
end
end
for _, schema in pairs(ts_utils.getLoadedSchemas()) do
for schema_name, schema in pairs(ts_utils.getLoadedSchemas()) do
-- verify tags compatibility
for tag in pairs(schema.tags) do
if tags[tag] == nil then
@ -116,7 +116,7 @@ function find_schema(rrdFile, rrdfname, tags, ts_utils)
local full_path = schema_get_full_path(schema, tags)
if full_path == rrdFile then
return schema
return schema_name
end
::next_schema::

View file

@ -3,7 +3,7 @@
--
local ts_utils = require "ts_utils"
local ts_schemas = {}
local schema
-- TODO: remove rrd_fname after new paths migration
-- NOTE: when rrd_fname is empty, the last tag value is used as file name
@ -18,7 +18,6 @@ schema:addTag("ifid")
schema:addTag("mac")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.mac_traffic = schema
-- ##############################################
@ -28,7 +27,6 @@ schema:addTag("ifid")
schema:addTag("mac")
schema:addTag("category")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.mac_ndpi_categories = schema
-------------------------------------------------------
-- HOST POOLS SCHEMAS
@ -39,7 +37,6 @@ schema:addTag("ifid")
schema:addTag("pool")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.host_pool_traffic = schema
-- ##############################################
@ -47,7 +44,6 @@ schema = ts_utils.newSchema("host_pool:blocked_flows", {step=300, rrd_fname="blo
schema:addTag("ifid")
schema:addTag("pool")
schema:addMetric("num_flows", ts_utils.metrics.counter)
ts_schemas.host_pool_blocked_flows = schema
-- ##############################################
@ -57,7 +53,6 @@ schema:addTag("pool")
schema:addTag("protocol")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.host_pool_ndpi = schema
-------------------------------------------------------
-- ASN SCHEMAS
@ -68,7 +63,6 @@ schema:addTag("ifid")
schema:addTag("asn")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.asn_traffic = schema
-- ##############################################
@ -78,7 +72,6 @@ schema:addTag("asn")
schema:addTag("protocol")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.asn_ndpi = schema
-- ##############################################
@ -86,7 +79,6 @@ schema = ts_utils.newSchema("asn:rtt", {step=300, rrd_fname="num_ms_rtt"})
schema:addTag("ifid")
schema:addTag("asn")
schema:addMetric("millis_rtt", ts_utils.metrics.gauge)
ts_schemas.asn_rtt = schema
-------------------------------------------------------
-- COUNTRIES SCHEMAS
@ -98,7 +90,6 @@ schema:addTag("country")
schema:addMetric("bytes_ingress", ts_utils.metrics.counter)
schema:addMetric("bytes_egress", ts_utils.metrics.counter)
schema:addMetric("bytes_inner", ts_utils.metrics.counter)
ts_schemas.country_traffic = schema
-------------------------------------------------------
-- VLAN SCHEMAS
@ -109,7 +100,6 @@ schema:addTag("ifid")
schema:addTag("vlan")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.vlan_traffic = schema
-- ##############################################
@ -119,7 +109,6 @@ schema:addTag("vlan")
schema:addTag("protocol")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.vlan_ndpi = schema
-------------------------------------------------------
-- FLOW DEVICES SCHEMAS
@ -131,7 +120,6 @@ schema:addTag("device")
schema:addTag("port")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.sflowdev_port_traffic = schema
-- ##############################################
@ -141,7 +129,6 @@ schema:addTag("device")
schema:addTag("port")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.flowdev_port_traffic = schema
-------------------------------------------------------
-- SNMP SCHEMAS
@ -153,7 +140,6 @@ schema:addTag("device")
schema:addTag("if_index")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.snmp_if_traffic = schema
-------------------------------------------------------
-- HOSTS SCHEMAS
@ -164,7 +150,6 @@ schema:addTag("ifid")
schema:addTag("host")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.host_traffic = schema
-- ##############################################
@ -172,7 +157,6 @@ schema = ts_utils.newSchema("host:flows", {step=300, rrd_fname="num_flows"})
schema:addTag("ifid")
schema:addTag("host")
schema:addMetric("num_flows", ts_utils.metrics.gauge)
ts_schemas.host_flows = schema
-- ##############################################
@ -183,7 +167,6 @@ schema:addTag("host")
schema:addTag("l4proto")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.host_l4protos = schema
-- ##############################################
@ -193,7 +176,6 @@ schema:addTag("host")
schema:addTag("protocol")
schema:addMetric("bytes_sent", ts_utils.metrics.counter)
schema:addMetric("bytes_rcvd", ts_utils.metrics.counter)
ts_schemas.host_ndpi = schema
-- ##############################################
@ -202,8 +184,3 @@ schema:addTag("ifid")
schema:addTag("host")
schema:addTag("category")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.host_ndpi_categories = schema
-- ##############################################
return ts_schemas

View file

@ -3,8 +3,6 @@
--
local ts_utils = require "ts_utils"
local ts_schemas = {}
local schema
-------------------------------------------------------
@ -15,7 +13,6 @@ schema = ts_utils.newSchema("profile:traffic", {step=60, rrd_fname="bytes"})
schema:addTag("ifid")
schema:addTag("profile")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.profile_traffic = schema
-------------------------------------------------------
-- SUBNETS SCHEMAS
@ -27,7 +24,6 @@ schema:addTag("subnet")
schema:addMetric("bytes_ingress", ts_utils.metrics.counter)
schema:addMetric("bytes_egress", ts_utils.metrics.counter)
schema:addMetric("bytes_inner", ts_utils.metrics.counter)
ts_schemas.subnet_traffic = schema
-- ##############################################
@ -37,7 +33,6 @@ schema:addTag("subnet")
schema:addMetric("bytes_ingress", ts_utils.metrics.counter)
schema:addMetric("bytes_egress", ts_utils.metrics.counter)
schema:addMetric("bytes_inner", ts_utils.metrics.counter)
ts_schemas.subnet_broadcast_traffic = schema
-------------------------------------------------------
-- INTERFACES SCHEMAS
@ -47,7 +42,6 @@ schema = ts_utils.newSchema("iface:ndpi", {step=60})
schema:addTag("ifid")
schema:addTag("protocol")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.iface_ndpi = schema
-- ##############################################
@ -55,7 +49,6 @@ schema = ts_utils.newSchema("iface:ndpi_categories", {step=60})
schema:addTag("ifid")
schema:addTag("category")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.iface_ndpi_categories = schema
-- ##############################################
@ -63,7 +56,6 @@ ts_schemas.iface_ndpi_categories = schema
schema = ts_utils.newSchema("iface:local2remote", {step=60, rrd_fname="local2remote"})
schema:addTag("ifid")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.iface_local2remote = schema
-- ##############################################
@ -71,90 +63,75 @@ ts_schemas.iface_local2remote = schema
schema = ts_utils.newSchema("iface:remote2local", {step=60, rrd_fname="remote2local"})
schema:addTag("ifid")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.iface_remote2local = schema
-- ##############################################
schema = ts_utils.newSchema("iface:hosts", {step=60, rrd_fname="num_hosts"})
schema:addTag("ifid")
schema:addMetric("num_hosts", ts_utils.metrics.gauge)
ts_schemas.iface_hosts = schema
-- ##############################################
schema = ts_utils.newSchema("iface:devices", {step=60, rrd_fname="num_devices"})
schema:addTag("ifid")
schema:addMetric("num_devices", ts_utils.metrics.gauge)
ts_schemas.iface_devices = schema
-- ##############################################
schema = ts_utils.newSchema("iface:flows", {step=60, rrd_fname="num_flows"})
schema:addTag("ifid")
schema:addMetric("num_flows", ts_utils.metrics.gauge)
ts_schemas.iface_flows = schema
-- ##############################################
schema = ts_utils.newSchema("iface:http_hosts", {step=60, rrd_fname="num_http_hosts"})
schema:addTag("ifid")
schema:addMetric("num_hosts", ts_utils.metrics.gauge)
ts_schemas.iface_http_hosts = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_retransmissions", {step=60, rrd_fname="tcp_retransmissions"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_retransmissions = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_out_of_order", {step=60, rrd_fname="tcp_ooo"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_out_of_order = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_lost", {step=60, rrd_fname="tcp_lost"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_lost = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_syn", {step=60, rrd_fname="tcp_syn"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_syn = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_synack", {step=60, rrd_fname="tcp_synack"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_synack = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_finack", {step=60, rrd_fname="tcp_finack"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_finack = schema
-- ##############################################
schema = ts_utils.newSchema("iface:tcp_rst", {step=60, rrd_fname="tcp_rst"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_tcp_rst = schema
-- ##############################################
schema = ts_utils.newSchema("iface:nfq_pct", {step=60, rrd_fname="num_nfq_pct"})
schema:addTag("ifid")
schema:addMetric("num_nfq_pct", ts_utils.metrics.gauge)
ts_schemas.iface_nfq_pct = schema
return ts_schemas

View file

@ -3,8 +3,6 @@
--
local ts_utils = require "ts_utils"
local ts_schemas = {}
local schema
-- ##############################################
@ -12,29 +10,21 @@ local schema
schema = ts_utils.newSchema("iface:traffic", {step=1, rrd_fname="bytes"})
schema:addTag("ifid")
schema:addMetric("bytes", ts_utils.metrics.counter)
ts_schemas.iface_traffic = schema
-- ##############################################
schema = ts_utils.newSchema("iface:packets", {step=1, rrd_fname="packets"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_packets = schema
-- ##############################################
schema = ts_utils.newSchema("iface:zmq_recv_flows", {step=1, rrd_fname="num_zmq_rcvd_flows"})
schema:addTag("ifid")
schema:addMetric("num_flows", ts_utils.metrics.gauge)
ts_schemas.iface_zmq_recv_flows = schema
-- ##############################################
schema = ts_utils.newSchema("iface:drops", {step=1, rrd_fname="drops"})
schema:addTag("ifid")
schema:addMetric("packets", ts_utils.metrics.counter)
ts_schemas.iface_drops = schema
-- ##############################################
return ts_schemas

View file

@ -40,6 +40,12 @@ end
function ts_utils.newSchema(name, options)
local schema = ts_utils.schema:new(name, options)
if loaded_schemas[name] ~= nil then
traceError(TRACE_WARNING, TRACE_CONSOLE, "Schema already defined: " .. name)
return loaded_schemas[name]
end
loaded_schemas[name] = schema
return schema
@ -84,8 +90,15 @@ end
-----------------------------------------------------------------------
function ts_utils.append(schema, tags_and_metrics, timestamp, verbose)
function ts_utils.append(schema_name, tags_and_metrics, timestamp, verbose)
timestamp = timestamp or os.time()
local schema = ts_utils.getSchema(schema_name)
if not schema then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return false
end
local tags, data = schema:verifyTagsAndMetrics(tags_and_metrics)
if not tags then
@ -155,13 +168,20 @@ end
-----------------------------------------------------------------------
function ts_utils.query(schema, tags, tstart, tend, options)
function ts_utils.query(schema_name, tags, tstart, tend, options)
local query_options = table.merge({
fill_value = 0, -- e.g. 0/0 for nan
min_value = 0, -- minimum value of a data point
max_value = math.huge, -- maximum value for a data point
}, options or {})
local schema = ts_utils.getSchema(schema_name)
if not schema then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return false
end
if not schema:verifyTags(tags) then
return nil
end
@ -200,7 +220,14 @@ end
-- List all the data series matching the given filter.
-- Only data series updated after start_time will be returned.
-- Returns a list of expanded tags based on the matches.
function ts_utils.listSeries(schema, tags_filter, start_time)
function ts_utils.listSeries(schema_name, tags_filter, start_time)
local schema = ts_utils.getSchema(schema_name)
if not schema then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return false
end
local driver = ts_utils.getQueryDriver()
if not driver then
@ -232,7 +259,14 @@ end
-----------------------------------------------------------------------
function ts_utils.delete(schema, tags)
function ts_utils.delete(schema_name, tags)
local schema = ts_utils.getSchema(schema_name)
if not schema then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return false
end
if not schema:verifyTags(data) then
return false
end