From 2ed89ad932abd2e185cb5fd7cd81aa2b520f3995 Mon Sep 17 00:00:00 2001 From: aouinizied Date: Fri, 15 Apr 2022 17:44:02 +0200 Subject: [PATCH] Fix QUIC detection on Windows. --- .github/workflows/build_test_aarch64.yml | 2 +- .github/workflows/build_test_armhf.yml | 2 +- .github/workflows/build_test_linux.yml | 4 +- .../workflows/build_test_publish_macos.yml | 4 +- .../workflows/build_test_publish_windows.yml | 4 +- dev_requirements.txt | 2 +- nfstream/engine/dependencies/nDPI | 2 +- nfstream/engine/engine_build.py | 4 + nfstream/engine/lib_engine.c | 28 +- tests.py | 733 ++++++++++-------- 10 files changed, 420 insertions(+), 365 deletions(-) diff --git a/.github/workflows/build_test_aarch64.yml b/.github/workflows/build_test_aarch64.yml index 9e7b264..835ecd3 100644 --- a/.github/workflows/build_test_aarch64.yml +++ b/.github/workflows/build_test_aarch64.yml @@ -43,7 +43,7 @@ jobs: - name: Test and generate coverage report run: | ${{ matrix.python-version }} -m pip install coverage - ${{ matrix.python-version }} -m coverage run -m pytest tests.py + ${{ matrix.python-version }} -m coverage run -m unittest tests.py ${{ matrix.python-version }} -m coverage combine - name: Upload coverage to Codecov diff --git a/.github/workflows/build_test_armhf.yml b/.github/workflows/build_test_armhf.yml index d0be169..412b12d 100644 --- a/.github/workflows/build_test_armhf.yml +++ b/.github/workflows/build_test_armhf.yml @@ -43,7 +43,7 @@ jobs: - name: Test and generate coverage report run: | ${{ matrix.python-version }} -m pip install coverage - ${{ matrix.python-version }} -m coverage run -m pytest tests.py + ${{ matrix.python-version }} -m coverage run -m unittest tests.py ${{ matrix.python-version }} -m coverage combine - name: Upload coverage to Codecov diff --git a/.github/workflows/build_test_linux.yml b/.github/workflows/build_test_linux.yml index 43e3535..2549ecc 100644 --- a/.github/workflows/build_test_linux.yml +++ b/.github/workflows/build_test_linux.yml @@ -71,14 +71,14 @@ jobs: - name: Test if: startsWith(matrix.os, 'ubuntu') && !startsWith(matrix.python-version, '3.10') run: | - python -m pytest tests.py + python tests.py - name: Test and generate coverage report # On version 3.10, we test and generate coverage report. if: startsWith(matrix.python-version, '3.10') run: | python -m pip install coverage - python -m coverage run -m pytest tests.py + python -m coverage run -m unittest tests.py python -m coverage combine - name: Upload coverage to Codecov diff --git a/.github/workflows/build_test_publish_macos.yml b/.github/workflows/build_test_publish_macos.yml index a8286df..bd69d3c 100644 --- a/.github/workflows/build_test_publish_macos.yml +++ b/.github/workflows/build_test_publish_macos.yml @@ -80,14 +80,14 @@ jobs: # On other versions then 3.9, we test only. (without coverage generation) if: startsWith(matrix.os, 'macos') && !startsWith(matrix.python-version, '3.9') run: | - python -m pytest tests.py + python tests.py - name: Test and generate coverage report # On version 3.9, we test and generate coverage report. if: startsWith(matrix.python-version, '3.9') run: | python -m pip install coverage - python -m coverage run -m pytest tests.py + python -m coverage run -m unittest tests.py python -m coverage combine - name: Upload coverage to Codecov diff --git a/.github/workflows/build_test_publish_windows.yml b/.github/workflows/build_test_publish_windows.yml index 63e3ec9..9db8123 100644 --- a/.github/workflows/build_test_publish_windows.yml +++ b/.github/workflows/build_test_publish_windows.yml @@ -75,14 +75,14 @@ jobs: # On other versions then 3.9, we test only. (without coverage generation) if: startsWith(matrix.os, 'windows') && !startsWith(matrix.python-version, '3.9') run: | - python -m pytest tests.py + python tests.py - name: Test and generate coverage report # On version 3.9, we test and generate coverage report. if: startsWith(matrix.python-version, '3.9') run: | python -m pip install coverage - python -m coverage run -m pytest tests.py + python -m coverage run -m unittest tests.py python -m coverage combine - name: Upload coverage to Codecov diff --git a/dev_requirements.txt b/dev_requirements.txt index ab9a100..f9e6a36 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -9,6 +9,6 @@ wheel>=0.37.0 twine>=3.4.2 setuptools>=57.4.0 codecov>=2.1.12 -pytest>=7.0.1 +termcolor>=1.1.0 tqdm>=4.63.0 auditwheel>=5.1.2 \ No newline at end of file diff --git a/nfstream/engine/dependencies/nDPI b/nfstream/engine/dependencies/nDPI index 939572c..fba75a3 160000 --- a/nfstream/engine/dependencies/nDPI +++ b/nfstream/engine/dependencies/nDPI @@ -1 +1 @@ -Subproject commit 939572c61ec27ea63b7e0f2d6a4d18f767e6195d +Subproject commit fba75a3cf5e06d2c9b8c7b1823cafd6e8ca636bc diff --git a/nfstream/engine/engine_build.py b/nfstream/engine/engine_build.py index c8c5192..7632a7a 100644 --- a/nfstream/engine/engine_build.py +++ b/nfstream/engine/engine_build.py @@ -60,6 +60,10 @@ if os.name != 'posix': # Windows EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR)) else: # best effort guess EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32")) + if os.path.exists(convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))): + EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR)) + else: # best effort guess + EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32")) with open(convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT))) as gcc_version_in: GCC_VERSION = gcc_version_in.read().split("\n")[0].split(")")[-1].strip() EXTRALINK_ARGS.append("{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format(root=ROOT, diff --git a/nfstream/engine/lib_engine.c b/nfstream/engine/lib_engine.c index 6e29946..e8306eb 100644 --- a/nfstream/engine/lib_engine.c +++ b/nfstream/engine/lib_engine.c @@ -1279,9 +1279,9 @@ static uint8_t flow_init_bidirectional(struct ndpi_detection_module_struct *diss flow->src_mac[3] = packet->src_mac[3]; flow->src_mac[4] = packet->src_mac[4]; flow->src_mac[5] = packet->src_mac[5]; - snprintf(flow->src_mac_str, sizeof(flow->src_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x", - packet->src_mac[0], packet->src_mac[1], packet->src_mac[2], - packet->src_mac[3], packet->src_mac[4], packet->src_mac[5]); + ndpi_snprintf(flow->src_mac_str, sizeof(flow->src_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x", + packet->src_mac[0], packet->src_mac[1], packet->src_mac[2], + packet->src_mac[3], packet->src_mac[4], packet->src_mac[5]); memcpy(flow->src_oui, flow->src_mac_str, 8); flow->src_port = packet->src_port; flow->dst_ip[0] = packet->dst_ip[0]; @@ -1292,9 +1292,9 @@ static uint8_t flow_init_bidirectional(struct ndpi_detection_module_struct *diss flow->dst_mac[3] = packet->dst_mac[3]; flow->dst_mac[4] = packet->dst_mac[4]; flow->dst_mac[5] = packet->dst_mac[5]; - snprintf(flow->dst_mac_str, sizeof(flow->dst_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x", - packet->dst_mac[0], packet->dst_mac[1], packet->dst_mac[2], - packet->dst_mac[3], packet->dst_mac[4], packet->dst_mac[5]); + ndpi_snprintf(flow->dst_mac_str, sizeof(flow->dst_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x", + packet->dst_mac[0], packet->dst_mac[1], packet->dst_mac[2], + packet->dst_mac[3], packet->dst_mac[4], packet->dst_mac[5]); memcpy(flow->dst_oui, flow->dst_mac_str, 8); flow->dst_port = packet->dst_port; flow->protocol = packet->protocol; @@ -1440,7 +1440,7 @@ pcap_t * capture_open(const uint8_t * pcap_file, int mode, char * child_error) { if (pcap_handle != NULL) { return pcap_handle; } else { - snprintf(child_error, 256, "%s", pcap_error_buffer); + ndpi_snprintf(child_error, 256, "%s", pcap_error_buffer); return NULL; } } @@ -1456,7 +1456,7 @@ int capture_set_fanout(pcap_t * pcap_handle, int mode, char * child_error, int g set_fanout = pcap_set_fanout_linux(pcap_handle, 1, 0x8000, (uint16_t) group_id); if (set_fanout != 0) { pcap_close(pcap_handle); - snprintf(child_error, 256, "%s", "Unable to setup fanout mode."); + ndpi_snprintf(child_error, 256, "%s", "Unable to setup fanout mode."); } #endif return set_fanout; @@ -1473,7 +1473,7 @@ int capture_activate(pcap_t * pcap_handle, int mode, char * child_error) { set_activate = pcap_activate(pcap_handle); if (set_activate != 0) { pcap_close(pcap_handle); - snprintf(child_error, 256, "%s", "Unable to activate source."); + ndpi_snprintf(child_error, 256, "%s", "Unable to activate source."); } return set_activate; } @@ -1489,7 +1489,7 @@ int capture_set_timeout(pcap_t * pcap_handle, int mode, char * child_error) { set_timeout = pcap_set_timeout(pcap_handle, 1000); if (set_timeout != 0) { pcap_close(pcap_handle); - snprintf(child_error, 256, "Unable to set buffer timeout."); + ndpi_snprintf(child_error, 256, "Unable to set buffer timeout."); } return set_timeout; } @@ -1505,7 +1505,7 @@ int capture_set_promisc(pcap_t * pcap_handle, int mode, char * child_error, int set_promisc = pcap_set_promisc(pcap_handle, promisc); if (set_promisc != 0) { pcap_close(pcap_handle); - snprintf(child_error, 256, "Unable to set promisc mode."); + ndpi_snprintf(child_error, 256, "Unable to set promisc mode."); } return set_promisc; } @@ -1521,7 +1521,7 @@ int capture_set_snaplen(pcap_t * pcap_handle, int mode, char * child_error, unsi set_snaplen = pcap_set_snaplen(pcap_handle, snaplen); if (set_snaplen != 0) { pcap_close(pcap_handle); - snprintf(child_error, 256, "Unable to set snaplen."); + ndpi_snprintf(child_error, 256, "Unable to set snaplen."); } return set_snaplen; } @@ -1534,12 +1534,12 @@ int capture_set_filter(pcap_t * pcap_handle, char * bpf_filter, char * child_err if (bpf_filter != NULL) { struct bpf_program fcode; if (pcap_compile(pcap_handle, &fcode, bpf_filter, 1, 0xFFFFFF00) < 0) { - snprintf(child_error, 256, "Unable to compile BPF filter."); + ndpi_snprintf(child_error, 256, "Unable to compile BPF filter."); pcap_close(pcap_handle); return 1; } else { if (pcap_setfilter(pcap_handle, &fcode) < 0) { - snprintf(child_error, 256, "Unable to compile BPF filter."); + ndpi_snprintf(child_error, 256, "Unable to compile BPF filter."); pcap_close(pcap_handle); return 1; } else { diff --git a/tests.py b/tests.py index 97d0a32..270fce9 100644 --- a/tests.py +++ b/tests.py @@ -13,12 +13,13 @@ If not, see . ------------------------------------------------------------------------------------------------------------------------ """ -from nfstream.plugins import SPLT, DHCP, FlowSlicer, MDNS -from nfstream import NFStreamer +import unittest import pandas as pd -import pytest import json import os +from nfstream import NFStreamer +from nfstream.plugins import SPLT, DHCP, FlowSlicer, MDNS +from termcolor import colored def get_files_list(path): @@ -31,367 +32,417 @@ def get_files_list(path): return files -def test_source_parameter(): - source = ["inexisting.pcap", "lo", 11] - for x in source: - with pytest.raises(ValueError): - NFStreamer(source=x).to_pandas() +class TestMethods(unittest.TestCase): + def test_source_parameter(self): + print("\n----------------------------------------------------------------------") + source = ["inexisting.pcap", "lo", 11] + for x in source: + with self.assertRaises(ValueError): + NFStreamer(source=x).to_pandas() + print("{}\t: {}".format(".Test source parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_decode_tunnels_parameter(): - decode_tunnels = [33, "True"] - for x in decode_tunnels: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - decode_tunnels=x) + def test_decode_tunnels_parameter(self): + print("\n----------------------------------------------------------------------") + decode_tunnels = [33, "True"] + for x in decode_tunnels: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + decode_tunnels=x) + print("{}\t: {}".format(".Test decode_tunnels parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_bpf_filter_parameter(self): + print("\n----------------------------------------------------------------------") + bpf_filter = ["my filter", 11] + for x in bpf_filter: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + bpf_filter=x).to_pandas() + print("{}\t: {}".format(".Test bpf_filter parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_bpf_filter_parameter(): - bpf_filter = ["my filter", 11] - for x in bpf_filter: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), bpf_filter=x).to_pandas() + def test_promiscuous_mode_parameter(self): + print("\n----------------------------------------------------------------------") + promiscuous_mode = ["yes", 89] + for x in promiscuous_mode: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + promiscuous_mode=x) + print("{}\t: {}".format(".Test promiscuous_mode parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_snapshot_length_parameter(self): + print("\n----------------------------------------------------------------------") + snapshot_length = ["largest", -1] + for x in snapshot_length: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + snapshot_length=x) + print("{}\t: {}".format(".Test snapshot_length parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_promiscuous_mode_parameter(): - promiscuous_mode = ["yes", 89] - for x in promiscuous_mode: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), promiscuous_mode=x) + def test_idle_timeout_parameter(self): + print("\n----------------------------------------------------------------------") + idle_timeout = [-1, "idle"] + for x in idle_timeout: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + idle_timeout=x) + print("{}\t: {}".format(".Test idle_timeout parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_active_timeout_parameter(self): + print("\n----------------------------------------------------------------------") + active_timeout = [-1, "active"] + for x in active_timeout: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + active_timeout=x) + print("{}\t: {}".format(".Test active_timeout parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_snapshot_length_parameter(): - snapshot_length = ["largest", -1] - for x in snapshot_length: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), snapshot_length=x) + def test_accounting_mode_parameter(self): + print("\n----------------------------------------------------------------------") + accounting_mode = [-1, 5, 'ip'] + for x in accounting_mode: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + accounting_mode=x) + print("{}\t: {}".format(".Test accounting_mode parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_udps_parameter(self): + print("\n----------------------------------------------------------------------") + udps = [lambda y: y+1, "NFPlugin"] + for x in udps: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + udps=x) + print("{}\t: {}".format(".Test udps parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_idle_timeout_parameter(): - idle_timeout = [-1, "idle"] - for x in idle_timeout: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=x) + def test_n_dissections_parameter(self): + print("\n----------------------------------------------------------------------") + n_dissections = ["yes", -1, 256] + for x in n_dissections: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + n_dissections=x) + print("{}\t: {}".format(".Test n_dissections parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_system_visibility_mode_parameter(self): + print("\n----------------------------------------------------------------------") + system_visibility_mode = ["yes", -1, 3] + for x in system_visibility_mode: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + system_visibility_mode=x) + print("{}\t: {}".format(".Test system_visibility_mode parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_active_timeout_parameter(): - active_timeout = [-1, "active"] - for x in active_timeout: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=x) + def test_system_visibility_extension_port(self): + print("\n----------------------------------------------------------------------") + system_visibility_mode = ["yes", -1, 88888] + for x in system_visibility_mode: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + system_visibility_extension_port=x) + print("{}\t: {}".format(".Test system_visibility_extension_port parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_system_visibility_poll_ms(self): + print("\n----------------------------------------------------------------------") + system_visibility_mode = ["yes", -1] + for x in system_visibility_mode: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + system_visibility_poll_ms=x) + print("{}\t: {}".format(".Test system_visibility_poll_ms parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_accounting_mode_parameter(): - print("\n----------------------------------------------------------------------") - accounting_mode = [-1, 5, 'ip'] - for x in accounting_mode: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), accounting_mode=x) + def test_statistical_analysis_parameter(self): + print("\n----------------------------------------------------------------------") + statistical_analysis = ["yes", 89] + for x in statistical_analysis: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + statistical_analysis=x) + print("{}\t: {}".format(".Test statistical_analysis parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_splt_analysis_parameter(self): + print("\n----------------------------------------------------------------------") + splt_analysis = [-1, 256, "yes"] + for x in splt_analysis: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + splt_analysis=x) + print("{}\t: {}".format(".Test splt_analysis parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_udps_parameter(): - udps = [lambda y: y+1, "NFPlugin"] - for x in udps: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x) + def test_n_meters_parameter(self): + print("\n----------------------------------------------------------------------") + n_meters = ["yes", -1] + for x in n_meters: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + n_meters=x) + print("{}\t: {}".format(".Test n_meters parameter".ljust(60, ' '), colored('OK', 'green'))) + def test_performance_report_parameter(self): + print("\n----------------------------------------------------------------------") + performance_report = ["yes", -1] + for x in performance_report: + with self.assertRaises(ValueError): + NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + performance_report=x) + print("{}\t: {}".format(".Test performance_report parameter".ljust(60, ' '), colored('OK', 'green'))) -def test_n_dissections_parameter(): - n_dissections = ["yes", -1, 256] - for x in n_dissections: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_dissections=x) + def test_expiration_management(self): + print("\n----------------------------------------------------------------------") + # Idle expiration + streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + idle_timeout=0) + last_id = 0 + for flow in streamer_expiration: + last_id = flow.id + self.assertEqual(last_id, 27) + # Active expiration + streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + active_timeout=0) + last_id = 0 + for flow in streamer_expiration: + last_id = flow.id + self.assertEqual(last_id, 27) + # Custom expiration + streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=1)) + last_id = 0 + for flow in streamer_expiration: + last_id = flow.id + self.assertEqual(last_id, 27) -def test_system_visibility_mode_parameter(): - system_visibility_mode = ["yes", -1, 3] - for x in system_visibility_mode: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_mode=x) + streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=4)) + last_id = 0 + for flow in streamer_expiration: + last_id = flow.id + self.assertEqual(last_id, 6) + print("{}\t: {}".format(".Test expiration management".ljust(60, ' '), colored('OK', 'green'))) -def test_system_visibility_extension_port(): - system_visibility_mode = ["yes", -1, 88888] - for x in system_visibility_mode: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_extension_port=x) + def test_tunnel_decoding(self): + print("\n----------------------------------------------------------------------") + decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"), + statistical_analysis=True, decode_tunnels=True) + for flow in decode_streamer: + self.assertEqual(flow.tunnel_id, 1) + decode_streamer.decode_tunnels = False + for flow in decode_streamer: + self.assertRaises(AttributeError, getattr, flow, "tunnel_id") + del decode_streamer + print("{}\t: {}".format(".Test tunnels decoding".ljust(60, ' '), colored('OK', 'green'))) - -def test_system_visibility_poll_ms(): - system_visibility_mode = ["yes", -1] - for x in system_visibility_mode: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_poll_ms=x) - - -def test_statistical_analysis_parameter(): - statistical_analysis = ["yes", 89] - for x in statistical_analysis: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), statistical_analysis=x) - - -def test_splt_analysis_parameter(): - splt_analysis = [-1, 256, "yes"] - for x in splt_analysis: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=x) - - -def test_n_meters_parameter(): - n_meters = ["yes", -1] - for x in n_meters: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - n_meters=x) - - -def test_performance_report_parameter(): - performance_report = ["yes", -1] - for x in performance_report: - with pytest.raises(ValueError): - NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), performance_report=x) - - -def test_expiration_management(): - # Idle expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0) - last_id = 0 - for flow in streamer_expiration: - last_id = flow.id - assert last_id == 27 - # Active expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0) - last_id = 0 - for flow in streamer_expiration: - last_id = flow.id - assert last_id == 27 - # Custom expiration - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=1)) - last_id = 0 - for flow in streamer_expiration: - last_id = flow.id - assert last_id == 27 - streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=4)) - last_id = 0 - for flow in streamer_expiration: - last_id = flow.id - assert last_id == 6 - - -def test_tunnel_decoding(): - decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"), - statistical_analysis=True, decode_tunnels=True) - for flow in decode_streamer: - assert flow.tunnel_id == 1 - decode_streamer.decode_tunnels = False - for flow in decode_streamer: - with pytest.raises(AttributeError): - getattr(flow, "tunnel_id") - del decode_streamer - - -def test_statistical(): - statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), - statistical_analysis=True, accounting_mode=1) - for flow in statistical_streamer: - assert flow.id == 0 - assert flow.expiration_id == 0 - assert flow.src_ip == '172.31.3.224' - assert flow.src_mac == '80:c6:ca:00:9e:9f' - assert flow.src_oui == '80:c6:ca' - assert flow.src_port == 42835 - assert flow.dst_ip == '216.58.212.100' - assert flow.dst_mac == '00:0e:8e:4d:b4:a8' - assert flow.dst_oui == '00:0e:8e' - assert flow.dst_port == 443 - assert flow.protocol == 6 - assert flow.ip_version == 4 - assert flow.vlan_id == 0 - assert flow.tunnel_id == 0 - assert flow.bidirectional_first_seen_ms == 1434443394683 - assert flow.bidirectional_last_seen_ms == 1434443401353 - assert flow.bidirectional_duration_ms == 6670 - assert flow.bidirectional_packets == 28 - assert flow.bidirectional_bytes == 8696 - assert flow.src2dst_first_seen_ms == 1434443394683 - assert flow.src2dst_last_seen_ms == 1434443401353 - assert flow.src2dst_duration_ms == 6670 - assert flow.src2dst_packets == 16 - assert flow.src2dst_bytes == 1288 - assert flow.dst2src_first_seen_ms == 1434443394717 - assert flow.dst2src_last_seen_ms == 1434443401308 - assert flow.dst2src_duration_ms == 6591 - assert flow.dst2src_packets == 12 - assert flow.dst2src_bytes == 7408 - assert flow.bidirectional_min_ps == 40 - assert flow.bidirectional_mean_ps == pytest.approx(310.571, 0.001) - assert flow.bidirectional_stddev_ps == pytest.approx(500.546, 0.001) - assert flow.bidirectional_max_ps == 1470 - assert flow.src2dst_min_ps == 40 - assert flow.src2dst_mean_ps == pytest.approx(80.499, 0.001) - assert flow.src2dst_stddev_ps == pytest.approx(89.555, 0.001) - assert flow.src2dst_max_ps == 354 - assert flow.dst2src_min_ps == 40 - assert flow.dst2src_mean_ps == pytest.approx(617.333, 0.001) - assert flow.dst2src_stddev_ps == pytest.approx(651.452, 0.001) - assert flow.dst2src_max_ps == 1470 - assert flow.bidirectional_min_piat_ms == 0 - assert flow.bidirectional_mean_piat_ms == pytest.approx(247.037, 0.001) - assert flow.bidirectional_stddev_piat_ms == pytest.approx(324.045, 0.001) - assert flow.bidirectional_max_piat_ms == 995 - assert flow.src2dst_min_piat_ms == 76 - assert flow.src2dst_mean_piat_ms == pytest.approx(444.666, 0.001) - assert flow.src2dst_stddev_piat_ms == pytest.approx(397.603, 0.001) - assert flow.src2dst_max_piat_ms == 1185 - assert flow.dst2src_min_piat_ms == 66 - assert flow.dst2src_mean_piat_ms == pytest.approx(599.181, 0.001) - assert flow.dst2src_stddev_piat_ms == pytest.approx(384.784, 0.001) - assert flow.dst2src_max_piat_ms == 1213 - assert flow.bidirectional_syn_packets == 2 - assert flow.bidirectional_cwr_packets == 0 - assert flow.bidirectional_ece_packets == 0 - assert flow.bidirectional_urg_packets == 0 - assert flow.bidirectional_ack_packets == 27 - assert flow.bidirectional_psh_packets == 8 - assert flow.bidirectional_rst_packets == 0 - assert flow.bidirectional_fin_packets == 2 - assert flow.src2dst_syn_packets == 1 - assert flow.src2dst_cwr_packets == 0 - assert flow.src2dst_ece_packets == 0 - assert flow.src2dst_urg_packets == 0 - assert flow.src2dst_ack_packets == 15 - assert flow.src2dst_psh_packets == 4 - assert flow.src2dst_rst_packets == 0 - assert flow.src2dst_fin_packets == 1 - assert flow.dst2src_syn_packets == 1 - assert flow.dst2src_cwr_packets == 0 - assert flow.dst2src_ece_packets == 0 - assert flow.dst2src_urg_packets == 0 - assert flow.dst2src_ack_packets == 12 - assert flow.dst2src_psh_packets == 4 - assert flow.dst2src_rst_packets == 0 - assert flow.dst2src_fin_packets == 1 + def test_statistical(self): + print("\n----------------------------------------------------------------------") + statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), + statistical_analysis=True, accounting_mode=1) + for flow in statistical_streamer: + self.assertEqual(flow.id, 0) + self.assertEqual(flow.expiration_id, 0) + self.assertEqual(flow.src_ip, '172.31.3.224') + self.assertEqual(flow.src_mac, '80:c6:ca:00:9e:9f') + self.assertEqual(flow.src_oui, '80:c6:ca') + self.assertEqual(flow.src_port, 42835) + self.assertEqual(flow.dst_ip, '216.58.212.100') + self.assertEqual(flow.dst_mac, '00:0e:8e:4d:b4:a8') + self.assertEqual(flow.dst_oui, '00:0e:8e') + self.assertEqual(flow.dst_port, 443) + self.assertEqual(flow.protocol, 6) + self.assertEqual(flow.ip_version, 4) + self.assertEqual(flow.vlan_id, 0) + self.assertEqual(flow.tunnel_id, 0) + self.assertEqual(flow.bidirectional_first_seen_ms, 1434443394683) + self.assertEqual(flow.bidirectional_last_seen_ms, 1434443401353) + self.assertEqual(flow.bidirectional_duration_ms, 6670) + self.assertEqual(flow.bidirectional_packets, 28) + self.assertEqual(flow.bidirectional_bytes, 8696) + self.assertEqual(flow.src2dst_first_seen_ms, 1434443394683) + self.assertEqual(flow.src2dst_last_seen_ms, 1434443401353) + self.assertEqual(flow.src2dst_duration_ms, 6670) + self.assertEqual(flow.src2dst_packets, 16) + self.assertEqual(flow.src2dst_bytes, 1288) + self.assertEqual(flow.dst2src_first_seen_ms, 1434443394717) + self.assertEqual(flow.dst2src_last_seen_ms, 1434443401308) + self.assertEqual(flow.dst2src_duration_ms, 6591) + self.assertEqual(flow.dst2src_packets, 12) + self.assertEqual(flow.dst2src_bytes, 7408) + self.assertEqual(flow.bidirectional_min_ps, 40) + self.assertAlmostEqual(flow.bidirectional_mean_ps, 310.57142857142856) + self.assertAlmostEqual(flow.bidirectional_stddev_ps, 500.54617788019937) + self.assertEqual(flow.bidirectional_max_ps, 1470) + self.assertEqual(flow.src2dst_min_ps, 40) + self.assertAlmostEqual(flow.src2dst_mean_ps, 80.49999999999999) + self.assertAlmostEqual(flow.src2dst_stddev_ps, 89.55519713189922) + self.assertEqual(flow.src2dst_max_ps, 354) + self.assertEqual(flow.dst2src_min_ps, 40) + self.assertAlmostEqual(flow.dst2src_mean_ps, 617.3333333333334) + self.assertAlmostEqual(flow.dst2src_stddev_ps, 651.4524099458397) + self.assertEqual(flow.dst2src_max_ps, 1470) + self.assertEqual(flow.bidirectional_min_piat_ms, 0) + self.assertAlmostEqual(flow.bidirectional_mean_piat_ms, 247.037037037037) + self.assertAlmostEqual(flow.bidirectional_stddev_piat_ms, 324.04599406227237) + self.assertEqual(flow.bidirectional_max_piat_ms, 995) + self.assertEqual(flow.src2dst_min_piat_ms, 76) + self.assertAlmostEqual(flow.src2dst_mean_piat_ms, 444.6666666666667) + self.assertAlmostEqual(flow.src2dst_stddev_piat_ms, 397.60329595261277) + self.assertEqual(flow.src2dst_max_piat_ms, 1185) + self.assertEqual(flow.dst2src_min_piat_ms, 66) + self.assertAlmostEqual(flow.dst2src_mean_piat_ms, 599.1818181818182) + self.assertAlmostEqual(flow.dst2src_stddev_piat_ms, 384.78456782511904) + self.assertEqual(flow.dst2src_max_piat_ms, 1213) + self.assertEqual(flow.bidirectional_syn_packets, 2) + self.assertEqual(flow.bidirectional_cwr_packets, 0) + self.assertEqual(flow.bidirectional_ece_packets, 0) + self.assertEqual(flow.bidirectional_urg_packets, 0) + self.assertEqual(flow.bidirectional_ack_packets, 27) + self.assertEqual(flow.bidirectional_psh_packets, 8) + self.assertEqual(flow.bidirectional_rst_packets, 0) + self.assertEqual(flow.bidirectional_fin_packets, 2) + self.assertEqual(flow.src2dst_syn_packets, 1) + self.assertEqual(flow.src2dst_cwr_packets, 0) + self.assertEqual(flow.src2dst_ece_packets, 0) + self.assertEqual(flow.src2dst_urg_packets, 0) + self.assertEqual(flow.src2dst_ack_packets, 15) + self.assertEqual(flow.src2dst_psh_packets, 4) + self.assertEqual(flow.src2dst_rst_packets, 0) + self.assertEqual(flow.src2dst_fin_packets, 1) + self.assertEqual(flow.dst2src_syn_packets, 1) + self.assertEqual(flow.dst2src_cwr_packets, 0) + self.assertEqual(flow.dst2src_ece_packets, 0) + self.assertEqual(flow.dst2src_urg_packets, 0) + self.assertEqual(flow.dst2src_ack_packets, 12) + self.assertEqual(flow.dst2src_psh_packets, 4) + self.assertEqual(flow.dst2src_rst_packets, 0) + self.assertEqual(flow.dst2src_fin_packets, 1) del statistical_streamer + print("{}\t: {}".format(".Test statistical extraction".ljust(60, ' '), colored('OK', 'green'))) + + def test_fingerprint_extraction(self): + print("\n----------------------------------------------------------------------") + fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), + statistical_analysis=True, accounting_mode=1) + for flow in fingerprint_streamer: + self.assertEqual(flow.application_name, 'TLS.Facebook') + self.assertEqual(flow.application_category_name, 'SocialNetwork') + self.assertEqual(flow.application_is_guessed, 0) + self.assertEqual(flow.application_confidence, 4) + self.assertTrue(flow.requested_server_name in ['facebook.com', 'www.facebook.com']) + self.assertTrue(flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840', + '5c60e71f1b8cd40e4d40ed5b6d666e3f']) + self.assertTrue(flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12', + '96681175a9547081bf3d417f1a572091']) + del fingerprint_streamer + print("{}\t: {}".format(".Test fingerprint extraction".ljust(60, ' '), colored('OK', 'green'))) + + def test_export(self): + print("\n----------------------------------------------------------------------") + df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, n_dissections=20).to_pandas() + df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, + n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"]) + self.assertEqual(df_anon.shape[0], df.shape[0]) + self.assertEqual(df_anon.shape[1], df.shape[1]) + self.assertEqual(df_anon['src_ip'].nunique(), df['src_ip'].nunique()) + self.assertEqual(df_anon['dst_ip'].nunique(), df['dst_ip'].nunique()) + + total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, n_dissections=20).to_csv() + df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) + total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), + statistical_analysis=True, n_dissections=20).to_csv() + df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) + os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv")) + self.assertEqual(total_flows, total_flows_anon) + self.assertEqual(total_flows, df_from_csv.shape[0]) + self.assertEqual(total_flows_anon, df_anon_from_csv.shape[0]) + self.assertEqual(total_flows, df.shape[0]) + self.assertEqual(total_flows_anon, df_anon.shape[0]) + print("{}\t: {}".format(".Test export interfaces".ljust(60, ' '), colored('OK', 'green'))) + + def test_bpf(self): + print("\n----------------------------------------------------------------------") + streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), + bpf_filter="src port 52066 or dst port 52066") + last_id = 0 + for flow in streamer_test: + last_id = flow.id + self.assertEqual(flow.src_port, 52066) + self.assertEqual(last_id, 0) + del streamer_test + print("{}\t: {}".format(".Test BPF".ljust(60, ' '), colored('OK', 'green'))) + + def test_ndpi_integration(self): + pcap_files = get_files_list(os.path.join("tests", "pcaps")) + result_files = get_files_list(os.path.join("tests", "results")) + failures = 0 + print("\n----------------------------------------------------------------------") + print(".Test nDPI integration on {} applications:".format(len(pcap_files))) + for file_idx, test_file in enumerate(pcap_files): + test_case_name = os.path.basename(test_file) + try: + test = NFStreamer(source=test_file, + n_dissections=20, + n_meters=1).to_pandas()[["id", + "bidirectional_packets", + "bidirectional_bytes", + "application_name", + "application_category_name", + "application_is_guessed", + "application_confidence"]].to_dict() + + true = pd.read_csv(result_files[file_idx]).to_dict() + self.assertEqual(test, true) + print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('OK', 'green'))) + except AssertionError: + failures += 1 + print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('KO', 'red'))) + self.assertEqual(failures, 0) + + def test_splt(self): + print("\n----------------------------------------------------------------------") + splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5, + udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas() + direction = json.loads(splt_df["udps.splt_direction"][0]) + ps = json.loads(splt_df["udps.splt_ps"][0]) + piat = json.loads(splt_df["udps.splt_piat_ms"][0]) + ndirection = json.loads(splt_df["splt_direction"][0]) + nps = json.loads(splt_df["splt_ps"][0]) + npiat = json.loads(splt_df["splt_piat_ms"][0]) + self.assertEqual(direction, [0, 1, 0, 0, 1]) + self.assertEqual(ps, [58, 60, 54, 180, 60]) + self.assertEqual(piat, [0, 34, 134, 144, 35]) + self.assertEqual(direction, ndirection) + self.assertEqual(ps, nps) + self.assertEqual(piat, npiat) + print("{}\t: {}".format(".Test SPLT analysis".ljust(60, ' '), colored('OK', 'green'))) + + def test_dhcp(self): + print("\n----------------------------------------------------------------------") + dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"), + n_dissections=0, + udps=DHCP() + ).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) + self.assertEqual(dhcp_df["udps.dhcp_msg_type"][0], "MsgType.DISCOVER") + self.assertEqual(dhcp_df["udps.dhcp_50"][1], "192.168.0.10") + self.assertEqual(dhcp_df["udps.dhcp_55"][1], "1,3,6,42") + self.assertEqual(dhcp_df["udps.dhcp_options"][1], "[53, 61, 50, 54, 55]") + self.assertEqual(dhcp_df["udps.dhcp_msg_type"][1], "MsgType.REQUEST") + self.assertEqual(dhcp_df["udps.dhcp_oui"][1], "00:0b:82") + self.assertEqual(dhcp_df.shape[0], 3) + print("{}\t: {}".format(".Test DHCP plugin".ljust(60, ' '), colored('OK', 'green'))) + + def test_mdns(self): + print("\n----------------------------------------------------------------------") + mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"), + n_dissections=0, + udps=MDNS() + ).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) + self.assertEqual(mdns_df["udps.mdns_ptr"][0], "['skynet.local', " + "'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', " + "'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', " + "'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', " + "'_workstation._tcp.local']") + print("{}\t: {}".format(".Test MDNS plugin".ljust(60, ' '), colored('OK', 'green'))) -def test_fingerprint_extraction(): - fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), - statistical_analysis=True, accounting_mode=1) - for flow in fingerprint_streamer: - assert flow.application_name == 'TLS.Facebook' - assert flow.application_category_name == 'SocialNetwork' - assert flow.application_is_guessed == 0 - assert flow.application_confidence == 4 - requested_server_name = flow.requested_server_name in ['facebook.com', 'www.facebook.com'] - assert int(requested_server_name) == 1 - client_fingerprint = flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840', - '5c60e71f1b8cd40e4d40ed5b6d666e3f'] - assert int(client_fingerprint) == 1 - server_fingerprint = flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12', - '96681175a9547081bf3d417f1a572091'] - assert int(server_fingerprint) == 1 - del fingerprint_streamer - - -def test_export(): - df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_pandas() - df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, - n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"]) - assert df_anon.shape[0] == df.shape[0] - assert df_anon.shape[1] == df.shape[1] - assert df_anon['src_ip'].nunique() == df['src_ip'].nunique() - assert df_anon['dst_ip'].nunique() == df['dst_ip'].nunique() - total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_csv() - df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) - total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"), - statistical_analysis=True, n_dissections=20).to_csv() - df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv")) - os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv")) - assert total_flows == total_flows_anon - assert total_flows == df_from_csv.shape[0] - assert total_flows_anon == df_anon_from_csv.shape[0] - assert total_flows == df.shape[0] - assert total_flows_anon == df_anon.shape[0] - - -def test_bpf(): - streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"), - bpf_filter="src port 52066 or dst port 52066") - last_id = 0 - for flow in streamer_test: - last_id = flow.id - assert flow.src_port == 52066 - assert last_id == 0 - - -def test_ndpi_integration(): - # For Windows platform, we have result mismatch on a subset of files - # We ignore these errors as Windows support is WIP in nDPI project: - # MR: https://github.com/ntop/nDPI/pull/1491 - pcap_files = get_files_list(os.path.join("tests", "pcaps")) - result_files = get_files_list(os.path.join("tests", "results")) - failures = 0 - for file_idx, test_file in enumerate(pcap_files): - try: - test = {} - test_df = NFStreamer(source=test_file, n_dissections=20, n_meters=1).to_pandas() - if test_df is not None: - test = test_df[["id", - "bidirectional_packets", - "bidirectional_bytes", - "application_name", - "application_category_name", - "application_is_guessed", - "application_confidence"]].to_dict() - - true = pd.read_csv(result_files[file_idx]).to_dict() - assert test == true - except AssertionError: - failures += 1 - if os.name != 'posix': # FIXME once nDPI Windows support is finalized. - assert failures == 8 - else: # Everything must be OK - assert failures == 0 - - -def test_splt(): - splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5, - udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas() - direction = json.loads(splt_df["udps.splt_direction"][0]) - ps = json.loads(splt_df["udps.splt_ps"][0]) - piat = json.loads(splt_df["udps.splt_piat_ms"][0]) - ndirection = json.loads(splt_df["splt_direction"][0]) - nps = json.loads(splt_df["splt_ps"][0]) - npiat = json.loads(splt_df["splt_piat_ms"][0]) - assert direction == [0, 1, 0, 0, 1] - assert ps == [58, 60, 54, 180, 60] - assert piat == [0, 34, 134, 144, 35] - assert direction == ndirection - assert ps == nps - assert piat == npiat - - -def test_dhcp(): - dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"), n_dissections=0, udps=DHCP())\ - .to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) - assert dhcp_df["udps.dhcp_msg_type"][0] == "MsgType.DISCOVER" - assert dhcp_df["udps.dhcp_50"][1] == "192.168.0.10" - assert dhcp_df["udps.dhcp_55"][1] == "1,3,6,42" - assert dhcp_df["udps.dhcp_options"][1] == "[53, 61, 50, 54, 55]" - assert dhcp_df["udps.dhcp_msg_type"][1] == "MsgType.REQUEST" - assert dhcp_df["udps.dhcp_oui"][1] == "00:0b:82" - assert dhcp_df.shape[0] == 3 - - -def test_mdns(): - mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"), n_dissections=0, udps=MDNS())\ - .to_pandas().sort_values(by=['src_ip']).reset_index(drop=True) - assert mdns_df["udps.mdns_ptr"][0] == "['skynet.local', "\ - "'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', "\ - "'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', "\ - "'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', "\ - "'_workstation._tcp.local']" +if __name__ == '__main__': + unittest.main() \ No newline at end of file