ability to stop extractions while processing, time cap for pcap (1m by default)

This commit is contained in:
Alfredo Cardigliano 2018-10-19 10:24:30 +02:00
parent 9f8a70e279
commit 507fa3e972
8 changed files with 95 additions and 12 deletions

View file

@ -13,6 +13,7 @@ local n2disk_ctl = "/usr/local/bin/n2diskctl"
local ntopng_config_tool = "/usr/bin/ntopng-utils-manage-config"
local n2disk_ctl_cmd = "sudo "..n2disk_ctl
local extraction_queue_key = "ntopng.traffic_recording.extraction_queue"
local extraction_stop_queue_key = "ntopng.traffic_recording.extraction_stop_queue"
local extraction_seqnum_key = "ntopng.traffic_recording.extraction_seqnum"
local extraction_jobs_key = "ntopng.traffic_recording.extraction_jobs"
@ -228,6 +229,7 @@ function recording_utils.createConfig(ifid, params)
local defaults = {
buffer_size = 1024, -- Buffer size (MB)
max_file_size = 256, -- Max file length (MB)
max_file_duration = 60, -- Max file duration (sec)
max_disk_space = recording_utils.default_disk_space, -- Max disk space (MB)
snaplen = 1536, -- Capture length
writer_core = 0, -- Writer thread affinity
@ -327,6 +329,7 @@ function recording_utils.createConfig(ifid, params)
f:write("--timeline-dir="..timeline_path.."\n")
f:write("--buffer-len="..config.buffer_size.."\n")
f:write("--max-file-len="..config.max_file_size.."\n")
f:write("--max-file-duration="..config.max_file_duration.."\n")
f:write("--disk-limit="..config.max_disk_space.."\n")
f:write("--snaplen="..config.snaplen.."\n")
f:write("--writer-cpu-affinity="..config.writer_core.."\n")
@ -428,10 +431,10 @@ end
function recording_utils.deleteJob(job_id)
local job_json = ntop.getHashCache(extraction_jobs_key, job_id)
if not isEmptyString(job_json) then
ntop.delHashCache(extraction_jobs_key, tostring(job_id))
local job = json.decode(job_json)
local dir_path = getPcapFileDir(job.id, job.ifid)
ntop.rmdir(dir_path)
ntop.delHashCache(extraction_jobs_key, tostring(job_id))
end
end
@ -468,6 +471,11 @@ function recording_utils.getExtractionJobs(ifid)
return jobs
end
-- Stop an extraction
function recording_utils.stopJob(job_id)
ntop.rpushCache(extraction_stop_queue_key, job_id)
end
-- Schedule an extraction
-- Note: 'params' should contain 'time_from', 'time_to', 'filter'
-- 'time_*' format is epoch (number)
@ -523,21 +531,40 @@ local function setJobAsCompleted()
end
function recording_utils.checkExtractionJobs()
if not ntop.isExtractionRunning() then
-- stop extractions for stopped jobs, if any
if ntop.isExtractionRunning() then
local id = ntop.lpopCache(extraction_stop_queue_key)
if not isEmptyString(id) then
local job_json = ntop.getHashCache(extraction_jobs_key, id)
if not isEmptyString(job_json) then
local job = json.decode(job_json)
ntop.stopExtraction(job.id)
job.status = 'stopped'
ntop.setHashCache(extraction_jobs_key, job.id, json.encode(job))
end
end
end
if not ntop.isExtractionRunning() then
-- set the previous job as completed, if any
setJobAsCompleted()
-- run a new extraction job
local id = ntop.lpopCache(extraction_queue_key)
if not isEmptyString(id) then
local job_json = ntop.getHashCache(extraction_jobs_key, id)
local job = json.decode(job_json)
if not isEmptyString(job_json) then
local job = json.decode(job_json)
ntop.runExtraction(job.id, tonumber(job.ifid), tonumber(job.time_from), tonumber(job.time_to), job.filter)
ntop.runExtraction(job.id, tonumber(job.ifid), tonumber(job.time_from), tonumber(job.time_to), job.filter)
job.status = 'processing'
ntop.setHashCache(extraction_jobs_key, job.id, json.encode(job))
job.status = 'processing'
ntop.setHashCache(extraction_jobs_key, job.id, json.encode(job))
end
end
end
end