Simplify CQ handling by storing data as derivatives

This requires less complexity than integral handling as it can now be handled similarly as gauge metrics
This commit is contained in:
emanuele-f 2019-02-07 12:11:44 +01:00
parent b82aea611e
commit d28bd1e4e9
3 changed files with 26 additions and 36 deletions

View file

@ -126,13 +126,13 @@ end
-- returns schema_name, step
local function retentionPolicyToSchema(schema, rp)
if rp == "raw" then
if((rp == "raw") or (rp == "autogen")) then
rp = nil
end
if not isEmptyString(rp) then
-- counters become integrals
local metric_type = ternary(schema.options.metrics_type == ts_common.metrics.counter, ts_common.metrics.integral, schema.options.metrics_type)
-- counters become derivatives
local metric_type = ternary(schema.options.metrics_type == ts_common.metrics.counter, ts_common.metrics.derivative, schema.options.metrics_type)
return string.format('"%s"."%s"', rp, schema.name), ternary(rp == "1d", 86400, 3600), metric_type
end
@ -297,17 +297,8 @@ local function getTotalSerieQuery(schema, query_schema, raw_step, tstart, tend,
end
if time_step and (raw_step ~= time_step) then
local query_2 = 'SELECT (MEAN("'.. label ..'")'
if data_type == ts_common.metrics.integral then
query_2 = query_2 .. "/" .. raw_step
end
-- sample the points
query_2 = query_2 .. ') AS "'.. label ..'" FROM ('.. query ..') GROUP BY time('.. time_step ..'s)'
query = query_2
elseif data_type == ts_common.metrics.integral then
query = 'SELECT (' .. label .. ' / '.. raw_step ..') AS ' .. label .. ' FROM(' .. query .. ')'
query = 'SELECT MEAN("'.. label ..'") AS "'.. label ..'" FROM ('.. query ..') GROUP BY time('.. time_step ..'s)'
end
if data_type == ts_common.metrics.counter then
@ -392,9 +383,7 @@ function driver:query(schema, tstart, tend, tags, options)
-- NOTE: why we need to device by time_step ? is MEAN+GROUP BY TIME bugged?
if data_type == ts_common.metrics.counter then
metrics[i] = "(DERIVATIVE(MEAN(\"" .. metric .. "\")) / ".. time_step ..") as " .. metric
elseif data_type == ts_common.metrics.integral then
metrics[i] = "(MEAN(\"".. metric .."\") / ".. raw_step ..") as " .. metric
else -- gauge
else -- gauge / derivative
metrics[i] = "MEAN(\"".. metric .."\") as " .. metric
end
end
@ -437,13 +426,13 @@ function driver:query(schema, tstart, tend, tags, options)
end
if total_serie then
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, schema.options.metrics_type)
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, data_type)
if stats.total ~= nil then
-- override total and average
local stats_query = "(SELECT ".. table.concat(schema._metrics, " + ") .. ' AS value FROM ' .. query_schema ..
' ' ..getWhereClause(tags, tstart, tend, ternary((data_type == ts_common.metrics.integral), -1, unaligned_offset)) .. ")"
if data_type ~= ts_common.metrics.integral then
' ' ..getWhereClause(tags, tstart, tend, ternary((data_type == ts_common.metrics.derivate), -1, unaligned_offset)) .. ")"
if data_type == ts_common.metrics.counter then
stats_query = "(SELECT NON_NEGATIVE_DIFFERENCE(value) as value FROM " .. stats_query .. ")"
end
stats_query = "SELECT SUM(value) FROM " .. stats_query
@ -705,7 +694,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
for idx, metric in ipairs(schema._metrics) do
if data_type == "counter" then
derivate_metrics[idx] = 'NON_NEGATIVE_DIFFERENCE('.. metric .. ') as ' .. metric
else -- integral
else -- derivative
derivate_metrics[idx] = metric
end
sum_metrics[idx] = 'SUM('.. metric .. ') as ' .. metric
@ -784,7 +773,7 @@ function driver:topk(schema, tags, tstart, tend, options, top_tags)
local stats = nil
if options.calculate_stats and total_serie then
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, schema.options.metrics_type)
stats = ts_common.calculateStatistics(total_serie, time_step, tend - tstart, data_type)
if stats.total ~= nil then
-- override total and average
@ -817,7 +806,6 @@ end
-- ##############################################
-- TODO check with integral type
function driver:queryTotal(schema, tstart, tend, tags, options)
local query_schema, raw_step, data_type = getQuerySchema(schema, tstart, tend, options)
local query
@ -838,14 +826,14 @@ function driver:queryTotal(schema, tstart, tend, tags, options)
'(SELECT ' .. table.concat(metrics, ", ") .. ' FROM '.. query_schema ..' WHERE ' ..
table.tconcat(tags, "=", " AND ", nil, "'") .. ' AND time >= ' .. tstart .. '000000000 AND time <= ' .. tend .. '000000000)'
else
-- gauge/integral
-- gauge/derivative
local metrics = {}
for i, metric in ipairs(schema._metrics) do
metrics[i] = "(SUM(" .. metric .. ")"
if data_type == ts_common.metrics.integral then
metrics[i] = metrics[i] .. "/" .. raw_step
if data_type == "derivative" then
metrics[i] = metrics[i] .. " * " .. raw_step
end
metrics[i] = metrics[i] .. ") as " .. metric
@ -877,10 +865,9 @@ end
-- ##############################################
-- TODO check with integral type
function driver:queryMean(schema, tags, tstart, tend)
function driver:queryMean(schema, tags, tstart, tend, options)
local metrics = {}
local query_schema = getQuerySchema(schema, tstart, tend)
local query_schema = getQuerySchema(schema, tstart, tend, options)
for i, metric in ipairs(schema._metrics) do
metrics[i] = "MEAN(" .. metric .. ") as " .. metric
@ -1096,20 +1083,21 @@ end
-- ##############################################
local function getCqQuery(dbname, tags, schema, source, dest, step, resemple)
local function getCqQuery(dbname, tags, schema, source, dest, step, dest_step, resemple)
local cq_name = string.format("%s__%s", schema.name, dest)
local resemple_s = ""
local _, _, data_type = retentionPolicyToSchema(schema, source)
if resemple then
resemple_s = "RESAMPLE FOR " .. resemple
end
if schema.options.metrics_type == ts_common.metrics.counter then
if data_type == ts_common.metrics.counter then
local sums = {}
local diffs = {}
for _, metric in ipairs(schema._metrics) do
sums[#sums + 1] = string.format('SUM(%s) as %s', metric, metric)
sums[#sums + 1] = string.format('(SUM(%s) / %u) as %s', metric, dest_step, metric)
diffs[#diffs + 1] = string.format('NON_NEGATIVE_DIFFERENCE(%s) as %s', metric, metric)
end
@ -1200,8 +1188,8 @@ function driver:setup(ts_utils)
goto continue
end
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600)
local cq_1h = getCqQuery(self.db, tags, schema, "autogen", "1h", schema.options.step, 3600, "2h")
local cq_1d = getCqQuery(self.db, tags, schema, "1h", "1d", 3600, 86400)
-- TODO temporary fix to alter existing queries, remove after beta end
queries[#queries + 1] = string.format('DROP CONTINUOUS QUERY "%s__1d" ON %s', schema.name, self.db)