diff --git a/nfstream/classifier.py b/nfstream/classifier.py new file mode 100644 index 0000000..f10b6b4 --- /dev/null +++ b/nfstream/classifier.py @@ -0,0 +1,97 @@ +from .ndpi_bindings import ndpi, NDPI_PROTOCOL_BITMASK, ndpi_flow_struct, ndpi_protocol, ndpi_id_struct +from ctypes import pointer, memset, sizeof, cast, c_char_p, c_void_p, POINTER, c_uint8, addressof + + +class NFStreamClassifier: + def __init__(self, name): + self.name = name + + def on_flow_init(self, flow): + return + + def on_flow_update(self, packet_information, flow): + return + + def on_flow_terminate(self, flow): + return + + def on_exit(self): + return + + +class NDPIClassifier(NFStreamClassifier): + def __init__(self, name): + NFStreamClassifier.__init__(self, name) + self.mod = ndpi.ndpi_init_detection_module() + all = NDPI_PROTOCOL_BITMASK() + ndpi.ndpi_wrap_NDPI_BITMASK_SET_ALL(pointer(all)) + ndpi.ndpi_set_protocol_detection_bitmask2(self.mod, pointer(all)) + self.max_num_udp_dissected_pkts = 16 + self.max_num_tcp_dissected_pkts = 10 + + def on_flow_init(self, flow): + flow.classifiers[self.name]['ndpi_flow'] = pointer(ndpi_flow_struct()) + memset(flow.classifiers[self.name]['ndpi_flow'], 0, sizeof(ndpi_flow_struct)) + flow.classifiers[self.name]['detected_protocol'] = ndpi_protocol() + flow.classifiers[self.name]['detection_completed'] = 0 + flow.classifiers[self.name]['src_id'] = pointer(ndpi_id_struct()) + flow.classifiers[self.name]['dst_id'] = pointer(ndpi_id_struct()) + flow.classifiers[self.name]['application_name'] = '' + flow.classifiers[self.name]['category_name'] = '' + + def on_flow_update(self, packet_information, flow): + if flow.classifiers[self.name]['detection_completed'] == 0: + flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_process_packet( + self.mod, + flow.classifiers[self.name]['ndpi_flow'], + cast(cast(c_char_p(packet_information.content), c_void_p), POINTER(c_uint8)), + len(packet_information.content), + packet_information.ts, + flow.classifiers[self.name]['src_id'], + flow.classifiers[self.name]['dst_id'] + ) + valid = False + if flow.ip_protocol == 6: + valid = (flow.src_to_dst_pkts + flow.dst_to_src_pkts) > self.max_num_tcp_dissected_pkts + elif flow.ip_protocol == 17: + valid = (flow.src_to_dst_pkts + flow.dst_to_src_pkts) > self.max_num_udp_dissected_pkts + if valid or flow.classifiers[self.name]['detected_protocol'].app_protocol != 0: + if valid or flow.classifiers[self.name]['detected_protocol'].master_protocol != 91: + flow.classifiers[self.name]['detection_completed'] = 1 + if flow.classifiers[self.name]['detected_protocol'].app_protocol == 0: + flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_giveup( + self.mod, + flow.classifiers[self.name]['ndpi_flow'], + 1, + cast(addressof(c_uint8(0)), POINTER(c_uint8)) + ) + + def on_flow_terminate(self, flow): + if flow.classifiers[self.name]['detected_protocol'].app_protocol == 0: + flow.classifiers[self.name]['detected_protocol'] = ndpi.ndpi_detection_giveup( + self.mod, + flow.classifiers[self.name]['ndpi_flow'], + 1, + cast(addressof(c_uint8(0)), POINTER(c_uint8)) + ) + master_name = cast(ndpi.ndpi_get_proto_name(self.mod, + flow.classifiers[self.name]['detected_protocol'].master_protocol), + c_char_p).value.decode('utf-8') + app_name = cast(ndpi.ndpi_get_proto_name(self.mod, + flow.classifiers[self.name]['detected_protocol'].app_protocol), + c_char_p).value.decode('utf-8') + category_name = cast(ndpi.ndpi_category_get_name(self.mod, flow.classifiers[self.name]['detected_protocol'].category), + c_char_p).value.decode('utf-8') + flow.classifiers[self.name]['application_name'] = master_name + '.' + app_name + flow.classifiers[self.name]['category_name'] = category_name + flow.classifiers[self.name]['app_id'] = flow.classifiers[self.name]['detected_protocol'].app_protocol + flow.classifiers[self.name]['master_id'] = flow.classifiers[self.name]['detected_protocol'].master_protocol + flow.classifiers[self.name]['ndpi_flow'] = None + # Now we do move some values to flow.metrics just to print purpose. If you are implementing your magic + # classifier, just do flow.classifiers['name_of_your_classifier]['name_of_your_feature'] + # if we move it before, it will trigger metrics callback. + flow.metrics['application_name'] = flow.classifiers[self.name]['application_name'] + flow.metrics['category_name'] = flow.classifiers[self.name]['category_name'] + + def on_exit(self): + ndpi.ndpi_exit_detection_module(self.mod) \ No newline at end of file diff --git a/nfstream/dpi.py b/nfstream/ndpi_bindings.py similarity index 100% rename from nfstream/dpi.py rename to nfstream/ndpi_bindings.py diff --git a/nfstream/streamer.py b/nfstream/streamer.py index 8f8c26d..f6aa63c 100644 --- a/nfstream/streamer.py +++ b/nfstream/streamer.py @@ -4,12 +4,9 @@ from lru import LRU # for LRU streamer management from collections import namedtuple from .observer import Observer +from .classifier import NDPIClassifier, NFStreamClassifier import socket -from .dpi import ndpi, NDPI_PROTOCOL_BITMASK, ndpi_flow_struct, ndpi_protocol, ndpi_id_struct -from ctypes import pointer, memset, sizeof, cast, c_char_p, c_void_p, POINTER, c_uint8, addressof import json -max_num_udp_dissected_pkts = 16 -max_num_tcp_dissected_pkts = 10 """ flow key structure """ FlowKey = namedtuple('FlowKey', ['ip_src', 'ip_dst', 'src_port', 'dst_port', 'ip_protocol']) @@ -47,7 +44,7 @@ def get_flow_key(pkt_info): class Flow: """ Flow entry structure """ - def __init__(self, pkt_info, streamer_metrics): + def __init__(self, pkt_info, streamer_classifiers, streamer_metrics): self.start_time = pkt_info.ts self.end_time = pkt_info.ts self.export_reason = -1 @@ -65,21 +62,18 @@ class Flow: self.dst_to_src_pkts = 0 self.src_to_dst_bytes = 0 self.dst_to_src_bytes = 0 - self.ndpi_flow = pointer(ndpi_flow_struct()) - memset(self.ndpi_flow, 0, sizeof(ndpi_flow_struct)) - self.detected_protocol = ndpi_protocol() - self.detection_completed = 0 - self.__src_id = pointer(ndpi_id_struct()) - self.__dst_id = pointer(ndpi_id_struct()) - self.application_name = '' - self.category_name = '' self.metrics = {} + self.classifiers = {} for name, metric in streamer_metrics.items(): self.metrics[name] = 0 + for name, classifier in streamer_classifiers.items(): + self.classifiers[classifier.name] = {} + classifier.on_flow_init(self) - def update(self, pkt_info, active_timeout, ndpi_info_mod, streamer_metrics): + def update(self, pkt_info, active_timeout, streamer_classifiers, streamer_metrics): """ Update a flow from a packet and return status """ if (pkt_info.ts - self.end_time) >= (active_timeout*1000): # Active Expiration + return 2 else: # We start by core management self.end_time = pkt_info.ts @@ -92,29 +86,10 @@ class Flow: self.dst_to_src_pkts += 1 self.dst_to_src_bytes += pkt_info.size pkt_info.direction = 1 - if self.detection_completed == 0: - self.detected_protocol = ndpi.ndpi_detection_process_packet(ndpi_info_mod, - self.ndpi_flow, - cast(cast(c_char_p(pkt_info.content), - c_void_p), POINTER(c_uint8)), - len(pkt_info.content), - pkt_info.ts, - self.__src_id, - self.__dst_id) - valid = False - if self.ip_protocol == 6: - valid = (self.src_to_dst_pkts + self.dst_to_src_pkts) > max_num_tcp_dissected_pkts - elif self.ip_protocol == 17: - valid = (self.src_to_dst_pkts + self.dst_to_src_pkts) > max_num_udp_dissected_pkts - if valid or self.detected_protocol.app_protocol != 0: - if valid or self.detected_protocol.master_protocol != 91: - self.detection_completed = 1 - if self.detected_protocol.app_protocol == 0: - self.detected_protocol = ndpi.ndpi_detection_giveup(ndpi_info_mod, - self.ndpi_flow, - 1, - cast(addressof(c_uint8(0)), - POINTER(c_uint8))) + + for name, classifier in streamer_classifiers.items(): + classifier.on_flow_update(pkt_info, self) + for name, metric in self.metrics.items(): self = streamer_metrics[name](pkt_info, self) return 0 @@ -130,7 +105,7 @@ class Flow: dst_to_src_pkts=self.dst_to_src_pkts, src_to_dst_bytes=self.src_to_dst_bytes, dst_to_src_bytes=self.dst_to_src_bytes, - ndpi_proto_num=str(self.detected_protocol.master_protocol) + '.' + str(self.detected_protocol.app_protocol) + ndpi_proto_num=str(self.classifiers['ndpi']['master_id']) + '.' + str(self.classifiers['ndpi']['app_id']) ) def __str__(self): @@ -143,8 +118,6 @@ class Flow: 'dst_to_src_pkts': self.dst_to_src_pkts, 'src_to_dst_bytes': self.src_to_dst_bytes, 'dst_to_src_bytes': self.dst_to_src_bytes, - 'application_name': self.application_name, - 'category_name': self.category_name, 'start_time': self.start_time, 'end_time': self.end_time, 'export_reason': self.export_reason @@ -152,17 +125,13 @@ class Flow: return json.dumps({**self.metrics, **metrics}) -def initialize(ndpi_struct): - all = NDPI_PROTOCOL_BITMASK() - ndpi.ndpi_wrap_NDPI_BITMASK_SET_ALL(pointer(all)) - ndpi.ndpi_set_protocol_detection_bitmask2(ndpi_struct, pointer(all)) - - class Streamer: """ streamer for flows management """ num_streamers = 0 - def __init__(self, source=None, capacity=128000, active_timeout=120, inactive_timeout=60, user_metrics=None): + def __init__(self, source=None, capacity=128000, active_timeout=120, inactive_timeout=60, + user_metrics=None, user_classifiers=None, enable_ndpi=True): + Streamer.num_streamers += 1 self.__exports = [] self.source = source @@ -173,11 +142,21 @@ class Streamer: self.current_flows = 0 # counter for stored flows self.current_tick = 0 # current timestamp self.processed_packets = 0 # current timestamp - self.__inspector = ndpi.ndpi_init_detection_module() - initialize(self.__inspector) - if user_metrics is None: - self.user_metrics = {} - else: + self.user_classifiers = {} + if user_classifiers is not None: + try: + classifier_iterator = iter(user_classifiers) + for classifier in classifier_iterator: + if isinstance(classifier, NFStreamClassifier): + self.user_classifiers[classifier.name] = classifier + except TypeError: + self.user_classifiers[user_classifiers.name] = user_classifiers + self.user_metrics = {} + if enable_ndpi: + ndpi_classifier = NDPIClassifier('ndpi') + self.user_classifiers[ndpi_classifier.name] = ndpi_classifier + + if user_metrics is not None: self.user_metrics = user_metrics def _get_capacity(self): @@ -199,28 +178,15 @@ class Streamer: self.exporter(value, 2) except TypeError: remaining_flows = False - ndpi.ndpi_exit_detection_module(self.__inspector) + + for classifier_name, classifier in self.user_classifiers.items(): + self.user_classifiers[classifier_name].on_exit() def exporter(self, flow, trigger_type): """ export method for a flow trigger_type:0(inactive), 1(active), 2(flush) """ flow.export_reason = trigger_type - if flow.detected_protocol.app_protocol == 0: # short unidentified use caseflows - flow.detected_protocol = ndpi.ndpi_detection_giveup(self.__inspector, - flow.ndpi_flow, - 1, - cast(addressof(c_uint8(0)), - POINTER(c_uint8))) - flow.detection_completed = 1 - master_name = cast(ndpi.ndpi_get_proto_name(self.__inspector, flow.detected_protocol.master_protocol), - c_char_p).value.decode('utf-8') - app_name = cast(ndpi.ndpi_get_proto_name(self.__inspector, flow.detected_protocol.app_protocol), - c_char_p).value.decode('utf-8') - - category_name = cast(ndpi.ndpi_category_get_name(self.__inspector, flow.detected_protocol.category), - c_char_p).value.decode('utf-8') - flow.category_name = category_name - flow.application_name = master_name + '.' + app_name - flow.ndpi_flow = None + for classifier_name, classifier in self.user_classifiers.items(): + self.user_classifiers[classifier_name].on_flow_terminate(flow) del self.__flows[flow.key] self.current_flows -= 1 self.__exports.append(flow) @@ -247,16 +213,17 @@ class Streamer: self.processed_packets += 1 # increment total processed packet counter key = get_flow_key(pkt_info) if key in self.__flows: - flow_status = self.__flows[key].update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics) + flow_status = self.__flows[key].update(pkt_info, self.active_timeout, self.user_classifiers, + self.user_metrics) if flow_status == 2: self.active_watcher(key) - flow = Flow(pkt_info, self.user_metrics) + flow = Flow(pkt_info, self.user_classifiers, self.user_metrics) self.__flows[flow.key] = flow - self.__flows[flow.key].update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics) + self.__flows[flow.key].update(pkt_info, self.active_timeout, self.user_classifiers, self.user_metrics) else: self.current_flows += 1 - flow = Flow(pkt_info, self.user_metrics) - flow.update(pkt_info, self.active_timeout, self.__inspector, self.user_metrics) + flow = Flow(pkt_info, self.user_classifiers, self.user_metrics) + flow.update(pkt_info, self.active_timeout, self.user_classifiers, self.user_metrics) self.__flows[flow.key] = flow self.current_tick = flow.start_time self.inactive_watcher() diff --git a/tests.py b/tests.py index 0f8b124..6b2df6a 100644 --- a/tests.py +++ b/tests.py @@ -40,9 +40,9 @@ class TestMethods(unittest.TestCase): streamer_test = Streamer(source=file_path, capacity=64000, inactive_timeout=60000, - active_timeout=60000) + active_timeout=60000, enable_ndpi=True) test_case_name = file_path.split('/')[-1].replace('.pcap', '') - # print(test_case_name + ': ') + print(test_case_name + ': ') exports = [] for export in streamer_test: exports.append(export.debug()) @@ -50,7 +50,7 @@ class TestMethods(unittest.TestCase): exports_ground_truth = flows_from_file(file) del streamer_test self.assertEqual(exports, exports_ground_truth) - print(Fore.BLUE + 'OK' + Style.RESET_ALL) + print(Fore.BLUE + 'OK' + Style.RESET_ALL) def test_unsupported_packet(self): print("\n----------------------------------------------------------------------") @@ -123,6 +123,7 @@ class TestMethods(unittest.TestCase): active_timeout=120, user_metrics={'test_src_to_dst_pkts': test_src_to_dst_pkts}) exports = list(streamer_test) + del streamer_test for export in exports: self.assertEqual(export.src_to_dst_pkts, export.metrics['test_src_to_dst_pkts']) print('user defined metric addition: ' + Fore.BLUE + 'OK' + Style.RESET_ALL)