Implement optimized exists query in InfluxDB

This avoids returning too much results when using standard listSeries
This commit is contained in:
emanuele-f 2019-09-05 15:58:52 +02:00
parent 48f07c0f73
commit 1d248331f6
2 changed files with 63 additions and 30 deletions

View file

@ -1053,12 +1053,9 @@ local function processListSeriesResult(data, schema, tags_filter, wildcard_tags)
return res
end
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
if schema.options.influx_internal_query then
-- internal metrics always exist
return {{}} -- exists
end
-- ##############################################
function driver:listSeries(schema, tags_filter, wildcard_tags, start_time)
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, start_time)
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
@ -1068,6 +1065,24 @@ end
-- ##############################################
function driver:exists(schema, tags_filter, wildcard_tags)
if schema.options.influx_internal_query then
-- internal metrics always exist
return(true)
end
-- Ignore wildcard_tags to avoid exessive points returned due to group by
wildcard_tags = {}
local query = makeListSeriesQuery(schema, tags_filter, wildcard_tags, 0)
local url = self.url
local data = influx_query(url .. "/query?db=".. self.db, query, self.username, self.password)
return(not table.empty(processListSeriesResult(data, schema, tags_filter, wildcard_tags)))
end
-- ##############################################
function driver:listSeriesBatched(batch)
local max_batch_size = 30
local url = self.url

View file

@ -569,15 +569,9 @@ end
-- ##############################################
local function list_series(schema_name, tags_filter, start_time, batched)
local schema = ts_utils.getSchema(schema_name)
local function getWildcardTags(schema, tags_filter)
tags_filter = tags_filter or {}
if not schema then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return nil
end
local wildcard_tags = {}
local filter_tags = {}
@ -594,19 +588,7 @@ local function list_series(schema_name, tags_filter, start_time, batched)
end
end
if not batched then
local driver = ts_utils.getQueryDriver()
if not driver then
return nil
end
ts_common.clearLastError()
return driver:listSeries(schema, filter_tags, wildcard_tags, start_time)
else
return schema, filter_tags, wildcard_tags
end
return filter_tags, wildcard_tags
end
-- ##############################################
@ -617,7 +599,21 @@ end
--! @param start_time time filter. Only timeseries updated after start_time will be returned.
--! @return a (possibly empty) list of tags values for the matching timeseries on success, nil on error.
function ts_utils.listSeries(schema_name, tags_filter, start_time)
return list_series(schema_name, tags_filter, start_time, false --[[ not batched ]])
local schema = ts_utils.getSchema(schema_name)
local driver = ts_utils.getQueryDriver()
if(not schema) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return nil
end
if(not driver) then
return nil
end
local filter_tags, wildcard_tags = getWildcardTags(schema, tags_filter)
return driver:listSeries(schema, filter_tags, wildcard_tags, start_time)
end
-- ##############################################
@ -631,12 +627,15 @@ local pending_listseries_batch = {}
--! @return nil on error, otherwise a number is returned, indicating the item id into the batch
--! @note Call ts_utils.getBatchedListSeriesResult() to get the batch responses
function ts_utils.batchListSeries(schema_name, tags_filter, start_time)
local schema, filter_tags, wildcard_tags = list_series(schema_name, tags_filter, start_time, true --[[ batched ]])
local schema = ts_utils.getSchema(schema_name)
if not schema then
if(not schema) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return nil
end
local filter_tags, wildcard_tags = getWildcardTags(schema, tags_filter)
pending_listseries_batch[#pending_listseries_batch + 1] = {
schema = schema,
start_time = start_time,
@ -681,12 +680,31 @@ end
-- ##############################################
--! @brief A shortcut for ts_utils.listSeries to verify timeseries existance.
--! @brief Verify timeseries existance.
--! @param schema_name the schema identifier.
--! @param tags_filter a list of filter tags. Tags which are not specified are considered wildcard.
--! @return true if the specified series exist, false otherwise.
function ts_utils.exists(schema_name, tags_filter)
return not table.empty(ts_utils.listSeries(schema_name, tags_filter, 0))
local driver = ts_utils.getQueryDriver()
if not driver then
return nil
end
if(driver.exists == nil) then
-- No "exists" implementation found, use listSeries fallback
return not table.empty(ts_utils.listSeries(schema_name, tags_filter, 0))
end
local schema = ts_utils.getSchema(schema_name)
if(not schema) then
traceError(TRACE_ERROR, TRACE_CONSOLE, "Schema not found: " .. schema_name)
return nil
end
local filter_tags, wildcard_tags = getWildcardTags(schema, tags_filter)
return driver:exists(schema, filter_tags, wildcard_tags)
end
-- ##############################################