Properly handle CQ aggregated types for counters

This commit is contained in:
emanuele-f 2019-02-06 19:16:28 +01:00
parent 300e6c4672
commit b82aea611e
2 changed files with 48 additions and 20 deletions

View file

@ -131,11 +131,13 @@ local function retentionPolicyToSchema(schema, rp)
end
if not isEmptyString(rp) then
return string.format('"%s"."%s"', rp, schema.name), ternary(rp == "1d", 86400, 3600)
-- counters become integrals
local metric_type = ternary(schema.options.metrics_type == ts_common.metrics.counter, ts_common.metrics.integral, schema.options.metrics_type)
return string.format('"%s"."%s"', rp, schema.name), ternary(rp == "1d", 86400, 3600), metric_type
end
-- raw
return string.format('"%s"', schema.name), schema.options.step
return string.format('"%s"', schema.name), schema.options.step, schema.options.metrics_type
end
-- ##############################################
@ -295,8 +297,17 @@ local function getTotalSerieQuery(schema, query_schema, raw_step, tstart, tend,
end
if time_step and (raw_step ~= time_step) then
local query_2 = 'SELECT (MEAN("'.. label ..'")'
if data_type == ts_common.metrics.integral then
query_2 = query_2 .. "/" .. raw_step
end
-- sample the points
query = 'SELECT MEAN("'.. label ..'") AS "'.. label ..'" FROM ('.. query ..') GROUP BY time('.. time_step ..'s)'
query_2 = query_2 .. ') AS "'.. label ..'" FROM ('.. query ..') GROUP BY time('.. time_step ..'s)'
query = query_2
elseif data_type == ts_common.metrics.integral then
query = 'SELECT (' .. label .. ' / '.. raw_step ..') AS ' .. label .. ' FROM(' .. query .. ')'
end
if data_type == ts_common.metrics.counter then
@ -321,8 +332,7 @@ end
-- ##############################################
function driver:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset)
local data_type = schema.options.metrics_type
function driver:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
local query = getTotalSerieQuery(schema, query_schema, raw_step, tstart, tend + unaligned_offset, tags, time_step, data_type, label)
local data = influx_query(url .. "/query?db=".. self.db .."&epoch=s", query, self.username, self.password, options)
@ -371,9 +381,8 @@ end
function driver:query(schema, tstart, tend, tags, options)
local metrics = {}
local data_type = schema.options.metrics_type
local retention_policy = getSchemaRetentionPolicy(schema, tstart, tend, options)
local query_schema, raw_step = retentionPolicyToSchema(schema, retention_policy)
local query_schema, raw_step, data_type = retentionPolicyToSchema(schema, retention_policy)
local time_step = ts_common.calculateSampledTimeStep(raw_step, tstart, tend, options)
-- NOTE: this offset is necessary to fix graph edge points when data insertion is not aligned with tstep
@ -383,7 +392,9 @@ function driver:query(schema, tstart, tend, tags, options)
-- NOTE: why we need to device by time_step ? is MEAN+GROUP BY TIME bugged?
if data_type == ts_common.metrics.counter then
metrics[i] = "(DERIVATIVE(MEAN(\"" .. metric .. "\")) / ".. time_step ..") as " .. metric
else
elseif data_type == ts_common.metrics.integral then
metrics[i] = "(MEAN(\"".. metric .."\") / ".. raw_step ..") as " .. metric
else -- gauge
metrics[i] = "MEAN(\"".. metric .."\") as " .. metric
end
end
@ -422,7 +433,7 @@ function driver:query(schema, tstart, tend, tags, options)
else
-- try to inherit label from existing series
local label = series and series[1] and series[1].label
total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart + time_step, tend, tags, options, url, time_step, label, unaligned_offset)
total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart + time_step, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
end
if total_serie then
@ -431,8 +442,10 @@ function driver:query(schema, tstart, tend, tags, options)
if stats.total ~= nil then
-- override total and average
local stats_query = "(SELECT ".. table.concat(schema._metrics, " + ") .. ' AS value FROM ' .. query_schema ..
' ' ..getWhereClause(tags, tstart, tend, unaligned_offset) .. ")"
stats_query = "(SELECT NON_NEGATIVE_DIFFERENCE(value) as value FROM " .. stats_query .. ")"
' ' ..getWhereClause(tags, tstart, tend, ternary((data_type == ts_common.metrics.integral), -1, unaligned_offset)) .. ")"
if data_type ~= ts_common.metrics.integral then
stats_query = "(SELECT NON_NEGATIVE_DIFFERENCE(value) as value FROM " .. stats_query .. ")"
end
stats_query = "SELECT SUM(value) FROM " .. stats_query
stats = table.merge(stats, self:_performStatsQuery(stats_query, tstart, tend))
@ -466,7 +479,7 @@ function driver:query(schema, tstart, tend, tags, options)
if total_serie then
local label = series and series[1].label
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset) or {options.fill_value}
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset, data_type) or {options.fill_value}
table.insert(total_serie, 1, additional_pt[1])
end
@ -680,7 +693,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
local top_tag = top_tags[1]
local retention_policy = getSchemaRetentionPolicy(schema, tstart, tend, options)
local query_schema, raw_step = retentionPolicyToSchema(schema, retention_policy)
local query_schema, raw_step, data_type = retentionPolicyToSchema(schema, retention_policy)
-- NOTE: this offset is necessary to fix graph edge points when data insertion is not aligned with tstep
local unaligned_offset = raw_step - 1
@ -690,7 +703,11 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
local all_metrics = table.concat(schema._metrics, ", ")
for idx, metric in ipairs(schema._metrics) do
derivate_metrics[idx] = 'NON_NEGATIVE_DIFFERENCE('.. metric .. ') as ' .. metric
if data_type == "counter" then
derivate_metrics[idx] = 'NON_NEGATIVE_DIFFERENCE('.. metric .. ') as ' .. metric
else -- integral
derivate_metrics[idx] = metric
end
sum_metrics[idx] = 'SUM('.. metric .. ') as ' .. metric
end
@ -709,8 +726,10 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
' '.. getWhereClause(tags, tstart, tend, unaligned_offset) ..')'
-- Calculate difference between counter values
base_query = '(SELECT NON_NEGATIVE_DIFFERENCE(value) as value, '.. table.concat(derivate_metrics, ", ") ..
if data_type == "counter" then
base_query = '(SELECT NON_NEGATIVE_DIFFERENCE(value) as value, '.. table.concat(derivate_metrics, ", ") ..
' FROM ' .. base_query .. " GROUP BY ".. top_tag ..")"
end
-- Sum the traffic
base_query = '(SELECT SUM(value) AS value, '.. table.concat(sum_metrics, ", ") ..
@ -761,7 +780,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
local time_step = ts_common.calculateSampledTimeStep(raw_step, tstart, tend, options)
local label = series and series[1].label
local total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset)
local total_serie = self:_makeTotalSerie(schema, query_schema, raw_step, tstart, tend, tags, options, url, time_step, label, unaligned_offset, data_type)
local stats = nil
if options.calculate_stats and total_serie then
@ -778,7 +797,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
end
if options.initial_point and total_serie then
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset) or {options.fill_value}
local additional_pt = self:_makeTotalSerie(schema, query_schema, raw_step, tstart-time_step, tstart, tags, options, url, time_step, label, unaligned_offset, data_type) or {options.fill_value}
table.insert(total_serie, 1, additional_pt[1])
end
@ -798,9 +817,9 @@ end
-- ##############################################
-- TODO check with integral type
function driver:queryTotal(schema, tstart, tend, tags, options)
local data_type = schema.options.metrics_type
local query_schema = getQuerySchema(schema, tstart, tend, options)
local query_schema, raw_step, data_type = getQuerySchema(schema, tstart, tend, options)
local query
if data_type == ts_common.metrics.counter then
@ -819,10 +838,17 @@ function driver:queryTotal(schema, tstart, tend, tags, options)
'(SELECT ' .. table.concat(metrics, ", ") .. ' FROM '.. query_schema ..' WHERE ' ..
table.tconcat(tags, "=", " AND ", nil, "'") .. ' AND time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000)'
else
-- gauge/integral
local metrics = {}
for i, metric in ipairs(schema._metrics) do
metrics[i] = "SUM(" .. metric .. ") as " .. metric
metrics[i] = "(SUM(" .. metric .. ")"
if data_type == ts_common.metrics.integral then
metrics[i] = metrics[i] .. "/" .. raw_step
end
metrics[i] = metrics[i] .. ") as " .. metric
end
query = 'SELECT ' .. table.concat(metrics, ", ") .. ' FROM ' .. query_schema ..' WHERE ' ..
@ -851,6 +877,7 @@ end
-- ##############################################
-- TODO check with integral type
function driver:queryMean(schema, tags, tstart, tend)
local metrics = {}
local query_schema = getQuerySchema(schema, tstart, tend)