-- -- (C) 2021-21 - ntop.org -- -- Module to keep things in common across alert_store of various type local dirs = ntop.getDirs() -- Import the classes library. local classes = require "classes" require "lua_utils" local json = require "dkjson" local format_utils = require "format_utils" local alert_consts = require "alert_consts" local alert_utils = require "alert_utils" local alert_severities = require "alert_severities" -- ############################################## local alert_store = classes.class() -- ############################################## -- Default number of time slots to be returned when aggregating by time local NUM_TIME_SLOTS = 31 -- ############################################## function alert_store:init(args) self._where = { "1 = 1" } self._group_by = nil end -- ############################################## function alert_store:_escape(str) if not str then return "" end return str:gsub("'", "''") end -- ############################################## --@brief Check if the submitted fields are avalid (i.e., they are not injection attempts) function alert_store:_valid_fields(fields) local f = fields:split(",") or { fields } for _, field in pairs(f) do -- only allow alphanumeric characters and underscores if not string.match(field, "^[%w_(*) ]+$") then traceError(TRACE_ERROR, TRACE_CONSOLE, string.format("Invalid field found in query [%s]", field:gsub('%W','') --[[ prevent stored injections --]])) return false end end return true end -- ############################################## --@brief Return the alert family name function alert_store:get_family() local family_name if self._alert_entity then family_name = self._alert_entity.alert_store_name end return family_name end -- ############################################## --@brief Add filters on status (engaged or historical) --@param engaged true to select engaged alerts --@return True if set is successful, false otherwise function alert_store:add_status_filter(engaged) self._engaged = engaged return true end -- ############################################## --@brief Handle filter operator (eq, lt, gt, gte, lte) function alert_store:strip_filter_operator(value) if isEmptyString(value) then return nil, nil end local filter = split(value, ",") return filter[1], filter[2] end -- ############################################## --@brief Add filters on time --@param epoch_begin The start timestamp --@param epoch_end The end timestamp --@return True if set is successful, false otherwise function alert_store:add_time_filter(epoch_begin, epoch_end) if not self._epoch_begin and tonumber(epoch_begin) then self._epoch_begin = tonumber(epoch_begin) self._where[#self._where + 1] = string.format("tstamp >= %u", epoch_begin) end if not self._epoch_end and tonumber(epoch_end) then self._epoch_end = tonumber(epoch_end) self._where[#self._where + 1] = string.format("tstamp <= %u", epoch_end) end return true end -- ############################################## --@brief Add filters on alert id --@param alert_id The id of an alert to be filtered --@return True if set is successful, false otherwise function alert_store:add_alert_id_filter(alert_id) if alert_id then local alert_id, op = self:strip_filter_operator(alert_id) if not self._alert_id and tonumber(alert_id) then self._alert_id = tonumber(alert_id) self._where[#self._where + 1] = string.format("alert_id = %u", alert_id) return true end end return false end -- ############################################## --@brief Add filters on alert severity --@param alert_severity The severity of an alert to be filtered --@return True if set is successful, false otherwise function alert_store:add_alert_severity_filter(alert_severity) if alert_severity then local alert_severity, op = self:strip_filter_operator(alert_severity) if not self._alert_severity and tonumber(alert_severity) then self._alert_severity = tonumber(alert_severity) self._where[#self._where + 1] = string.format("severity = %u", alert_severity) return true end end return false end -- ############################################## --@brief Add filters on alert rowid --@param rowid The rowid of an alert to be filtered --@return True if set is successful, false otherwise function alert_store:add_alert_rowid_filter(rowid) if tonumber(rowid) then self._where[#self._where + 1] = string.format("rowid = %u", rowid) return true end return false end -- ############################################## --@brief Pagination options to fetch partial results --@param limit The number of results to be returned --@param offset The number of records to skip before returning results --@return True if set is successful, false otherwise function alert_store:add_limit(limit, offset) if not self._limit and tonumber(limit) then self._limit = limit if not self._offset and tonumber(offset) then self._offset = offset end return true end return false end -- ############################################## --@brief Specify the sort criteria of the query --@param sort_column The column to be used for sorting --@param sort_order Order, either `asc` or `desc` --@return True if set is successful, false otherwise function alert_store:add_order_by(sort_column, sort_order) if not self._order_by and sort_column and self:_valid_fields(sort_column) and (sort_order == "asc" or sort_order == "desc") then self._order_by = {sort_column = sort_column, sort_order = sort_order} return true end return false end -- ############################################## function alert_store:group_by(fields) if not self._group_by and fields and self:_valid_fields(fields) then self._group_by = fields return true end return false end -- ############################################## function alert_store:insert(alert) traceError(TRACE_NORMAL, TRACE_CONSOLE, "alert_store:insert") return false end -- ############################################## --@brief Deletes data according to specified filters function alert_store:delete() local where_clause = table.concat(self._where, " AND ") -- Prepare the final query local q = string.format("DELETE FROM `%s` WHERE %s ", self._table_name, where_clause) local res = interface.alert_store_query(q) return res and table.len(res) == 0 end -- ############################################## function alert_store:select_historical(filter, fields) local res = {} local where_clause = '' local group_by_clause = '' local order_by_clause = '' local limit_clause = '' local offset_clause = '' -- TODO handle fields (e.g. add entity value to WHERE) -- Select everything by defaul fields = fields or '*' if not self:_valid_fields(fields) then return res end where_clause = table.concat(self._where, " AND ") -- [OPTIONAL] Add the group by if self._group_by then group_by_clause = string.format("GROUP BY %s", self._group_by) end -- [OPTIONAL] Add sort criteria if self._order_by then order_by_clause = string.format("ORDER BY %s %s", self._order_by.sort_column, self._order_by.sort_order) end -- [OPTIONAL] Add limit for pagination if self._limit then limit_clause = string.format("LIMIT %u", self._limit) end -- [OPTIONAL] Add offset for pagination if self._offset then offset_clause = string.format("OFFSET %u", self._offset) end -- Prepare the final query -- NOTE: entity_id is necessary as alert_utils.formatAlertMessage assumes it to always be present inside the alert local q = string.format(" SELECT %u entity_id, %s FROM `%s` WHERE %s %s %s %s %s", self._alert_entity.entity_id, fields, self._table_name, where_clause, group_by_clause, order_by_clause, limit_clause, offset_clause) res = interface.alert_store_query(q) return res end -- ############################################## --@brief Selects engaged alerts from memory --@return Selected engaged alerts, and the total number of engaged alerts function alert_store:select_engaged(filter) local alert_id_filter = tonumber(self._alert_id) local severity_filter = tonumber(self._alert_severity) local entity_id_filter = tonumber(self._alert_entity and self._alert_entity.entity_id) -- Possibly set in subclasses constructor local entity_value_filter = filter or self._entity_value -- tprint(string.format("id=%s sev=%s entity=%s val=%s", alert_id_filter, severity_filter, entity_id_filter, entity_value_filter)) local alerts = interface.getEngagedAlerts(entity_id_filter, entity_value_filter, alert_id_filter, severity_filter) local total_rows = 0 local sort_2_col = {} -- Sort and filtering for idx, alert in pairs(alerts) do -- Exclude alerts falling outside requested time ranges local tstamp = tonumber(alert.tstamp) if self._epoch_begin and tstamp < self._epoch_begin then goto continue end if self._epoch_end and tstamp > self._epoch_end then goto continue end if self._order_by and self._order_by.sort_column and alert[self._order_by.sort_column] ~= nil then sort_2_col[#sort_2_col + 1] = {idx = idx, val = tonumber(alert[self._order_by.sort_column]) or string.format("%s", alert[self._order_by.sort_column])} else sort_2_col[#sort_2_col + 1] = {idx = idx, val = tstamp} end total_rows = total_rows + 1 ::continue:: end -- Pagination local offset = self._offset or 0 -- The offset, or zero (start from the beginning) if no offset is set local limit = self._limit or total_rows -- The limit, or the actual number of records, ie., no limit local res = {} local i = 0 for _, val in pairsByField(sort_2_col, "val", ternary(self._order_by and self._order_by.sort_order and self._order_by.sort_order == "asc", asc, rev)) do if i >= offset + limit then break end if i >= offset then res[#res + 1] = alerts[val.idx] end i = i + 1 end return res, total_rows end -- ############################################## --@brief Performs a query and counts the number of records function alert_store:count() local count_query = self:select_historical(nil, "count(*) as count") local num_results = tonumber(count_query[1]["count"]) return num_results end -- ############################################## --@brief Returns minimum and maximum timestamps and the time slot width to -- be used for queries performing group-by-time operations function alert_store:_count_by_time_get_bounds() local now = os.time() local min_slot = self._epoch_begin or (now - 3600) local max_slot = self._epoch_end or now local slot_width -- Compute the width to obtain a fixed number of points local slot_span = max_slot - min_slot if slot_span < 0 or slot_span < NUM_TIME_SLOTS then -- Slot width is 1 second, can't be smaller than this slot_width = 1 else -- Result is the floor to return an integer number slot_width = math.floor(slot_span / NUM_TIME_SLOTS) end -- Align the range using the width of the time slot to always return aligned data min_slot = min_slot - (min_slot % slot_width) max_slot = min_slot + slot_width * NUM_TIME_SLOTS return min_slot, max_slot, slot_width end -- ############################################## --@brief Pad missing points with zeroes and prepare the series function alert_store:_prepare_count_by_severity_and_time_series(all_severities, min_slot, max_slot, time_slot_width) local res = {} if table.len(all_severities) == 0 then -- No series, add a dummy series for "no alerts" local noalert_res = {} for slot = min_slot, max_slot + 1, time_slot_width do noalert_res[#noalert_res + 1] = {slot * 1000 --[[ In milliseconds --]], 0} end res[0] = noalert_res return res end -- Pad missing points with zeroes for _, severity in pairs(alert_severities) do local severity_id = tonumber(severity.severity_id) -- Empty series for this severity, skip if not all_severities[severity_id] then goto skip_severity_pad end for slot = min_slot, max_slot + 1, time_slot_width do if not all_severities[severity_id].all_slots[slot] then all_severities[severity_id].all_slots[slot] = 0 end end ::skip_severity_pad:: end -- Prepare the result as a Lua array ordered by time slot for _, severity in pairs(alert_severities) do local severity_id = tonumber(severity.severity_id) -- Empty series for this severity, skip if not all_severities[severity_id] then goto skip_severity_prep end local severity_res = {} for slot, count in pairsByKeys(all_severities[severity_id].all_slots, asc) do severity_res[#severity_res + 1] = {slot * 1000 --[[ In milliseconds --]], count} end res[severity_id] = severity_res ::skip_severity_prep:: end return res end -- ############################################## --@brief Counts the number of engaged alerts in multiple time slots function alert_store:count_by_severity_and_time_engaged(filter, severity) local min_slot, max_slot, time_slot_width = self:_count_by_time_get_bounds() local alert_id_filter = tonumber(self._alert_id) local severity_filter = tonumber(severity) or tonumber(self._alert_severity) local entity_id_filter = tonumber(self._alert_entity and self._alert_entity.entity_id) -- Possibly set in subclasses constructor local entity_value_filter = filter or self._entity_value -- tprint(string.format("id=%s sev=%s entity=%s val=%s", alert_id_filter, severity_filter, entity_id_filter, entity_value_filter)) local alerts = interface.getEngagedAlerts(entity_id_filter, entity_value_filter, alert_id_filter, severity_filter) local all_severities = {} local all_slots = {} -- Calculate minimum and maximum slots to make sure the response always returns consecutive time slots, possibly filled with zeroes for _, alert in ipairs(alerts) do local severity_id = alert.severity local tstamp = tonumber(alert.tstamp) local cur_slot = tstamp - (tstamp % time_slot_width) -- Exclude alerts falling outside requested time ranges if self._epoch_begin and tstamp < self._epoch_begin then goto continue end if self._epoch_end and tstamp > self._epoch_end then goto continue end if not all_severities[severity_id] then all_severities[severity_id] = {} end if not all_severities[severity_id].all_slots then all_severities[severity_id].all_slots = {} end all_severities[severity_id].all_slots[cur_slot] = (all_severities[severity_id].all_slots[cur_slot] or 0) + 1 ::continue:: end return self:_prepare_count_by_severity_and_time_series(all_severities, min_slot, max_slot, time_slot_width) end -- ############################################## --@brief Performs a query and counts the number of records in multiple time slots function alert_store:count_by_severity_and_time_historical() -- Preserve all the filters currently set local min_slot, max_slot, time_slot_width = self:_count_by_time_get_bounds() local where_clause = table.concat(self._where, " AND ") if severity then where_clause = string.format("severity = %u", severity) .. " AND " .. where_clause end -- Group by according to the timeslot, that is, the alert timestamp MODULO the slot width local q = string.format("SELECT severity, (tstamp - tstamp %% %u) as slot, count(*) count FROM %s WHERE %s GROUP BY severity, slot ORDER BY severity, slot ASC", time_slot_width, self._table_name, where_clause) local q_res = interface.alert_store_query(q) local all_severities = {} -- Read points from the query for _, p in ipairs(q_res) do local severity_id = tonumber(p.severity) if not all_severities[severity_id] then all_severities[severity_id] = {} end if not all_severities[severity_id].all_slots then all_severities[severity_id].all_slots = {} end -- Make sure slots are within the requested bounds local cur_slot = tonumber(p.slot) local cur_count = tonumber(p.count) if cur_slot >= min_slot and cur_slot <= max_slot then all_severities[severity_id].all_slots[cur_slot] = cur_count end end return self:_prepare_count_by_severity_and_time_series(all_severities, min_slot, max_slot, time_slot_width) end -- ############################################## --@brief Count from memory (engaged) or database (historical) --@return Alert counters divided into severity and time slots function alert_store:count_by_severity_and_time() -- Add filters self:add_request_filters() -- Add limits and sort criteria self:add_request_ranges() if self._engaged then -- Engaged return self:count_by_severity_and_time_engaged() else -- Historical return self:count_by_severity_and_time_historical() end end -- ############################################## -- REST API Utility Functions -- ############################################## --@brief Handle count requests (GET) from memory (engaged) or database (historical) --@return Alert counters divided into severity and time slots function alert_store:count_by_severity_and_time_request() local res = { series = {}, colors = {} } local count_data = self:count_by_severity_and_time() for _, severity in pairsByField(alert_severities, "severity_id", rev) do if(count_data[severity.severity_id] ~= nil) then res.series[#res.series + 1] = { name = i18n(severity.i18n_title), data = count_data[severity.severity_id], } res.colors[#res.colors + 1] = severity.color end end if table.len(res.series) == 0 and count_data[0] ~= nil then res.series[#res.series + 1] = { name = i18n("alerts_dashboard.no_alerts"), data = count_data[0], } res.colors[#res.colors + 1] = "#ccc" end return res end -- ############################################## --@brief Handle alerts select request (GET) from memory (engaged) or database (historical) --@param filter A filter on the entity value (no filter by default) --@param select_fields The fields to be returned (all by default or in any case for engaged) --@return Selected alerts, and the total number of alerts function alert_store:select_request(filter, select_fields) -- Add filters self:add_request_filters() if self._engaged then -- Engaged -- Add limits and sort criteria self:add_request_ranges() local alerts, total_rows = self:select_engaged(filter) return alerts, total_rows else -- Historical -- Count local total_row = self:count() -- Add limits and sort criteria only after the count has been done self:add_request_ranges() local res = self:select_historical(filter, select_fields) return res, total_row end end -- ############################################## --@brief Possibly overridden in subclasses to add additional filters from the request function alert_store:_add_additional_request_filters() end -- ############################################## --@brief Add filters according to what is specified inside the REST API function alert_store:add_request_filters() local epoch_begin = tonumber(_GET["epoch_begin"]) local epoch_end = tonumber(_GET["epoch_end"]) local alert_id = _GET["alert_id"] or _GET["alert_type"] --[[ compatibility ]]-- local alert_severity = _GET["alert_severity"] or _GET["severity"] local rowid = _GET["row_id"] local status = _GET["status"] self:add_time_filter(epoch_begin, epoch_end) self:add_alert_id_filter(alert_id) self:add_alert_severity_filter(alert_severity) self:add_status_filter(status and status == 'engaged') self:add_alert_rowid_filter(rowid) self:_add_additional_request_filters() end -- ############################################## --@brief Possibly overridden in subclasses to get info about additional available filters function alert_store:_get_additional_available_filters() return {} end -- ############################################## --@brief Get info about available filters function alert_store:get_available_filters() local additional_filters = self:_get_additional_available_filters() local filters = { -- Note alert_id could have been defined here for all families, -- however this requires a migration of the 'other' alerts -- in order to list alerts by entity severity = { value_type = 'severity', }, } return table.merge(filters, additional_filters) end -- ############################################## --@brief Add offset, limit, and group by filters according to what is specified inside the REST API function alert_store:add_request_ranges() local start = tonumber(_GET["start"]) --[[ The OFFSET: default no offset --]] local length = tonumber(_GET["length"]) --[[ The LIMIT: default no limit --]] local sort_column = _GET["sort"] local sort_order = _GET["order"] self:add_limit(length, start) self:add_order_by(sort_column, sort_order) end -- ############################################## --@brief Convert an alert coming from the DB (value) to a record returned by the REST API function alert_store:format_record_common(value, entity_id, no_html) local record = {} -- Note: this record is rendered by -- httpdocs/templates/pages/alerts/families/{host,..}/table[.js].template record["family"] = self:get_family() record["row_id"] = value["rowid"] local score = tonumber(value["score"]) local severity_id = ntop.mapScoreToSeverity(score) local severity = alert_consts.alertSeverityById(severity_id) local tstamp = tonumber(value["alert_tstamp"] or value["tstamp"]) record["tstamp"] = { value = tstamp, label = format_utils.formatPastEpochShort(tstamp), highlight = severity.color, } record["alert_id"] = { value = value["alert_id"], label = alert_consts.alertTypeLabel(tonumber(value["alert_id"]), no_html, entity_id), } record["score"] = { value = score, label = format_utils.formatValue(score), color = severity.color, } local severity_label = "" if severity and not no_html then severity_label = " " elseif severity and no_html then severity_label = severity.i18n_title end record["severity"] = { value = severity_id, label = severity_label, color = severity.color, } if tonumber(value["tstamp_end"]) > 0 and tonumber(value["tstamp"]) > 0 then record["duration"] = tonumber(value["tstamp_end"]) - tonumber(value["tstamp"]) elseif tonumber(value["tstamp"]) > 0 then local now = os.time() record["duration"] = now - tonumber(value["tstamp"]) end record["count"] = tonumber(value["count"]) or 1 local alert_json = json.decode(value["json"]) or {} record["script_key"] = alert_json["alert_generation"] and alert_json["alert_generation"]["script_key"] return record end -- ############################################## --@brief Deletes old data according to the configuration or up to a safe limit function alert_store:housekeeping() local prefs = ntop.getPrefs() -- By Number of records local max_entity_alerts = prefs.max_entity_alerts local limit = math.floor(max_entity_alerts * 0.8) -- deletes 20% more alerts than the maximum number local q = string.format("DELETE FROM `%s` WHERE rowid <= (SELECT rowid FROM `%s` ORDER BY rowid DESC LIMIT 1 OFFSET %u)", self._table_name, self._table_name, limit) local deleted = interface.alert_store_query(q) -- By Time local now = os.time() local max_time_sec = prefs.max_num_secs_before_delete_alert local expiration_epoch = now - max_time_sec q = string.format("DELETE FROM `%s` WHERE tstamp < %u", self._table_name, expiration_epoch) deleted = interface.alert_store_query(q) end -- ############################################## return alert_store