nfstream/tests.py
Adrian Pekar b1b2ee87e2
Upgrade nDPI from 4.7.0 to 5.0 with Windows build fix (#230)
* Upgrade nDPI from 4.7.0 to 5.0 with backward compatibility

- Upgrade nDPI submodule from 4.7.0 to 5.0
- Fix all 9 nDPI 5.0 API breaking changes:
  * Protocol structure: .app_protocol -> .proto.app_protocol
  * Protocol structure: .master_protocol -> .proto.master_protocol
  * ndpi_protocol2name() signature change
  * ndpi_detection_giveup() signature change (removed enable_guess/guessed params)
  * ndpi_extra_dissection_possible() replaced with state check
  * ndpi_init_detection_module() now expects NULL
  * Protocol bitmask removed (all protocols enabled by default)
  * TCP/UDP struct size checks removed
  * Fingerprint extraction: ja3_client -> ja4_client

- Maintain backward compatibility by keeping field names:
  * client_fingerprint: Now JA4 (TLS), HASSH client (SSH), DHCP fingerprint
  * server_fingerprint: Now JA3S (TLS), HASSH server (SSH)

- Update build system for nDPI 5.0:
  * Add --with-only-libndpi flag to build.sh
  * Make CFFI marker extraction optional in engine_build.py

- Update tests for nDPI 5.0:
  * Update fingerprint assertions (JA3 MD5 -> JA4 format)
  * Regenerate all 228 baseline test results
  * Remove invalid test file (memcached.cap)

- All 29 tests passing

* Fix memcpy buffer overreads and enable DNS subclassification

- Fix buffer overread in fingerprint memcpy calls by using source size
  instead of destination size:
  - HASSH client/server: 33 bytes (was reading 48)
  - JA4 client: 37 bytes (was reading 48)
  - JA3 server: 33 bytes (was reading 48)
  - DHCP: 48 bytes (consistent with others)

- Enable DNS subclassification (disabled by default in nDPI 5.0)
  to restore detection of DNS.Apple, DNS.Google, etc.

- Regenerate test baselines with fixes applied

* Replace outdated Steam test pcaps with nDPI 5.0 version

- Remove steam.pcap and steam_datagram_relay_ping.pcapng (obsolete)
- Add steam.pcapng from nDPI 5.0 test suite
- nDPI 5.0 reworked Steam detection (ntop/nDPI#2264)

* Fix Windows build: add missing configure step for nDPI 5.0

The Windows build script was missing the ./configure --with-only-libndpi
step before make, causing nDPI headers to not be properly installed.
This resulted in the CFFI preprocessing marker //CFFI.NDPI_PACKED_STRUCTURES
being absent from ndpi_cdefinitions_packed.h, crashing engine_build.py.

Aligns build_windows.sh with build.sh which already had this step.
2026-02-14 17:48:00 +01:00

920 lines
34 KiB
Python

"""
------------------------------------------------------------------------------------------------------------------------
tests.py
Copyright (C) 2019-22 - NFStream Developers
This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
version.
NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along with NFStream.
If not, see <http://www.gnu.org/licenses/>.
------------------------------------------------------------------------------------------------------------------------
"""
import pandas as pd
import json
import os
from nfstream import NFStreamer
from nfstream.plugins import SPLT, DHCP, FlowSlicer, MDNS
def get_files_list(path):
files = []
for r, d, f in os.walk(path):
for file in f:
if (
".pcap" == file[-5:] or ".pcapng" == file[-7:]
): # Pick out only pcaps files
files.append(os.path.join(r, file))
files.sort()
return files
class NFStreamTest(object):
@staticmethod
def test_source_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
source = ["inexisting.pcap", "lo", 11]
for x in source:
try:
NFStreamer(source=x).to_pandas()
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_source_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_decode_tunnels_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
decode_tunnels = [33, "True"]
for x in decode_tunnels:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
decode_tunnels=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_decode_tunnels_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_bpf_filter_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
bpf_filter = ["my filter", 11]
for x in bpf_filter:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
bpf_filter=x,
).to_pandas()
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_bpf_filter_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_promiscuous_mode_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
promiscuous_mode = ["yes", 89]
for x in promiscuous_mode:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
promiscuous_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_promiscuous_mode_parameter".ljust(60, " "),
"OK",
)
)
@staticmethod
def test_snapshot_length_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
snapshot_length = ["largest", -1]
for x in snapshot_length:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
snapshot_length=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_snapshot_length_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_socket_buffer_size_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
socket_buffer_size = ["largest", -1, 2**31]
for x in socket_buffer_size:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
socket_buffer_size=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_socket_buffer_size_parameter".ljust(60, " "),
"OK",
)
)
@staticmethod
def test_idle_timeout_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
idle_timeout = [-1, "idle"]
for x in idle_timeout:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
idle_timeout=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_idle_timeout_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_active_timeout_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
active_timeout = [-1, "active"]
for x in active_timeout:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
active_timeout=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_active_timeout_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_accounting_mode_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
accounting_mode = [-1, 5, "ip"]
for x in accounting_mode:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
accounting_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_accounting_mode_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_udps_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
udps = [lambda y: y + 1, "NFPlugin"]
for x in udps:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_udps_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_n_dissections_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
n_dissections = ["yes", -1, 256]
for x in n_dissections:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_dissections=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_n_dissections_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_system_visibility_mode_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
system_visibility_mode = ["yes", -1, 3]
for x in system_visibility_mode:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_system_visibility_mode_parameter".ljust(60, " "),
"OK",
)
)
@staticmethod
def test_system_visibility_poll_ms():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
system_visibility_mode = ["yes", -1]
for x in system_visibility_mode:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_poll_ms=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_system_visibility_poll_ms".ljust(60, " "), "OK"
)
)
@staticmethod
def test_statistical_analysis_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
statistical_analysis = ["yes", 89]
for x in statistical_analysis:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_statistical_analysis_parameter".ljust(60, " "),
"OK",
)
)
@staticmethod
def test_splt_analysis_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
splt_analysis = [-1, 70000, "yes"]
for x in splt_analysis:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
splt_analysis=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print(
"{}\t: {}".format(
".test_splt_analysis_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_n_meters_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
n_meters = ["yes", -1]
for x in n_meters:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_meters=x
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_n_meters_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_max_nflows_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
max_nflows = ["yes", -1]
for x in max_nflows:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
max_nflows=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_max_nflows_parameter".ljust(60, " "), "OK"
)
)
@staticmethod
def test_performance_report_parameter():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
performance_report = ["yes", -1]
for x in performance_report:
try:
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
performance_report=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print(
"{}\t: {}".format(
".test_performance_report_parameter".ljust(60, " "),
"OK",
)
)
@staticmethod
def test_expiration_management():
print(
"\n----------------------------------------------------------------------"
)
# Idle expiration
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Active expiration
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Custom expiration
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=1),
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=4),
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 6
print(
"{}\t: {}".format(
".test_expiration_management".ljust(60, " "), "OK"
)
)
@staticmethod
def test_tunnel_decoding():
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
decode_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "gtp-u.pcap"),
statistical_analysis=True,
decode_tunnels=True,
)
for flow in decode_streamer:
assert flow.tunnel_id == 1
decode_streamer.decode_tunnels = False
for flow in decode_streamer:
try:
getattr(flow, "tunnel_id")
except AttributeError:
n_exceptions += 1
assert n_exceptions == 1
del decode_streamer
print(
"{}\t: {}".format(
".test_tunnel_decoding".ljust(60, " "), "OK"
)
)
@staticmethod
def test_statistical():
print(
"\n----------------------------------------------------------------------"
)
statistical_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=True,
accounting_mode=1,
)
for flow in statistical_streamer:
assert flow.id == 0
assert flow.expiration_id == 0
assert flow.src_ip == "172.31.3.224"
assert flow.src_mac == "80:c6:ca:00:9e:9f"
assert flow.src_oui == "80:c6:ca"
assert flow.src_port == 42835
assert flow.dst_ip == "216.58.212.100"
assert flow.dst_mac == "00:0e:8e:4d:b4:a8"
assert flow.dst_oui == "00:0e:8e"
assert flow.dst_port == 443
assert flow.protocol == 6
assert flow.ip_version == 4
assert flow.vlan_id == 0
assert flow.tunnel_id == 0
assert flow.bidirectional_first_seen_ms == 1434443394683
assert flow.bidirectional_last_seen_ms == 1434443401353
assert flow.bidirectional_duration_ms == 6670
assert flow.bidirectional_packets == 28
assert flow.bidirectional_bytes == 8696
assert flow.src2dst_first_seen_ms == 1434443394683
assert flow.src2dst_last_seen_ms == 1434443401353
assert flow.src2dst_duration_ms == 6670
assert flow.src2dst_packets == 16
assert flow.src2dst_bytes == 1288
assert flow.dst2src_first_seen_ms == 1434443394717
assert flow.dst2src_last_seen_ms == 1434443401308
assert flow.dst2src_duration_ms == 6591
assert flow.dst2src_packets == 12
assert flow.dst2src_bytes == 7408
assert flow.bidirectional_min_ps == 40
assert (flow.bidirectional_mean_ps - 310.571) < 0.001
assert (flow.bidirectional_stddev_ps - 500.546) < 0.001
assert flow.bidirectional_max_ps == 1470
assert flow.src2dst_min_ps == 40
assert (flow.src2dst_mean_ps - 80.499) < 0.001
assert (flow.src2dst_stddev_ps - 89.555) < 0.001
assert flow.src2dst_max_ps == 354
assert flow.dst2src_min_ps == 40
assert (flow.dst2src_mean_ps - 617.333) < 0.001
assert (flow.dst2src_stddev_ps - 651.452) < 0.001
assert flow.dst2src_max_ps == 1470
assert flow.bidirectional_min_piat_ms == 0
assert (flow.bidirectional_mean_piat_ms - 247.037) < 0.001
assert (flow.bidirectional_stddev_piat_ms - 324.045) < 0.001
assert flow.bidirectional_max_piat_ms == 995
assert flow.src2dst_min_piat_ms == 76
assert (flow.src2dst_mean_piat_ms - 444.666) < 0.001
assert (flow.src2dst_stddev_piat_ms - 397.603) < 0.001
assert flow.src2dst_max_piat_ms == 1185
assert flow.dst2src_min_piat_ms == 66
assert (flow.dst2src_mean_piat_ms - 599.181) < 0.001
assert (flow.dst2src_stddev_piat_ms - 384.784) < 0.001
assert flow.dst2src_max_piat_ms == 1213
assert flow.bidirectional_syn_packets == 2
assert flow.bidirectional_cwr_packets == 0
assert flow.bidirectional_ece_packets == 0
assert flow.bidirectional_urg_packets == 0
assert flow.bidirectional_ack_packets == 27
assert flow.bidirectional_psh_packets == 8
assert flow.bidirectional_rst_packets == 0
assert flow.bidirectional_fin_packets == 2
assert flow.src2dst_syn_packets == 1
assert flow.src2dst_cwr_packets == 0
assert flow.src2dst_ece_packets == 0
assert flow.src2dst_urg_packets == 0
assert flow.src2dst_ack_packets == 15
assert flow.src2dst_psh_packets == 4
assert flow.src2dst_rst_packets == 0
assert flow.src2dst_fin_packets == 1
assert flow.dst2src_syn_packets == 1
assert flow.dst2src_cwr_packets == 0
assert flow.dst2src_ece_packets == 0
assert flow.dst2src_urg_packets == 0
assert flow.dst2src_ack_packets == 12
assert flow.dst2src_psh_packets == 4
assert flow.dst2src_rst_packets == 0
assert flow.dst2src_fin_packets == 1
del statistical_streamer
print(
"{}\t: {}".format(
".test_statistical".ljust(60, " "), "OK"
)
)
@staticmethod
def test_fingerprint_extraction():
print(
"\n----------------------------------------------------------------------"
)
fingerprint_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "facebook.pcap"),
statistical_analysis=True,
accounting_mode=1,
)
for flow in fingerprint_streamer:
assert flow.application_name == "TLS.Facebook"
assert flow.application_category_name == "SocialNetwork"
assert flow.application_is_guessed == 0
assert flow.application_confidence == 6
requested_server_name = flow.requested_server_name in [
"facebook.com",
"www.facebook.com",
]
assert int(requested_server_name) == 1
# nDPI 5.0: client_fingerprint now contains JA4 (not JA3), server_fingerprint still JA3S
client_fingerprint = flow.client_fingerprint in [
"t12d1311h2_27a29bd8d6e6_c4623e4f4474", # JA4 for www.facebook.com
"t12d1310h2_27a29bd8d6e6_85173d161f9a", # JA4 for facebook.com
]
assert int(client_fingerprint) == 1
server_fingerprint = flow.server_fingerprint in [
"2d1eb5817ece335c24904f516ad5da12", # JA3S unchanged
"96681175a9547081bf3d417f1a572091", # JA3S unchanged
]
assert int(server_fingerprint) == 1
del fingerprint_streamer
print(
"{}\t: {}".format(
".test_fingerprint_extraction".ljust(60, " "), "OK"
)
)
@staticmethod
def test_export():
print(
"\n----------------------------------------------------------------------"
)
df = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcapng"),
statistical_analysis=True,
n_dissections=20,
).to_pandas()
df_anon = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcapng"),
statistical_analysis=True,
n_dissections=20,
).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
assert df_anon.shape[0] == df.shape[0]
assert df_anon.shape[1] == df.shape[1]
assert df_anon["src_ip"].nunique() == df["src_ip"].nunique()
assert df_anon["dst_ip"].nunique() == df["dst_ip"].nunique()
total_flows = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcapng"),
statistical_analysis=True,
n_dissections=20,
).to_csv()
df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcapng.csv"))
total_flows_anon = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcapng"),
statistical_analysis=True,
n_dissections=20,
).to_csv()
df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcapng.csv"))
os.remove(os.path.join("tests", "pcaps", "steam.pcapng.csv"))
assert total_flows == total_flows_anon
assert total_flows == df_from_csv.shape[0]
assert total_flows_anon == df_anon_from_csv.shape[0]
assert total_flows == df.shape[0]
assert total_flows_anon == df_anon.shape[0]
print("{}\t: {}".format(".test_export".ljust(60, " "), "OK"))
@staticmethod
def test_bpf():
print(
"\n----------------------------------------------------------------------"
)
streamer_test = NFStreamer(
source=os.path.join("tests", "pcaps", "facebook.pcap"),
bpf_filter="src port 52066 or dst port 52066",
)
last_id = 0
for flow in streamer_test:
last_id = flow.id
assert flow.src_port == 52066
assert last_id == 0
print("{}\t: {}".format(".test_bpf".ljust(60, " "), "OK"))
@staticmethod
def test_ndpi_integration():
print(
"\n----------------------------------------------------------------------"
)
pcap_files = get_files_list(os.path.join("tests", "pcaps"))
result_files = get_files_list(os.path.join("tests", "results"))
failures = 0
print(".Test nDPI integration on {} applications:".format(len(pcap_files)))
for file_idx, test_file in enumerate(pcap_files):
test_case_name = os.path.basename(test_file)
try:
test = (
NFStreamer(source=test_file, n_dissections=20, n_meters=1)
.to_pandas()[
[
"id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence",
]
]
.to_dict()
)
true = pd.read_csv(result_files[file_idx]).to_dict()
assert test == true
print(
"{}\t: {}".format(
test_case_name.ljust(60, " "), "OK"
)
)
except AssertionError:
failures += 1
print(
"{}\t: {}".format(
test_case_name.ljust(60, " "), "KO"
)
)
# Everything must be OK
assert failures == 0
@staticmethod
def test_splt():
print(
"\n----------------------------------------------------------------------"
)
splt_df = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
splt_analysis=5,
udps=SPLT(sequence_length=5, accounting_mode=0),
).to_pandas()
direction = json.loads(splt_df["udps.splt_direction"][0])
ps = json.loads(splt_df["udps.splt_ps"][0])
piat = json.loads(splt_df["udps.splt_piat_ms"][0])
ndirection = json.loads(splt_df["splt_direction"][0])
nps = json.loads(splt_df["splt_ps"][0])
npiat = json.loads(splt_df["splt_piat_ms"][0])
assert direction == [0, 1, 0, 0, 1]
assert ps == [58, 60, 54, 180, 60]
assert piat == [0, 34, 134, 144, 35]
assert direction == ndirection
assert ps == nps
assert piat == npiat
print("{}\t: {}".format(".test_splt".ljust(60, " "), "OK"))
@staticmethod
def test_dhcp():
print(
"\n----------------------------------------------------------------------"
)
dhcp_df = (
NFStreamer(
source=os.path.join("tests", "pcaps", "dhcp.pcap"),
n_dissections=0,
udps=DHCP(),
)
.to_pandas()
.sort_values(by=["src_ip"])
.reset_index(drop=True)
)
assert dhcp_df["udps.dhcp_msg_type"][0] == "MsgType.DISCOVER"
assert dhcp_df["udps.dhcp_50"][1] == "192.168.0.10"
assert dhcp_df["udps.dhcp_55"][1] == "1,3,6,42"
assert dhcp_df["udps.dhcp_options"][1] == "[53, 61, 50, 54, 55]"
assert dhcp_df["udps.dhcp_msg_type"][1] == "MsgType.REQUEST"
assert dhcp_df["udps.dhcp_oui"][1] == "00:0b:82"
assert dhcp_df.shape[0] == 3
print("{}\t: {}".format(".test_dhcp".ljust(60, " "), "OK"))
@staticmethod
def test_mdns():
print(
"\n----------------------------------------------------------------------"
)
mdns_df = (
NFStreamer(
source=os.path.join("tests", "pcaps", "mdns.pcap"),
n_dissections=0,
udps=MDNS(),
)
.to_pandas()
.sort_values(by=["src_ip"])
.reset_index(drop=True)
)
assert (
mdns_df["udps.mdns_ptr"][0] == "['skynet.local', "
"'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', "
"'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', "
"'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', "
"'_workstation._tcp.local']"
)
print("{}\t: {}".format(".test_mdns".ljust(60, " "), "OK"))
@staticmethod
def test_multi_files():
print(
"\n----------------------------------------------------------------------"
)
multi_files = [
os.path.join("tests", "pcaps", "one_flow_1_5.pcap"),
os.path.join("tests", "pcaps", "one_flow_6_10.pcap"),
os.path.join("tests", "pcaps", "one_flow_11_15.pcap"),
os.path.join("tests", "pcaps", "one_flow_16_19.pcap"),
]
for flow in NFStreamer(source=multi_files):
assert flow.id == 0
assert flow.expiration_id == 0
assert flow.src_ip == "192.168.43.18"
assert flow.src_mac == "30:52:cb:6c:9c:1b"
assert flow.src_oui == "30:52:cb"
assert flow.src_port == 52066
assert flow.dst_ip == "66.220.156.68"
assert flow.dst_mac == "98:0c:82:d3:3c:7c"
assert flow.dst_oui == "98:0c:82"
assert flow.dst_port == 443
assert flow.protocol == 6
assert flow.ip_version == 4
assert flow.vlan_id == 0
assert flow.tunnel_id == 0
assert flow.bidirectional_first_seen_ms == 1472393122365
assert flow.bidirectional_last_seen_ms == 1472393123665
assert flow.bidirectional_duration_ms == 1300
assert flow.bidirectional_packets == 19
assert flow.bidirectional_bytes == 5745
assert flow.src2dst_first_seen_ms == 1472393122365
assert flow.src2dst_last_seen_ms == 1472393123408
assert flow.src2dst_duration_ms == 1043
assert flow.src2dst_packets == 9
assert flow.src2dst_bytes == 1345
assert flow.dst2src_first_seen_ms == 1472393122668
assert flow.dst2src_last_seen_ms == 1472393123665
assert flow.dst2src_duration_ms == 997
assert flow.dst2src_packets == 10
assert flow.dst2src_bytes == 4400
assert flow.application_name == "TLS.Facebook"
assert flow.application_category_name == "SocialNetwork"
assert flow.application_is_guessed == 0
assert flow.application_confidence == 6
assert flow.requested_server_name == "facebook.com"
# nDPI 5.0: client_fingerprint is now JA4 instead of JA3
assert flow.client_fingerprint == "t12d1310h2_27a29bd8d6e6_85173d161f9a"
assert flow.server_fingerprint == "2d1eb5817ece335c24904f516ad5da12"
assert flow.user_agent == ""
assert flow.content_type == ""
print(
"{}\t: {}".format(
".test_multi_files".ljust(60, " "), "OK"
)
)
@staticmethod
def test_max_nflows():
print(
"\n----------------------------------------------------------------------"
)
df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap")).to_pandas()
assert df.shape[0] == 294
df = NFStreamer(
source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=100
).to_pandas()
assert df.shape[0] == 100
df = NFStreamer(
source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=0
).to_pandas()
assert df.shape[0] == 294
print(
"{}\t: {}".format(".test_max_nflows".ljust(60, " "), "OK")
)
if __name__ == "__main__":
# IMPORTANT: As NFStream input is network bytes, we rely on fuzzing techniques to ensure robustness.
# Fuzzing testing is part of Google OSS-Fuzz project.
# Github: https://github.com/google/oss-fuzz/tree/master/projects/nfstream
# Build status: https://oss-fuzz-build-logs.storage.googleapis.com/index.html#nfstream
NFStreamTest.test_source_parameter()
NFStreamTest.test_decode_tunnels_parameter()
NFStreamTest.test_bpf_filter_parameter()
NFStreamTest.test_promiscuous_mode_parameter()
NFStreamTest.test_snapshot_length_parameter()
NFStreamTest.test_socket_buffer_size_parameter()
NFStreamTest.test_idle_timeout_parameter()
NFStreamTest.test_active_timeout_parameter()
NFStreamTest.test_accounting_mode_parameter()
NFStreamTest.test_udps_parameter()
NFStreamTest.test_n_dissections_parameter()
NFStreamTest.test_system_visibility_mode_parameter()
NFStreamTest.test_system_visibility_poll_ms()
NFStreamTest.test_statistical_analysis_parameter()
NFStreamTest.test_splt_analysis_parameter()
NFStreamTest.test_n_meters_parameter()
NFStreamTest.test_max_nflows_parameter()
NFStreamTest.test_performance_report_parameter()
NFStreamTest.test_expiration_management()
NFStreamTest.test_tunnel_decoding()
NFStreamTest.test_statistical()
NFStreamTest.test_fingerprint_extraction()
NFStreamTest.test_export()
NFStreamTest.test_bpf()
NFStreamTest.test_ndpi_integration()
NFStreamTest.test_splt()
NFStreamTest.test_dhcp()
NFStreamTest.test_mdns()
NFStreamTest.test_multi_files()
NFStreamTest.test_max_nflows()