Handle end time on timeseries. (#7438) (#7466)

This commit is contained in:
Nicolò Maio 2023-05-29 15:20:11 +02:00 committed by GitHub
parent c09c3ab09c
commit fcb564b7c7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 153 additions and 144 deletions

View file

@ -711,7 +711,7 @@ end
-- *Limitation*
-- tags_filter is expected to contain all the tags of the schema except the last
-- one. For such tag, a list of available values will be returned.
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time, end_time)
if #wildcard_tags > 1 then
tprint(debug.traceback())
tprint({
@ -757,7 +757,7 @@ function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
if ((v ~= nil) and (#v == 1)) then
local last_update = ntop.rrd_lastupdate(fpath)
if last_update ~= nil and last_update >= start_time then
if last_update ~= nil and last_update >= start_time and ((end_time and last_update <= end_time) or (end_time == nil)) then
local value = v[1]
local toadd = false
@ -806,7 +806,7 @@ function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
local last_update = ntop.rrd_lastupdate(fpath)
if last_update ~= nil and last_update >= start_time then
if last_update ~= nil and last_update >= start_time and ((end_time and last_update <= end_time) or (end_time == nil)) then
res[#res + 1] = table.merge(tags_filter, {
[wildcard_tag] = f
})
@ -939,148 +939,145 @@ end
-- ##############################################
function driver:topk(schema, tags, tstart, tend, options, top_tags)
if #top_tags > 1 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
return nil
end
if #top_tags > 1 then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver does not support topk on multiple tags")
return nil
end
local top_tag = top_tags[1]
local top_tag = top_tags[1]
if top_tag ~= schema._tags[#schema._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE,
"RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
return nil
end
if top_tag ~= schema._tags[#schema._tags] then
traceError(TRACE_ERROR, TRACE_CONSOLE, "RRD driver only support topk with topk tag in the last tag, got topk on '" .. (top_tag or "") .. "'")
return nil
end
local series = driver:listSeries(schema, tags, top_tags, tstart)
if not series then
return nil
end
local series = driver:listSeries(schema, tags, top_tags, tstart, tend)
if not series then
return nil
end
local items = {}
local tag_2_series = {}
local total_series = {}
local total_valid = true
local step = 0
local query_start = tstart
local cf = getConsolidationFunction(schema)
local items = {}
local tag_2_series = {}
local total_series = {}
local total_valid = true
local step = 0
local query_start = tstart
local cf = getConsolidationFunction(schema)
if options.initial_point then
query_start = tstart - schema.options.step
end
if options.initial_point then
query_start = tstart - schema.options.step
end
for _, serie_tags in pairs(series) do
local rrdfile = driver.schema_get_full_path(schema, serie_tags)
for _, serie_tags in pairs(series) do
local rrdfile = driver.schema_get_full_path(schema, serie_tags)
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE,
string.format("RRD_FETCH[topk] schema=%s %s[%s] -> (%s): last_update=%u", schema.name,
table.tconcat(tags, "=", ","), table.concat(top_tags, ","), rrdfile, ntop.rrd_lastupdate(rrdfile)))
end
if isDebugEnabled() then
traceError(TRACE_NORMAL, TRACE_CONSOLE, string.format("RRD_FETCH[topk] schema=%s %s[%s] -> (%s): last_update=%u",
schema.name, table.tconcat(tags, "=", ","), table.concat(top_tags, ","), rrdfile, ntop.rrd_lastupdate(rrdfile)))
end
touchRRD(rrdfile)
touchRRD(rrdfile)
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, cf, query_start, tend)
local sum = 0
local fstart, fstep, fdata, fend, fcount, names = ntop.rrd_fetch_columns(rrdfile, cf, query_start, tend)
local sum = 0
if fdata == nil then
goto continue
end
if fdata == nil then
goto continue
end
step = fstep
step = fstep
local partials = {}
local partials = {}
local serie_idx = 0
for _, _ in pairs(fdata) do
serie_idx = serie_idx + 1 -- the first id is 1
local name = schema._metrics[serie_idx]
local fdata_name = names[serie_idx]
serie = fdata[fdata_name]
local serie_idx = 0
for _,_ in pairs(fdata) do
serie_idx = serie_idx + 1 -- the first id is 1
local name = schema._metrics[serie_idx]
local fdata_name = names[serie_idx]
serie = fdata[fdata_name]
local max_val = ts_common.getMaxPointValue(schema, name, serie_tags)
partials[name] = 0
local max_val = ts_common.getMaxPointValue(schema, name, serie_tags)
partials[name] = 0
-- Remove the last value: RRD seems to give an additional point
serie[#serie] = nil
-- Remove the last value: RRD seems to give an additional point
serie[#serie] = nil
if (#total_series ~= 0) and #total_series ~= #serie then
-- NOTE: even if touchRRD is used, series can still have a different number
-- of points when the tend parameter does not correspond to the current time
-- e.g. when comparing with the past or manually zooming.
-- In this case, total serie il discarded as it's incorrect
total_valid = false
end
if (#total_series ~= 0) and #total_series ~= #serie then
-- NOTE: even if touchRRD is used, series can still have a different number
-- of points when the tend parameter does not correspond to the current time
-- e.g. when comparing with the past or manually zooming.
-- In this case, total serie il discarded as it's incorrect
total_valid = false
end
for i = #total_series + 1, #serie do
-- init
total_series[i] = 0
end
for i=#total_series + 1, #serie do
-- init
total_series[i] = 0
end
for i, v in pairs(serie) do
local v = ts_common.normalizeVal(v, max_val, options)
for i, v in pairs(serie) do
local v = ts_common.normalizeVal(v, max_val, options)
if type(v) == "number" then
sum = sum + v
partials[name] = partials[name] + v * step
total_series[i] = total_series[i] + v
end
end
end
if type(v) == "number" then
sum = sum + v
partials[name] = partials[name] + v * step
total_series[i] = total_series[i] + v
end
end
end
items[serie_tags[top_tag]] = sum * step
tag_2_series[serie_tags[top_tag]] = {serie_tags, partials}
items[serie_tags[top_tag]] = sum * step
tag_2_series[serie_tags[top_tag]] = {serie_tags, partials}
::continue::
end
::continue::
end
local topk = {}
local topk = {}
for top_item, value in pairsByValues(items, rev) do
if value > 0 then
topk[#topk + 1] = {
tags = tag_2_series[top_item][1],
value = value,
partials = tag_2_series[top_item][2]
}
end
for top_item, value in pairsByValues(items, rev) do
if value > 0 then
topk[#topk + 1] = {
tags = tag_2_series[top_item][1],
value = value,
partials = tag_2_series[top_item][2],
}
end
if #topk >= options.top then
break
end
end
if #topk >= options.top then
break
end
end
local stats = nil
local stats = nil
-- table.clone needed as augumented_total can be modified below (sampleSeries works on it in-place)
local augumented_total = table.clone(total_series)
-- table.clone needed as augumented_total can be modified below (sampleSeries works on it in-place)
local augumented_total = table.clone(total_series)
if options.initial_point and total_series then
-- remove initial point to avoid stats calculation on it
table.remove(total_series, 1)
end
if options.initial_point and total_series then
-- remove initial point to avoid stats calculation on it
table.remove(total_series, 1)
end
local fstep, count = sampleSeries(schema, #augumented_total, step, options.max_num_points, {{
data = augumented_total
}})
local fstep, count = sampleSeries(schema, #augumented_total, step, options.max_num_points, {{data=augumented_total}})
if options.calculate_stats then
stats = ts_common.calculateStatistics(total_series, step, tend - tstart, schema.options.metrics_type)
stats = table.merge(stats, ts_common.calculateMinMax(augumented_total))
end
if options.calculate_stats then
stats = ts_common.calculateStatistics(total_series, step, tend - tstart, schema.options.metrics_type)
stats = table.merge(stats, ts_common.calculateMinMax(augumented_total))
end
if not total_valid then
total_series = nil
augumented_total = nil
end
if not total_valid then
total_series = nil
augumented_total = nil
end
return {
topk = topk,
additional_series = {
total = augumented_total,
},
statistics = stats,
}
return {
topk = topk,
additional_series = {
total = augumented_total
},
statistics = stats
}
end
-- ##############################################