mirror of
https://github.com/vel21ripn/nDPI.git
synced 2026-05-01 16:30:17 +00:00
101 lines
4.2 KiB
Python
101 lines
4.2 KiB
Python
"""
|
|
------------------------------------------------------------------------------------------------------------------------
|
|
ndpi.py
|
|
Copyright (C) 2011-22 - ntop.org
|
|
This file is part of nDPI, an open source deep packet inspection library.
|
|
nDPI is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
|
|
License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
|
|
version.
|
|
nDPI is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
|
|
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
|
|
You should have received a copy of the GNU Lesser General Public License along with NFStream.
|
|
If not, see <http://www.gnu.org/licenses/>.
|
|
------------------------------------------------------------------------------------------------------------------------
|
|
"""
|
|
|
|
from collections import namedtuple
|
|
from _ndpi import ffi, lib
|
|
|
|
|
|
ndpi_protocol = namedtuple('NDPIProtocol', ['C',
|
|
'master_protocol',
|
|
'app_protocol',
|
|
'category'])
|
|
|
|
ndpi_confidence = namedtuple('NDPIConfidence', ['id',
|
|
'name'])
|
|
|
|
|
|
class NDPI(object):
|
|
__slots__ = ("_api_version",
|
|
"_revision",
|
|
"_detection_module")
|
|
|
|
def __init__(self):
|
|
self._detection_module = lib.ndpi_init_detection_module(0)
|
|
if self._detection_module == ffi.NULL:
|
|
raise MemoryError("Unable to instantiate NDPI object")
|
|
lib.ndpi_py_setup_detection_module(self._detection_module)
|
|
|
|
@property
|
|
def api_version(self):
|
|
return lib.ndpi_get_api_version()
|
|
|
|
@property
|
|
def revision(self):
|
|
return ffi.string(lib.ndpi_revision()).decode('utf-8', errors='ignore')
|
|
|
|
def process_packet(self, flow, packet, packet_time_ms):
|
|
p = lib.ndpi_detection_process_packet(self._detection_module,
|
|
flow._C,
|
|
packet,
|
|
len(packet),
|
|
int(packet_time_ms))
|
|
return ndpi_protocol(C=p,
|
|
master_protocol=p.master_protocol,
|
|
app_protocol=p.app_protocol,
|
|
category=p.category)
|
|
|
|
def giveup(self, flow, enable_guess=True):
|
|
p = lib.ndpi_detection_giveup(self._detection_module,
|
|
flow._C,
|
|
enable_guess,
|
|
ffi.new("uint8_t*", 0))
|
|
return ndpi_protocol(C=p,
|
|
master_protocol=p.master_protocol,
|
|
app_protocol=p.app_protocol,
|
|
category=p.category)
|
|
|
|
def protocol_name(self, protocol):
|
|
buf = ffi.new("char[40]")
|
|
lib.ndpi_protocol2name(self._detection_module, protocol.C, buf, ffi.sizeof(buf))
|
|
return ffi.string(buf).decode('utf-8', errors='ignore')
|
|
|
|
def protocol_category_name(self, protocol):
|
|
return ffi.string(lib.ndpi_category_get_name(self._detection_module,
|
|
protocol.C.category)).decode('utf-8',
|
|
errors='ignore')
|
|
|
|
def __del__(self):
|
|
if self._detection_module != ffi.NULL:
|
|
lib.ndpi_exit_detection_module(self._detection_module)
|
|
|
|
|
|
class NDPIFlow(object):
|
|
__slots__ = ("_C")
|
|
|
|
@property
|
|
def confidence(self):
|
|
confidence = self._C.confidence
|
|
return ndpi_confidence(id=confidence,
|
|
name=ffi.string(lib.ndpi_confidence_get_name(confidence)).decode('utf-8',
|
|
errors='ignore'))
|
|
|
|
def __init__(self):
|
|
self._C = lib.ndpi_py_initialize_flow()
|
|
|
|
def __del__(self):
|
|
if self._C != ffi.NULL:
|
|
lib.ndpi_flow_free(self._C)
|
|
self._C = ffi.NULL
|
|
|