Add last-timestamp check to avoid adding invalid 0 points in the charts

The issue occurred in InfluxDB due to a delay between buffering and actual export.
This possible addresses #2853
This commit is contained in:
emanuele-f 2019-09-16 20:04:30 +02:00
parent ef7d8614d9
commit d29bb578c0
2 changed files with 36 additions and 4 deletions

View file

@ -295,9 +295,10 @@ end
-- ##############################################
local function influx2Series(schema, tstart, tend, tags, options, data, time_step)
local function influx2Series(schema, tstart, tend, tags, options, data, time_step, last_ts)
local series = {}
local max_vals = {}
last_ts = last_ts or os.time()
-- Create the columns
for i=2, #data.columns do
@ -325,6 +326,11 @@ local function influx2Series(schema, tstart, tend, tags, options, data, time_ste
goto continue
end
if(cur_t >= last_ts) then
-- skip values exceeding the last point
break
end
-- Fill the missing points
while((cur_t - next_t) >= time_step) do
for _, serie in pairs(series) do
@ -360,6 +366,11 @@ local function influx2Series(schema, tstart, tend, tags, options, data, time_ste
-- Fill the missing points at the end
while((tend - next_t) >= 0) do
if(next_t >= last_ts) then
-- skip values exceeding the last point
break
end
for _, serie in pairs(series) do
serie.data[series_idx] = options.fill_value
end
@ -530,15 +541,30 @@ function driver:query(schema, tstart, tend, tags, options)
local query = self:_makeSeriesQuery(query_schema, metrics, tags, tstart, tend + unaligned_offset, time_step, schema)
local url = self.url
local data = influx_query(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", query, self.username, self.password, options)
local data = {}
local series, count
-- Perform an additional query to determine the last point in the raw data
local last_ts_query = string.format('SELECT LAST('.. schema._metrics[1] ..') FROM %s', query_schema)
local jres = influx_query_multi(url .. "/query?db=".. getDatabaseName(schema, self.db) .."&epoch=s", string.format("%s;%s", query, last_ts_query), self.username, self.password, options)
local last_ts = os.time()
if(jres and jres.results and (#jres.results == 2)) then
if jres.results[1].series then
data = jres.results[1]
end
if jres.results[2].series and jres.results[2].series[1].values then
last_ts = jres.results[2].series[1].values[1][1]
end
end
if table.empty(data) then
series, count = ts_common.fillSeries(schema, tstart + time_step, tend, time_step, options.fill_value)
else
-- Note: we are working with intervals because of derivatives. The first interval ends at tstart + time_step
-- which is the first value returned by InfluxDB
series, count, tstart = influx2Series(schema, tstart + time_step, tend, tags, options, data.series[1], time_step)
series, count, tstart = influx2Series(schema, tstart + time_step, tend, tags, options, data.series[1], time_step, last_ts)
end
local total_serie = nil