Fix QUIC detection on Windows.

This commit is contained in:
aouinizied 2022-04-15 17:44:02 +02:00
parent 8c118aedd5
commit 2ed89ad932
10 changed files with 420 additions and 365 deletions

View file

@ -43,7 +43,7 @@ jobs:
- name: Test and generate coverage report
run: |
${{ matrix.python-version }} -m pip install coverage
${{ matrix.python-version }} -m coverage run -m pytest tests.py
${{ matrix.python-version }} -m coverage run -m unittest tests.py
${{ matrix.python-version }} -m coverage combine
- name: Upload coverage to Codecov

View file

@ -43,7 +43,7 @@ jobs:
- name: Test and generate coverage report
run: |
${{ matrix.python-version }} -m pip install coverage
${{ matrix.python-version }} -m coverage run -m pytest tests.py
${{ matrix.python-version }} -m coverage run -m unittest tests.py
${{ matrix.python-version }} -m coverage combine
- name: Upload coverage to Codecov

View file

@ -71,14 +71,14 @@ jobs:
- name: Test
if: startsWith(matrix.os, 'ubuntu') && !startsWith(matrix.python-version, '3.10')
run: |
python -m pytest tests.py
python tests.py
- name: Test and generate coverage report
# On version 3.10, we test and generate coverage report.
if: startsWith(matrix.python-version, '3.10')
run: |
python -m pip install coverage
python -m coverage run -m pytest tests.py
python -m coverage run -m unittest tests.py
python -m coverage combine
- name: Upload coverage to Codecov

View file

@ -80,14 +80,14 @@ jobs:
# On other versions then 3.9, we test only. (without coverage generation)
if: startsWith(matrix.os, 'macos') && !startsWith(matrix.python-version, '3.9')
run: |
python -m pytest tests.py
python tests.py
- name: Test and generate coverage report
# On version 3.9, we test and generate coverage report.
if: startsWith(matrix.python-version, '3.9')
run: |
python -m pip install coverage
python -m coverage run -m pytest tests.py
python -m coverage run -m unittest tests.py
python -m coverage combine
- name: Upload coverage to Codecov

View file

@ -75,14 +75,14 @@ jobs:
# On other versions then 3.9, we test only. (without coverage generation)
if: startsWith(matrix.os, 'windows') && !startsWith(matrix.python-version, '3.9')
run: |
python -m pytest tests.py
python tests.py
- name: Test and generate coverage report
# On version 3.9, we test and generate coverage report.
if: startsWith(matrix.python-version, '3.9')
run: |
python -m pip install coverage
python -m coverage run -m pytest tests.py
python -m coverage run -m unittest tests.py
python -m coverage combine
- name: Upload coverage to Codecov

View file

@ -9,6 +9,6 @@ wheel>=0.37.0
twine>=3.4.2
setuptools>=57.4.0
codecov>=2.1.12
pytest>=7.0.1
termcolor>=1.1.0
tqdm>=4.63.0
auditwheel>=5.1.2

@ -1 +1 @@
Subproject commit 939572c61ec27ea63b7e0f2d6a4d18f767e6195d
Subproject commit fba75a3cf5e06d2c9b8c7b1823cafd6e8ca636bc

View file

@ -60,6 +60,10 @@ if os.name != 'posix': # Windows
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR))
else: # best effort guess
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32"))
if os.path.exists(convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))):
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))
else: # best effort guess
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32"))
with open(convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT))) as gcc_version_in:
GCC_VERSION = gcc_version_in.read().split("\n")[0].split(")")[-1].strip()
EXTRALINK_ARGS.append("{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format(root=ROOT,

View file

@ -1279,9 +1279,9 @@ static uint8_t flow_init_bidirectional(struct ndpi_detection_module_struct *diss
flow->src_mac[3] = packet->src_mac[3];
flow->src_mac[4] = packet->src_mac[4];
flow->src_mac[5] = packet->src_mac[5];
snprintf(flow->src_mac_str, sizeof(flow->src_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x",
packet->src_mac[0], packet->src_mac[1], packet->src_mac[2],
packet->src_mac[3], packet->src_mac[4], packet->src_mac[5]);
ndpi_snprintf(flow->src_mac_str, sizeof(flow->src_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x",
packet->src_mac[0], packet->src_mac[1], packet->src_mac[2],
packet->src_mac[3], packet->src_mac[4], packet->src_mac[5]);
memcpy(flow->src_oui, flow->src_mac_str, 8);
flow->src_port = packet->src_port;
flow->dst_ip[0] = packet->dst_ip[0];
@ -1292,9 +1292,9 @@ static uint8_t flow_init_bidirectional(struct ndpi_detection_module_struct *diss
flow->dst_mac[3] = packet->dst_mac[3];
flow->dst_mac[4] = packet->dst_mac[4];
flow->dst_mac[5] = packet->dst_mac[5];
snprintf(flow->dst_mac_str, sizeof(flow->dst_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x",
packet->dst_mac[0], packet->dst_mac[1], packet->dst_mac[2],
packet->dst_mac[3], packet->dst_mac[4], packet->dst_mac[5]);
ndpi_snprintf(flow->dst_mac_str, sizeof(flow->dst_mac_str), "%02x:%02x:%02x:%02x:%02x:%02x",
packet->dst_mac[0], packet->dst_mac[1], packet->dst_mac[2],
packet->dst_mac[3], packet->dst_mac[4], packet->dst_mac[5]);
memcpy(flow->dst_oui, flow->dst_mac_str, 8);
flow->dst_port = packet->dst_port;
flow->protocol = packet->protocol;
@ -1440,7 +1440,7 @@ pcap_t * capture_open(const uint8_t * pcap_file, int mode, char * child_error) {
if (pcap_handle != NULL) {
return pcap_handle;
} else {
snprintf(child_error, 256, "%s", pcap_error_buffer);
ndpi_snprintf(child_error, 256, "%s", pcap_error_buffer);
return NULL;
}
}
@ -1456,7 +1456,7 @@ int capture_set_fanout(pcap_t * pcap_handle, int mode, char * child_error, int g
set_fanout = pcap_set_fanout_linux(pcap_handle, 1, 0x8000, (uint16_t) group_id);
if (set_fanout != 0) {
pcap_close(pcap_handle);
snprintf(child_error, 256, "%s", "Unable to setup fanout mode.");
ndpi_snprintf(child_error, 256, "%s", "Unable to setup fanout mode.");
}
#endif
return set_fanout;
@ -1473,7 +1473,7 @@ int capture_activate(pcap_t * pcap_handle, int mode, char * child_error) {
set_activate = pcap_activate(pcap_handle);
if (set_activate != 0) {
pcap_close(pcap_handle);
snprintf(child_error, 256, "%s", "Unable to activate source.");
ndpi_snprintf(child_error, 256, "%s", "Unable to activate source.");
}
return set_activate;
}
@ -1489,7 +1489,7 @@ int capture_set_timeout(pcap_t * pcap_handle, int mode, char * child_error) {
set_timeout = pcap_set_timeout(pcap_handle, 1000);
if (set_timeout != 0) {
pcap_close(pcap_handle);
snprintf(child_error, 256, "Unable to set buffer timeout.");
ndpi_snprintf(child_error, 256, "Unable to set buffer timeout.");
}
return set_timeout;
}
@ -1505,7 +1505,7 @@ int capture_set_promisc(pcap_t * pcap_handle, int mode, char * child_error, int
set_promisc = pcap_set_promisc(pcap_handle, promisc);
if (set_promisc != 0) {
pcap_close(pcap_handle);
snprintf(child_error, 256, "Unable to set promisc mode.");
ndpi_snprintf(child_error, 256, "Unable to set promisc mode.");
}
return set_promisc;
}
@ -1521,7 +1521,7 @@ int capture_set_snaplen(pcap_t * pcap_handle, int mode, char * child_error, unsi
set_snaplen = pcap_set_snaplen(pcap_handle, snaplen);
if (set_snaplen != 0) {
pcap_close(pcap_handle);
snprintf(child_error, 256, "Unable to set snaplen.");
ndpi_snprintf(child_error, 256, "Unable to set snaplen.");
}
return set_snaplen;
}
@ -1534,12 +1534,12 @@ int capture_set_filter(pcap_t * pcap_handle, char * bpf_filter, char * child_err
if (bpf_filter != NULL) {
struct bpf_program fcode;
if (pcap_compile(pcap_handle, &fcode, bpf_filter, 1, 0xFFFFFF00) < 0) {
snprintf(child_error, 256, "Unable to compile BPF filter.");
ndpi_snprintf(child_error, 256, "Unable to compile BPF filter.");
pcap_close(pcap_handle);
return 1;
} else {
if (pcap_setfilter(pcap_handle, &fcode) < 0) {
snprintf(child_error, 256, "Unable to compile BPF filter.");
ndpi_snprintf(child_error, 256, "Unable to compile BPF filter.");
pcap_close(pcap_handle);
return 1;
} else {

733
tests.py
View file

@ -13,12 +13,13 @@ If not, see <http://www.gnu.org/licenses/>.
------------------------------------------------------------------------------------------------------------------------
"""
from nfstream.plugins import SPLT, DHCP, FlowSlicer, MDNS
from nfstream import NFStreamer
import unittest
import pandas as pd
import pytest
import json
import os
from nfstream import NFStreamer
from nfstream.plugins import SPLT, DHCP, FlowSlicer, MDNS
from termcolor import colored
def get_files_list(path):
@ -31,367 +32,417 @@ def get_files_list(path):
return files
def test_source_parameter():
source = ["inexisting.pcap", "lo", 11]
for x in source:
with pytest.raises(ValueError):
NFStreamer(source=x).to_pandas()
class TestMethods(unittest.TestCase):
def test_source_parameter(self):
print("\n----------------------------------------------------------------------")
source = ["inexisting.pcap", "lo", 11]
for x in source:
with self.assertRaises(ValueError):
NFStreamer(source=x).to_pandas()
print("{}\t: {}".format(".Test source parameter".ljust(60, ' '), colored('OK', 'green')))
def test_decode_tunnels_parameter():
decode_tunnels = [33, "True"]
for x in decode_tunnels:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
decode_tunnels=x)
def test_decode_tunnels_parameter(self):
print("\n----------------------------------------------------------------------")
decode_tunnels = [33, "True"]
for x in decode_tunnels:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
decode_tunnels=x)
print("{}\t: {}".format(".Test decode_tunnels parameter".ljust(60, ' '), colored('OK', 'green')))
def test_bpf_filter_parameter(self):
print("\n----------------------------------------------------------------------")
bpf_filter = ["my filter", 11]
for x in bpf_filter:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
bpf_filter=x).to_pandas()
print("{}\t: {}".format(".Test bpf_filter parameter".ljust(60, ' '), colored('OK', 'green')))
def test_bpf_filter_parameter():
bpf_filter = ["my filter", 11]
for x in bpf_filter:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), bpf_filter=x).to_pandas()
def test_promiscuous_mode_parameter(self):
print("\n----------------------------------------------------------------------")
promiscuous_mode = ["yes", 89]
for x in promiscuous_mode:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
promiscuous_mode=x)
print("{}\t: {}".format(".Test promiscuous_mode parameter".ljust(60, ' '), colored('OK', 'green')))
def test_snapshot_length_parameter(self):
print("\n----------------------------------------------------------------------")
snapshot_length = ["largest", -1]
for x in snapshot_length:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
snapshot_length=x)
print("{}\t: {}".format(".Test snapshot_length parameter".ljust(60, ' '), colored('OK', 'green')))
def test_promiscuous_mode_parameter():
promiscuous_mode = ["yes", 89]
for x in promiscuous_mode:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), promiscuous_mode=x)
def test_idle_timeout_parameter(self):
print("\n----------------------------------------------------------------------")
idle_timeout = [-1, "idle"]
for x in idle_timeout:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
idle_timeout=x)
print("{}\t: {}".format(".Test idle_timeout parameter".ljust(60, ' '), colored('OK', 'green')))
def test_active_timeout_parameter(self):
print("\n----------------------------------------------------------------------")
active_timeout = [-1, "active"]
for x in active_timeout:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
active_timeout=x)
print("{}\t: {}".format(".Test active_timeout parameter".ljust(60, ' '), colored('OK', 'green')))
def test_snapshot_length_parameter():
snapshot_length = ["largest", -1]
for x in snapshot_length:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), snapshot_length=x)
def test_accounting_mode_parameter(self):
print("\n----------------------------------------------------------------------")
accounting_mode = [-1, 5, 'ip']
for x in accounting_mode:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
accounting_mode=x)
print("{}\t: {}".format(".Test accounting_mode parameter".ljust(60, ' '), colored('OK', 'green')))
def test_udps_parameter(self):
print("\n----------------------------------------------------------------------")
udps = [lambda y: y+1, "NFPlugin"]
for x in udps:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=x)
print("{}\t: {}".format(".Test udps parameter".ljust(60, ' '), colored('OK', 'green')))
def test_idle_timeout_parameter():
idle_timeout = [-1, "idle"]
for x in idle_timeout:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=x)
def test_n_dissections_parameter(self):
print("\n----------------------------------------------------------------------")
n_dissections = ["yes", -1, 256]
for x in n_dissections:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_dissections=x)
print("{}\t: {}".format(".Test n_dissections parameter".ljust(60, ' '), colored('OK', 'green')))
def test_system_visibility_mode_parameter(self):
print("\n----------------------------------------------------------------------")
system_visibility_mode = ["yes", -1, 3]
for x in system_visibility_mode:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_mode=x)
print("{}\t: {}".format(".Test system_visibility_mode parameter".ljust(60, ' '), colored('OK', 'green')))
def test_active_timeout_parameter():
active_timeout = [-1, "active"]
for x in active_timeout:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=x)
def test_system_visibility_extension_port(self):
print("\n----------------------------------------------------------------------")
system_visibility_mode = ["yes", -1, 88888]
for x in system_visibility_mode:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_extension_port=x)
print("{}\t: {}".format(".Test system_visibility_extension_port parameter".ljust(60, ' '), colored('OK', 'green')))
def test_system_visibility_poll_ms(self):
print("\n----------------------------------------------------------------------")
system_visibility_mode = ["yes", -1]
for x in system_visibility_mode:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_poll_ms=x)
print("{}\t: {}".format(".Test system_visibility_poll_ms parameter".ljust(60, ' '), colored('OK', 'green')))
def test_accounting_mode_parameter():
print("\n----------------------------------------------------------------------")
accounting_mode = [-1, 5, 'ip']
for x in accounting_mode:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), accounting_mode=x)
def test_statistical_analysis_parameter(self):
print("\n----------------------------------------------------------------------")
statistical_analysis = ["yes", 89]
for x in statistical_analysis:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=x)
print("{}\t: {}".format(".Test statistical_analysis parameter".ljust(60, ' '), colored('OK', 'green')))
def test_splt_analysis_parameter(self):
print("\n----------------------------------------------------------------------")
splt_analysis = [-1, 256, "yes"]
for x in splt_analysis:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
splt_analysis=x)
print("{}\t: {}".format(".Test splt_analysis parameter".ljust(60, ' '), colored('OK', 'green')))
def test_udps_parameter():
udps = [lambda y: y+1, "NFPlugin"]
for x in udps:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x)
def test_n_meters_parameter(self):
print("\n----------------------------------------------------------------------")
n_meters = ["yes", -1]
for x in n_meters:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_meters=x)
print("{}\t: {}".format(".Test n_meters parameter".ljust(60, ' '), colored('OK', 'green')))
def test_performance_report_parameter(self):
print("\n----------------------------------------------------------------------")
performance_report = ["yes", -1]
for x in performance_report:
with self.assertRaises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
performance_report=x)
print("{}\t: {}".format(".Test performance_report parameter".ljust(60, ' '), colored('OK', 'green')))
def test_n_dissections_parameter():
n_dissections = ["yes", -1, 256]
for x in n_dissections:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_dissections=x)
def test_expiration_management(self):
print("\n----------------------------------------------------------------------")
# Idle expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
idle_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
# Active expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
active_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
# Custom expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=1))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
def test_system_visibility_mode_parameter():
system_visibility_mode = ["yes", -1, 3]
for x in system_visibility_mode:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_mode=x)
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=4))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 6)
print("{}\t: {}".format(".Test expiration management".ljust(60, ' '), colored('OK', 'green')))
def test_system_visibility_extension_port():
system_visibility_mode = ["yes", -1, 88888]
for x in system_visibility_mode:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_extension_port=x)
def test_tunnel_decoding(self):
print("\n----------------------------------------------------------------------")
decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"),
statistical_analysis=True, decode_tunnels=True)
for flow in decode_streamer:
self.assertEqual(flow.tunnel_id, 1)
decode_streamer.decode_tunnels = False
for flow in decode_streamer:
self.assertRaises(AttributeError, getattr, flow, "tunnel_id")
del decode_streamer
print("{}\t: {}".format(".Test tunnels decoding".ljust(60, ' '), colored('OK', 'green')))
def test_system_visibility_poll_ms():
system_visibility_mode = ["yes", -1]
for x in system_visibility_mode:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_poll_ms=x)
def test_statistical_analysis_parameter():
statistical_analysis = ["yes", 89]
for x in statistical_analysis:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), statistical_analysis=x)
def test_splt_analysis_parameter():
splt_analysis = [-1, 256, "yes"]
for x in splt_analysis:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=x)
def test_n_meters_parameter():
n_meters = ["yes", -1]
for x in n_meters:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_meters=x)
def test_performance_report_parameter():
performance_report = ["yes", -1]
for x in performance_report:
with pytest.raises(ValueError):
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), performance_report=x)
def test_expiration_management():
# Idle expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Active expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Custom expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=1))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=FlowSlicer(limit=4))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 6
def test_tunnel_decoding():
decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"),
statistical_analysis=True, decode_tunnels=True)
for flow in decode_streamer:
assert flow.tunnel_id == 1
decode_streamer.decode_tunnels = False
for flow in decode_streamer:
with pytest.raises(AttributeError):
getattr(flow, "tunnel_id")
del decode_streamer
def test_statistical():
statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=True, accounting_mode=1)
for flow in statistical_streamer:
assert flow.id == 0
assert flow.expiration_id == 0
assert flow.src_ip == '172.31.3.224'
assert flow.src_mac == '80:c6:ca:00:9e:9f'
assert flow.src_oui == '80:c6:ca'
assert flow.src_port == 42835
assert flow.dst_ip == '216.58.212.100'
assert flow.dst_mac == '00:0e:8e:4d:b4:a8'
assert flow.dst_oui == '00:0e:8e'
assert flow.dst_port == 443
assert flow.protocol == 6
assert flow.ip_version == 4
assert flow.vlan_id == 0
assert flow.tunnel_id == 0
assert flow.bidirectional_first_seen_ms == 1434443394683
assert flow.bidirectional_last_seen_ms == 1434443401353
assert flow.bidirectional_duration_ms == 6670
assert flow.bidirectional_packets == 28
assert flow.bidirectional_bytes == 8696
assert flow.src2dst_first_seen_ms == 1434443394683
assert flow.src2dst_last_seen_ms == 1434443401353
assert flow.src2dst_duration_ms == 6670
assert flow.src2dst_packets == 16
assert flow.src2dst_bytes == 1288
assert flow.dst2src_first_seen_ms == 1434443394717
assert flow.dst2src_last_seen_ms == 1434443401308
assert flow.dst2src_duration_ms == 6591
assert flow.dst2src_packets == 12
assert flow.dst2src_bytes == 7408
assert flow.bidirectional_min_ps == 40
assert flow.bidirectional_mean_ps == pytest.approx(310.571, 0.001)
assert flow.bidirectional_stddev_ps == pytest.approx(500.546, 0.001)
assert flow.bidirectional_max_ps == 1470
assert flow.src2dst_min_ps == 40
assert flow.src2dst_mean_ps == pytest.approx(80.499, 0.001)
assert flow.src2dst_stddev_ps == pytest.approx(89.555, 0.001)
assert flow.src2dst_max_ps == 354
assert flow.dst2src_min_ps == 40
assert flow.dst2src_mean_ps == pytest.approx(617.333, 0.001)
assert flow.dst2src_stddev_ps == pytest.approx(651.452, 0.001)
assert flow.dst2src_max_ps == 1470
assert flow.bidirectional_min_piat_ms == 0
assert flow.bidirectional_mean_piat_ms == pytest.approx(247.037, 0.001)
assert flow.bidirectional_stddev_piat_ms == pytest.approx(324.045, 0.001)
assert flow.bidirectional_max_piat_ms == 995
assert flow.src2dst_min_piat_ms == 76
assert flow.src2dst_mean_piat_ms == pytest.approx(444.666, 0.001)
assert flow.src2dst_stddev_piat_ms == pytest.approx(397.603, 0.001)
assert flow.src2dst_max_piat_ms == 1185
assert flow.dst2src_min_piat_ms == 66
assert flow.dst2src_mean_piat_ms == pytest.approx(599.181, 0.001)
assert flow.dst2src_stddev_piat_ms == pytest.approx(384.784, 0.001)
assert flow.dst2src_max_piat_ms == 1213
assert flow.bidirectional_syn_packets == 2
assert flow.bidirectional_cwr_packets == 0
assert flow.bidirectional_ece_packets == 0
assert flow.bidirectional_urg_packets == 0
assert flow.bidirectional_ack_packets == 27
assert flow.bidirectional_psh_packets == 8
assert flow.bidirectional_rst_packets == 0
assert flow.bidirectional_fin_packets == 2
assert flow.src2dst_syn_packets == 1
assert flow.src2dst_cwr_packets == 0
assert flow.src2dst_ece_packets == 0
assert flow.src2dst_urg_packets == 0
assert flow.src2dst_ack_packets == 15
assert flow.src2dst_psh_packets == 4
assert flow.src2dst_rst_packets == 0
assert flow.src2dst_fin_packets == 1
assert flow.dst2src_syn_packets == 1
assert flow.dst2src_cwr_packets == 0
assert flow.dst2src_ece_packets == 0
assert flow.dst2src_urg_packets == 0
assert flow.dst2src_ack_packets == 12
assert flow.dst2src_psh_packets == 4
assert flow.dst2src_rst_packets == 0
assert flow.dst2src_fin_packets == 1
def test_statistical(self):
print("\n----------------------------------------------------------------------")
statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=True, accounting_mode=1)
for flow in statistical_streamer:
self.assertEqual(flow.id, 0)
self.assertEqual(flow.expiration_id, 0)
self.assertEqual(flow.src_ip, '172.31.3.224')
self.assertEqual(flow.src_mac, '80:c6:ca:00:9e:9f')
self.assertEqual(flow.src_oui, '80:c6:ca')
self.assertEqual(flow.src_port, 42835)
self.assertEqual(flow.dst_ip, '216.58.212.100')
self.assertEqual(flow.dst_mac, '00:0e:8e:4d:b4:a8')
self.assertEqual(flow.dst_oui, '00:0e:8e')
self.assertEqual(flow.dst_port, 443)
self.assertEqual(flow.protocol, 6)
self.assertEqual(flow.ip_version, 4)
self.assertEqual(flow.vlan_id, 0)
self.assertEqual(flow.tunnel_id, 0)
self.assertEqual(flow.bidirectional_first_seen_ms, 1434443394683)
self.assertEqual(flow.bidirectional_last_seen_ms, 1434443401353)
self.assertEqual(flow.bidirectional_duration_ms, 6670)
self.assertEqual(flow.bidirectional_packets, 28)
self.assertEqual(flow.bidirectional_bytes, 8696)
self.assertEqual(flow.src2dst_first_seen_ms, 1434443394683)
self.assertEqual(flow.src2dst_last_seen_ms, 1434443401353)
self.assertEqual(flow.src2dst_duration_ms, 6670)
self.assertEqual(flow.src2dst_packets, 16)
self.assertEqual(flow.src2dst_bytes, 1288)
self.assertEqual(flow.dst2src_first_seen_ms, 1434443394717)
self.assertEqual(flow.dst2src_last_seen_ms, 1434443401308)
self.assertEqual(flow.dst2src_duration_ms, 6591)
self.assertEqual(flow.dst2src_packets, 12)
self.assertEqual(flow.dst2src_bytes, 7408)
self.assertEqual(flow.bidirectional_min_ps, 40)
self.assertAlmostEqual(flow.bidirectional_mean_ps, 310.57142857142856)
self.assertAlmostEqual(flow.bidirectional_stddev_ps, 500.54617788019937)
self.assertEqual(flow.bidirectional_max_ps, 1470)
self.assertEqual(flow.src2dst_min_ps, 40)
self.assertAlmostEqual(flow.src2dst_mean_ps, 80.49999999999999)
self.assertAlmostEqual(flow.src2dst_stddev_ps, 89.55519713189922)
self.assertEqual(flow.src2dst_max_ps, 354)
self.assertEqual(flow.dst2src_min_ps, 40)
self.assertAlmostEqual(flow.dst2src_mean_ps, 617.3333333333334)
self.assertAlmostEqual(flow.dst2src_stddev_ps, 651.4524099458397)
self.assertEqual(flow.dst2src_max_ps, 1470)
self.assertEqual(flow.bidirectional_min_piat_ms, 0)
self.assertAlmostEqual(flow.bidirectional_mean_piat_ms, 247.037037037037)
self.assertAlmostEqual(flow.bidirectional_stddev_piat_ms, 324.04599406227237)
self.assertEqual(flow.bidirectional_max_piat_ms, 995)
self.assertEqual(flow.src2dst_min_piat_ms, 76)
self.assertAlmostEqual(flow.src2dst_mean_piat_ms, 444.6666666666667)
self.assertAlmostEqual(flow.src2dst_stddev_piat_ms, 397.60329595261277)
self.assertEqual(flow.src2dst_max_piat_ms, 1185)
self.assertEqual(flow.dst2src_min_piat_ms, 66)
self.assertAlmostEqual(flow.dst2src_mean_piat_ms, 599.1818181818182)
self.assertAlmostEqual(flow.dst2src_stddev_piat_ms, 384.78456782511904)
self.assertEqual(flow.dst2src_max_piat_ms, 1213)
self.assertEqual(flow.bidirectional_syn_packets, 2)
self.assertEqual(flow.bidirectional_cwr_packets, 0)
self.assertEqual(flow.bidirectional_ece_packets, 0)
self.assertEqual(flow.bidirectional_urg_packets, 0)
self.assertEqual(flow.bidirectional_ack_packets, 27)
self.assertEqual(flow.bidirectional_psh_packets, 8)
self.assertEqual(flow.bidirectional_rst_packets, 0)
self.assertEqual(flow.bidirectional_fin_packets, 2)
self.assertEqual(flow.src2dst_syn_packets, 1)
self.assertEqual(flow.src2dst_cwr_packets, 0)
self.assertEqual(flow.src2dst_ece_packets, 0)
self.assertEqual(flow.src2dst_urg_packets, 0)
self.assertEqual(flow.src2dst_ack_packets, 15)
self.assertEqual(flow.src2dst_psh_packets, 4)
self.assertEqual(flow.src2dst_rst_packets, 0)
self.assertEqual(flow.src2dst_fin_packets, 1)
self.assertEqual(flow.dst2src_syn_packets, 1)
self.assertEqual(flow.dst2src_cwr_packets, 0)
self.assertEqual(flow.dst2src_ece_packets, 0)
self.assertEqual(flow.dst2src_urg_packets, 0)
self.assertEqual(flow.dst2src_ack_packets, 12)
self.assertEqual(flow.dst2src_psh_packets, 4)
self.assertEqual(flow.dst2src_rst_packets, 0)
self.assertEqual(flow.dst2src_fin_packets, 1)
del statistical_streamer
print("{}\t: {}".format(".Test statistical extraction".ljust(60, ' '), colored('OK', 'green')))
def test_fingerprint_extraction(self):
print("\n----------------------------------------------------------------------")
fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
statistical_analysis=True, accounting_mode=1)
for flow in fingerprint_streamer:
self.assertEqual(flow.application_name, 'TLS.Facebook')
self.assertEqual(flow.application_category_name, 'SocialNetwork')
self.assertEqual(flow.application_is_guessed, 0)
self.assertEqual(flow.application_confidence, 4)
self.assertTrue(flow.requested_server_name in ['facebook.com', 'www.facebook.com'])
self.assertTrue(flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840',
'5c60e71f1b8cd40e4d40ed5b6d666e3f'])
self.assertTrue(flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12',
'96681175a9547081bf3d417f1a572091'])
del fingerprint_streamer
print("{}\t: {}".format(".Test fingerprint extraction".ljust(60, ' '), colored('OK', 'green')))
def test_export(self):
print("\n----------------------------------------------------------------------")
df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_pandas()
df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
self.assertEqual(df_anon.shape[0], df.shape[0])
self.assertEqual(df_anon.shape[1], df.shape[1])
self.assertEqual(df_anon['src_ip'].nunique(), df['src_ip'].nunique())
self.assertEqual(df_anon['dst_ip'].nunique(), df['dst_ip'].nunique())
total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv"))
self.assertEqual(total_flows, total_flows_anon)
self.assertEqual(total_flows, df_from_csv.shape[0])
self.assertEqual(total_flows_anon, df_anon_from_csv.shape[0])
self.assertEqual(total_flows, df.shape[0])
self.assertEqual(total_flows_anon, df_anon.shape[0])
print("{}\t: {}".format(".Test export interfaces".ljust(60, ' '), colored('OK', 'green')))
def test_bpf(self):
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
bpf_filter="src port 52066 or dst port 52066")
last_id = 0
for flow in streamer_test:
last_id = flow.id
self.assertEqual(flow.src_port, 52066)
self.assertEqual(last_id, 0)
del streamer_test
print("{}\t: {}".format(".Test BPF".ljust(60, ' '), colored('OK', 'green')))
def test_ndpi_integration(self):
pcap_files = get_files_list(os.path.join("tests", "pcaps"))
result_files = get_files_list(os.path.join("tests", "results"))
failures = 0
print("\n----------------------------------------------------------------------")
print(".Test nDPI integration on {} applications:".format(len(pcap_files)))
for file_idx, test_file in enumerate(pcap_files):
test_case_name = os.path.basename(test_file)
try:
test = NFStreamer(source=test_file,
n_dissections=20,
n_meters=1).to_pandas()[["id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence"]].to_dict()
true = pd.read_csv(result_files[file_idx]).to_dict()
self.assertEqual(test, true)
print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('OK', 'green')))
except AssertionError:
failures += 1
print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('KO', 'red')))
self.assertEqual(failures, 0)
def test_splt(self):
print("\n----------------------------------------------------------------------")
splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5,
udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas()
direction = json.loads(splt_df["udps.splt_direction"][0])
ps = json.loads(splt_df["udps.splt_ps"][0])
piat = json.loads(splt_df["udps.splt_piat_ms"][0])
ndirection = json.loads(splt_df["splt_direction"][0])
nps = json.loads(splt_df["splt_ps"][0])
npiat = json.loads(splt_df["splt_piat_ms"][0])
self.assertEqual(direction, [0, 1, 0, 0, 1])
self.assertEqual(ps, [58, 60, 54, 180, 60])
self.assertEqual(piat, [0, 34, 134, 144, 35])
self.assertEqual(direction, ndirection)
self.assertEqual(ps, nps)
self.assertEqual(piat, npiat)
print("{}\t: {}".format(".Test SPLT analysis".ljust(60, ' '), colored('OK', 'green')))
def test_dhcp(self):
print("\n----------------------------------------------------------------------")
dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"),
n_dissections=0,
udps=DHCP()
).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
self.assertEqual(dhcp_df["udps.dhcp_msg_type"][0], "MsgType.DISCOVER")
self.assertEqual(dhcp_df["udps.dhcp_50"][1], "192.168.0.10")
self.assertEqual(dhcp_df["udps.dhcp_55"][1], "1,3,6,42")
self.assertEqual(dhcp_df["udps.dhcp_options"][1], "[53, 61, 50, 54, 55]")
self.assertEqual(dhcp_df["udps.dhcp_msg_type"][1], "MsgType.REQUEST")
self.assertEqual(dhcp_df["udps.dhcp_oui"][1], "00:0b:82")
self.assertEqual(dhcp_df.shape[0], 3)
print("{}\t: {}".format(".Test DHCP plugin".ljust(60, ' '), colored('OK', 'green')))
def test_mdns(self):
print("\n----------------------------------------------------------------------")
mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"),
n_dissections=0,
udps=MDNS()
).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
self.assertEqual(mdns_df["udps.mdns_ptr"][0], "['skynet.local', "
"'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', "
"'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', "
"'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', "
"'_workstation._tcp.local']")
print("{}\t: {}".format(".Test MDNS plugin".ljust(60, ' '), colored('OK', 'green')))
def test_fingerprint_extraction():
fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
statistical_analysis=True, accounting_mode=1)
for flow in fingerprint_streamer:
assert flow.application_name == 'TLS.Facebook'
assert flow.application_category_name == 'SocialNetwork'
assert flow.application_is_guessed == 0
assert flow.application_confidence == 4
requested_server_name = flow.requested_server_name in ['facebook.com', 'www.facebook.com']
assert int(requested_server_name) == 1
client_fingerprint = flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840',
'5c60e71f1b8cd40e4d40ed5b6d666e3f']
assert int(client_fingerprint) == 1
server_fingerprint = flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12',
'96681175a9547081bf3d417f1a572091']
assert int(server_fingerprint) == 1
del fingerprint_streamer
def test_export():
df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_pandas()
df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
assert df_anon.shape[0] == df.shape[0]
assert df_anon.shape[1] == df.shape[1]
assert df_anon['src_ip'].nunique() == df['src_ip'].nunique()
assert df_anon['dst_ip'].nunique() == df['dst_ip'].nunique()
total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv"))
assert total_flows == total_flows_anon
assert total_flows == df_from_csv.shape[0]
assert total_flows_anon == df_anon_from_csv.shape[0]
assert total_flows == df.shape[0]
assert total_flows_anon == df_anon.shape[0]
def test_bpf():
streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
bpf_filter="src port 52066 or dst port 52066")
last_id = 0
for flow in streamer_test:
last_id = flow.id
assert flow.src_port == 52066
assert last_id == 0
def test_ndpi_integration():
# For Windows platform, we have result mismatch on a subset of files
# We ignore these errors as Windows support is WIP in nDPI project:
# MR: https://github.com/ntop/nDPI/pull/1491
pcap_files = get_files_list(os.path.join("tests", "pcaps"))
result_files = get_files_list(os.path.join("tests", "results"))
failures = 0
for file_idx, test_file in enumerate(pcap_files):
try:
test = {}
test_df = NFStreamer(source=test_file, n_dissections=20, n_meters=1).to_pandas()
if test_df is not None:
test = test_df[["id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence"]].to_dict()
true = pd.read_csv(result_files[file_idx]).to_dict()
assert test == true
except AssertionError:
failures += 1
if os.name != 'posix': # FIXME once nDPI Windows support is finalized.
assert failures == 8
else: # Everything must be OK
assert failures == 0
def test_splt():
splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5,
udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas()
direction = json.loads(splt_df["udps.splt_direction"][0])
ps = json.loads(splt_df["udps.splt_ps"][0])
piat = json.loads(splt_df["udps.splt_piat_ms"][0])
ndirection = json.loads(splt_df["splt_direction"][0])
nps = json.loads(splt_df["splt_ps"][0])
npiat = json.loads(splt_df["splt_piat_ms"][0])
assert direction == [0, 1, 0, 0, 1]
assert ps == [58, 60, 54, 180, 60]
assert piat == [0, 34, 134, 144, 35]
assert direction == ndirection
assert ps == nps
assert piat == npiat
def test_dhcp():
dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"), n_dissections=0, udps=DHCP())\
.to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
assert dhcp_df["udps.dhcp_msg_type"][0] == "MsgType.DISCOVER"
assert dhcp_df["udps.dhcp_50"][1] == "192.168.0.10"
assert dhcp_df["udps.dhcp_55"][1] == "1,3,6,42"
assert dhcp_df["udps.dhcp_options"][1] == "[53, 61, 50, 54, 55]"
assert dhcp_df["udps.dhcp_msg_type"][1] == "MsgType.REQUEST"
assert dhcp_df["udps.dhcp_oui"][1] == "00:0b:82"
assert dhcp_df.shape[0] == 3
def test_mdns():
mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"), n_dissections=0, udps=MDNS())\
.to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
assert mdns_df["udps.mdns_ptr"][0] == "['skynet.local', "\
"'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', "\
"'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', "\
"'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', "\
"'_workstation._tcp.local']"
if __name__ == '__main__':
unittest.main()