Reworked clickhouse retention

This commit is contained in:
Luca Deri 2023-05-31 11:39:05 +02:00
parent fa32379501
commit a4f731d61e
3 changed files with 32 additions and 3 deletions

View file

@ -890,7 +890,8 @@ end
-- ########################################################
-- Delete partitions older than data_retention (epoch)
function db_utils.clickhouseDeleteOldPartitions(data_retention)
function db_utils.clickhouseDeleteOldPartitions(data_retention, aggregated_retention)
-- [1] Non Aggregated Retention
local day_aligned_retention = data_retention - (data_retention % 86400)
-- Create a string that identifies the PARTITIONs name of the most recent partition that will be deleted
local retention_yyyymmdd = os.date("%Y%m%d", day_aligned_retention)
@ -900,9 +901,10 @@ function db_utils.clickhouseDeleteOldPartitions(data_retention)
-- Query the partitions that need to be deleted. Convert YYYYMMDD strings into integers so that
-- only relevant partitions can be queried and deleted
-- The last condition > 999999 prevents old partitions created as YYYMM to be deleted
local partitions_q = string.format("SELECT DISTINCT database, table, toUInt32(partition) drop_part FROM system.parts WHERE active AND database='%s' AND table!='l7_protocols' AND table!='flow_risks' AND table!='alert_severities' AND table!='l7_categories' AND table!='l4_protocols' AND drop_part <= %u AND drop_part > 999999", ntop.getPrefs().mysql_dbname or 'ntopng', retention_yyyymmdd)
local partitions_q = string.format("SELECT DISTINCT database, table, toUInt32(partition) drop_part FROM system.parts WHERE active AND database='%s' AND table != 'l7_protocols' AND table != 'flow_risks' AND table != 'alert_severities' AND table != 'l7_categories' AND table != 'l4_protocols' AND table != 'aggregated_flows' AND drop_part <= %u AND drop_part > 999999", ntop.getPrefs().mysql_dbname or 'ntopng', retention_yyyymmdd)
local partitions_res = interface.execSQLQuery(partitions_q)
if(partitions_res ~= nil) then
-- Iterate queried partitions and delete them (nil is returned if there is nothing to delete)
for _, partition_info in ipairs(partitions_res) do
@ -912,6 +914,24 @@ function db_utils.clickhouseDeleteOldPartitions(data_retention)
local delete_partition_res = interface.execSQLQuery(delete_partition_q)
end
end
-- [2] Aggregated flows
day_aligned_retention = aggregated_retention - (aggregated_retention % 86400)
retention_yyyymmdd = os.date("%Y%m%d", day_aligned_retention)
partitions_q = string.format("SELECT DISTINCT database, table, toUInt32(partition) drop_part FROM system.parts WHERE active AND database='%s' AND table = 'aggregated_flows' AND drop_part <= %u AND drop_part > 999999", ntop.getPrefs().mysql_dbname or 'ntopng', retention_yyyymmdd)
partitions_res = interface.execSQLQuery(partitions_q)
if(partitions_res ~= nil) then
-- Iterate queried partitions and delete them (nil is returned if there is nothing to delete)
for _, partition_info in ipairs(partitions_res) do
local delete_partition_q = string.format("ALTER TABLE %s.%s DROP PARTITION '%s'",
partition_info["database"], partition_info["table"], partition_info["drop_part"])
local delete_partition_res = interface.execSQLQuery(delete_partition_q)
end
end
end
-- ########################################################