From 5481132981548dcdd97716f36958403634504ef4 Mon Sep 17 00:00:00 2001 From: Zied Aouini Date: Fri, 10 May 2024 18:46:03 +0200 Subject: [PATCH] black formatting --- browser/extension_server.py | 63 ++-- examples/csv_generator.py | 9 +- examples/flow_printer.py | 8 +- examples/wfeatures_pandas.py | 7 +- generate_results.py | 28 +- nfstream/__init__.py | 4 +- nfstream/anonymizer.py | 26 +- nfstream/engine/engine.py | 40 ++- nfstream/engine/engine_build.py | 145 +++++--- nfstream/flow.py | 539 +++++++++++++++++------------ nfstream/meter.py | 386 +++++++++++++++++---- nfstream/plugin.py | 3 +- nfstream/plugins/dhcp.py | 26 +- nfstream/plugins/mdns.py | 5 +- nfstream/plugins/slicer.py | 3 +- nfstream/plugins/splt.py | 5 +- nfstream/plugins/wfeatures.py | 20 +- nfstream/streamer.py | 326 ++++++++++++------ nfstream/system.py | 51 +-- nfstream/utils.py | 60 ++-- setup.py | 63 ++-- tests.py | 583 ++++++++++++++++++++++++-------- 22 files changed, 1641 insertions(+), 759 deletions(-) diff --git a/browser/extension_server.py b/browser/extension_server.py index 4ff7b9b..01b7cec 100644 --- a/browser/extension_server.py +++ b/browser/extension_server.py @@ -19,45 +19,55 @@ import json import sys -NFRequest = namedtuple('NFRequest', ['browser', - 'timestamp', - 'remote_ip', - 'tab_id', - 'request_id', - 'tab_is_active', - 'tab_url']) +NFRequest = namedtuple( + "NFRequest", + [ + "browser", + "timestamp", + "remote_ip", + "tab_id", + "request_id", + "tab_is_active", + "tab_url", + ], +) class NFRequestHandler(BaseHTTPRequestHandler): - """ Handler for HTTP request from browser extension """ + """Handler for HTTP request from browser extension""" + def _set_headers(self): - """ headers setter """ - self.send_header('Access-Control-Allow-Origin', '*') - self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') + """headers setter""" + self.send_header("Access-Control-Allow-Origin", "*") + self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") self.send_header("Access-Control-Allow-Headers", "X-Requested-With") self.send_header("Access-Control-Allow-Headers", "content-type") self.end_headers() def do_OPTIONS(self): - """ OPTIONS handler """ + """OPTIONS handler""" self.send_response(200) self._set_headers() def do_POST(self): - """ POST handler """ - if self.path.endswith('.json') and (self.path.startswith('/nfstream-chrome') or - self.path.startswith('/nfstream-firefox')): - length = self.headers['content-length'] + """POST handler""" + if self.path.endswith(".json") and ( + self.path.startswith("/nfstream-chrome") + or self.path.startswith("/nfstream-firefox") + ): + length = self.headers["content-length"] data = json.loads(self.rfile.read(int(length))) self.send_response(200) self._set_headers() - request = NFRequest(data["browser"], - float(data["timestamp"]), - data["ip_address"], - data["tab_id"], - data["req_id"], - data["tab_is_active"], - data["tab_url"]) + request = NFRequest( + data["browser"], + float(data["timestamp"]), + data["ip_address"], + data["tab_id"], + data["req_id"], + data["tab_is_active"], + data["tab_url"], + ) # For sake of brevity, we print it only print(request) @@ -72,19 +82,20 @@ class NFRequestHandler(BaseHTTPRequestHandler): class NFRequestServer(HTTPServer): - """ NFRequest HTTP server""" + """NFRequest HTTP server""" + def __init__(self, *args): HTTPServer.__init__(self, *args) self.stopped = False # self.channel = channel -if __name__ == '__main__': # Mandatory if you are running on Windows Platform +if __name__ == "__main__": # Mandatory if you are running on Windows Platform try: port = int(sys.argv[1]) except IndexError: # not specified port = 28314 - server_address = ('', port) # localhost with configurable port + server_address = ("", port) # localhost with configurable port server = NFRequestServer(server_address, NFRequestHandler) try: while not server.stopped: diff --git a/examples/csv_generator.py b/examples/csv_generator.py index f9f5014..9e93c39 100644 --- a/examples/csv_generator.py +++ b/examples/csv_generator.py @@ -17,10 +17,9 @@ from nfstream import NFStreamer import sys -if __name__ == '__main__': # Mandatory if you are running on Windows Platform +if __name__ == "__main__": # Mandatory if you are running on Windows Platform path = sys.argv[1] print("nfstream processing started. Use Ctrl+C to interrupt and save.") - total_flows = NFStreamer(source=path, - statistical_analysis=True, - splt_analysis=10, - performance_report=1).to_csv() + total_flows = NFStreamer( + source=path, statistical_analysis=True, splt_analysis=10, performance_report=1 + ).to_csv() diff --git a/examples/flow_printer.py b/examples/flow_printer.py index 4838e84..5337141 100644 --- a/examples/flow_printer.py +++ b/examples/flow_printer.py @@ -17,16 +17,16 @@ from nfstream import NFStreamer import sys -if __name__ == '__main__': # Mandatory if you are running on Windows Platform +if __name__ == "__main__": # Mandatory if you are running on Windows Platform input_filepaths = [] for path in sys.argv[1:]: input_filepaths.append(path) if len(input_filepaths) == 1: # Single file / Interface input_filepaths = input_filepaths[0] - flow_streamer = NFStreamer(source=input_filepaths, - statistical_analysis=False, - idle_timeout=1) + flow_streamer = NFStreamer( + source=input_filepaths, statistical_analysis=False, idle_timeout=1 + ) result = {} try: for flow in flow_streamer: diff --git a/examples/wfeatures_pandas.py b/examples/wfeatures_pandas.py index 172c893..3e29cda 100644 --- a/examples/wfeatures_pandas.py +++ b/examples/wfeatures_pandas.py @@ -23,11 +23,12 @@ except ImportError: sys.exit(1) - -if __name__ == '__main__': # Mandatory if you are running on Windows Platform +if __name__ == "__main__": # Mandatory if you are running on Windows Platform path = sys.argv[1] print("nfstream processing started. Use Ctrl+C to interrupt and save.") - streamer = NFStreamer(source=path, active_timeout=41, udps=WFPlugin(active_timeout=41, levels=12)) + streamer = NFStreamer( + source=path, active_timeout=41, udps=WFPlugin(active_timeout=41, levels=12) + ) print("Converting to pandas...") df = streamer.to_pandas() print("Dataframe: ") diff --git a/generate_results.py b/generate_results.py index dade840..dfc16f7 100644 --- a/generate_results.py +++ b/generate_results.py @@ -24,22 +24,26 @@ def get_files_list(path): files = [] for r, d, f in os.walk(path): for file in f: - if '.pcap' == file[-5:] or ".pcapng" == file[-7:]: # Pick out only pcaps files + if ( + ".pcap" == file[-5:] or ".pcapng" == file[-7:] + ): # Pick out only pcaps files files.append(os.path.join(r, file)) files.sort() return files -if __name__ == '__main__': # Mandatory if you are running on Windows Platform +if __name__ == "__main__": # Mandatory if you are running on Windows Platform pcap_files = get_files_list(os.path.join("tests", "pcaps")) for pcap_file in tqdm(pcap_files): - df = NFStreamer(source=pcap_file, n_dissections=20, n_meters=1).to_pandas()[["id", - "bidirectional_packets", - "bidirectional_bytes", - "application_name", - "application_category_name", - "application_is_guessed", - "application_confidence"]] - df.to_csv(pcap_file.replace("pcaps", - "results"), - index=False) + df = NFStreamer(source=pcap_file, n_dissections=20, n_meters=1).to_pandas()[ + [ + "id", + "bidirectional_packets", + "bidirectional_bytes", + "application_name", + "application_category_name", + "application_is_guessed", + "application_confidence", + ] + ] + df.to_csv(pcap_file.replace("pcaps", "results"), index=False) diff --git a/nfstream/__init__.py b/nfstream/__init__.py index cc661cb..6e63387 100644 --- a/nfstream/__init__.py +++ b/nfstream/__init__.py @@ -20,5 +20,5 @@ from .plugin import NFPlugin # streamer module is the core module of nfstream package. __author__ = """Zied Aouini""" -__email__ = 'aouinizied@gmail.com' -__version__ = '6.5.4' +__email__ = "aouinizied@gmail.com" +__version__ = "6.5.4" diff --git a/nfstream/anonymizer.py b/nfstream/anonymizer.py index 736f7f0..fac9a57 100644 --- a/nfstream/anonymizer.py +++ b/nfstream/anonymizer.py @@ -19,14 +19,12 @@ import secrets class NFAnonymizer(object): """ - NFAnonymizer: NFStream anonymization implementation. - Anonymizer is initiated at each time to_csv or to_pandas is called with a random secret key (64 bytes). - Each specified column is anonymized using blake2b algorithm (digest_size: 64 bytes). + NFAnonymizer: NFStream anonymization implementation. + Anonymizer is initiated at each time to_csv or to_pandas is called with a random secret key (64 bytes). + Each specified column is anonymized using blake2b algorithm (digest_size: 64 bytes). """ - __slots__ = ('_secret', - '_cols_names', - '_cols_index', - "_enabled") + + __slots__ = ("_secret", "_cols_names", "_cols_index", "_enabled") def __init__(self, cols_names): self._secret = secrets.token_bytes(64) @@ -38,19 +36,23 @@ class NFAnonymizer(object): def process(self, flow): if self._enabled: - if self._cols_index is None: # First flow, we extract indexes of cols to anonymize. + if ( + self._cols_index is None + ): # First flow, we extract indexes of cols to anonymize. self._cols_index = [] for col_name in self._cols_names: keys = flow.keys() try: self._cols_index.append(keys.index(col_name)) except ValueError: - print("WARNING: NFlow do not have {} attribute. Skipping anonymization.") + print( + "WARNING: NFlow do not have {} attribute. Skipping anonymization." + ) values = flow.values() for col_idx in self._cols_index: if values[col_idx] is not None: - values[col_idx] = blake2b(str(values[col_idx]).encode(), - digest_size=64, - key=self._secret).hexdigest() + values[col_idx] = blake2b( + str(values[col_idx]).encode(), digest_size=64, key=self._secret + ).hexdigest() return values return flow.values() diff --git a/nfstream/engine/engine.py b/nfstream/engine/engine.py index 76cd450..bb07315 100644 --- a/nfstream/engine/engine.py +++ b/nfstream/engine/engine.py @@ -16,36 +16,48 @@ If not, see . from _lib_engine import ffi, lib -def setup_capture(ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size): - capture = lib.capture_open(bytes(source, 'utf-8'), int(mode), error_child, socket_buffer_size) +def setup_capture( + ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size +): + capture = lib.capture_open( + bytes(source, "utf-8"), int(mode), error_child, socket_buffer_size + ) if capture == ffi.NULL: return - fanout_set_failed = lib.capture_set_fanout(capture, int(mode), error_child, group_id) + fanout_set_failed = lib.capture_set_fanout( + capture, int(mode), error_child, group_id + ) if fanout_set_failed: return timeout_set_failed = lib.capture_set_timeout(capture, int(mode), error_child) if timeout_set_failed: return - promisc_set_failed = lib.capture_set_promisc(capture, int(mode), error_child, int(promisc)) + promisc_set_failed = lib.capture_set_promisc( + capture, int(mode), error_child, int(promisc) + ) if promisc_set_failed: return - snaplen_set_failed = lib.capture_set_snaplen(capture, int(mode), error_child, snaplen) + snaplen_set_failed = lib.capture_set_snaplen( + capture, int(mode), error_child, snaplen + ) if snaplen_set_failed: return return capture def setup_filter(capture, lib, error_child, bpf_filter): - """ Compile and setup BPF filter """ + """Compile and setup BPF filter""" if bpf_filter is not None: - filter_set_failed = lib.capture_set_filter(capture, bytes(bpf_filter, 'utf-8'), error_child) + filter_set_failed = lib.capture_set_filter( + capture, bytes(bpf_filter, "utf-8"), error_child + ) if filter_set_failed: return False return True def activate_capture(capture, lib, error_child, bpf_filter, mode): - """ Capture activation function """ + """Capture activation function""" activation_failed = lib.capture_activate(capture, int(mode), error_child) if activation_failed: return False @@ -53,7 +65,7 @@ def activate_capture(capture, lib, error_child, bpf_filter, mode): def setup_dissector(ffi, lib, n_dissections): - """ Setup dissector according to n_dissections value """ + """Setup dissector according to n_dissections value""" if n_dissections: # Dissection activated # Check that headers and loaded library match and initiate dissector. checker = ffi.new("struct dissector_checker *") @@ -70,13 +82,13 @@ def setup_dissector(ffi, lib, n_dissections): def is_interface(val): - """ Check if val is a valid interface name and return it if true else None """ - intf = lib.capture_get_interface(val.encode('ascii')) + """Check if val is a valid interface name and return it if true else None""" + intf = lib.capture_get_interface(val.encode("ascii")) if intf == ffi.NULL: return None - return ffi.string(intf).decode('ascii', 'ignore') + return ffi.string(intf).decode("ascii", "ignore") def create_engine(): - """ engine creation function, return the loaded native nfstream engine and it's ffi interface""" - return ffi, lib \ No newline at end of file + """engine creation function, return the loaded native nfstream engine and it's ffi interface""" + return ffi, lib diff --git a/nfstream/engine/engine_build.py b/nfstream/engine/engine_build.py index 0411e47..c3bf6a9 100644 --- a/nfstream/engine/engine_build.py +++ b/nfstream/engine/engine_build.py @@ -20,13 +20,13 @@ import os def cdef_to_replace(cdef): - """ helper function that replaces unsupported definitions for cffi """ + """helper function that replaces unsupported definitions for cffi""" to_rep = [] cdef_list = cdef.split("static inline") for idx, sub_def in enumerate(cdef_list): end = sub_def.find("}") if end and idx: - to_rep.append(sub_def[:end+1]) + to_rep.append(sub_def[: end + 1]) to_rep.append("typedef __builtin_va_list __darwin_va_list;") to_rep.append("typedef __signed char int8_t;") to_rep.append(" __attribute__((__packed__))") @@ -36,8 +36,8 @@ def cdef_to_replace(cdef): def convert_path(p): - """ dummy path converter """ - if os.name == 'posix': + """dummy path converter""" + if os.name == "posix": return p return p.replace("/", "\\") @@ -46,7 +46,7 @@ def convert_path(p): ROOT = "" USR = "usr" USR_LOCAL = "usr/local" -if os.name != 'posix': # Windows case, we must take into account msys2 path tree. +if os.name != "posix": # Windows case, we must take into account msys2 path tree. MSYS2_PATH = os.getenv("MSYS2_PATH") # If user have custom msys2 installation if MSYS2_PATH is None: # User didn't set this path, we use default one os.environ["MSYS2_PATH"] = "C:/msys64" @@ -55,64 +55,109 @@ if os.name != 'posix': # Windows case, we must take into account msys2 path tre USR_LOCAL = "mingw64" -BUILD_SCRIPT_PATH = str(pathlib.Path(__file__).parent.resolve().joinpath("scripts").joinpath("build")) +BUILD_SCRIPT_PATH = str( + pathlib.Path(__file__).parent.resolve().joinpath("scripts").joinpath("build") +) # Patched path as it is passed to msys2 bash ENGINE_PATH = str(pathlib.Path(__file__).parent.resolve()).replace("\\", "/") -if os.name != 'posix': # Windows case +if os.name != "posix": # Windows case os.environ["MSYSTEM"] = "MINGW64" BUILD_CMD = r"""'{}'""".format(str(BUILD_SCRIPT_PATH) + "_windows.sh") - subprocess.check_call(["{msys}/usr/bin/bash".format(msys=ROOT).replace("/", "\\"), - "-l", - BUILD_CMD, - ENGINE_PATH], - shell=True) + subprocess.check_call( + [ + "{msys}/usr/bin/bash".format(msys=ROOT).replace("/", "\\"), + "-l", + BUILD_CMD, + ENGINE_PATH, + ], + shell=True, + ) else: # Linux, MacOS subprocess.check_call([str(BUILD_SCRIPT_PATH) + ".sh"], shell=True) -INCLUDE_DIRS = ["{root}/tmp/nfstream_build/{usr}/include/ndpi".format(root=ROOT, usr=USR), - "{root}/tmp/nfstream_build/{usr}/include".format(root=ROOT, usr=USR_LOCAL)] -EXTRALINK_ARGS = ["{root}/tmp/nfstream_build/{usr}/lib/libndpi.a".format(root=ROOT, usr=USR)] +INCLUDE_DIRS = [ + "{root}/tmp/nfstream_build/{usr}/include/ndpi".format(root=ROOT, usr=USR), + "{root}/tmp/nfstream_build/{usr}/include".format(root=ROOT, usr=USR_LOCAL), +] +EXTRALINK_ARGS = [ + "{root}/tmp/nfstream_build/{usr}/lib/libndpi.a".format(root=ROOT, usr=USR) +] -if os.name != 'posix': # Windows +if os.name != "posix": # Windows INCLUDE_DIRS.append("{root}/tmp/nfstream_build/npcap/Include".format(root=ROOT)) - if os.path.exists(convert_path("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR))): - EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR)) + if os.path.exists( + convert_path("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR)) + ): + EXTRALINK_ARGS.append( + "{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR) + ) else: # best effort guess - EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32")) - if os.path.exists(convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))): + EXTRALINK_ARGS.append( + "{root}/{usr}/lib/libmingwex.a".format( + root=ROOT, usr=USR + "/x86_64-w64-mingw32" + ) + ) + if os.path.exists( + convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR)) + ): EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR)) else: # best effort guess - EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32")) - with open(convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT))) as gcc_version_in: + EXTRALINK_ARGS.append( + "{root}/{usr}/lib/libmsvcrt.a".format( + root=ROOT, usr=USR + "/x86_64-w64-mingw32" + ) + ) + with open( + convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT)) + ) as gcc_version_in: GCC_VERSION = gcc_version_in.read().split("\n")[0].split(")")[-1].strip() - EXTRALINK_ARGS.append("{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format(root=ROOT, - usr=USR, - version=GCC_VERSION)) + EXTRALINK_ARGS.append( + "{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format( + root=ROOT, usr=USR, version=GCC_VERSION + ) + ) # IMPORTANT: We link with wpcap.lib from downloaded SDK in order to not bundle npcap OEM binaries. # Consequently, the generated extension will still look for these binaries on the host machine. # Instructions on how to install npcap binaries are provided in README (Windows Note). - EXTRALINK_ARGS.append("{root}/tmp/nfstream_build/npcap/Lib/x64/wpcap.lib".format(root=ROOT)) + EXTRALINK_ARGS.append( + "{root}/tmp/nfstream_build/npcap/Lib/x64/wpcap.lib".format(root=ROOT) + ) # And finally socket stuff - if os.path.exists(convert_path("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR))): + if os.path.exists( + convert_path("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR)) + ): EXTRALINK_ARGS.append("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR)) else: # best effort guess EXTRALINK_ARGS.append("{root}/usr/lib/w32api/libws2_32.a".format(root=ROOT)) else: - EXTRALINK_ARGS.append("{root}/tmp/nfstream_build/{usr}/lib/libpcap.a".format(root=ROOT, usr=USR_LOCAL)) + EXTRALINK_ARGS.append( + "{root}/tmp/nfstream_build/{usr}/lib/libpcap.a".format(root=ROOT, usr=USR_LOCAL) + ) -with open(convert_path("{root}/tmp/nfstream_build/lib_engine_cdefinitions.c".format(root=ROOT))) as engine_cdef: +with open( + convert_path( + "{root}/tmp/nfstream_build/lib_engine_cdefinitions.c".format(root=ROOT) + ) +) as engine_cdef: ENGINE_CDEF = engine_cdef.read() for to_replace in cdef_to_replace(ENGINE_CDEF): ENGINE_CDEF = ENGINE_CDEF.replace(to_replace, "") -with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(root=ROOT))) as ndpi_cdefs: +with open( + convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(root=ROOT)) +) as ndpi_cdefs: NDPI_CDEF = ndpi_cdefs.read() try: - APPLE_SILICON_DEFS = NDPI_CDEF.split("/* Generic byte swapping functions. */")[1].\ - split("/* Generic little endian to host endianess byte swapping functions. */")[0] + APPLE_SILICON_DEFS = NDPI_CDEF.split("/* Generic byte swapping functions. */")[ + 1 + ].split( + "/* Generic little endian to host endianess byte swapping functions. */" + )[ + 0 + ] NDPI_CDEF = NDPI_CDEF.replace(APPLE_SILICON_DEFS, "") except IndexError: pass @@ -121,7 +166,11 @@ with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(ro NDPI_MODULE_STRUCT_CDEF = NDPI_CDEF.split("//CFFI.NDPI_MODULE_STRUCT")[1] -with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions_packed.h".format(root=ROOT))) as ndpi_cdefs_pack: +with open( + convert_path( + "{root}/tmp/nfstream_build/ndpi_cdefinitions_packed.h".format(root=ROOT) + ) +) as ndpi_cdefs_pack: NDPI_PACKED = ndpi_cdefs_pack.read() for to_replace in cdef_to_replace(NDPI_PACKED): NDPI_PACKED = NDPI_PACKED.replace(to_replace, "") @@ -133,12 +182,17 @@ NDPI_PACKED_STRUCTURES = NDPI_PACKED.split("//CFFI.NDPI_PACKED_STRUCTURES")[1] # As cdef do not support if-def, yet we fix it by simple string replacement -SOCK_INCLUDES = """#include \n#include \n#include """ -if os.name != 'posix': +SOCK_INCLUDES = ( + """#include \n#include \n#include """ +) +if os.name != "posix": SOCK_INCLUDES = """#include \n#include \n#include """ -ENGINE_INCLUDES = """ +ENGINE_INCLUDES = ( + """ #include -""" + SOCK_INCLUDES + """ +""" + + SOCK_INCLUDES + + """ #include #include #include @@ -147,6 +201,7 @@ ENGINE_INCLUDES = """ #include #include """ +) ENGINE_SOURCE = ENGINE_INCLUDES + NDPI_MODULE_STRUCT_CDEF + ENGINE_CDEF ENGINE_APIS = """ char * capture_get_interface(char * intf_name); @@ -181,12 +236,15 @@ const char *engine_lib_pcap_version(void); ffi_builder = FFI() -ffi_builder.set_source("_lib_engine", - ENGINE_SOURCE, - include_dirs=[convert_path(d) for d in INCLUDE_DIRS], - extra_link_args=[convert_path(a) for a in EXTRALINK_ARGS]) +ffi_builder.set_source( + "_lib_engine", + ENGINE_SOURCE, + include_dirs=[convert_path(d) for d in INCLUDE_DIRS], + extra_link_args=[convert_path(a) for a in EXTRALINK_ARGS], +) -ffi_builder.cdef(""" +ffi_builder.cdef( + """ typedef uint64_t u_int64_t; typedef uint32_t u_int32_t; typedef uint16_t u_int16_t; @@ -201,7 +259,8 @@ struct in6_addr { }; struct pcap; typedef struct pcap pcap_t; -""") +""" +) ffi_builder.cdef(NDPI_PACKED_STRUCTURES, packed=True) ffi_builder.cdef(NDPI_CDEF, override=True) ffi_builder.cdef(ENGINE_CDEF.split("//CFFI_SHARED_STRUCTURES")[1]) diff --git a/nfstream/flow.py b/nfstream/flow.py index 73ddfd5..4316e49 100644 --- a/nfstream/flow.py +++ b/nfstream/flow.py @@ -18,42 +18,47 @@ from math import sqrt from .utils import NFEvent # When NFStream is extended with plugins, packer C structure is pythonized using the following namedtuple. -nf_packet = namedtuple('NFPacket', ['time', - 'delta_time', - 'direction', - 'raw_size', - 'ip_size', - 'transport_size', - 'payload_size', - 'src_ip', - 'src_mac', - 'src_oui', - 'dst_ip', - 'dst_mac', - 'dst_oui', - 'src_port', - 'dst_port', - 'protocol', - 'vlan_id', - 'ip_version', - 'ip_packet', - 'syn', - 'cwr', - 'ece', - 'urg', - 'ack', - 'psh', - 'rst', - 'fin', - 'tunnel_id']) +nf_packet = namedtuple( + "NFPacket", + [ + "time", + "delta_time", + "direction", + "raw_size", + "ip_size", + "transport_size", + "payload_size", + "src_ip", + "src_mac", + "src_oui", + "dst_ip", + "dst_mac", + "dst_oui", + "src_port", + "dst_port", + "protocol", + "vlan_id", + "ip_version", + "ip_packet", + "syn", + "cwr", + "ece", + "urg", + "ack", + "psh", + "rst", + "fin", + "tunnel_id", + ], +) class UDPS(object): - """ dummy class that add udps slot the flexibility required for extensions """ + """dummy class that add udps slot the flexibility required for extensions""" def pythonize_packet(packet, ffi, flow): - """ convert a cdata packet to a namedtuple """ + """convert a cdata packet to a namedtuple""" src_ip = flow.src_ip dst_ip = flow.dst_ip src_mac = flow.src_mac @@ -68,157 +73,179 @@ def pythonize_packet(packet, ffi, flow): src_oui = flow.dst_oui dst_oui = flow.src_oui - return nf_packet(time=packet.time, - delta_time=packet.delta_time, - direction=packet.direction, - raw_size=packet.raw_size, - ip_size=packet.ip_size, - transport_size=packet.transport_size, - payload_size=packet.payload_size, - src_ip=src_ip, - src_mac=src_mac, - src_oui=src_oui, - dst_ip=dst_ip, - dst_mac=dst_mac, - dst_oui=dst_oui, - src_port=packet.src_port, - dst_port=packet.dst_port, - protocol=packet.protocol, - vlan_id=packet.vlan_id, - ip_version=packet.ip_version, - ip_packet=bytes(ffi.buffer(packet.ip_content, packet.ip_content_len)), - syn=packet.syn, - cwr=packet.cwr, - ece=packet.ece, - urg=packet.urg, - ack=packet.ack, - psh=packet.psh, - rst=packet.rst, - fin=packet.fin, - tunnel_id=packet.tunnel_id) + return nf_packet( + time=packet.time, + delta_time=packet.delta_time, + direction=packet.direction, + raw_size=packet.raw_size, + ip_size=packet.ip_size, + transport_size=packet.transport_size, + payload_size=packet.payload_size, + src_ip=src_ip, + src_mac=src_mac, + src_oui=src_oui, + dst_ip=dst_ip, + dst_mac=dst_mac, + dst_oui=dst_oui, + src_port=packet.src_port, + dst_port=packet.dst_port, + protocol=packet.protocol, + vlan_id=packet.vlan_id, + ip_version=packet.ip_version, + ip_packet=bytes(ffi.buffer(packet.ip_content, packet.ip_content_len)), + syn=packet.syn, + cwr=packet.cwr, + ece=packet.ece, + urg=packet.urg, + ack=packet.ack, + psh=packet.psh, + rst=packet.rst, + fin=packet.fin, + tunnel_id=packet.tunnel_id, + ) class NFlow(object): """ - NFlow is NFStream representation of a network flow. - It is a slotted class for performances reasons, and slots are initiated according to NFStream detected mode. - If nfstream is used with extension, we refer to it as sync mode, and we need to update slots from C structure. - If not, nfstream will compute all configured metrics within C structure and update it only at init and expire. - Such logic allows us to provide maximum performances when running without extensions. When set with extension - we pay the cost of flexibility with attributes access/update. + NFlow is NFStream representation of a network flow. + It is a slotted class for performances reasons, and slots are initiated according to NFStream detected mode. + If nfstream is used with extension, we refer to it as sync mode, and we need to update slots from C structure. + If not, nfstream will compute all configured metrics within C structure and update it only at init and expire. + Such logic allows us to provide maximum performances when running without extensions. When set with extension + we pay the cost of flexibility with attributes access/update. """ - __slots__ = ('id', - 'expiration_id', - 'src_ip', - 'src_mac', - 'src_oui', - 'src_port', - 'dst_ip', - 'dst_mac', - 'dst_oui', - 'dst_port', - 'protocol', - 'ip_version', - 'vlan_id', - 'tunnel_id', - 'bidirectional_first_seen_ms', - 'bidirectional_last_seen_ms', - 'bidirectional_duration_ms', - 'bidirectional_packets', - 'bidirectional_bytes', - 'src2dst_first_seen_ms', - 'src2dst_last_seen_ms', - 'src2dst_duration_ms', - 'src2dst_packets', - 'src2dst_bytes', - 'dst2src_first_seen_ms', - 'dst2src_last_seen_ms', - 'dst2src_duration_ms', - 'dst2src_packets', - 'dst2src_bytes', - 'bidirectional_min_ps', - 'bidirectional_mean_ps', - 'bidirectional_stddev_ps', - 'bidirectional_max_ps', - 'src2dst_min_ps', - 'src2dst_mean_ps', - 'src2dst_stddev_ps', - 'src2dst_max_ps', - 'dst2src_min_ps', - 'dst2src_mean_ps', - 'dst2src_stddev_ps', - 'dst2src_max_ps', - 'bidirectional_min_piat_ms', - 'bidirectional_mean_piat_ms', - 'bidirectional_stddev_piat_ms', - 'bidirectional_max_piat_ms', - 'src2dst_min_piat_ms', - 'src2dst_mean_piat_ms', - 'src2dst_stddev_piat_ms', - 'src2dst_max_piat_ms', - 'dst2src_min_piat_ms', - 'dst2src_mean_piat_ms', - 'dst2src_stddev_piat_ms', - 'dst2src_max_piat_ms', - 'bidirectional_syn_packets', - 'bidirectional_cwr_packets', - 'bidirectional_ece_packets', - 'bidirectional_urg_packets', - 'bidirectional_ack_packets', - 'bidirectional_psh_packets', - 'bidirectional_rst_packets', - 'bidirectional_fin_packets', - 'src2dst_syn_packets', - 'src2dst_cwr_packets', - 'src2dst_ece_packets', - 'src2dst_urg_packets', - 'src2dst_ack_packets', - 'src2dst_psh_packets', - 'src2dst_rst_packets', - 'src2dst_fin_packets', - 'dst2src_syn_packets', - 'dst2src_cwr_packets', - 'dst2src_ece_packets', - 'dst2src_urg_packets', - 'dst2src_ack_packets', - 'dst2src_psh_packets', - 'dst2src_rst_packets', - 'dst2src_fin_packets', - 'splt_direction', - 'splt_ps', - 'splt_piat_ms', - 'application_name', - 'application_category_name', - 'application_is_guessed', - 'application_confidence', - 'requested_server_name', - 'client_fingerprint', - 'server_fingerprint', - 'user_agent', - 'content_type', - '_C', - 'udps', - 'system_process_pid', - 'system_process_name', - 'system_browser_tab') - def __init__(self, packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt, dissector, - decode_tunnels, system_visibility_mode): - self.id = NFEvent.FLOW # id set to NFLOW for internal communications and handled (incremented) by NFStreamer. + __slots__ = ( + "id", + "expiration_id", + "src_ip", + "src_mac", + "src_oui", + "src_port", + "dst_ip", + "dst_mac", + "dst_oui", + "dst_port", + "protocol", + "ip_version", + "vlan_id", + "tunnel_id", + "bidirectional_first_seen_ms", + "bidirectional_last_seen_ms", + "bidirectional_duration_ms", + "bidirectional_packets", + "bidirectional_bytes", + "src2dst_first_seen_ms", + "src2dst_last_seen_ms", + "src2dst_duration_ms", + "src2dst_packets", + "src2dst_bytes", + "dst2src_first_seen_ms", + "dst2src_last_seen_ms", + "dst2src_duration_ms", + "dst2src_packets", + "dst2src_bytes", + "bidirectional_min_ps", + "bidirectional_mean_ps", + "bidirectional_stddev_ps", + "bidirectional_max_ps", + "src2dst_min_ps", + "src2dst_mean_ps", + "src2dst_stddev_ps", + "src2dst_max_ps", + "dst2src_min_ps", + "dst2src_mean_ps", + "dst2src_stddev_ps", + "dst2src_max_ps", + "bidirectional_min_piat_ms", + "bidirectional_mean_piat_ms", + "bidirectional_stddev_piat_ms", + "bidirectional_max_piat_ms", + "src2dst_min_piat_ms", + "src2dst_mean_piat_ms", + "src2dst_stddev_piat_ms", + "src2dst_max_piat_ms", + "dst2src_min_piat_ms", + "dst2src_mean_piat_ms", + "dst2src_stddev_piat_ms", + "dst2src_max_piat_ms", + "bidirectional_syn_packets", + "bidirectional_cwr_packets", + "bidirectional_ece_packets", + "bidirectional_urg_packets", + "bidirectional_ack_packets", + "bidirectional_psh_packets", + "bidirectional_rst_packets", + "bidirectional_fin_packets", + "src2dst_syn_packets", + "src2dst_cwr_packets", + "src2dst_ece_packets", + "src2dst_urg_packets", + "src2dst_ack_packets", + "src2dst_psh_packets", + "src2dst_rst_packets", + "src2dst_fin_packets", + "dst2src_syn_packets", + "dst2src_cwr_packets", + "dst2src_ece_packets", + "dst2src_urg_packets", + "dst2src_ack_packets", + "dst2src_psh_packets", + "dst2src_rst_packets", + "dst2src_fin_packets", + "splt_direction", + "splt_ps", + "splt_piat_ms", + "application_name", + "application_category_name", + "application_is_guessed", + "application_confidence", + "requested_server_name", + "client_fingerprint", + "server_fingerprint", + "user_agent", + "content_type", + "_C", + "udps", + "system_process_pid", + "system_process_name", + "system_browser_tab", + ) + + def __init__( + self, + packet, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, + ): + self.id = ( + NFEvent.FLOW + ) # id set to NFLOW for internal communications and handled (incremented) by NFStreamer. self.expiration_id = 0 # Initialize C structure. - self._C = lib.meter_initialize_flow(packet, accounting_mode, statistics, splt, n_dissections, dissector, sync) + self._C = lib.meter_initialize_flow( + packet, accounting_mode, statistics, splt, n_dissections, dissector, sync + ) if self._C == ffi.NULL: # raise OSError in order to be handled by meter. raise OSError("Not enough memory for new flow creation.") # Here we go for the first copy in order to make defined slots available - self.src_ip = ffi.string(self._C.src_ip_str).decode('utf-8', errors='ignore') - self.src_mac = ffi.string(self._C.src_mac_str).decode('utf-8', errors='ignore') - self.src_oui = ffi.string(self._C.src_oui).decode('utf-8', errors='ignore') + self.src_ip = ffi.string(self._C.src_ip_str).decode("utf-8", errors="ignore") + self.src_mac = ffi.string(self._C.src_mac_str).decode("utf-8", errors="ignore") + self.src_oui = ffi.string(self._C.src_oui).decode("utf-8", errors="ignore") self.src_port = self._C.src_port - self.dst_ip = ffi.string(self._C.dst_ip_str).decode('utf-8', errors='ignore') - self.dst_mac = ffi.string(self._C.dst_mac_str).decode('utf-8', errors='ignore') - self.dst_oui = ffi.string(self._C.dst_oui).decode('utf-8', errors='ignore') + self.dst_ip = ffi.string(self._C.dst_ip_str).decode("utf-8", errors="ignore") + self.dst_mac = ffi.string(self._C.dst_mac_str).decode("utf-8", errors="ignore") + self.dst_oui = ffi.string(self._C.dst_oui).decode("utf-8", errors="ignore") self.dst_port = self._C.dst_port self.protocol = self._C.protocol self.ip_version = self._C.ip_version @@ -291,15 +318,29 @@ class NFlow(object): self.dst2src_fin_packets = self._C.dst2src_fin_packets if n_dissections: # Same for dissection when > 0 if sync: - self.application_name = ffi.string(self._C.application_name).decode('utf-8', errors='ignore') - self.application_category_name = ffi.string(self._C.category_name).decode('utf-8', errors='ignore') + self.application_name = ffi.string(self._C.application_name).decode( + "utf-8", errors="ignore" + ) + self.application_category_name = ffi.string( + self._C.category_name + ).decode("utf-8", errors="ignore") self.application_is_guessed = self._C.guessed self.application_confidence = self._C.confidence - self.requested_server_name = ffi.string(self._C.requested_server_name).decode('utf-8', errors='ignore') - self.client_fingerprint = ffi.string(self._C.c_hash).decode('utf-8', errors='ignore') - self.server_fingerprint = ffi.string(self._C.s_hash).decode('utf-8', errors='ignore') - self.user_agent = ffi.string(self._C.user_agent).decode('utf-8', errors='ignore') - self.content_type = ffi.string(self._C.content_type).decode('utf-8', errors='ignore') + self.requested_server_name = ffi.string( + self._C.requested_server_name + ).decode("utf-8", errors="ignore") + self.client_fingerprint = ffi.string(self._C.c_hash).decode( + "utf-8", errors="ignore" + ) + self.server_fingerprint = ffi.string(self._C.s_hash).decode( + "utf-8", errors="ignore" + ) + self.user_agent = ffi.string(self._C.user_agent).decode( + "utf-8", errors="ignore" + ) + self.content_type = ffi.string(self._C.content_type).decode( + "utf-8", errors="ignore" + ) else: self.application_name = None self.application_category_name = None @@ -324,25 +365,56 @@ class NFlow(object): if system_visibility_mode == 2: self.system_browser_tab = "" - def update(self, packet, idle_timeout, active_timeout, ffi, lib, udps, sync, accounting_mode, - n_dissections, statistics, splt, dissector): - """ NFlow update method """ + def update( + self, + packet, + idle_timeout, + active_timeout, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + ): + """NFlow update method""" # First, we update internal C structure. - ret = lib.meter_update_flow(self._C, packet, idle_timeout, active_timeout, accounting_mode, statistics, splt, - n_dissections, dissector, sync) - if ret > 0: # If update done it will be zero, idle and active are matched to 1 and 2. + ret = lib.meter_update_flow( + self._C, + packet, + idle_timeout, + active_timeout, + accounting_mode, + statistics, + splt, + n_dissections, + dissector, + sync, + ) + if ( + ret > 0 + ): # If update done it will be zero, idle and active are matched to 1 and 2. self.expiration_id = ret - 1 - return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # expire it. + return self.expire( + udps, sync, n_dissections, statistics, splt, ffi, lib, dissector + ) # expire it. if sync: # If running with Plugins self.sync(n_dissections, statistics, splt, ffi, lib, sync) # We need to copy computed values on C struct. for udp in udps: # Then call each plugin on_update entrypoint. udp.on_update(pythonize_packet(packet, ffi, self), self) - if self.expiration_id == -1: # One of the plugins set expiration to custom value (-1) - return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # Expire it. + if ( + self.expiration_id == -1 + ): # One of the plugins set expiration to custom value (-1) + return self.expire( + udps, sync, n_dissections, statistics, splt, ffi, lib, dissector + ) # Expire it. def expire(self, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector): - """ NFlow expiration method """ + """NFlow expiration method""" # Call expiration of C structure. lib.meter_expire_flow(self._C, n_dissections, dissector) # Then sync (second copy in case of non sync mode) @@ -379,34 +451,46 @@ class NFlow(object): bidirectional_packets = self.bidirectional_packets # NOTE: We need the root square of the variance to provide sample stddev (Var**0.5)/(n-1) if bidirectional_packets > 1: - self.bidirectional_stddev_ps = sqrt(self._C.bidirectional_stddev_ps/(bidirectional_packets - 1)) + self.bidirectional_stddev_ps = sqrt( + self._C.bidirectional_stddev_ps / (bidirectional_packets - 1) + ) self.bidirectional_max_ps = self._C.bidirectional_max_ps self.src2dst_min_ps = self._C.src2dst_min_ps self.src2dst_mean_ps = self._C.src2dst_mean_ps src2dst_packets = self.src2dst_packets if src2dst_packets > 1: - self.src2dst_stddev_ps = sqrt(self._C.src2dst_stddev_ps/(src2dst_packets - 1)) + self.src2dst_stddev_ps = sqrt( + self._C.src2dst_stddev_ps / (src2dst_packets - 1) + ) self.src2dst_max_ps = self._C.src2dst_max_ps self.dst2src_min_ps = self._C.dst2src_min_ps self.dst2src_mean_ps = self._C.dst2src_mean_ps dst2src_packets = self.dst2src_packets if dst2src_packets > 1: - self.dst2src_stddev_ps = sqrt(self._C.dst2src_stddev_ps / (dst2src_packets - 1)) + self.dst2src_stddev_ps = sqrt( + self._C.dst2src_stddev_ps / (dst2src_packets - 1) + ) self.dst2src_max_ps = self._C.dst2src_max_ps self.bidirectional_min_piat_ms = self._C.bidirectional_min_piat_ms self.bidirectional_mean_piat_ms = self._C.bidirectional_mean_piat_ms if bidirectional_packets > 2: - self.bidirectional_stddev_piat_ms = sqrt(self._C.bidirectional_stddev_piat_ms/(bidirectional_packets-2)) + self.bidirectional_stddev_piat_ms = sqrt( + self._C.bidirectional_stddev_piat_ms / (bidirectional_packets - 2) + ) self.bidirectional_max_piat_ms = self._C.bidirectional_max_piat_ms self.src2dst_min_piat_ms = self._C.src2dst_min_piat_ms self.src2dst_mean_piat_ms = self._C.src2dst_mean_piat_ms if src2dst_packets > 2: - self.src2dst_stddev_piat_ms = sqrt(self._C.src2dst_stddev_piat_ms/(src2dst_packets - 2)) + self.src2dst_stddev_piat_ms = sqrt( + self._C.src2dst_stddev_piat_ms / (src2dst_packets - 2) + ) self.src2dst_max_piat_ms = self._C.src2dst_max_piat_ms self.dst2src_min_piat_ms = self._C.dst2src_min_piat_ms self.dst2src_mean_piat_ms = self._C.dst2src_mean_piat_ms if dst2src_packets > 2: - self.dst2src_stddev_piat_ms = sqrt(self._C.dst2src_stddev_piat_ms/(dst2src_packets - 2)) + self.dst2src_stddev_piat_ms = sqrt( + self._C.dst2src_stddev_piat_ms / (dst2src_packets - 2) + ) self.dst2src_max_piat_ms = self._C.dst2src_max_piat_ms self.bidirectional_syn_packets = self._C.bidirectional_syn_packets self.bidirectional_cwr_packets = self._C.bidirectional_cwr_packets @@ -435,24 +519,44 @@ class NFlow(object): if n_dissections: # If dissection set (>0) # We minimize updates to a single one, when detection completed. if self._C.detection_completed < 2: - self.application_name = ffi.string(self._C.application_name).decode('utf-8', errors='ignore') - self.application_category_name = ffi.string(self._C.category_name).decode('utf-8', errors='ignore') - self.requested_server_name = ffi.string(self._C.requested_server_name).decode('utf-8', errors='ignore') - self.client_fingerprint = ffi.string(self._C.c_hash).decode('utf-8', errors='ignore') - self.server_fingerprint = ffi.string(self._C.s_hash).decode('utf-8', errors='ignore') - self.user_agent = ffi.string(self._C.user_agent).decode('utf-8', errors='ignore') - self.content_type = ffi.string(self._C.content_type).decode('utf-8', errors='ignore') + self.application_name = ffi.string(self._C.application_name).decode( + "utf-8", errors="ignore" + ) + self.application_category_name = ffi.string( + self._C.category_name + ).decode("utf-8", errors="ignore") + self.requested_server_name = ffi.string( + self._C.requested_server_name + ).decode("utf-8", errors="ignore") + self.client_fingerprint = ffi.string(self._C.c_hash).decode( + "utf-8", errors="ignore" + ) + self.server_fingerprint = ffi.string(self._C.s_hash).decode( + "utf-8", errors="ignore" + ) + self.user_agent = ffi.string(self._C.user_agent).decode( + "utf-8", errors="ignore" + ) + self.content_type = ffi.string(self._C.content_type).decode( + "utf-8", errors="ignore" + ) self.application_is_guessed = self._C.guessed self.application_confidence = self._C.confidence if splt: - if sync_mode: # Same for splt, once we reach splt limit, there is no need to sync it anymore. + if ( + sync_mode + ): # Same for splt, once we reach splt limit, there is no need to sync it anymore. if self._C.bidirectional_packets <= splt: self.splt_direction = ffi.unpack(self._C.splt_direction, splt) self.splt_ps = ffi.unpack(self._C.splt_ps, splt) self.splt_piat_ms = ffi.unpack(self._C.splt_piat_ms, splt) else: - if self._C.splt_closed == 0: # we also release the memory to keep only the obtained list. - lib.meter_free_flow(self._C, n_dissections, splt, 0) # free SPLT + if ( + self._C.splt_closed == 0 + ): # we also release the memory to keep only the obtained list. + lib.meter_free_flow( + self._C, n_dissections, splt, 0 + ) # free SPLT else: self.splt_direction = ffi.unpack(self._C.splt_direction, splt) self.splt_ps = ffi.unpack(self._C.splt_ps, splt) @@ -460,11 +564,11 @@ class NFlow(object): # Memory will be released by freer. def is_idle(self, tick, idle_timeout): - """ is_idle method to check if NFlow is idle accoring to configured timeout """ + """is_idle method to check if NFlow is idle accoring to configured timeout""" return (tick - idle_timeout) >= self._C.bidirectional_last_seen_ms def __str__(self): - """ String representation of NFlow """ + """String representation of NFlow""" started = False printable = "NFlow(" for attr_name in self.__slots__: @@ -473,27 +577,38 @@ class NFlow(object): printable += attr_name + "=" + str(getattr(self, attr_name)) started = True else: - if attr_name == 'udps': + if attr_name == "udps": for udp_name in self.udps.__dict__.keys(): - printable += ',\n ' + attr_name + '.' + udp_name + "=" + str(getattr(self.udps, - udp_name)) + printable += ( + ",\n " + + attr_name + + "." + + udp_name + + "=" + + str(getattr(self.udps, udp_name)) + ) else: - printable += ',\n ' + attr_name + "=" + str(getattr(self, attr_name)) + printable += ( + ",\n " + + attr_name + + "=" + + str(getattr(self, attr_name)) + ) except AttributeError: pass printable += ")" return printable def keys(self): - """ get NFlow keys""" + """get NFlow keys""" # Note we transform udps to udps.value_name as preprocessing for csv/pandas interfaces ret = [] for attr_name in self.__slots__: try: getattr(self, attr_name) - if attr_name == 'udps': + if attr_name == "udps": for udp_name in self.udps.__dict__.keys(): - ret.append(attr_name + '.' + udp_name) + ret.append(attr_name + "." + udp_name) else: ret.append(attr_name) except AttributeError: @@ -501,13 +616,13 @@ class NFlow(object): return ret def values(self): - """ get flow values """ + """get flow values""" # Note: same indexing as keys. ret = [] for attr_name in self.__slots__: try: attr_value = getattr(self, attr_name) - if attr_name == 'udps': + if attr_name == "udps": for udp_value in self.udps.__dict__.values(): ret.append(udp_value) else: diff --git a/nfstream/meter.py b/nfstream/meter.py index f7f6d9d..e247ad0 100644 --- a/nfstream/meter.py +++ b/nfstream/meter.py @@ -31,12 +31,13 @@ FLOW_KEY = "{}:{}:{}:{}:{}:{}:{}:{}:{}" class NFCache(OrderedDict): - """ LRU Flow Cache + """LRU Flow Cache The NFCache object is used to cache flows entries such as MRU entries are kept on the end and LRU entries will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly linked list with sentinel nodes to track order. By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout. """ + def __init__(self, *args, **kwds): super().__init__(*args, **kwds) @@ -54,20 +55,38 @@ class NFCache(OrderedDict): return next(iter(self)) -def meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, - dissector): +def meter_scan( + meter_tick, + cache, + idle_timeout, + channel, + udps, + sync, + n_dissections, + statistics, + splt, + ffi, + lib, + dissector, +): """Checks flow cache for expired flow. Expired flows are identified, added to channel and then removed from the cache. """ remaining = True # We suppose that there is something to expire scanned = 0 - while remaining and scanned < 1000: # idle scan budget (each 10ms we scan 1000 as maximum) + while ( + remaining and scanned < 1000 + ): # idle scan budget (each 10ms we scan 1000 as maximum) try: flow_key = cache.get_lru_key() # will return the LRU flow key. flow = cache[flow_key] if flow.is_idle(meter_tick, idle_timeout): # idle, expire it. - channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector)) + channel.put( + flow.expire( + udps, sync, n_dissections, statistics, splt, ffi, lib, dissector + ) + ) del cache[flow_key] del flow scanned += 1 @@ -79,52 +98,114 @@ def meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissectio def get_flow_key(src_ip, src_port, dst_ip, dst_port, protocol, vlan_id, tunnel_id): - """ Create a consistent direction agnostic flow key """ + """Create a consistent direction agnostic flow key""" if src_ip[1] < dst_ip[1] or ((src_ip[1] == dst_ip[1]) and (src_ip[0] < dst_ip[0])): - key = (src_ip[0], src_ip[1], src_port, - dst_ip[0], dst_ip[1], dst_port, - protocol, vlan_id, tunnel_id) + key = ( + src_ip[0], + src_ip[1], + src_port, + dst_ip[0], + dst_ip[1], + dst_port, + protocol, + vlan_id, + tunnel_id, + ) else: if src_ip[0] == dst_ip[0] and src_ip[1] == dst_ip[1]: if src_port <= dst_port: - key = (src_ip[0], src_ip[1], src_port, - dst_ip[0], dst_ip[1], dst_port, - protocol, vlan_id, tunnel_id) + key = ( + src_ip[0], + src_ip[1], + src_port, + dst_ip[0], + dst_ip[1], + dst_port, + protocol, + vlan_id, + tunnel_id, + ) else: - key = (dst_ip[0], dst_ip[1], dst_port, - src_ip[0], src_ip[1], src_port, - protocol, vlan_id, tunnel_id) + key = ( + dst_ip[0], + dst_ip[1], + dst_port, + src_ip[0], + src_ip[1], + src_port, + protocol, + vlan_id, + tunnel_id, + ) else: - key = (dst_ip[0], dst_ip[1], dst_port, - src_ip[0], src_ip[1], src_port, - protocol, vlan_id, tunnel_id) + key = ( + dst_ip[0], + dst_ip[1], + dst_port, + src_ip[0], + src_ip[1], + src_port, + protocol, + vlan_id, + tunnel_id, + ) return key def get_flow_key_from_pkt(packet): - """ Create flow key from packet information (7-tuple) + """Create flow key from packet information (7-tuple) A flow key uniquely determines a flow using source ip, destination ip, source port, destination port, TCP/UDP protocol, VLAN ID and tunnel ID of the packets. """ - return get_flow_key(packet.src_ip, - packet.src_port, - packet.dst_ip, - packet.dst_port, - packet.protocol, - packet.vlan_id, - packet.tunnel_id) + return get_flow_key( + packet.src_ip, + packet.src_port, + packet.dst_ip, + packet.dst_port, + packet.protocol, + packet.vlan_id, + packet.tunnel_id, + ) -def consume(packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps, sync, accounting_mode, n_dissections, - statistics, splt, dissector, decode_tunnels, system_visibility_mode): - """ consume a packet and produce flow """ +def consume( + packet, + cache, + active_timeout, + idle_timeout, + channel, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, +): + """consume a packet and produce flow""" # We maintain state for active flows computation 1 for creation, 0 for update/cut, -1 for custom expire flow_key = get_flow_key_from_pkt(packet) try: # update flow - flow = cache[flow_key].update(packet, idle_timeout, active_timeout, ffi, lib, udps, sync, accounting_mode, - n_dissections, statistics, splt, dissector) + flow = cache[flow_key].update( + packet, + idle_timeout, + active_timeout, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + ) if flow is not None: if flow.expiration_id < 0: # custom expiration channel.put(flow) @@ -136,52 +217,123 @@ def consume(packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps del cache[flow_key] del flow try: - cache[flow_key] = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections, - statistics, splt, dissector, decode_tunnels, system_visibility_mode) - if cache[flow_key].expiration_id == -1: # A user Plugin forced expiration on the first packet + cache[flow_key] = NFlow( + packet, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, + ) + if ( + cache[flow_key].expiration_id == -1 + ): # A user Plugin forced expiration on the first packet channel.put( - cache[flow_key].expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector)) + cache[flow_key].expire( + udps, + sync, + n_dissections, + statistics, + splt, + ffi, + lib, + dissector, + ) + ) del cache[flow_key] state = 0 except OSError: - print("WARNING: Failed to allocate memory space for flow creation. Flow creation aborted.") + print( + "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted." + ) state = 0 else: state = 0 except KeyError: # create flow try: if sync: - flow = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt, dissector, - decode_tunnels, system_visibility_mode) - if flow.expiration_id == -1: # A user Plugin forced expiration on the first packet - channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector)) + flow = NFlow( + packet, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, + ) + if ( + flow.expiration_id == -1 + ): # A user Plugin forced expiration on the first packet + channel.put( + flow.expire( + udps, + sync, + n_dissections, + statistics, + splt, + ffi, + lib, + dissector, + ) + ) del flow state = 0 else: cache[flow_key] = flow state = 1 else: - cache[flow_key] = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt, - dissector, decode_tunnels, system_visibility_mode) + cache[flow_key] = NFlow( + packet, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, + ) state = 1 except OSError: - print("WARNING: Failed to allocate memory space for flow creation. Flow creation aborted.") + print( + "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted." + ) state = 0 return state -def meter_cleanup(cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector): - """ cleanup all entries in NFCache """ +def meter_cleanup( + cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector +): + """cleanup all entries in NFCache""" for flow_key in list(cache.keys()): flow = cache[flow_key] # Push it on channel. - channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector)) + channel.put( + flow.expire( + udps, sync, n_dissections, statistics, splt, ffi, lib, dissector + ) + ) del cache[flow_key] del flow def capture_track(lib, capture, mode, interface_stats, tracker, processed, ignored): - """ Update shared performance values """ + """Update shared performance values""" lib.capture_stats(capture, interface_stats, mode) tracker[0].value = interface_stats.dropped tracker[1].value = processed @@ -193,17 +345,44 @@ def send_error(root_idx, channel, msg): channel.put(InternalError(NFEvent.ERROR, msg)) -def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots, root_idx, mode, - idle_timeout, active_timeout, accounting_mode, udps, n_dissections, statistics, splt, - channel, tracker, lock, group_id, system_visibility_mode, socket_buffer_size): - """ Metering workflow """ - set_affinity(root_idx+1) +def meter_workflow( + source, + snaplen, + decode_tunnels, + bpf_filter, + promisc, + n_roots, + root_idx, + mode, + idle_timeout, + active_timeout, + accounting_mode, + udps, + n_dissections, + statistics, + splt, + channel, + tracker, + lock, + group_id, + system_visibility_mode, + socket_buffer_size, +): + """Metering workflow""" + set_affinity(root_idx + 1) ffi, lib = create_engine() if lib is None: send_error(root_idx, channel, ENGINE_LOAD_ERR) return - meter_tick, meter_scan_tick, meter_track_tick = 0, 0, 0 # meter, idle scan and perf track timelines - meter_scan_interval, meter_track_interval = 10, 1000 # we scan each 10 msecs and update perf each sec. + meter_tick, meter_scan_tick, meter_track_tick = ( + 0, + 0, + 0, + ) # meter, idle scan and perf track timelines + meter_scan_interval, meter_track_interval = ( + 10, + 1000, + ) # we scan each 10 msecs and update perf each sec. cache = NFCache() dissector = setup_dissector(ffi, lib, n_dissections) if dissector == ffi.NULL and n_dissections: @@ -229,19 +408,39 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots for source_idx, source in enumerate(sources): error_child = ffi.new("char[256]") - capture = setup_capture(ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size) + capture = setup_capture( + ffi, + lib, + source, + snaplen, + promisc, + mode, + error_child, + group_id, + socket_buffer_size, + ) if capture is None: - send_error(root_idx, channel, ffi.string(error_child).decode('utf-8', errors='ignore')) + send_error( + root_idx, + channel, + ffi.string(error_child).decode("utf-8", errors="ignore"), + ) return # Here the last operation, BPF filtering setup and activation. if not activate_capture(capture, lib, error_child, bpf_filter, mode): - send_error(root_idx, channel, ffi.string(error_child).decode('utf-8', errors='ignore')) + send_error( + root_idx, + channel, + ffi.string(error_child).decode("utf-8", errors="ignore"), + ) return remaining_packets = True while remaining_packets: nf_packet = ffi.new("struct nf_packet *") - ret = lib.capture_next(capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode)) + ret = lib.capture_next( + capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode) + ) if ret > 0: # Valid must be processed by meter packet_time = nf_packet.time if packet_time > meter_tick: @@ -255,18 +454,57 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots go_scan = True # Activate scan meter_scan_tick = meter_tick # Consume packet and return diff - diff = consume(nf_packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps, sync, - accounting_mode, n_dissections, statistics, splt, dissector, decode_tunnels, - system_visibility_mode) + diff = consume( + nf_packet, + cache, + active_timeout, + idle_timeout, + channel, + ffi, + lib, + udps, + sync, + accounting_mode, + n_dissections, + statistics, + splt, + dissector, + decode_tunnels, + system_visibility_mode, + ) active_flows += diff if go_scan: - idles = meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections, - statistics, splt, ffi, lib, dissector) + idles = meter_scan( + meter_tick, + cache, + idle_timeout, + channel, + udps, + sync, + n_dissections, + statistics, + splt, + ffi, + lib, + dissector, + ) active_flows -= idles else: # time ticker if meter_tick - meter_scan_tick >= meter_scan_interval: - idles = meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections, - statistics, splt, ffi, lib, dissector) + idles = meter_scan( + meter_tick, + cache, + idle_timeout, + channel, + udps, + sync, + n_dissections, + statistics, + splt, + ffi, + lib, + dissector, + ) active_flows -= idles meter_scan_tick = meter_tick elif ret == 0: # Ignored packet @@ -275,14 +513,26 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots pass else: # End of file remaining_packets = False # end of loop - if meter_tick - meter_track_tick >= meter_track_interval: # Performance tracking - capture_track(lib, capture, mode, interface_stats, tracker, processed_packets, ignored_packets) + if ( + meter_tick - meter_track_tick >= meter_track_interval + ): # Performance tracking + capture_track( + lib, + capture, + mode, + interface_stats, + tracker, + processed_packets, + ignored_packets, + ) meter_track_tick = meter_tick # Close capture lib.capture_close(capture) # Expire all remaining flows in the cache. - meter_cleanup(cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) + meter_cleanup( + cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector + ) # Clean dissector lib.dissector_cleanup(dissector) # Release engine library diff --git a/nfstream/plugin.py b/nfstream/plugin.py index 3ffd9a5..707c4ac 100644 --- a/nfstream/plugin.py +++ b/nfstream/plugin.py @@ -15,7 +15,8 @@ If not, see . class NFPlugin(object): - """ NFPlugin class: Main entry point to extend NFStream """ + """NFPlugin class: Main entry point to extend NFStream""" + def __init__(self, **kwargs): """ NFPlugin Parameters: diff --git a/nfstream/plugins/dhcp.py b/nfstream/plugins/dhcp.py index af94acd..2d2234b 100644 --- a/nfstream/plugins/dhcp.py +++ b/nfstream/plugins/dhcp.py @@ -32,10 +32,10 @@ class MsgType(Enum): class DHCP(NFPlugin): - """ DHCP plugin + """DHCP plugin This plugin extracts client information from DHCP sessions, and split flow - on transaction completion to prevent several sessions to be considered as + on transaction completion to prevent several sessions to be considered as the same flow. The following information are retrieved: - dhcp_12 (Option 12, hostname): hostname is decoded as utf8 with special characters being replaced. @@ -48,6 +48,7 @@ class DHCP(NFPlugin): - dhcp_addr: The IP address allocated to the client """ + def on_init(self, packet, flow): flow.udps.dhcp_12 = None # Sometimes hostname is missing from ndpi flow.udps.dhcp_50 = None # must be anonymized on export @@ -70,19 +71,19 @@ class DHCP(NFPlugin): for opt in dhcp.opts: if opt[0] == 12: # Hostname - hostname = opt[1].decode('utf-8', errors='replace') + hostname = opt[1].decode("utf-8", errors="replace") if len(hostname) > 0: flow.udps.dhcp_12 = hostname elif opt[0] == 53: # Msg type msg_type = MsgType(int.from_bytes(opt[1], "big")) elif opt[0] == 60: # Vendor class identifier - flow.udps.dhcp_60 = opt[1].decode('utf-8') + flow.udps.dhcp_60 = opt[1].decode("utf-8") elif opt[0] == 77: # User class id - flow.udps.dhcp_77 = opt[1].decode('utf-8') + flow.udps.dhcp_77 = opt[1].decode("utf-8") elif opt[0] == 57: # Maximum DHCP Message Size flow.udps.dhcp_57 = int.from_bytes(opt[1], "big") elif opt[0] == 55: # parameter request list (aka fingerprint) - opt55 = ','.join(str(i) for i in opt[1]) + opt55 = ",".join(str(i) for i in opt[1]) elif opt[0] == 50: # requested ip opt50 = ipaddress.ip_address(int.from_bytes(opt[1], "big")) options.append(opt[0]) @@ -101,8 +102,10 @@ class DHCP(NFPlugin): msg_type, options, opt50, opt55 = self._process_options(flow, dhcp) if msg_type == MsgType.REQUEST: - mac = struct.unpack('BBBBBB', dhcp.chaddr) - flow.udps.dhcp_oui = '{:02x}:{:02x}:{:02x}'.format(mac[0], mac[1], mac[2]) + mac = struct.unpack("BBBBBB", dhcp.chaddr) + flow.udps.dhcp_oui = "{:02x}:{:02x}:{:02x}".format( + mac[0], mac[1], mac[2] + ) flow.udps.dhcp_options = options flow.udps.dhcp_55 = opt55 if opt55 is not None else None flow.udps.dhcp_50 = str(opt50) if opt50 is not None else None @@ -110,7 +113,12 @@ class DHCP(NFPlugin): if ciaddr != ipaddress.ip_address(0): flow.udps.dhcp_addr = str(ciaddr) - if msg_type in [MsgType.ACK, MsgType.NACK, MsgType.INFORM, MsgType.DECLINE] or flow.src_ip == str(ipaddress.ip_address(0)): + if msg_type in [ + MsgType.ACK, + MsgType.NACK, + MsgType.INFORM, + MsgType.DECLINE, + ] or flow.src_ip == str(ipaddress.ip_address(0)): flow.expiration_id = -1 if flow.udps.dhcp_msg_type is None: diff --git a/nfstream/plugins/mdns.py b/nfstream/plugins/mdns.py index 5d08c29..32b87f9 100644 --- a/nfstream/plugins/mdns.py +++ b/nfstream/plugins/mdns.py @@ -18,13 +18,14 @@ import dpkt class MDNS(NFPlugin): - """ MDNS plugin + """MDNS plugin This plugin extracts answer information from MDNS requests. The following information are retrieved: - mdns_ptr: An ordered list of PTR answsers. """ + def on_init(self, packet, flow): flow.udps.mdns_ptr = [] self.on_update(packet, flow) @@ -41,6 +42,6 @@ class MDNS(NFPlugin): if len(dns.an) > 0: for answer in dns.an: if answer.type == 12: # PTR - ptr = answer.ptrname.replace(',', ' ') + ptr = answer.ptrname.replace(",", " ") if ptr not in flow.udps.mdns_ptr: flow.udps.mdns_ptr.append(ptr) diff --git a/nfstream/plugins/slicer.py b/nfstream/plugins/slicer.py index 23da3a0..17bdb4d 100644 --- a/nfstream/plugins/slicer.py +++ b/nfstream/plugins/slicer.py @@ -17,10 +17,11 @@ from nfstream import NFPlugin class FlowSlicer(NFPlugin): - """ FlowSlicer plugin + """FlowSlicer plugin This plugin implements a custom flow expiration logic based on a packets count limit. """ + def on_init(self, packet, flow): if self.limit == 1: flow.expiration_id = -1 diff --git a/nfstream/plugins/splt.py b/nfstream/plugins/splt.py index dbf3a54..9d47342 100644 --- a/nfstream/plugins/splt.py +++ b/nfstream/plugins/splt.py @@ -32,6 +32,7 @@ class SPLT(NFPlugin): - splt_ipt: Array with inter packet arrival time in milliseconds. Note: Tail will be set with default value -1. """ + @staticmethod def _get_packet_size(packet, accounting_mode): if accounting_mode == 0: @@ -54,5 +55,7 @@ class SPLT(NFPlugin): if flow.bidirectional_packets <= self.sequence_length: packet_index = flow.bidirectional_packets - 1 flow.udps.splt_direction[packet_index] = packet.direction - flow.udps.splt_ps[packet_index] = self._get_packet_size(packet, self.accounting_mode) + flow.udps.splt_ps[packet_index] = self._get_packet_size( + packet, self.accounting_mode + ) flow.udps.splt_piat_ms[packet_index] = packet.delta_time diff --git a/nfstream/plugins/wfeatures.py b/nfstream/plugins/wfeatures.py index eeacc32..8d6836d 100644 --- a/nfstream/plugins/wfeatures.py +++ b/nfstream/plugins/wfeatures.py @@ -22,7 +22,7 @@ from nfstream import NFPlugin class WFPlugin(NFPlugin): """Wavelet-based Features plugin. This plugin attempts to recreate wavelet-based features from [1]. - + Features are calculated from `ip_size`, that is binned on packet timestamps into timeseries of len 2**`levels` (spanning `active_timeout`). @@ -49,17 +49,17 @@ class WFPlugin(NFPlugin): assert hasattr(self, "levels") assert hasattr(self, "active_timeout") - # Pywt requires vector of length 2**level as input + # Pywt requires vector of length 2**level as input # set nbins to that number: self.nbins = 2**self.levels - + # Given `active_timeout` as max flow length calculate size of each bin: - self.bin_size = (self.active_timeout / 2**self.levels * 1000) + self.bin_size = self.active_timeout / 2**self.levels * 1000 # Reserve a empty vectors for data i forward and backward direction - flow.udps.forward = np.zeros(self.nbins).tolist() + flow.udps.forward = np.zeros(self.nbins).tolist() flow.udps.backward = np.zeros(self.nbins).tolist() - + # Save first packet timestamp that will be used to calc index # of time series bin flow.udps.first_packet_timestamp = packet.time # timestamp in ms @@ -67,7 +67,7 @@ class WFPlugin(NFPlugin): def on_update(self, packet, flow): # Calculate time in ms from first packet mstime_since_first_packet: int = packet.time - flow.udps.first_packet_timestamp - + # Calculate index of bin to put data into ibin, _ = divmod(mstime_since_first_packet, self.bin_size) @@ -133,12 +133,12 @@ class WFPlugin(NFPlugin): E_k = np.sum(np.power(np.abs(d), 2), axis=0) E_total = np.sum(E_k, axis=0) - + # Relarive wavelet energy - p_k = E_k / (E_total + 1e-7) + p_k = E_k / (E_total + 1e-7) p_n = np.power(d, 2) / (E_k + 1e-7) # Shannon entropy - S_k = -np.sum(p_n * np.log(p_n + 1e-7), axis=0) + S_k = -np.sum(p_n * np.log(p_n + 1e-7), axis=0) # Absolute mean of coefficients u_k = np.mean(np.abs(d), axis=0) # Std. deviation of coeficcients diff --git a/nfstream/streamer.py b/nfstream/streamer.py index 6666c2a..9f28b9d 100644 --- a/nfstream/streamer.py +++ b/nfstream/streamer.py @@ -26,8 +26,21 @@ from .meter import meter_workflow from .anonymizer import NFAnonymizer from .engine import is_interface from .plugin import NFPlugin -from .utils import csv_converter, open_file, RepeatedTimer, update_performances, set_affinity, available_cpus_count -from .utils import validate_flows_per_file, NFMode, create_csv_file_path, NFEvent, validate_rotate_files +from .utils import ( + csv_converter, + open_file, + RepeatedTimer, + update_performances, + set_affinity, + available_cpus_count, +) +from .utils import ( + validate_flows_per_file, + NFMode, + create_csv_file_path, + NFEvent, + validate_rotate_files, +) from .system import system_socket_worflow, match_flow_conn @@ -48,25 +61,27 @@ class NFStreamer(object): """ - def __init__(self, - source=None, - decode_tunnels=True, - bpf_filter=None, - promiscuous_mode=True, - snapshot_length=1536, - socket_buffer_size=0, - idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt - active_timeout=1800, - accounting_mode=0, - udps=None, - n_dissections=20, - statistical_analysis=False, - splt_analysis=0, - n_meters=0, - max_nflows=0, - performance_report=0, - system_visibility_mode=0, - system_visibility_poll_ms=100): + def __init__( + self, + source=None, + decode_tunnels=True, + bpf_filter=None, + promiscuous_mode=True, + snapshot_length=1536, + socket_buffer_size=0, + idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt + active_timeout=1800, + accounting_mode=0, + udps=None, + n_dissections=20, + statistical_analysis=False, + splt_analysis=0, + n_meters=0, + max_nflows=0, + performance_report=0, + system_visibility_mode=0, + system_visibility_poll_ms=100, + ): with NFStreamer.glock: NFStreamer.streamer_id += 1 self._idx = NFStreamer.streamer_id @@ -115,13 +130,17 @@ class NFStreamer(object): if not isfile(value[i]): raise TypeError except TypeError: - raise ValueError("Invalid pcap file path at index: " + str(i) + ".") + raise ValueError( + "Invalid pcap file path at index: " + str(i) + "." + ) self._mode = NFMode.MULTIPLE_FILES else: try: value = str(os.fspath(value)) except TypeError: - raise ValueError("Please specify a pcap file path or a valid network interface name as source.") + raise ValueError( + "Please specify a pcap file path or a valid network interface name as source." + ) if isfile(value): self._mode = NFMode.SINGLE_FILE else: @@ -130,7 +149,9 @@ class NFStreamer(object): self._mode = NFMode.INTERFACE value = interface else: - raise ValueError("Please specify a pcap file path or a valid network interface name as source.") + raise ValueError( + "Please specify a pcap file path or a valid network interface name as source." + ) self._source = value @property @@ -140,7 +161,9 @@ class NFStreamer(object): @decode_tunnels.setter def decode_tunnels(self, value): if not isinstance(value, bool): - raise ValueError("Please specify a valid decode_tunnels parameter (possible values: True, False).") + raise ValueError( + "Please specify a valid decode_tunnels parameter (possible values: True, False)." + ) self._decode_tunnels = value @property @@ -160,7 +183,9 @@ class NFStreamer(object): @promiscuous_mode.setter def promiscuous_mode(self, value): if not isinstance(value, bool): - raise ValueError("Please specify a valid promiscuous_mode parameter (possible values: True, False).") + raise ValueError( + "Please specify a valid promiscuous_mode parameter (possible values: True, False)." + ) self._promiscuous_mode = value @property @@ -170,7 +195,9 @@ class NFStreamer(object): @snapshot_length.setter def snapshot_length(self, value): if not isinstance(value, int) or value <= 0: - raise ValueError("Please specify a valid snapshot_length parameter (positive integer).") + raise ValueError( + "Please specify a valid snapshot_length parameter (positive integer)." + ) self._snapshot_length = value @property @@ -179,8 +206,10 @@ class NFStreamer(object): @socket_buffer_size.setter def socket_buffer_size(self, value): - if not isinstance(value, int) or (value < 0 or value > 2**31-1): - raise ValueError("Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1).") + if not isinstance(value, int) or (value < 0 or value > 2**31 - 1): + raise ValueError( + "Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1)." + ) self._socket_buffer_size = value @property @@ -189,8 +218,12 @@ class NFStreamer(object): @idle_timeout.setter def idle_timeout(self, value): - if not isinstance(value, int) or ((value < 0) or (value * 1000) > 18446744073709551615): # max uint64_t - raise ValueError("Please specify a valid idle_timeout parameter (positive integer in seconds).") + if not isinstance(value, int) or ( + (value < 0) or (value * 1000) > 18446744073709551615 + ): # max uint64_t + raise ValueError( + "Please specify a valid idle_timeout parameter (positive integer in seconds)." + ) self._idle_timeout = value @property @@ -199,8 +232,12 @@ class NFStreamer(object): @active_timeout.setter def active_timeout(self, value): - if not isinstance(value, int) or ((value < 0) or (value * 1000) > 18446744073709551615): # max uint64_t - raise ValueError("Please specify a valid active_timeout parameter (positive integer in seconds).") + if not isinstance(value, int) or ( + (value < 0) or (value * 1000) > 18446744073709551615 + ): # max uint64_t + raise ValueError( + "Please specify a valid active_timeout parameter (positive integer in seconds)." + ) self._active_timeout = value @property @@ -210,7 +247,9 @@ class NFStreamer(object): @accounting_mode.setter def accounting_mode(self, value): if not isinstance(value, int) or (value not in [0, 1, 2, 3]): - raise ValueError("Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3).") + raise ValueError( + "Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3)." + ) self._accounting_mode = value @property @@ -225,7 +264,9 @@ class NFStreamer(object): if isinstance(plugin, NFPlugin): pass else: - raise ValueError("User defined plugins must inherit from NFPlugin type.") + raise ValueError( + "User defined plugins must inherit from NFPlugin type." + ) self._udps = value else: if isinstance(value, NFPlugin): @@ -234,7 +275,9 @@ class NFStreamer(object): if value is None: self._udps = () else: - raise ValueError("User defined plugins must inherit from NFPlugin type.") + raise ValueError( + "User defined plugins must inherit from NFPlugin type." + ) @property def n_dissections(self): @@ -243,7 +286,9 @@ class NFStreamer(object): @n_dissections.setter def n_dissections(self, value): if not isinstance(value, int) or (value < 0 or value > 255): - raise ValueError("Please specify a valid n_dissections parameter (possible values in : [0,...,255]).") + raise ValueError( + "Please specify a valid n_dissections parameter (possible values in : [0,...,255])." + ) self._n_dissections = value @property @@ -253,7 +298,9 @@ class NFStreamer(object): @statistical_analysis.setter def statistical_analysis(self, value): if not isinstance(value, bool): - raise ValueError("Please specify a valid statistical_analysis parameter (possible values: True, False).") + raise ValueError( + "Please specify a valid statistical_analysis parameter (possible values: True, False)." + ) self._statistical_analysis = value @property @@ -263,9 +310,13 @@ class NFStreamer(object): @splt_analysis.setter def splt_analysis(self, value): if not isinstance(value, int) or (value < 0 or value > 65535): - raise ValueError("Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])") + raise ValueError( + "Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])" + ) if value > 255: - print("[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool.") + print( + "[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool." + ) self._splt_analysis = value @property @@ -277,17 +328,25 @@ class NFStreamer(object): if isinstance(value, int) and value >= 0: pass else: - raise ValueError("Please specify a valid n_meters parameter (>=1 or 0 for auto scaling).") + raise ValueError( + "Please specify a valid n_meters parameter (>=1 or 0 for auto scaling)." + ) c_cpus, c_cores = available_cpus_count(), psutil.cpu_count(logical=False) - if c_cores is None: # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078) + if ( + c_cores is None + ): # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078) c_cores = c_cpus if value == 0: if platform.system() == "Linux" and self._mode == NFMode.INTERFACE: - self._n_meters = c_cpus - 1 # We are in live capture mode and kernel fanout will be available + self._n_meters = ( + c_cpus - 1 + ) # We are in live capture mode and kernel fanout will be available # only on Linux, we set the n_meters to detected logical CPUs -1 else: # Windows, MacOS, offline capture if c_cpus >= c_cores: - if c_cpus == 2 * c_cores or c_cpus == c_cores: # multi-thread or single threaded + if ( + c_cpus == 2 * c_cores or c_cpus == c_cores + ): # multi-thread or single threaded self._n_meters = c_cores - 1 else: self._n_meters = int(divmod(c_cpus / 2, 1)[0]) - 1 @@ -297,7 +356,11 @@ class NFStreamer(object): if (value + 1) <= c_cpus: self._n_meters = value else: # avoid contention - print("WARNING: n_meters set to :{} in order to avoid contention.".format(c_cpus - 1)) + print( + "WARNING: n_meters set to :{} in order to avoid contention.".format( + c_cpus - 1 + ) + ) self._n_meters = c_cpus - 1 if self._n_meters == 0: # one CPU case self._n_meters = 1 @@ -322,8 +385,10 @@ class NFStreamer(object): if isinstance(value, int) and value >= 0: pass else: - raise ValueError("Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)" - " or 0 to disable). [Available only for Live capture]") + raise ValueError( + "Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)" + " or 0 to disable). [Available only for Live capture]" + ) self._performance_report = value @property @@ -334,16 +399,20 @@ class NFStreamer(object): def system_visibility_mode(self, value): if isinstance(value, int) and value in [0, 1]: if self._mode == NFMode.SINGLE_FILE and value > 0: - print("WARNING: system_visibility_mode switched to 0 in offline capture " - "(available only for live capture)") + print( + "WARNING: system_visibility_mode switched to 0 in offline capture " + "(available only for live capture)" + ) value = 0 else: pass else: - raise ValueError("Please specify a valid system_visibility_mode parameter\n" - "0: disable\n" - "1: process information\n" - "[Available only for live capture on the system generating the traffic]") + raise ValueError( + "Please specify a valid system_visibility_mode parameter\n" + "0: disable\n" + "1: process information\n" + "[Available only for live capture on the system generating the traffic]" + ) self._system_visibility_mode = value @property @@ -355,8 +424,10 @@ class NFStreamer(object): if isinstance(value, int) and value >= 0: pass else: - raise ValueError("Please specify a valid system_visibility_poll_ms parameter " - "(positive integer in milliseconds)") + raise ValueError( + "Please specify a valid system_visibility_poll_ms parameter " + "(positive integer in milliseconds)" + ) self._system_visibility_poll_ms = value def __iter__(self): @@ -374,50 +445,76 @@ class NFStreamer(object): # To avoid issues on PyPy on Windows (See https://foss.heptapod.net/pypy/pypy/-/issues/3488), All # multiprocessing Value invocation must be performed before the call to Queue. n_meters = self.n_meters - idx_generator = self._mp_context.Value('i', 0) + idx_generator = self._mp_context.Value("i", 0) for i in range(n_meters): - performances.append([self._mp_context.Value('I', 0), - self._mp_context.Value('I', 0), - self._mp_context.Value('I', 0)]) + performances.append( + [ + self._mp_context.Value("I", 0), + self._mp_context.Value("I", 0), + self._mp_context.Value("I", 0), + ] + ) channel = self._mp_context.Queue(maxsize=32767) # Backpressure strategy. # We set it to (2^15-1) to cope with OSX max semaphore value. group_id = os.getpid() + self._idx # Used for fanout on Linux systems try: for i in range(n_meters): - meters.append(self._mp_context.Process(target=meter_workflow, - args=(self.source, - self.snapshot_length, - self.decode_tunnels, - self.bpf_filter, - self.promiscuous_mode, - n_meters, - i, - self._mode, - self.idle_timeout * 1000, - self.active_timeout * 1000, - self.accounting_mode, - self.udps, - self.n_dissections, - self.statistical_analysis, - self.splt_analysis, - channel, - performances[i], - lock, - group_id, - self.system_visibility_mode, - self.socket_buffer_size,))) + meters.append( + self._mp_context.Process( + target=meter_workflow, + args=( + self.source, + self.snapshot_length, + self.decode_tunnels, + self.bpf_filter, + self.promiscuous_mode, + n_meters, + i, + self._mode, + self.idle_timeout * 1000, + self.active_timeout * 1000, + self.accounting_mode, + self.udps, + self.n_dissections, + self.statistical_analysis, + self.splt_analysis, + channel, + performances[i], + lock, + group_id, + self.system_visibility_mode, + self.socket_buffer_size, + ), + ) + ) meters[i].daemon = True # demonize meter meters[i].start() if self._mode == NFMode.INTERFACE and self.performance_report > 0: if platform.system() == "Linux": - rt = RepeatedTimer(self.performance_report, update_performances, performances, True, idx_generator) + rt = RepeatedTimer( + self.performance_report, + update_performances, + performances, + True, + idx_generator, + ) else: - rt = RepeatedTimer(self.performance_report, update_performances, performances, False, idx_generator) + rt = RepeatedTimer( + self.performance_report, + update_performances, + performances, + False, + idx_generator, + ) if self._mode == NFMode.INTERFACE and self.system_visibility_mode: - socket_listener = self._mp_context.Process(target=system_socket_worflow, - args=(channel, - self.idle_timeout * 1000, - self.system_visibility_poll_ms / 1000,)) + socket_listener = self._mp_context.Process( + target=system_socket_worflow, + args=( + channel, + self.idle_timeout * 1000, + self.system_visibility_poll_ms / 1000, + ), + ) socket_listener.daemon = True # demonize socket_listener socket_listener.start() @@ -435,7 +532,9 @@ class NFStreamer(object): child_error = recv.message break elif recv.id == NFEvent.ALL_AFFINITY_SET: - set_affinity(0) # we pin streamer to core 0 as it's the less intensive task and several services runs + set_affinity( + 0 + ) # we pin streamer to core 0 as it's the less intensive task and several services runs # by default on this core. elif recv.id == NFEvent.SOCKET_CREATE: conn_cache[recv.key] = [recv.process_name, recv.process_pid] @@ -444,7 +543,10 @@ class NFStreamer(object): else: # NFEvent.FLOW recv.id = idx_generator.value # Unify ID idx_generator.value = idx_generator.value + 1 - if self._mode == NFMode.INTERFACE and self.system_visibility_mode: + if ( + self._mode == NFMode.INTERFACE + and self.system_visibility_mode + ): recv = match_flow_conn(conn_cache, recv) yield recv if recv.id == self.max_nflows: @@ -464,10 +566,14 @@ class NFStreamer(object): channel.join_thread() # and we join its thread if child_error is not None: raise ValueError(child_error) - except ValueError as observer_error: # job initiation failed due to some bad observer parameters. + except ( + ValueError + ) as observer_error: # job initiation failed due to some bad observer parameters. raise ValueError(observer_error) - def to_csv(self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0): + def to_csv( + self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0 + ): validate_flows_per_file(flows_per_file) validate_rotate_files(rotate_files) chunked, chunk_idx = True, -1 @@ -479,18 +585,20 @@ class NFStreamer(object): f = None for flow in self: try: - if total_flows == 0 or (chunked and (chunk_flows > flows_per_file)): # header creation + if total_flows == 0 or ( + chunked and (chunk_flows > flows_per_file) + ): # header creation if f is not None: f.close() chunk_flows = 1 chunk_idx += 1 f = open_file(output_path, chunked, chunk_idx, rotate_files) - header = ','.join([str(i) for i in flow.keys()]) + "\n" - f.write(header.encode('utf-8')) + header = ",".join([str(i) for i in flow.keys()]) + "\n" + f.write(header.encode("utf-8")) values = anon.process(flow) csv_converter(values) - to_export = ','.join([str(i) for i in values]) + "\n" - f.write(to_export.encode('utf-8')) + to_export = ",".join([str(i) for i in values]) + "\n" + f.write(to_export.encode("utf-8")) total_flows = total_flows + 1 chunk_flows += 1 except KeyboardInterrupt: @@ -501,16 +609,26 @@ class NFStreamer(object): return total_flows def to_pandas(self, columns_to_anonymize=()): - """ streamer to pandas function """ - temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format(pid=os.getpid(), - iid=NFStreamer.streamer_id, - ts=tm.time()) - total_flows = self.to_csv(path=temp_file_path, columns_to_anonymize=columns_to_anonymize, flows_per_file=0) + """streamer to pandas function""" + temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format( + pid=os.getpid(), iid=NFStreamer.streamer_id, ts=tm.time() + ) + total_flows = self.to_csv( + path=temp_file_path, + columns_to_anonymize=columns_to_anonymize, + flows_per_file=0, + ) if total_flows > 0: # If there is flows, return Dataframe else return None. - df = pd.read_csv(temp_file_path, engine="c") # Use C engine for superior performance (non-experimental) + df = pd.read_csv( + temp_file_path, engine="c" + ) # Use C engine for superior performance (non-experimental) if total_flows != df.shape[0]: - print("WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() " - "method if drops are critical.".format(abs(df.shape[0] - total_flows))) + print( + "WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() " + "method if drops are critical.".format( + abs(df.shape[0] - total_flows) + ) + ) else: df = None if os.path.exists(temp_file_path): diff --git a/nfstream/system.py b/nfstream/system.py index c7c6f67..281a8a5 100644 --- a/nfstream/system.py +++ b/nfstream/system.py @@ -21,19 +21,17 @@ from .utils import NFEvent import time -NFSocket = namedtuple('NFSocket', ['id', - 'key', - 'process_pid', - 'process_name']) +NFSocket = namedtuple("NFSocket", ["id", "key", "process_pid", "process_name"]) class ConnCache(OrderedDict): - """ LRU Connections Cache + """LRU Connections Cache The ConnCache object is used to cache connections entries such as MRU entries are kept on the end and LRU entries will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly linked list with sentinel nodes to track order. By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout. """ + def __init__(self, channel, timeout, *args, **kwds): self.channel = channel self.timeout = timeout + 5000 @@ -54,17 +52,21 @@ class ConnCache(OrderedDict): return next(iter(self)) def scan(self, current_time): - """ Scan and delete LRU entries based on a defined timeout """ + """Scan and delete LRU entries based on a defined timeout""" if (current_time - self.last_scan_time) > 10: remaining = True # We suppose that there is something to expire scanned = 0 - while remaining and scanned <= 1000: # Each 10 ms we scan with 1000 entries budget + while ( + remaining and scanned <= 1000 + ): # Each 10 ms we scan with 1000 entries budget try: lru_key = self.get_lru_key() # will return the LRU conn key. lru_last_update_time = self[lru_key] if current_time - lru_last_update_time >= self.timeout: del self[lru_key] - self.channel.put(NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None)) # Send to streamer + self.channel.put( + NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None) + ) # Send to streamer scanned += 1 else: remaining = False # LRU flow is not yet idle. @@ -74,7 +76,7 @@ class ConnCache(OrderedDict): def simplify_protocol(protocol): - """ Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others """ + """Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others""" if protocol == 6: return protocol if protocol == 17: @@ -83,17 +85,14 @@ def simplify_protocol(protocol): def get_conn_key_from_flow(f): - """ Compute a conn key from NFlow object attributes """ - return get_flow_key(f.src_ip, - f.src_port, - f.dst_ip, - f.dst_port, - simplify_protocol(f.protocol), - 0, 0) + """Compute a conn key from NFlow object attributes""" + return get_flow_key( + f.src_ip, f.src_port, f.dst_ip, f.dst_port, simplify_protocol(f.protocol), 0, 0 + ) def match_flow_conn(conn_cache, flow): - """ Match a flow with a connection entry based on a shared key""" + """Match a flow with a connection entry based on a shared key""" if len(conn_cache) > 0: flow_key = get_conn_key_from_flow(flow) try: @@ -106,23 +105,27 @@ def match_flow_conn(conn_cache, flow): def get_conn_key(c): - """ Create a 5-tuple connection key tuple """ + """Create a 5-tuple connection key tuple""" if c.raddr != () and c.pid is not None: if c.type == SocketKind.SOCK_STREAM: # TCP protocol - return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0) + return get_flow_key( + c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0 + ) if c.type == SocketKind.SOCK_DGRAM: # UDP protocol - return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0) + return get_flow_key( + c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0 + ) return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 0, 0, 0) return None def system_socket_worflow(channel, idle_timeout, poll_period): - """ Host ground-truth generation workflow """ + """Host ground-truth generation workflow""" conn_cache = ConnCache(channel=channel, timeout=idle_timeout) try: while True: current_time = time.time() * 1000 - for conn in net_connections(kind='inet'): + for conn in net_connections(kind="inet"): # IMPORTANT: The rationale behind the usage of an active polling approach (net_connections call): # System process visibility is intended to generate the most accurate ground truth for traffic # classification end-host-based research experiments, as reported in the literature [1]. @@ -140,7 +143,9 @@ def system_socket_worflow(channel, idle_timeout, poll_period): if key not in conn_cache: # Create and send process_name = Process(conn.pid).name() conn_cache[key] = current_time - channel.put(NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name)) # Send to streamer + channel.put( + NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name) + ) # Send to streamer else: # update time conn_cache[key] = current_time conn_cache.scan(current_time) diff --git a/nfstream/utils.py b/nfstream/utils.py index e72315f..fb9c883 100644 --- a/nfstream/utils.py +++ b/nfstream/utils.py @@ -35,56 +35,58 @@ class NFMode(IntEnum): MULTIPLE_FILES = 2 -InternalError = namedtuple('InternalError', ['id', 'message']) +InternalError = namedtuple("InternalError", ["id", "message"]) -InternalState = namedtuple('InternalState', ['id']) +InternalState = namedtuple("InternalState", ["id"]) def validate_flows_per_file(n): - """ Simple parameter validator """ + """Simple parameter validator""" if not isinstance(n, int) or isinstance(n, int) and n < 0: raise ValueError("Please specify a valid flows_per_file parameter (>= 0).") def validate_rotate_files(n): - """ Simple parameter validator """ + """Simple parameter validator""" if not isinstance(n, int) or isinstance(n, int) and n < 0: raise ValueError("Please specify a valid rotate_files parameter (>= 0).") def create_csv_file_path(path, source): - """ File path creator """ + """File path creator""" if path is None: if type(source) == list: - return str(source[0]) + '.csv' - return str(source) + '.csv' + return str(source[0]) + ".csv" + return str(source) + ".csv" return path def csv_converter(values): - """ Convert non numeric values to string using their __str__ method and ensure proper quoting """ + """Convert non numeric values to string using their __str__ method and ensure proper quoting""" for idx, value in enumerate(values): if not isinstance(value, float) and not isinstance(value, int): if value is None: values[idx] = "" else: values[idx] = str(values[idx]) - values[idx] = values[idx].replace('\"', '\\"') - values[idx] = "\"" + values[idx] + "\"" + values[idx] = values[idx].replace('"', '\\"') + values[idx] = '"' + values[idx] + '"' def open_file(path, chunked, chunk_idx, rotate_files): - """ File opener taking chunk mode into consideration """ + """File opener taking chunk mode into consideration""" if not chunked: - return open(path, 'wb') + return open(path, "wb") else: if rotate_files: - return open(path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), 'wb') - return open(path.replace("csv", "{}.csv".format(chunk_idx)), 'wb') + return open( + path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), "wb" + ) + return open(path.replace("csv", "{}.csv".format(chunk_idx)), "wb") def update_performances(performances, is_linux, flows_count): - """ Update performance report and check platform for consistency """ + """Update performance report and check platform for consistency""" drops = 0 processed = 0 ignored = 0 @@ -98,15 +100,22 @@ def update_performances(performances, is_linux, flows_count): ignored = max(meter[2].value, ignored) processed += meter[1].value load.append(meter[1].value) - print(json.dumps({"flows_expired": flows_count.value, - "packets_processed": processed, - "packets_ignored": ignored, - "packets_dropped_filtered_by_kernel": drops, - "meters_packets_processing_balance": load})) + print( + json.dumps( + { + "flows_expired": flows_count.value, + "packets_processed": processed, + "packets_ignored": ignored, + "packets_dropped_filtered_by_kernel": drops, + "meters_packets_processing_balance": load, + } + ) + ) class RepeatedTimer(object): - """ Repeated timer thread """ + """Repeated timer thread""" + def __init__(self, interval, function, *args, **kwargs): self._timer = None self.interval = interval @@ -131,14 +140,15 @@ class RepeatedTimer(object): self._timer.cancel() self.is_running = False - + def chunks_of_list(lst, n): - """ create list of chunks of size n from a list""" + """create list of chunks of size n from a list""" for i in range(0, len(lst), n): - yield lst[i:i + n] + yield lst[i : i + n] + def set_affinity(idx): - """ CPU affinity setter """ + """CPU affinity setter""" if platform.system() == "Linux": c_cpus = psutil.Process().cpu_affinity() temp = list(chunks_of_list(c_cpus, 2)) diff --git a/setup.py b/setup.py index 81af20d..00d9f00 100644 --- a/setup.py +++ b/setup.py @@ -24,15 +24,14 @@ THIS_DIRECTORY = str(pathlib.Path(__file__).parent.resolve()) if (not sys.version_info[0] == 3) and (not sys.version_info[1] >= 6): sys.exit("Sorry, nfstream requires Python3.6+ versions.") -with open(os.path.join(THIS_DIRECTORY, 'README.md'), encoding='utf-8') as f: +with open(os.path.join(THIS_DIRECTORY, "README.md"), encoding="utf-8") as f: LONG_DESCRIPTION = f.read() -INSTALL_REQUIRES = ['cffi>=1.15.0', - 'psutil>=5.8.0', - 'dpkt>=1.9.7', - 'numpy>=1.19.5'] +INSTALL_REQUIRES = ["cffi>=1.15.0", "psutil>=5.8.0", "dpkt>=1.9.7", "numpy>=1.19.5"] -if platform.python_implementation() == 'PyPy': # This is mandatory to fix pandas issues with PyPy +if ( + platform.python_implementation() == "PyPy" +): # This is mandatory to fix pandas issues with PyPy INSTALL_REQUIRES.append("pandas<=1.2.5") else: INSTALL_REQUIRES.append("pandas>=1.1.5") @@ -40,40 +39,40 @@ else: setup( name="nfstream", - version='6.5.4', - url='https://www.nfstream.org/', - license='LGPLv3', + version="6.5.4", + url="https://www.nfstream.org/", + license="LGPLv3", description="A Flexible Network Data Analysis Framework", long_description=LONG_DESCRIPTION, - long_description_content_type='text/markdown', - author='Zied Aouini', - author_email='aouinizied@gmail.com', - packages=['nfstream', 'nfstream.plugins', 'nfstream.engine'], + long_description_content_type="text/markdown", + author="Zied Aouini", + author_email="aouinizied@gmail.com", + packages=["nfstream", "nfstream.plugins", "nfstream.engine"], setup_requires=["cffi>=1.15.0"], cffi_modules=["nfstream/engine/engine_build.py:ffi_builder"], install_requires=INSTALL_REQUIRES, include_package_data=True, platforms=["Linux", "Mac OS-X", "Windows", "Unix"], classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)', - 'Intended Audience :: Telecommunications Industry', - 'Intended Audience :: Information Technology', - 'Intended Audience :: System Administrators', - 'Intended Audience :: Science/Research', - 'Intended Audience :: Developers', - 'Programming Language :: Python :: 3 :: Only', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: 3.9', - 'Programming Language :: Python :: 3.10', - 'Programming Language :: Python :: 3.11', - 'Topic :: Security', - 'Topic :: Internet :: Log Analysis', - 'Topic :: System :: Networking :: Monitoring', - 'Topic :: Scientific/Engineering :: Artificial Intelligence' + "Development Status :: 5 - Production/Stable", + "License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)", + "Intended Audience :: Telecommunications Industry", + "Intended Audience :: Information Technology", + "Intended Audience :: System Administrators", + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Topic :: Security", + "Topic :: Internet :: Log Analysis", + "Topic :: System :: Networking :: Monitoring", + "Topic :: Scientific/Engineering :: Artificial Intelligence", ], project_urls={ - 'GitHub': 'https://github.com/nfstream/nfstream', - } + "GitHub": "https://github.com/nfstream/nfstream", + }, ) diff --git a/tests.py b/tests.py index 0ad33b1..f668159 100644 --- a/tests.py +++ b/tests.py @@ -25,7 +25,9 @@ def get_files_list(path): files = [] for r, d, f in os.walk(path): for file in f: - if '.pcap' == file[-5:] or ".pcapng" == file[-7:]: # Pick out only pcaps files + if ( + ".pcap" == file[-5:] or ".pcapng" == file[-7:] + ): # Pick out only pcaps files files.append(os.path.join(r, file)) files.sort() return files @@ -34,7 +36,9 @@ def get_files_list(path): class NFStreamTest(object): @staticmethod def test_source_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 source = ["inexisting.pcap", "lo", 11] for x in source: @@ -43,268 +47,444 @@ class NFStreamTest(object): except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_source_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_source_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_decode_tunnels_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 decode_tunnels = [33, "True"] for x in decode_tunnels: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - decode_tunnels=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + decode_tunnels=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_decode_tunnels_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_decode_tunnels_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_bpf_filter_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 bpf_filter = ["my filter", 11] for x in bpf_filter: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), bpf_filter=x).to_pandas() + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + bpf_filter=x, + ).to_pandas() except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_bpf_filter_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_bpf_filter_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_promiscuous_mode_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 promiscuous_mode = ["yes", 89] for x in promiscuous_mode: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), promiscuous_mode=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + promiscuous_mode=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_promiscuous_mode_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_promiscuous_mode_parameter".ljust(60, " "), + colored("OK", "green"), + ) + ) @staticmethod def test_snapshot_length_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 snapshot_length = ["largest", -1] for x in snapshot_length: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), snapshot_length=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + snapshot_length=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_snapshot_length_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_snapshot_length_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_socket_buffer_size_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 socket_buffer_size = ["largest", -1, 2**31] for x in socket_buffer_size: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), socket_buffer_size=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + socket_buffer_size=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_socket_buffer_size_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_socket_buffer_size_parameter".ljust(60, " "), + colored("OK", "green"), + ) + ) @staticmethod def test_idle_timeout_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 idle_timeout = [-1, "idle"] for x in idle_timeout: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + idle_timeout=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_idle_timeout_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_idle_timeout_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_active_timeout_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 active_timeout = [-1, "active"] for x in active_timeout: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + active_timeout=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_active_timeout_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_active_timeout_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_accounting_mode_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 - accounting_mode = [-1, 5, 'ip'] + accounting_mode = [-1, 5, "ip"] for x in accounting_mode: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), accounting_mode=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + accounting_mode=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_accounting_mode_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_accounting_mode_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_udps_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 udps = [lambda y: y + 1, "NFPlugin"] for x in udps: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_udps_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_udps_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_n_dissections_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 n_dissections = ["yes", -1, 256] for x in n_dissections: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_dissections=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + n_dissections=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_n_dissections_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_n_dissections_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_system_visibility_mode_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 system_visibility_mode = ["yes", -1, 3] for x in system_visibility_mode: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_mode=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + system_visibility_mode=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_system_visibility_mode_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_system_visibility_mode_parameter".ljust(60, " "), + colored("OK", "green"), + ) + ) @staticmethod def test_system_visibility_poll_ms(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 system_visibility_mode = ["yes", -1] for x in system_visibility_mode: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_poll_ms=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + system_visibility_poll_ms=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_system_visibility_poll_ms".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_system_visibility_poll_ms".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_statistical_analysis_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 statistical_analysis = ["yes", 89] for x in statistical_analysis: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), statistical_analysis=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + statistical_analysis=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_statistical_analysis_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_statistical_analysis_parameter".ljust(60, " "), + colored("OK", "green"), + ) + ) @staticmethod def test_splt_analysis_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 splt_analysis = [-1, 70000, "yes"] for x in splt_analysis: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + splt_analysis=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 3 - print("{}\t: {}".format(".test_splt_analysis_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_splt_analysis_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_n_meters_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 n_meters = ["yes", -1] for x in n_meters: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - n_meters=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_meters=x + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_n_meters_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_n_meters_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_max_nflows_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 max_nflows = ["yes", -1] for x in max_nflows: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - max_nflows=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + max_nflows=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_max_nflows_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_max_nflows_parameter".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_performance_report_parameter(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 performance_report = ["yes", -1] for x in performance_report: try: - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), performance_report=x) + NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + performance_report=x, + ) except ValueError: n_exceptions += 1 assert n_exceptions == 2 - print("{}\t: {}".format(".test_performance_report_parameter".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_performance_report_parameter".ljust(60, " "), + colored("OK", "green"), + ) + ) @staticmethod def test_expiration_management(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) # Idle expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0) + streamer_expiration = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0 + ) last_id = 0 for flow in streamer_expiration: last_id = flow.id assert last_id == 27 # Active expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0) + streamer_expiration = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0 + ) last_id = 0 for flow in streamer_expiration: last_id = flow.id assert last_id == 27 # Custom expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - udps=FlowSlicer(limit=1)) + streamer_expiration = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + udps=FlowSlicer(limit=1), + ) last_id = 0 for flow in streamer_expiration: last_id = flow.id assert last_id == 27 - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - udps=FlowSlicer(limit=4)) + streamer_expiration = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + udps=FlowSlicer(limit=4), + ) last_id = 0 for flow in streamer_expiration: last_id = flow.id assert last_id == 6 - print("{}\t: {}".format(".test_expiration_management".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_expiration_management".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_tunnel_decoding(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) n_exceptions = 0 - decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"), - statistical_analysis=True, decode_tunnels=True) + decode_streamer = NFStreamer( + source=os.path.join("tests", "pcaps", "gtp-u.pcap"), + statistical_analysis=True, + decode_tunnels=True, + ) for flow in decode_streamer: assert flow.tunnel_id == 1 decode_streamer.decode_tunnels = False @@ -315,23 +495,32 @@ class NFStreamTest(object): n_exceptions += 1 assert n_exceptions == 1 del decode_streamer - print("{}\t: {}".format(".test_tunnel_decoding".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_tunnel_decoding".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_statistical(): - print("\n----------------------------------------------------------------------") - statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - statistical_analysis=True, accounting_mode=1) + print( + "\n----------------------------------------------------------------------" + ) + statistical_streamer = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + statistical_analysis=True, + accounting_mode=1, + ) for flow in statistical_streamer: assert flow.id == 0 assert flow.expiration_id == 0 - assert flow.src_ip == '172.31.3.224' - assert flow.src_mac == '80:c6:ca:00:9e:9f' - assert flow.src_oui == '80:c6:ca' + assert flow.src_ip == "172.31.3.224" + assert flow.src_mac == "80:c6:ca:00:9e:9f" + assert flow.src_oui == "80:c6:ca" assert flow.src_port == 42835 - assert flow.dst_ip == '216.58.212.100' - assert flow.dst_mac == '00:0e:8e:4d:b4:a8' - assert flow.dst_oui == '00:0e:8e' + assert flow.dst_ip == "216.58.212.100" + assert flow.dst_mac == "00:0e:8e:4d:b4:a8" + assert flow.dst_oui == "00:0e:8e" assert flow.dst_port == 443 assert flow.protocol == 6 assert flow.ip_version == 4 @@ -401,46 +590,79 @@ class NFStreamTest(object): assert flow.dst2src_rst_packets == 0 assert flow.dst2src_fin_packets == 1 del statistical_streamer - print("{}\t: {}".format(".test_statistical".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_statistical".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_fingerprint_extraction(): - print("\n----------------------------------------------------------------------") - fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), - statistical_analysis=True, accounting_mode=1) + print( + "\n----------------------------------------------------------------------" + ) + fingerprint_streamer = NFStreamer( + source=os.path.join("tests", "pcaps", "facebook.pcap"), + statistical_analysis=True, + accounting_mode=1, + ) for flow in fingerprint_streamer: - assert flow.application_name == 'TLS.Facebook' - assert flow.application_category_name == 'SocialNetwork' + assert flow.application_name == "TLS.Facebook" + assert flow.application_category_name == "SocialNetwork" assert flow.application_is_guessed == 0 assert flow.application_confidence == 6 - requested_server_name = flow.requested_server_name in ['facebook.com', 'www.facebook.com'] + requested_server_name = flow.requested_server_name in [ + "facebook.com", + "www.facebook.com", + ] assert int(requested_server_name) == 1 - client_fingerprint = flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840', - '5c60e71f1b8cd40e4d40ed5b6d666e3f'] + client_fingerprint = flow.client_fingerprint in [ + "bfcc1a3891601edb4f137ab7ab25b840", + "5c60e71f1b8cd40e4d40ed5b6d666e3f", + ] assert int(client_fingerprint) == 1 - server_fingerprint = flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12', - '96681175a9547081bf3d417f1a572091'] + server_fingerprint = flow.server_fingerprint in [ + "2d1eb5817ece335c24904f516ad5da12", + "96681175a9547081bf3d417f1a572091", + ] assert int(server_fingerprint) == 1 del fingerprint_streamer - print("{}\t: {}".format(".test_fingerprint_extraction".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_fingerprint_extraction".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_export(): - print("\n----------------------------------------------------------------------") - df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_pandas() - df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, - n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"]) + print( + "\n----------------------------------------------------------------------" + ) + df = NFStreamer( + source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, + n_dissections=20, + ).to_pandas() + df_anon = NFStreamer( + source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, + n_dissections=20, + ).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"]) assert df_anon.shape[0] == df.shape[0] assert df_anon.shape[1] == df.shape[1] - assert df_anon['src_ip'].nunique() == df['src_ip'].nunique() - assert df_anon['dst_ip'].nunique() == df['dst_ip'].nunique() - total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_csv() + assert df_anon["src_ip"].nunique() == df["src_ip"].nunique() + assert df_anon["dst_ip"].nunique() == df["dst_ip"].nunique() + total_flows = NFStreamer( + source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, + n_dissections=20, + ).to_csv() df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) - total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_csv() + total_flows_anon = NFStreamer( + source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, + n_dissections=20, + ).to_csv() df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv")) assert total_flows == total_flows_anon @@ -448,24 +670,29 @@ class NFStreamTest(object): assert total_flows_anon == df_anon_from_csv.shape[0] assert total_flows == df.shape[0] assert total_flows_anon == df_anon.shape[0] - print("{}\t: {}".format(".test_export".ljust(60, ' '), colored('OK', 'green'))) + print("{}\t: {}".format(".test_export".ljust(60, " "), colored("OK", "green"))) @staticmethod def test_bpf(): - print("\n----------------------------------------------------------------------") - streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), - bpf_filter="src port 52066 or dst port 52066") + print( + "\n----------------------------------------------------------------------" + ) + streamer_test = NFStreamer( + source=os.path.join("tests", "pcaps", "facebook.pcap"), + bpf_filter="src port 52066 or dst port 52066", + ) last_id = 0 for flow in streamer_test: last_id = flow.id assert flow.src_port == 52066 assert last_id == 0 - print("{}\t: {}".format(".test_bpf".ljust(60, ' '), colored('OK', 'green'))) - + print("{}\t: {}".format(".test_bpf".ljust(60, " "), colored("OK", "green"))) @staticmethod def test_ndpi_integration(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) pcap_files = get_files_list(os.path.join("tests", "pcaps")) result_files = get_files_list(os.path.join("tests", "results")) failures = 0 @@ -473,30 +700,49 @@ class NFStreamTest(object): for file_idx, test_file in enumerate(pcap_files): test_case_name = os.path.basename(test_file) try: - test = NFStreamer(source=test_file, - n_dissections=20, - n_meters=1).to_pandas()[["id", - "bidirectional_packets", - "bidirectional_bytes", - "application_name", - "application_category_name", - "application_is_guessed", - "application_confidence"]].to_dict() + test = ( + NFStreamer(source=test_file, n_dissections=20, n_meters=1) + .to_pandas()[ + [ + "id", + "bidirectional_packets", + "bidirectional_bytes", + "application_name", + "application_category_name", + "application_is_guessed", + "application_confidence", + ] + ] + .to_dict() + ) true = pd.read_csv(result_files[file_idx]).to_dict() assert test == true - print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + test_case_name.ljust(60, " "), colored("OK", "green") + ) + ) except AssertionError: failures += 1 - print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('KO', 'red'))) + print( + "{}\t: {}".format( + test_case_name.ljust(60, " "), colored("KO", "red") + ) + ) # Everything must be OK assert failures == 0 @staticmethod def test_splt(): - print("\n----------------------------------------------------------------------") - splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5, - udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas() + print( + "\n----------------------------------------------------------------------" + ) + splt_df = NFStreamer( + source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + splt_analysis=5, + udps=SPLT(sequence_length=5, accounting_mode=0), + ).to_pandas() direction = json.loads(splt_df["udps.splt_direction"][0]) ps = json.loads(splt_df["udps.splt_ps"][0]) piat = json.loads(splt_df["udps.splt_piat_ms"][0]) @@ -509,13 +755,23 @@ class NFStreamTest(object): assert direction == ndirection assert ps == nps assert piat == npiat - print("{}\t: {}".format(".test_splt".ljust(60, ' '), colored('OK', 'green'))) + print("{}\t: {}".format(".test_splt".ljust(60, " "), colored("OK", "green"))) @staticmethod def test_dhcp(): - print("\n----------------------------------------------------------------------") - dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"), n_dissections=0, udps=DHCP()) \ - .to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) + print( + "\n----------------------------------------------------------------------" + ) + dhcp_df = ( + NFStreamer( + source=os.path.join("tests", "pcaps", "dhcp.pcap"), + n_dissections=0, + udps=DHCP(), + ) + .to_pandas() + .sort_values(by=["src_ip"]) + .reset_index(drop=True) + ) assert dhcp_df["udps.dhcp_msg_type"][0] == "MsgType.DISCOVER" assert dhcp_df["udps.dhcp_50"][1] == "192.168.0.10" assert dhcp_df["udps.dhcp_55"][1] == "1,3,6,42" @@ -523,27 +779,43 @@ class NFStreamTest(object): assert dhcp_df["udps.dhcp_msg_type"][1] == "MsgType.REQUEST" assert dhcp_df["udps.dhcp_oui"][1] == "00:0b:82" assert dhcp_df.shape[0] == 3 - print("{}\t: {}".format(".test_dhcp".ljust(60, ' '), colored('OK', 'green'))) + print("{}\t: {}".format(".test_dhcp".ljust(60, " "), colored("OK", "green"))) @staticmethod def test_mdns(): - print("\n----------------------------------------------------------------------") - mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"), n_dissections=0, udps=MDNS()) \ - .to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) - assert mdns_df["udps.mdns_ptr"][0] == "['skynet.local', " \ - "'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', " \ - "'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', " \ - "'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', " \ - "'_workstation._tcp.local']" - print("{}\t: {}".format(".test_mdns".ljust(60, ' '), colored('OK', 'green'))) + print( + "\n----------------------------------------------------------------------" + ) + mdns_df = ( + NFStreamer( + source=os.path.join("tests", "pcaps", "mdns.pcap"), + n_dissections=0, + udps=MDNS(), + ) + .to_pandas() + .sort_values(by=["src_ip"]) + .reset_index(drop=True) + ) + assert ( + mdns_df["udps.mdns_ptr"][0] == "['skynet.local', " + "'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', " + "'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', " + "'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', " + "'_workstation._tcp.local']" + ) + print("{}\t: {}".format(".test_mdns".ljust(60, " "), colored("OK", "green"))) @staticmethod def test_multi_files(): - print("\n----------------------------------------------------------------------") - multi_files = [os.path.join("tests", "pcaps", "one_flow_1_5.pcap"), - os.path.join("tests", "pcaps", "one_flow_6_10.pcap"), - os.path.join("tests", "pcaps", "one_flow_11_15.pcap"), - os.path.join("tests", "pcaps", "one_flow_16_19.pcap")] + print( + "\n----------------------------------------------------------------------" + ) + multi_files = [ + os.path.join("tests", "pcaps", "one_flow_1_5.pcap"), + os.path.join("tests", "pcaps", "one_flow_6_10.pcap"), + os.path.join("tests", "pcaps", "one_flow_11_15.pcap"), + os.path.join("tests", "pcaps", "one_flow_16_19.pcap"), + ] for flow in NFStreamer(source=multi_files): assert flow.id == 0 assert flow.expiration_id == 0 @@ -583,22 +855,33 @@ class NFStreamTest(object): assert flow.server_fingerprint == "2d1eb5817ece335c24904f516ad5da12" assert flow.user_agent == "" assert flow.content_type == "" - print("{}\t: {}".format(".test_multi_files".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format( + ".test_multi_files".ljust(60, " "), colored("OK", "green") + ) + ) @staticmethod def test_max_nflows(): - print("\n----------------------------------------------------------------------") + print( + "\n----------------------------------------------------------------------" + ) df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap")).to_pandas() assert df.shape[0] == 294 - df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=100).to_pandas() + df = NFStreamer( + source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=100 + ).to_pandas() assert df.shape[0] == 100 - df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=0).to_pandas() + df = NFStreamer( + source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=0 + ).to_pandas() assert df.shape[0] == 294 - print("{}\t: {}".format(".test_max_nflows".ljust(60, ' '), colored('OK', 'green'))) + print( + "{}\t: {}".format(".test_max_nflows".ljust(60, " "), colored("OK", "green")) + ) -if __name__ == '__main__': - +if __name__ == "__main__": # IMPORTANT: As NFStream input is network bytes, we rely on fuzzing techniques to ensure robustness. # Fuzzing testing is part of Google OSS-Fuzz project. # Github: https://github.com/google/oss-fuzz/tree/master/projects/nfstream