Implements in-mem RRD writes queue, avoids json enc/dec

This commit is contained in:
Simone Mainardi 2020-02-29 22:30:00 +01:00
parent 9fd86de371
commit cce4dcf568
9 changed files with 216 additions and 69 deletions

View file

@ -12,11 +12,11 @@ local json = require("dkjson")
require("rrd_paths")
local use_hwpredict = false
local rrd_update_queue = "ntopng.rrd_update"
local max_rrd_queueLen = 100000
local curr_num_rrd_updates = 0
local ENABLE_EXPERIMENTAL_RRD_QUEUE = false
local EXPERIMENTAL_RRD_QUEUE_MAX_LOOPS = 10
local EXPERIMENTAL_RRD_QUEUE_MAX_DEQUEUES_PER_LOOP = 10000
local type_to_rrdtype = {
[ts_common.metrics.counter] = "DERIVE",
[ts_common.metrics.gauge] = "GAUGE",
@ -447,7 +447,7 @@ local function log_ts(rrdfile, schema, timestamp, tags, metrics)
critical = ""
end
io.write("[RRD]"..critical.."[Step: ".. schema.options.step .."]["..j.."]["..curr_num_rrd_updates.."]\n")
io.write("[RRD]"..critical.."[Step: ".. schema.options.step .."]["..j.."]\n")
return(j)
end
@ -460,32 +460,11 @@ function driver:append(schema, timestamp, tags, metrics)
if ENABLE_EXPERIMENTAL_RRD_QUEUE then
if not schema.options.is_critical_ts then
local j = json.encode({schema_name = schema.name, timestamp = timestamp, tags = tags, metrics = metrics})
ntop.lpushCache(rrd_update_queue, j)
curr_num_rrd_updates = curr_num_rrd_updates + 1
if(curr_num_rrd_updates == 100) then
-- Get the current number of points in the queue
-- (this can slightly change once it has been read
-- as the exporter thread could be reading points and other writers
-- could be wrinting points. However, the chance of this happening is pretty
-- low given that this section is short)
local cur_len = ntop.llenCache(rrd_update_queue)
local res = interface.rrd_enqueue(schema.name, timestamp, tags, metrics)
if cur_len > max_rrd_queueLen then
-- Calculate how many points are going to be dropped
local num_drops = cur_len - max_rrd_queueLen
-- Trim the queue and increase the counter
ntop.ltrimCache(rrd_update_queue, 0, max_rrd_queueLen)
ntop.rrd_inc_num_drops(num_drops)
end
curr_num_rrd_updates = 0
if not res then
ntop.rrd_inc_num_drops()
end
return true
end
end
@ -503,8 +482,6 @@ function driver:append(schema, timestamp, tags, metrics)
return false
end
end
curr_num_rrd_updates = curr_num_rrd_updates + 1
return update_rrd(schema, rrdfile, timestamp, metrics)
end
@ -1063,48 +1040,141 @@ end
-- ##############################################
-- Parses a line in line protocol format (https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/)
-- into tags and metrics
local function line_protocol_to_tags_and_metrics(protocol_line)
-- An example of line protocol is the following
--
-- weather,location=us-midwest temperature=82 1465839830100400200
-- | -------------------- -------------- |
-- | | | |
-- | | | |
-- +-----------+--------+-+---------+-+---------+
-- |measurement|,tag_set| |field_set| |timestamp|
-- +-----------+--------+-+---------+-+---------+
--
-- NOTE: no need to worry about possible spaces in the tag names. As we are using a regexp to parse
-- the line, and regexps are greedy, spaces in the tags won't be of any issue. Examples that are correctly parsed are:
--
-- local test_line2 = "iface:traffic_rxtx,ifid=0 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
-- local test_line = "iface:traffic_rxtx,ifid=0,ndpi_category=My Category,ndpi_proto=Apple iTunes,host=1.2.3.4 bytes_sent=849500,bytes_rcvd=5134958 1583007829\n"
--
local measurement_and_tag_set, field_set, timestamp = protocol_line:match("(.+)%s(.+)%s(.+)\n")
local measurement
local tags = {}
local metrics = {}
-- Parse measurement and tags
local items = measurement_and_tag_set:split(",")
if not items then
-- no tag set, just the measurement
measurement = measurement_and_tag_set
else
-- measurement is at position 1, tags at positions 2+
measurement = items[1]
for i=2,#items do
local tag_items = items[i]:split("=")
if tag_items then
tags[tag_items[1]] = tonumber(tag_items[2]) or tag_items[2]
end
end
end
-- Parse metrics
local items = field_set:split(",")
if not items then
-- Should not occur
else
for i=1,#items do
local field_items = items[i]:split("=")
if field_items then
metrics[field_items[1]] = tonumber(field_items[2]) or field_items[2]
end
end
end
local res = {schema_name = measurement, tags = tags, metrics = metrics, timestamp = tonumber(timestamp)}
return res
end
-- ##############################################
function driver:export()
if not ENABLE_EXPERIMENTAL_RRD_QUEUE then
return -- Nothing to do
end
local ts_utils = require "ts_utils" -- required to get the schema from the schema name
-- Cap the number of exported points to the actual number of points in the queue
-- Possibly enforce a maximum time
local num_ts_points = ntop.llenCache(rrd_update_queue)
-- tprint("...dequeuing "..num_ts_points)
for i=1, num_ts_points do
-- use rpop to extract oldest points first
local ts_point = ntop.rpopCache(rrd_update_queue)
local ts_point_json = json.decode(ts_point)
local available_interfaces = interface.getIfNames()
-- Add the system interface to the available interfaces
available_interfaces[getSystemInterfaceId()] = getSystemInterfaceName()
if not ts_point or not ts_point_json then
break -- Should not happen
end
-- Set the name and a status to know when all the interfaces are done
local num_ifaces = 0
for cur_ifid, ifname in pairs(available_interfaces) do
available_interfaces[cur_ifid] = {ifname = ifname, completed = false}
num_ifaces = num_ifaces + 1
end
-- No need to do sanity checks on the schema. This queue is 'private' and should
-- only be written with valid data already checked.
local schema = ts_utils.getSchema(ts_point_json["schema_name"])
local timestamp = ts_point_json["timestamp"]
local tags = ts_point_json["tags"]
local metrics = ts_point_json["metrics"]
local base, rrd = schema_get_path(schema, tags)
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
local num_completed = 0 -- Number of interfaces with no more points to dequeue at any given loop
for cur_loop=1, EXPERIMENTAL_RRD_QUEUE_MAX_LOOPS do
-- Iterate all interfaces in a round-robin fashion to
-- make sue every one gets a chance to have their point written
-- in a fair way
if not ntop.notEmptyFile(rrdfile) then
ntop.mkdir(base)
if not create_rrd(schema, rrdfile) then
return false
num_completed = 0 -- reset it at every loop
for cur_ifid, iface in pairs(available_interfaces) do
if iface["completed"] then
-- Once an interface is marked as completed, do not reprocess it
-- until the next run, even if new points have arrived in the meanwhile
goto next_interface
end
for cur_dequeue=1, EXPERIMENTAL_RRD_QUEUE_MAX_DEQUEUES_PER_LOOP do
local ts_point = interface.rrd_dequeue(tonumber(cur_ifid))
if not ts_point then
iface["completed"] = true
break
end
local parsed_ts_point = line_protocol_to_tags_and_metrics(ts_point)
-- No need to do sanity checks on the schema. This queue is 'private' and should
-- only be written with valid data already checked.
local schema = ts_utils.getSchema(parsed_ts_point["schema_name"])
local timestamp = parsed_ts_point["timestamp"]
local tags = parsed_ts_point["tags"]
local metrics = parsed_ts_point["metrics"]
local base, rrd = schema_get_path(schema, tags)
local rrdfile = os_utils.fixPath(base .. "/" .. rrd .. ".rrd")
if not ntop.notEmptyFile(rrdfile) then
ntop.mkdir(base)
if not create_rrd(schema, rrdfile) then
return false
end
end
update_rrd(schema, rrdfile, timestamp, metrics)
end
::next_interface::
if iface["completed"] then
num_completed = num_completed + 1
end
end
update_rrd(schema, rrdfile, timestamp, metrics)
if num_completed == num_ifaces then
-- No more loops needed, dequeues completed for all interfaces
break
end
end
-- tprint("... dequeued")
end
-- ##############################################