Implement Classifier abstract layer.

This commit is contained in:
aouinizied 2019-10-21 19:12:57 +02:00
parent bbc6196272
commit ba11caeea7
4 changed files with 143 additions and 78 deletions

97
nfstream/classifier.py Normal file
View file

@ -0,0 +1,97 @@
from .ndpi_bindings import ndpi, NDPI_PROTOCOL_BITMASK, ndpi_flow_struct, ndpi_protocol, ndpi_id_struct
from ctypes import pointer, memset, sizeof, cast, c_char_p, c_void_p, POINTER, c_uint8, addressof
class NFStreamClassifier:
def __init__(self, name):
self.name = name
def on_flow_init(self, flow):
return
def on_flow_update(self, packet_information, flow):
return
def on_flow_terminate(self, flow):
return
def on_exit(self):
return
class NDPIClassifier(NFStreamClassifier):
def __init__(self, name):
NFStreamClassifier.__init__(self, name)
self.mod = ndpi.ndpi_init_detection_module()
all = NDPI_PROTOCOL_BITMASK()
ndpi.ndpi_wrap_NDPI_BITMASK_SET_ALL(pointer(all))
ndpi.ndpi_set_protocol_detection_bitmask2(self.mod, pointer(all))
self.max_num_udp_dissected_pkts = 16
self.max_num_tcp_dissected_pkts = 10
def on_flow_init(self, flow):
flow.classifiers[self.name]['ndpi_flow'] = pointer(ndpi_flow_struct())
memset(flow.classifiers[self.name]['ndpi_flow'], 0, sizeof(ndpi_flow_struct))
flow.classifiers[self.name]['detected_protocol'] = ndpi_protocol()
flow.classifiers[self.name]['detection_completed'] = 0
flow.classifiers[self.name]['src_id'] = pointer(ndpi_id_struct())
flow.classifiers[self.name]['dst_id'] = pointer(ndpi_id_struct())
flow.classifiers[self.name]['application_name'] = ''
flow.classifiers[self.name]['category_name'] = ''
def on_flow_update(self, packet_information, flow):
if flow.classifiers[self.name]['detection_completed'] == 0:
flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_process_packet(
self.mod,
flow.classifiers[self.name]['ndpi_flow'],
cast(cast(c_char_p(packet_information.content), c_void_p), POINTER(c_uint8)),
len(packet_information.content),
packet_information.ts,
flow.classifiers[self.name]['src_id'],
flow.classifiers[self.name]['dst_id']
)
valid = False
if flow.ip_protocol == 6:
valid = (flow.src_to_dst_pkts + flow.dst_to_src_pkts) > self.max_num_tcp_dissected_pkts
elif flow.ip_protocol == 17:
valid = (flow.src_to_dst_pkts + flow.dst_to_src_pkts) > self.max_num_udp_dissected_pkts
if valid or flow.classifiers[self.name]['detected_protocol'].app_protocol != 0:
if valid or flow.classifiers[self.name]['detected_protocol'].master_protocol != 91:
flow.classifiers[self.name]['detection_completed'] = 1
if flow.classifiers[self.name]['detected_protocol'].app_protocol == 0:
flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_giveup(
self.mod,
flow.classifiers[self.name]['ndpi_flow'],
1,
cast(addressof(c_uint8(0)), POINTER(c_uint8))
)
def on_flow_terminate(self, flow):
if flow.classifiers[self.name]['detected_protocol'].app_protocol == 0:
flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_giveup(
self.mod,
flow.classifiers[self.name]['ndpi_flow'],
1,
cast(addressof(c_uint8(0)), POINTER(c_uint8))
)
master_name = cast(ndpi.ndpi_get_proto_name(self.mod,
flow.classifiers[self.name]['detected_protocol'].master_protocol),
c_char_p).value.decode('utf-8')
app_name = cast(ndpi.ndpi_get_proto_name(self.mod,
flow.classifiers[self.name]['detected_protocol'].app_protocol),
c_char_p).value.decode('utf-8')
category_name = cast(ndpi.ndpi_category_get_name(self.mod, flow.classifiers[self.name]['detected_protocol'].category),
c_char_p).value.decode('utf-8')
flow.classifiers[self.name]['application_name'] = master_name + '.' + app_name
flow.classifiers[self.name]['category_name'] = category_name
flow.classifiers[self.name]['app_id'] = flow.classifiers[self.name]['detected_protocol'].app_protocol
flow.classifiers[self.name]['master_id'] = flow.classifiers[self.name]['detected_protocol'].master_protocol
flow.classifiers[self.name]['ndpi_flow'] = None
# Now we do move some values to flow.metrics just to print purpose. If you are implementing your magic
# classifier, just do flow.classifiers['name_of_your_classifier]['name_of_your_feature']
# if we move it before, it will trigger metrics callback.
flow.metrics['application_name'] = flow.classifiers[self.name]['application_name']
flow.metrics['category_name'] = flow.classifiers[self.name]['category_name']
def on_exit(self):
ndpi.ndpi_exit_detection_module(self.mod)

View file

@ -4,12 +4,9 @@
from lru import LRU # for LRU streamer management
from collections import namedtuple
from .observer import Observer
from .classifier import NDPIClassifier, NFStreamClassifier
import socket
from .dpi import ndpi, NDPI_PROTOCOL_BITMASK, ndpi_flow_struct, ndpi_protocol, ndpi_id_struct
from ctypes import pointer, memset, sizeof, cast, c_char_p, c_void_p, POINTER, c_uint8, addressof
import json
max_num_udp_dissected_pkts = 16
max_num_tcp_dissected_pkts = 10
""" flow key structure """
FlowKey = namedtuple('FlowKey', ['ip_src', 'ip_dst', 'src_port', 'dst_port', 'ip_protocol'])
@ -47,7 +44,7 @@ def get_flow_key(pkt_info):
class Flow:
""" Flow entry structure """
def __init__(self, pkt_info, streamer_metrics):
def __init__(self, pkt_info, streamer_classifiers, streamer_metrics):
self.start_time = pkt_info.ts
self.end_time = pkt_info.ts
self.export_reason = -1
@ -65,21 +62,18 @@ class Flow:
self.dst_to_src_pkts = 0
self.src_to_dst_bytes = 0
self.dst_to_src_bytes = 0
self.ndpi_flow = pointer(ndpi_flow_struct())
memset(self.ndpi_flow, 0, sizeof(ndpi_flow_struct))
self.detected_protocol = ndpi_protocol()
self.detection_completed = 0
self.__src_id = pointer(ndpi_id_struct())
self.__dst_id = pointer(ndpi_id_struct())
self.application_name = ''
self.category_name = ''
self.metrics = {}
self.classifiers = {}
for name, metric in streamer_metrics.items():
self.metrics[name] = 0
for name, classifier in streamer_classifiers.items():
self.classifiers[classifier.name] = {}
classifier.on_flow_init(self)
def update(self, pkt_info, active_timeout, ndpi_info_mod, streamer_metrics):
def update(self, pkt_info, active_timeout, streamer_classifiers, streamer_metrics):
""" Update a flow from a packet and return status """
if (pkt_info.ts - self.end_time) >= (active_timeout*1000): # Active Expiration
return 2
else: # We start by core management
self.end_time = pkt_info.ts
@ -92,29 +86,10 @@ class Flow:
self.dst_to_src_pkts += 1
self.dst_to_src_bytes += pkt_info.size
pkt_info.direction = 1
if self.detection_completed == 0:
self.detected_protocol = ndpi.ndpi_detection_process_packet(ndpi_info_mod,
self.ndpi_flow,
cast(cast(c_char_p(pkt_info.content),
c_void_p), POINTER(c_uint8)),
len(pkt_info.content),
pkt_info.ts,
self.__src_id,
self.__dst_id)
valid = False
if self.ip_protocol == 6:
valid = (self.src_to_dst_pkts + self.dst_to_src_pkts) > max_num_tcp_dissected_pkts
elif self.ip_protocol == 17:
valid = (self.src_to_dst_pkts + self.dst_to_src_pkts) > max_num_udp_dissected_pkts
if valid or self.detected_protocol.app_protocol != 0:
if valid or self.detected_protocol.master_protocol != 91:
self.detection_completed = 1
if self.detected_protocol.app_protocol == 0:
self.detected_protocol = ndpi.ndpi_detection_giveup(ndpi_info_mod,
self.ndpi_flow,
1,
cast(addressof(c_uint8(0)),
POINTER(c_uint8)))
for name, classifier in streamer_classifiers.items():
classifier.on_flow_update(pkt_info, self)
for name, metric in self.metrics.items():
self = streamer_metrics[name](pkt_info, self)
return 0
@ -130,7 +105,7 @@ class Flow:
dst_to_src_pkts=self.dst_to_src_pkts,
src_to_dst_bytes=self.src_to_dst_bytes,
dst_to_src_bytes=self.dst_to_src_bytes,
ndpi_proto_num=str(self.detected_protocol.master_protocol) + '.' + str(self.detected_protocol.app_protocol)
ndpi_proto_num=str(self.classifiers['ndpi']['master_id']) + '.' + str(self.classifiers['ndpi']['app_id'])
)
def __str__(self):
@ -143,8 +118,6 @@ class Flow:
'dst_to_src_pkts': self.dst_to_src_pkts,
'src_to_dst_bytes': self.src_to_dst_bytes,
'dst_to_src_bytes': self.dst_to_src_bytes,
'application_name': self.application_name,
'category_name': self.category_name,
'start_time': self.start_time,
'end_time': self.end_time,
'export_reason': self.export_reason
@ -152,17 +125,13 @@ class Flow:
return json.dumps({**self.metrics, **metrics})
def initialize(ndpi_struct):
all = NDPI_PROTOCOL_BITMASK()
ndpi.ndpi_wrap_NDPI_BITMASK_SET_ALL(pointer(all))
ndpi.ndpi_set_protocol_detection_bitmask2(ndpi_struct, pointer(all))
class Streamer:
""" streamer for flows management """
num_streamers = 0
def __init__(self, source=None, capacity=128000, active_timeout=120, inactive_timeout=60, user_metrics=None):
def __init__(self, source=None, capacity=128000, active_timeout=120, inactive_timeout=60,
user_metrics=None, user_classifiers=None, enable_ndpi=True):
Streamer.num_streamers += 1
self.__exports = []
self.source = source
@ -173,11 +142,21 @@ class Streamer:
self.current_flows = 0 # counter for stored flows
self.current_tick = 0 # current timestamp
self.processed_packets = 0 # current timestamp
self.__inspector = ndpi.ndpi_init_detection_module()
initialize(self.__inspector)
if user_metrics is None:
self.user_metrics = {}
else:
self.user_classifiers = {}
if user_classifiers is not None:
try:
classifier_iterator = iter(user_classifiers)
for classifier in classifier_iterator:
if isinstance(classifier, NFStreamClassifier):
self.user_classifiers[classifier.name] = classifier
except TypeError:
self.user_classifiers[user_classifiers.name] = user_classifiers
self.user_metrics = {}
if enable_ndpi:
ndpi_classifier = NDPIClassifier('ndpi')
self.user_classifiers[ndpi_classifier.name] = ndpi_classifier
if user_metrics is not None:
self.user_metrics = user_metrics
def _get_capacity(self):
@ -199,28 +178,15 @@ class Streamer:
self.exporter(value, 2)
except TypeError:
remaining_flows = False
ndpi.ndpi_exit_detection_module(self.__inspector)
for classifier_name, classifier in self.user_classifiers.items():
self.user_classifiers[classifier_name].on_exit()
def exporter(self, flow, trigger_type):
""" export method for a flow trigger_type:0(inactive), 1(active), 2(flush) """
flow.export_reason = trigger_type
if flow.detected_protocol.app_protocol == 0: # short unidentified use caseflows
flow.detected_protocol = ndpi.ndpi_detection_giveup(self.__inspector,
flow.ndpi_flow,
1,
cast(addressof(c_uint8(0)),
POINTER(c_uint8)))
flow.detection_completed = 1
master_name = cast(ndpi.ndpi_get_proto_name(self.__inspector, flow.detected_protocol.master_protocol),
c_char_p).value.decode('utf-8')
app_name = cast(ndpi.ndpi_get_proto_name(self.__inspector, flow.detected_protocol.app_protocol),
c_char_p).value.decode('utf-8')
category_name = cast(ndpi.ndpi_category_get_name(self.__inspector, flow.detected_protocol.category),
c_char_p).value.decode('utf-8')
flow.category_name = category_name
flow.application_name = master_name + '.' + app_name
flow.ndpi_flow = None
for classifier_name, classifier in self.user_classifiers.items():
self.user_classifiers[classifier_name].on_flow_terminate(flow)
del self.__flows[flow.key]
self.current_flows -= 1
self.__exports.append(flow)
@ -247,16 +213,17 @@ class Streamer:
self.processed_packets += 1 # increment total processed packet counter
key = get_flow_key(pkt_info)
if key in self.__flows:
flow_status = self.__flows[key].update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics)
flow_status = self.__flows[key].update(pkt_info, self.active_timeout, self.user_classifiers,
self.user_metrics)
if flow_status == 2:
self.active_watcher(key)
flow = Flow(pkt_info, self.user_metrics)
flow = Flow(pkt_info, self.user_classifiers, self.user_metrics)
self.__flows[flow.key] = flow
self.__flows[flow.key].update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics)
self.__flows[flow.key].update(pkt_info, self.active_timeout, self.user_classifiers, self.user_metrics)
else:
self.current_flows += 1
flow = Flow(pkt_info, self.user_metrics)
flow.update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics)
flow = Flow(pkt_info, self.user_classifiers, self.user_metrics)
flow.update(pkt_info, self.active_timeout, self.user_classifiers, self.user_metrics)
self.__flows[flow.key] = flow
self.current_tick = flow.start_time
self.inactive_watcher()

View file

@ -40,9 +40,9 @@ class TestMethods(unittest.TestCase):
streamer_test = Streamer(source=file_path,
capacity=64000,
inactive_timeout=60000,
active_timeout=60000)
active_timeout=60000, enable_ndpi=True)
test_case_name = file_path.split('/')[-1].replace('.pcap', '')
# print(test_case_name + ': ')
print(test_case_name + ': ')
exports = []
for export in streamer_test:
exports.append(export.debug())
@ -50,7 +50,7 @@ class TestMethods(unittest.TestCase):
exports_ground_truth = flows_from_file(file)
del streamer_test
self.assertEqual(exports, exports_ground_truth)
print(Fore.BLUE + 'OK' + Style.RESET_ALL)
print(Fore.BLUE + 'OK' + Style.RESET_ALL)
def test_unsupported_packet(self):
print("\n----------------------------------------------------------------------")
@ -123,6 +123,7 @@ class TestMethods(unittest.TestCase):
active_timeout=120,
user_metrics={'test_src_to_dst_pkts': test_src_to_dst_pkts})
exports = list(streamer_test)
del streamer_test
for export in exports:
self.assertEqual(export.src_to_dst_pkts, export.metrics['test_src_to_dst_pkts'])
print('user defined metric addition: ' + Fore.BLUE + 'OK' + Style.RESET_ALL)