black formatting

This commit is contained in:
Zied Aouini 2024-05-10 18:46:03 +02:00
parent 4942c65696
commit 5481132981
No known key found for this signature in database
22 changed files with 1641 additions and 759 deletions

View file

@ -19,45 +19,55 @@ import json
import sys
NFRequest = namedtuple('NFRequest', ['browser',
'timestamp',
'remote_ip',
'tab_id',
'request_id',
'tab_is_active',
'tab_url'])
NFRequest = namedtuple(
"NFRequest",
[
"browser",
"timestamp",
"remote_ip",
"tab_id",
"request_id",
"tab_is_active",
"tab_url",
],
)
class NFRequestHandler(BaseHTTPRequestHandler):
""" Handler for HTTP request from browser extension """
"""Handler for HTTP request from browser extension"""
def _set_headers(self):
""" headers setter """
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
"""headers setter"""
self.send_header("Access-Control-Allow-Origin", "*")
self.send_header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
self.send_header("Access-Control-Allow-Headers", "X-Requested-With")
self.send_header("Access-Control-Allow-Headers", "content-type")
self.end_headers()
def do_OPTIONS(self):
""" OPTIONS handler """
"""OPTIONS handler"""
self.send_response(200)
self._set_headers()
def do_POST(self):
""" POST handler """
if self.path.endswith('.json') and (self.path.startswith('/nfstream-chrome') or
self.path.startswith('/nfstream-firefox')):
length = self.headers['content-length']
"""POST handler"""
if self.path.endswith(".json") and (
self.path.startswith("/nfstream-chrome")
or self.path.startswith("/nfstream-firefox")
):
length = self.headers["content-length"]
data = json.loads(self.rfile.read(int(length)))
self.send_response(200)
self._set_headers()
request = NFRequest(data["browser"],
float(data["timestamp"]),
data["ip_address"],
data["tab_id"],
data["req_id"],
data["tab_is_active"],
data["tab_url"])
request = NFRequest(
data["browser"],
float(data["timestamp"]),
data["ip_address"],
data["tab_id"],
data["req_id"],
data["tab_is_active"],
data["tab_url"],
)
# For sake of brevity, we print it only
print(request)
@ -72,19 +82,20 @@ class NFRequestHandler(BaseHTTPRequestHandler):
class NFRequestServer(HTTPServer):
""" NFRequest HTTP server"""
"""NFRequest HTTP server"""
def __init__(self, *args):
HTTPServer.__init__(self, *args)
self.stopped = False
# self.channel = channel
if __name__ == '__main__': # Mandatory if you are running on Windows Platform
if __name__ == "__main__": # Mandatory if you are running on Windows Platform
try:
port = int(sys.argv[1])
except IndexError: # not specified
port = 28314
server_address = ('', port) # localhost with configurable port
server_address = ("", port) # localhost with configurable port
server = NFRequestServer(server_address, NFRequestHandler)
try:
while not server.stopped:

View file

@ -17,10 +17,9 @@ from nfstream import NFStreamer
import sys
if __name__ == '__main__': # Mandatory if you are running on Windows Platform
if __name__ == "__main__": # Mandatory if you are running on Windows Platform
path = sys.argv[1]
print("nfstream processing started. Use Ctrl+C to interrupt and save.")
total_flows = NFStreamer(source=path,
statistical_analysis=True,
splt_analysis=10,
performance_report=1).to_csv()
total_flows = NFStreamer(
source=path, statistical_analysis=True, splt_analysis=10, performance_report=1
).to_csv()

View file

@ -17,16 +17,16 @@ from nfstream import NFStreamer
import sys
if __name__ == '__main__': # Mandatory if you are running on Windows Platform
if __name__ == "__main__": # Mandatory if you are running on Windows Platform
input_filepaths = []
for path in sys.argv[1:]:
input_filepaths.append(path)
if len(input_filepaths) == 1: # Single file / Interface
input_filepaths = input_filepaths[0]
flow_streamer = NFStreamer(source=input_filepaths,
statistical_analysis=False,
idle_timeout=1)
flow_streamer = NFStreamer(
source=input_filepaths, statistical_analysis=False, idle_timeout=1
)
result = {}
try:
for flow in flow_streamer:

View file

@ -23,11 +23,12 @@ except ImportError:
sys.exit(1)
if __name__ == '__main__': # Mandatory if you are running on Windows Platform
if __name__ == "__main__": # Mandatory if you are running on Windows Platform
path = sys.argv[1]
print("nfstream processing started. Use Ctrl+C to interrupt and save.")
streamer = NFStreamer(source=path, active_timeout=41, udps=WFPlugin(active_timeout=41, levels=12))
streamer = NFStreamer(
source=path, active_timeout=41, udps=WFPlugin(active_timeout=41, levels=12)
)
print("Converting to pandas...")
df = streamer.to_pandas()
print("Dataframe: ")

View file

@ -24,22 +24,26 @@ def get_files_list(path):
files = []
for r, d, f in os.walk(path):
for file in f:
if '.pcap' == file[-5:] or ".pcapng" == file[-7:]: # Pick out only pcaps files
if (
".pcap" == file[-5:] or ".pcapng" == file[-7:]
): # Pick out only pcaps files
files.append(os.path.join(r, file))
files.sort()
return files
if __name__ == '__main__': # Mandatory if you are running on Windows Platform
if __name__ == "__main__": # Mandatory if you are running on Windows Platform
pcap_files = get_files_list(os.path.join("tests", "pcaps"))
for pcap_file in tqdm(pcap_files):
df = NFStreamer(source=pcap_file, n_dissections=20, n_meters=1).to_pandas()[["id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence"]]
df.to_csv(pcap_file.replace("pcaps",
"results"),
index=False)
df = NFStreamer(source=pcap_file, n_dissections=20, n_meters=1).to_pandas()[
[
"id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence",
]
]
df.to_csv(pcap_file.replace("pcaps", "results"), index=False)

View file

@ -20,5 +20,5 @@ from .plugin import NFPlugin
# streamer module is the core module of nfstream package.
__author__ = """Zied Aouini"""
__email__ = 'aouinizied@gmail.com'
__version__ = '6.5.4'
__email__ = "aouinizied@gmail.com"
__version__ = "6.5.4"

View file

@ -19,14 +19,12 @@ import secrets
class NFAnonymizer(object):
"""
NFAnonymizer: NFStream anonymization implementation.
Anonymizer is initiated at each time to_csv or to_pandas is called with a random secret key (64 bytes).
Each specified column is anonymized using blake2b algorithm (digest_size: 64 bytes).
NFAnonymizer: NFStream anonymization implementation.
Anonymizer is initiated at each time to_csv or to_pandas is called with a random secret key (64 bytes).
Each specified column is anonymized using blake2b algorithm (digest_size: 64 bytes).
"""
__slots__ = ('_secret',
'_cols_names',
'_cols_index',
"_enabled")
__slots__ = ("_secret", "_cols_names", "_cols_index", "_enabled")
def __init__(self, cols_names):
self._secret = secrets.token_bytes(64)
@ -38,19 +36,23 @@ class NFAnonymizer(object):
def process(self, flow):
if self._enabled:
if self._cols_index is None: # First flow, we extract indexes of cols to anonymize.
if (
self._cols_index is None
): # First flow, we extract indexes of cols to anonymize.
self._cols_index = []
for col_name in self._cols_names:
keys = flow.keys()
try:
self._cols_index.append(keys.index(col_name))
except ValueError:
print("WARNING: NFlow do not have {} attribute. Skipping anonymization.")
print(
"WARNING: NFlow do not have {} attribute. Skipping anonymization."
)
values = flow.values()
for col_idx in self._cols_index:
if values[col_idx] is not None:
values[col_idx] = blake2b(str(values[col_idx]).encode(),
digest_size=64,
key=self._secret).hexdigest()
values[col_idx] = blake2b(
str(values[col_idx]).encode(), digest_size=64, key=self._secret
).hexdigest()
return values
return flow.values()

View file

@ -16,36 +16,48 @@ If not, see <http://www.gnu.org/licenses/>.
from _lib_engine import ffi, lib
def setup_capture(ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size):
capture = lib.capture_open(bytes(source, 'utf-8'), int(mode), error_child, socket_buffer_size)
def setup_capture(
ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size
):
capture = lib.capture_open(
bytes(source, "utf-8"), int(mode), error_child, socket_buffer_size
)
if capture == ffi.NULL:
return
fanout_set_failed = lib.capture_set_fanout(capture, int(mode), error_child, group_id)
fanout_set_failed = lib.capture_set_fanout(
capture, int(mode), error_child, group_id
)
if fanout_set_failed:
return
timeout_set_failed = lib.capture_set_timeout(capture, int(mode), error_child)
if timeout_set_failed:
return
promisc_set_failed = lib.capture_set_promisc(capture, int(mode), error_child, int(promisc))
promisc_set_failed = lib.capture_set_promisc(
capture, int(mode), error_child, int(promisc)
)
if promisc_set_failed:
return
snaplen_set_failed = lib.capture_set_snaplen(capture, int(mode), error_child, snaplen)
snaplen_set_failed = lib.capture_set_snaplen(
capture, int(mode), error_child, snaplen
)
if snaplen_set_failed:
return
return capture
def setup_filter(capture, lib, error_child, bpf_filter):
""" Compile and setup BPF filter """
"""Compile and setup BPF filter"""
if bpf_filter is not None:
filter_set_failed = lib.capture_set_filter(capture, bytes(bpf_filter, 'utf-8'), error_child)
filter_set_failed = lib.capture_set_filter(
capture, bytes(bpf_filter, "utf-8"), error_child
)
if filter_set_failed:
return False
return True
def activate_capture(capture, lib, error_child, bpf_filter, mode):
""" Capture activation function """
"""Capture activation function"""
activation_failed = lib.capture_activate(capture, int(mode), error_child)
if activation_failed:
return False
@ -53,7 +65,7 @@ def activate_capture(capture, lib, error_child, bpf_filter, mode):
def setup_dissector(ffi, lib, n_dissections):
""" Setup dissector according to n_dissections value """
"""Setup dissector according to n_dissections value"""
if n_dissections: # Dissection activated
# Check that headers and loaded library match and initiate dissector.
checker = ffi.new("struct dissector_checker *")
@ -70,13 +82,13 @@ def setup_dissector(ffi, lib, n_dissections):
def is_interface(val):
""" Check if val is a valid interface name and return it if true else None """
intf = lib.capture_get_interface(val.encode('ascii'))
"""Check if val is a valid interface name and return it if true else None"""
intf = lib.capture_get_interface(val.encode("ascii"))
if intf == ffi.NULL:
return None
return ffi.string(intf).decode('ascii', 'ignore')
return ffi.string(intf).decode("ascii", "ignore")
def create_engine():
""" engine creation function, return the loaded native nfstream engine and it's ffi interface"""
return ffi, lib
"""engine creation function, return the loaded native nfstream engine and it's ffi interface"""
return ffi, lib

View file

@ -20,13 +20,13 @@ import os
def cdef_to_replace(cdef):
""" helper function that replaces unsupported definitions for cffi """
"""helper function that replaces unsupported definitions for cffi"""
to_rep = []
cdef_list = cdef.split("static inline")
for idx, sub_def in enumerate(cdef_list):
end = sub_def.find("}")
if end and idx:
to_rep.append(sub_def[:end+1])
to_rep.append(sub_def[: end + 1])
to_rep.append("typedef __builtin_va_list __darwin_va_list;")
to_rep.append("typedef __signed char int8_t;")
to_rep.append(" __attribute__((__packed__))")
@ -36,8 +36,8 @@ def cdef_to_replace(cdef):
def convert_path(p):
""" dummy path converter """
if os.name == 'posix':
"""dummy path converter"""
if os.name == "posix":
return p
return p.replace("/", "\\")
@ -46,7 +46,7 @@ def convert_path(p):
ROOT = ""
USR = "usr"
USR_LOCAL = "usr/local"
if os.name != 'posix': # Windows case, we must take into account msys2 path tree.
if os.name != "posix": # Windows case, we must take into account msys2 path tree.
MSYS2_PATH = os.getenv("MSYS2_PATH") # If user have custom msys2 installation
if MSYS2_PATH is None: # User didn't set this path, we use default one
os.environ["MSYS2_PATH"] = "C:/msys64"
@ -55,64 +55,109 @@ if os.name != 'posix': # Windows case, we must take into account msys2 path tre
USR_LOCAL = "mingw64"
BUILD_SCRIPT_PATH = str(pathlib.Path(__file__).parent.resolve().joinpath("scripts").joinpath("build"))
BUILD_SCRIPT_PATH = str(
pathlib.Path(__file__).parent.resolve().joinpath("scripts").joinpath("build")
)
# Patched path as it is passed to msys2 bash
ENGINE_PATH = str(pathlib.Path(__file__).parent.resolve()).replace("\\", "/")
if os.name != 'posix': # Windows case
if os.name != "posix": # Windows case
os.environ["MSYSTEM"] = "MINGW64"
BUILD_CMD = r"""'{}'""".format(str(BUILD_SCRIPT_PATH) + "_windows.sh")
subprocess.check_call(["{msys}/usr/bin/bash".format(msys=ROOT).replace("/", "\\"),
"-l",
BUILD_CMD,
ENGINE_PATH],
shell=True)
subprocess.check_call(
[
"{msys}/usr/bin/bash".format(msys=ROOT).replace("/", "\\"),
"-l",
BUILD_CMD,
ENGINE_PATH,
],
shell=True,
)
else: # Linux, MacOS
subprocess.check_call([str(BUILD_SCRIPT_PATH) + ".sh"], shell=True)
INCLUDE_DIRS = ["{root}/tmp/nfstream_build/{usr}/include/ndpi".format(root=ROOT, usr=USR),
"{root}/tmp/nfstream_build/{usr}/include".format(root=ROOT, usr=USR_LOCAL)]
EXTRALINK_ARGS = ["{root}/tmp/nfstream_build/{usr}/lib/libndpi.a".format(root=ROOT, usr=USR)]
INCLUDE_DIRS = [
"{root}/tmp/nfstream_build/{usr}/include/ndpi".format(root=ROOT, usr=USR),
"{root}/tmp/nfstream_build/{usr}/include".format(root=ROOT, usr=USR_LOCAL),
]
EXTRALINK_ARGS = [
"{root}/tmp/nfstream_build/{usr}/lib/libndpi.a".format(root=ROOT, usr=USR)
]
if os.name != 'posix': # Windows
if os.name != "posix": # Windows
INCLUDE_DIRS.append("{root}/tmp/nfstream_build/npcap/Include".format(root=ROOT))
if os.path.exists(convert_path("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR))):
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR))
if os.path.exists(
convert_path("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR))
):
EXTRALINK_ARGS.append(
"{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR)
)
else: # best effort guess
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmingwex.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32"))
if os.path.exists(convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))):
EXTRALINK_ARGS.append(
"{root}/{usr}/lib/libmingwex.a".format(
root=ROOT, usr=USR + "/x86_64-w64-mingw32"
)
)
if os.path.exists(
convert_path("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))
):
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR))
else: # best effort guess
EXTRALINK_ARGS.append("{root}/{usr}/lib/libmsvcrt.a".format(root=ROOT, usr=USR+"/x86_64-w64-mingw32"))
with open(convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT))) as gcc_version_in:
EXTRALINK_ARGS.append(
"{root}/{usr}/lib/libmsvcrt.a".format(
root=ROOT, usr=USR + "/x86_64-w64-mingw32"
)
)
with open(
convert_path("{root}/tmp/nfstream_build/gcc_version.in".format(root=ROOT))
) as gcc_version_in:
GCC_VERSION = gcc_version_in.read().split("\n")[0].split(")")[-1].strip()
EXTRALINK_ARGS.append("{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format(root=ROOT,
usr=USR,
version=GCC_VERSION))
EXTRALINK_ARGS.append(
"{root}/{usr}/lib/gcc/x86_64-w64-mingw32/{version}/libgcc.a".format(
root=ROOT, usr=USR, version=GCC_VERSION
)
)
# IMPORTANT: We link with wpcap.lib from downloaded SDK in order to not bundle npcap OEM binaries.
# Consequently, the generated extension will still look for these binaries on the host machine.
# Instructions on how to install npcap binaries are provided in README (Windows Note).
EXTRALINK_ARGS.append("{root}/tmp/nfstream_build/npcap/Lib/x64/wpcap.lib".format(root=ROOT))
EXTRALINK_ARGS.append(
"{root}/tmp/nfstream_build/npcap/Lib/x64/wpcap.lib".format(root=ROOT)
)
# And finally socket stuff
if os.path.exists(convert_path("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR))):
if os.path.exists(
convert_path("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR))
):
EXTRALINK_ARGS.append("{root}/{usr}/lib/libws2_32.a".format(root=ROOT, usr=USR))
else: # best effort guess
EXTRALINK_ARGS.append("{root}/usr/lib/w32api/libws2_32.a".format(root=ROOT))
else:
EXTRALINK_ARGS.append("{root}/tmp/nfstream_build/{usr}/lib/libpcap.a".format(root=ROOT, usr=USR_LOCAL))
EXTRALINK_ARGS.append(
"{root}/tmp/nfstream_build/{usr}/lib/libpcap.a".format(root=ROOT, usr=USR_LOCAL)
)
with open(convert_path("{root}/tmp/nfstream_build/lib_engine_cdefinitions.c".format(root=ROOT))) as engine_cdef:
with open(
convert_path(
"{root}/tmp/nfstream_build/lib_engine_cdefinitions.c".format(root=ROOT)
)
) as engine_cdef:
ENGINE_CDEF = engine_cdef.read()
for to_replace in cdef_to_replace(ENGINE_CDEF):
ENGINE_CDEF = ENGINE_CDEF.replace(to_replace, "")
with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(root=ROOT))) as ndpi_cdefs:
with open(
convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(root=ROOT))
) as ndpi_cdefs:
NDPI_CDEF = ndpi_cdefs.read()
try:
APPLE_SILICON_DEFS = NDPI_CDEF.split("/* Generic byte swapping functions. */")[1].\
split("/* Generic little endian to host endianess byte swapping functions. */")[0]
APPLE_SILICON_DEFS = NDPI_CDEF.split("/* Generic byte swapping functions. */")[
1
].split(
"/* Generic little endian to host endianess byte swapping functions. */"
)[
0
]
NDPI_CDEF = NDPI_CDEF.replace(APPLE_SILICON_DEFS, "")
except IndexError:
pass
@ -121,7 +166,11 @@ with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions.h".format(ro
NDPI_MODULE_STRUCT_CDEF = NDPI_CDEF.split("//CFFI.NDPI_MODULE_STRUCT")[1]
with open(convert_path("{root}/tmp/nfstream_build/ndpi_cdefinitions_packed.h".format(root=ROOT))) as ndpi_cdefs_pack:
with open(
convert_path(
"{root}/tmp/nfstream_build/ndpi_cdefinitions_packed.h".format(root=ROOT)
)
) as ndpi_cdefs_pack:
NDPI_PACKED = ndpi_cdefs_pack.read()
for to_replace in cdef_to_replace(NDPI_PACKED):
NDPI_PACKED = NDPI_PACKED.replace(to_replace, "")
@ -133,12 +182,17 @@ NDPI_PACKED_STRUCTURES = NDPI_PACKED.split("//CFFI.NDPI_PACKED_STRUCTURES")[1]
# As cdef do not support if-def, yet we fix it by simple string replacement
SOCK_INCLUDES = """#include <unistd.h>\n#include <netinet/in.h>\n#include <sys/time.h>"""
if os.name != 'posix':
SOCK_INCLUDES = (
"""#include <unistd.h>\n#include <netinet/in.h>\n#include <sys/time.h>"""
)
if os.name != "posix":
SOCK_INCLUDES = """#include <winsock2.h>\n#include <process.h>\n#include <io.h>"""
ENGINE_INCLUDES = """
ENGINE_INCLUDES = (
"""
#include <stdlib.h>
""" + SOCK_INCLUDES + """
"""
+ SOCK_INCLUDES
+ """
#include <math.h>
#include <stdint.h>
#include <string.h>
@ -147,6 +201,7 @@ ENGINE_INCLUDES = """
#include <ndpi_api.h>
#include <pcap.h>
"""
)
ENGINE_SOURCE = ENGINE_INCLUDES + NDPI_MODULE_STRUCT_CDEF + ENGINE_CDEF
ENGINE_APIS = """
char * capture_get_interface(char * intf_name);
@ -181,12 +236,15 @@ const char *engine_lib_pcap_version(void);
ffi_builder = FFI()
ffi_builder.set_source("_lib_engine",
ENGINE_SOURCE,
include_dirs=[convert_path(d) for d in INCLUDE_DIRS],
extra_link_args=[convert_path(a) for a in EXTRALINK_ARGS])
ffi_builder.set_source(
"_lib_engine",
ENGINE_SOURCE,
include_dirs=[convert_path(d) for d in INCLUDE_DIRS],
extra_link_args=[convert_path(a) for a in EXTRALINK_ARGS],
)
ffi_builder.cdef("""
ffi_builder.cdef(
"""
typedef uint64_t u_int64_t;
typedef uint32_t u_int32_t;
typedef uint16_t u_int16_t;
@ -201,7 +259,8 @@ struct in6_addr {
};
struct pcap;
typedef struct pcap pcap_t;
""")
"""
)
ffi_builder.cdef(NDPI_PACKED_STRUCTURES, packed=True)
ffi_builder.cdef(NDPI_CDEF, override=True)
ffi_builder.cdef(ENGINE_CDEF.split("//CFFI_SHARED_STRUCTURES")[1])

View file

@ -18,42 +18,47 @@ from math import sqrt
from .utils import NFEvent
# When NFStream is extended with plugins, packer C structure is pythonized using the following namedtuple.
nf_packet = namedtuple('NFPacket', ['time',
'delta_time',
'direction',
'raw_size',
'ip_size',
'transport_size',
'payload_size',
'src_ip',
'src_mac',
'src_oui',
'dst_ip',
'dst_mac',
'dst_oui',
'src_port',
'dst_port',
'protocol',
'vlan_id',
'ip_version',
'ip_packet',
'syn',
'cwr',
'ece',
'urg',
'ack',
'psh',
'rst',
'fin',
'tunnel_id'])
nf_packet = namedtuple(
"NFPacket",
[
"time",
"delta_time",
"direction",
"raw_size",
"ip_size",
"transport_size",
"payload_size",
"src_ip",
"src_mac",
"src_oui",
"dst_ip",
"dst_mac",
"dst_oui",
"src_port",
"dst_port",
"protocol",
"vlan_id",
"ip_version",
"ip_packet",
"syn",
"cwr",
"ece",
"urg",
"ack",
"psh",
"rst",
"fin",
"tunnel_id",
],
)
class UDPS(object):
""" dummy class that add udps slot the flexibility required for extensions """
"""dummy class that add udps slot the flexibility required for extensions"""
def pythonize_packet(packet, ffi, flow):
""" convert a cdata packet to a namedtuple """
"""convert a cdata packet to a namedtuple"""
src_ip = flow.src_ip
dst_ip = flow.dst_ip
src_mac = flow.src_mac
@ -68,157 +73,179 @@ def pythonize_packet(packet, ffi, flow):
src_oui = flow.dst_oui
dst_oui = flow.src_oui
return nf_packet(time=packet.time,
delta_time=packet.delta_time,
direction=packet.direction,
raw_size=packet.raw_size,
ip_size=packet.ip_size,
transport_size=packet.transport_size,
payload_size=packet.payload_size,
src_ip=src_ip,
src_mac=src_mac,
src_oui=src_oui,
dst_ip=dst_ip,
dst_mac=dst_mac,
dst_oui=dst_oui,
src_port=packet.src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
vlan_id=packet.vlan_id,
ip_version=packet.ip_version,
ip_packet=bytes(ffi.buffer(packet.ip_content, packet.ip_content_len)),
syn=packet.syn,
cwr=packet.cwr,
ece=packet.ece,
urg=packet.urg,
ack=packet.ack,
psh=packet.psh,
rst=packet.rst,
fin=packet.fin,
tunnel_id=packet.tunnel_id)
return nf_packet(
time=packet.time,
delta_time=packet.delta_time,
direction=packet.direction,
raw_size=packet.raw_size,
ip_size=packet.ip_size,
transport_size=packet.transport_size,
payload_size=packet.payload_size,
src_ip=src_ip,
src_mac=src_mac,
src_oui=src_oui,
dst_ip=dst_ip,
dst_mac=dst_mac,
dst_oui=dst_oui,
src_port=packet.src_port,
dst_port=packet.dst_port,
protocol=packet.protocol,
vlan_id=packet.vlan_id,
ip_version=packet.ip_version,
ip_packet=bytes(ffi.buffer(packet.ip_content, packet.ip_content_len)),
syn=packet.syn,
cwr=packet.cwr,
ece=packet.ece,
urg=packet.urg,
ack=packet.ack,
psh=packet.psh,
rst=packet.rst,
fin=packet.fin,
tunnel_id=packet.tunnel_id,
)
class NFlow(object):
"""
NFlow is NFStream representation of a network flow.
It is a slotted class for performances reasons, and slots are initiated according to NFStream detected mode.
If nfstream is used with extension, we refer to it as sync mode, and we need to update slots from C structure.
If not, nfstream will compute all configured metrics within C structure and update it only at init and expire.
Such logic allows us to provide maximum performances when running without extensions. When set with extension
we pay the cost of flexibility with attributes access/update.
NFlow is NFStream representation of a network flow.
It is a slotted class for performances reasons, and slots are initiated according to NFStream detected mode.
If nfstream is used with extension, we refer to it as sync mode, and we need to update slots from C structure.
If not, nfstream will compute all configured metrics within C structure and update it only at init and expire.
Such logic allows us to provide maximum performances when running without extensions. When set with extension
we pay the cost of flexibility with attributes access/update.
"""
__slots__ = ('id',
'expiration_id',
'src_ip',
'src_mac',
'src_oui',
'src_port',
'dst_ip',
'dst_mac',
'dst_oui',
'dst_port',
'protocol',
'ip_version',
'vlan_id',
'tunnel_id',
'bidirectional_first_seen_ms',
'bidirectional_last_seen_ms',
'bidirectional_duration_ms',
'bidirectional_packets',
'bidirectional_bytes',
'src2dst_first_seen_ms',
'src2dst_last_seen_ms',
'src2dst_duration_ms',
'src2dst_packets',
'src2dst_bytes',
'dst2src_first_seen_ms',
'dst2src_last_seen_ms',
'dst2src_duration_ms',
'dst2src_packets',
'dst2src_bytes',
'bidirectional_min_ps',
'bidirectional_mean_ps',
'bidirectional_stddev_ps',
'bidirectional_max_ps',
'src2dst_min_ps',
'src2dst_mean_ps',
'src2dst_stddev_ps',
'src2dst_max_ps',
'dst2src_min_ps',
'dst2src_mean_ps',
'dst2src_stddev_ps',
'dst2src_max_ps',
'bidirectional_min_piat_ms',
'bidirectional_mean_piat_ms',
'bidirectional_stddev_piat_ms',
'bidirectional_max_piat_ms',
'src2dst_min_piat_ms',
'src2dst_mean_piat_ms',
'src2dst_stddev_piat_ms',
'src2dst_max_piat_ms',
'dst2src_min_piat_ms',
'dst2src_mean_piat_ms',
'dst2src_stddev_piat_ms',
'dst2src_max_piat_ms',
'bidirectional_syn_packets',
'bidirectional_cwr_packets',
'bidirectional_ece_packets',
'bidirectional_urg_packets',
'bidirectional_ack_packets',
'bidirectional_psh_packets',
'bidirectional_rst_packets',
'bidirectional_fin_packets',
'src2dst_syn_packets',
'src2dst_cwr_packets',
'src2dst_ece_packets',
'src2dst_urg_packets',
'src2dst_ack_packets',
'src2dst_psh_packets',
'src2dst_rst_packets',
'src2dst_fin_packets',
'dst2src_syn_packets',
'dst2src_cwr_packets',
'dst2src_ece_packets',
'dst2src_urg_packets',
'dst2src_ack_packets',
'dst2src_psh_packets',
'dst2src_rst_packets',
'dst2src_fin_packets',
'splt_direction',
'splt_ps',
'splt_piat_ms',
'application_name',
'application_category_name',
'application_is_guessed',
'application_confidence',
'requested_server_name',
'client_fingerprint',
'server_fingerprint',
'user_agent',
'content_type',
'_C',
'udps',
'system_process_pid',
'system_process_name',
'system_browser_tab')
def __init__(self, packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt, dissector,
decode_tunnels, system_visibility_mode):
self.id = NFEvent.FLOW # id set to NFLOW for internal communications and handled (incremented) by NFStreamer.
__slots__ = (
"id",
"expiration_id",
"src_ip",
"src_mac",
"src_oui",
"src_port",
"dst_ip",
"dst_mac",
"dst_oui",
"dst_port",
"protocol",
"ip_version",
"vlan_id",
"tunnel_id",
"bidirectional_first_seen_ms",
"bidirectional_last_seen_ms",
"bidirectional_duration_ms",
"bidirectional_packets",
"bidirectional_bytes",
"src2dst_first_seen_ms",
"src2dst_last_seen_ms",
"src2dst_duration_ms",
"src2dst_packets",
"src2dst_bytes",
"dst2src_first_seen_ms",
"dst2src_last_seen_ms",
"dst2src_duration_ms",
"dst2src_packets",
"dst2src_bytes",
"bidirectional_min_ps",
"bidirectional_mean_ps",
"bidirectional_stddev_ps",
"bidirectional_max_ps",
"src2dst_min_ps",
"src2dst_mean_ps",
"src2dst_stddev_ps",
"src2dst_max_ps",
"dst2src_min_ps",
"dst2src_mean_ps",
"dst2src_stddev_ps",
"dst2src_max_ps",
"bidirectional_min_piat_ms",
"bidirectional_mean_piat_ms",
"bidirectional_stddev_piat_ms",
"bidirectional_max_piat_ms",
"src2dst_min_piat_ms",
"src2dst_mean_piat_ms",
"src2dst_stddev_piat_ms",
"src2dst_max_piat_ms",
"dst2src_min_piat_ms",
"dst2src_mean_piat_ms",
"dst2src_stddev_piat_ms",
"dst2src_max_piat_ms",
"bidirectional_syn_packets",
"bidirectional_cwr_packets",
"bidirectional_ece_packets",
"bidirectional_urg_packets",
"bidirectional_ack_packets",
"bidirectional_psh_packets",
"bidirectional_rst_packets",
"bidirectional_fin_packets",
"src2dst_syn_packets",
"src2dst_cwr_packets",
"src2dst_ece_packets",
"src2dst_urg_packets",
"src2dst_ack_packets",
"src2dst_psh_packets",
"src2dst_rst_packets",
"src2dst_fin_packets",
"dst2src_syn_packets",
"dst2src_cwr_packets",
"dst2src_ece_packets",
"dst2src_urg_packets",
"dst2src_ack_packets",
"dst2src_psh_packets",
"dst2src_rst_packets",
"dst2src_fin_packets",
"splt_direction",
"splt_ps",
"splt_piat_ms",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence",
"requested_server_name",
"client_fingerprint",
"server_fingerprint",
"user_agent",
"content_type",
"_C",
"udps",
"system_process_pid",
"system_process_name",
"system_browser_tab",
)
def __init__(
self,
packet,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
):
self.id = (
NFEvent.FLOW
) # id set to NFLOW for internal communications and handled (incremented) by NFStreamer.
self.expiration_id = 0
# Initialize C structure.
self._C = lib.meter_initialize_flow(packet, accounting_mode, statistics, splt, n_dissections, dissector, sync)
self._C = lib.meter_initialize_flow(
packet, accounting_mode, statistics, splt, n_dissections, dissector, sync
)
if self._C == ffi.NULL: # raise OSError in order to be handled by meter.
raise OSError("Not enough memory for new flow creation.")
# Here we go for the first copy in order to make defined slots available
self.src_ip = ffi.string(self._C.src_ip_str).decode('utf-8', errors='ignore')
self.src_mac = ffi.string(self._C.src_mac_str).decode('utf-8', errors='ignore')
self.src_oui = ffi.string(self._C.src_oui).decode('utf-8', errors='ignore')
self.src_ip = ffi.string(self._C.src_ip_str).decode("utf-8", errors="ignore")
self.src_mac = ffi.string(self._C.src_mac_str).decode("utf-8", errors="ignore")
self.src_oui = ffi.string(self._C.src_oui).decode("utf-8", errors="ignore")
self.src_port = self._C.src_port
self.dst_ip = ffi.string(self._C.dst_ip_str).decode('utf-8', errors='ignore')
self.dst_mac = ffi.string(self._C.dst_mac_str).decode('utf-8', errors='ignore')
self.dst_oui = ffi.string(self._C.dst_oui).decode('utf-8', errors='ignore')
self.dst_ip = ffi.string(self._C.dst_ip_str).decode("utf-8", errors="ignore")
self.dst_mac = ffi.string(self._C.dst_mac_str).decode("utf-8", errors="ignore")
self.dst_oui = ffi.string(self._C.dst_oui).decode("utf-8", errors="ignore")
self.dst_port = self._C.dst_port
self.protocol = self._C.protocol
self.ip_version = self._C.ip_version
@ -291,15 +318,29 @@ class NFlow(object):
self.dst2src_fin_packets = self._C.dst2src_fin_packets
if n_dissections: # Same for dissection when > 0
if sync:
self.application_name = ffi.string(self._C.application_name).decode('utf-8', errors='ignore')
self.application_category_name = ffi.string(self._C.category_name).decode('utf-8', errors='ignore')
self.application_name = ffi.string(self._C.application_name).decode(
"utf-8", errors="ignore"
)
self.application_category_name = ffi.string(
self._C.category_name
).decode("utf-8", errors="ignore")
self.application_is_guessed = self._C.guessed
self.application_confidence = self._C.confidence
self.requested_server_name = ffi.string(self._C.requested_server_name).decode('utf-8', errors='ignore')
self.client_fingerprint = ffi.string(self._C.c_hash).decode('utf-8', errors='ignore')
self.server_fingerprint = ffi.string(self._C.s_hash).decode('utf-8', errors='ignore')
self.user_agent = ffi.string(self._C.user_agent).decode('utf-8', errors='ignore')
self.content_type = ffi.string(self._C.content_type).decode('utf-8', errors='ignore')
self.requested_server_name = ffi.string(
self._C.requested_server_name
).decode("utf-8", errors="ignore")
self.client_fingerprint = ffi.string(self._C.c_hash).decode(
"utf-8", errors="ignore"
)
self.server_fingerprint = ffi.string(self._C.s_hash).decode(
"utf-8", errors="ignore"
)
self.user_agent = ffi.string(self._C.user_agent).decode(
"utf-8", errors="ignore"
)
self.content_type = ffi.string(self._C.content_type).decode(
"utf-8", errors="ignore"
)
else:
self.application_name = None
self.application_category_name = None
@ -324,25 +365,56 @@ class NFlow(object):
if system_visibility_mode == 2:
self.system_browser_tab = ""
def update(self, packet, idle_timeout, active_timeout, ffi, lib, udps, sync, accounting_mode,
n_dissections, statistics, splt, dissector):
""" NFlow update method """
def update(
self,
packet,
idle_timeout,
active_timeout,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
):
"""NFlow update method"""
# First, we update internal C structure.
ret = lib.meter_update_flow(self._C, packet, idle_timeout, active_timeout, accounting_mode, statistics, splt,
n_dissections, dissector, sync)
if ret > 0: # If update done it will be zero, idle and active are matched to 1 and 2.
ret = lib.meter_update_flow(
self._C,
packet,
idle_timeout,
active_timeout,
accounting_mode,
statistics,
splt,
n_dissections,
dissector,
sync,
)
if (
ret > 0
): # If update done it will be zero, idle and active are matched to 1 and 2.
self.expiration_id = ret - 1
return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # expire it.
return self.expire(
udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
) # expire it.
if sync: # If running with Plugins
self.sync(n_dissections, statistics, splt, ffi, lib, sync)
# We need to copy computed values on C struct.
for udp in udps: # Then call each plugin on_update entrypoint.
udp.on_update(pythonize_packet(packet, ffi, self), self)
if self.expiration_id == -1: # One of the plugins set expiration to custom value (-1)
return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # Expire it.
if (
self.expiration_id == -1
): # One of the plugins set expiration to custom value (-1)
return self.expire(
udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
) # Expire it.
def expire(self, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector):
""" NFlow expiration method """
"""NFlow expiration method"""
# Call expiration of C structure.
lib.meter_expire_flow(self._C, n_dissections, dissector)
# Then sync (second copy in case of non sync mode)
@ -379,34 +451,46 @@ class NFlow(object):
bidirectional_packets = self.bidirectional_packets
# NOTE: We need the root square of the variance to provide sample stddev (Var**0.5)/(n-1)
if bidirectional_packets > 1:
self.bidirectional_stddev_ps = sqrt(self._C.bidirectional_stddev_ps/(bidirectional_packets - 1))
self.bidirectional_stddev_ps = sqrt(
self._C.bidirectional_stddev_ps / (bidirectional_packets - 1)
)
self.bidirectional_max_ps = self._C.bidirectional_max_ps
self.src2dst_min_ps = self._C.src2dst_min_ps
self.src2dst_mean_ps = self._C.src2dst_mean_ps
src2dst_packets = self.src2dst_packets
if src2dst_packets > 1:
self.src2dst_stddev_ps = sqrt(self._C.src2dst_stddev_ps/(src2dst_packets - 1))
self.src2dst_stddev_ps = sqrt(
self._C.src2dst_stddev_ps / (src2dst_packets - 1)
)
self.src2dst_max_ps = self._C.src2dst_max_ps
self.dst2src_min_ps = self._C.dst2src_min_ps
self.dst2src_mean_ps = self._C.dst2src_mean_ps
dst2src_packets = self.dst2src_packets
if dst2src_packets > 1:
self.dst2src_stddev_ps = sqrt(self._C.dst2src_stddev_ps / (dst2src_packets - 1))
self.dst2src_stddev_ps = sqrt(
self._C.dst2src_stddev_ps / (dst2src_packets - 1)
)
self.dst2src_max_ps = self._C.dst2src_max_ps
self.bidirectional_min_piat_ms = self._C.bidirectional_min_piat_ms
self.bidirectional_mean_piat_ms = self._C.bidirectional_mean_piat_ms
if bidirectional_packets > 2:
self.bidirectional_stddev_piat_ms = sqrt(self._C.bidirectional_stddev_piat_ms/(bidirectional_packets-2))
self.bidirectional_stddev_piat_ms = sqrt(
self._C.bidirectional_stddev_piat_ms / (bidirectional_packets - 2)
)
self.bidirectional_max_piat_ms = self._C.bidirectional_max_piat_ms
self.src2dst_min_piat_ms = self._C.src2dst_min_piat_ms
self.src2dst_mean_piat_ms = self._C.src2dst_mean_piat_ms
if src2dst_packets > 2:
self.src2dst_stddev_piat_ms = sqrt(self._C.src2dst_stddev_piat_ms/(src2dst_packets - 2))
self.src2dst_stddev_piat_ms = sqrt(
self._C.src2dst_stddev_piat_ms / (src2dst_packets - 2)
)
self.src2dst_max_piat_ms = self._C.src2dst_max_piat_ms
self.dst2src_min_piat_ms = self._C.dst2src_min_piat_ms
self.dst2src_mean_piat_ms = self._C.dst2src_mean_piat_ms
if dst2src_packets > 2:
self.dst2src_stddev_piat_ms = sqrt(self._C.dst2src_stddev_piat_ms/(dst2src_packets - 2))
self.dst2src_stddev_piat_ms = sqrt(
self._C.dst2src_stddev_piat_ms / (dst2src_packets - 2)
)
self.dst2src_max_piat_ms = self._C.dst2src_max_piat_ms
self.bidirectional_syn_packets = self._C.bidirectional_syn_packets
self.bidirectional_cwr_packets = self._C.bidirectional_cwr_packets
@ -435,24 +519,44 @@ class NFlow(object):
if n_dissections: # If dissection set (>0)
# We minimize updates to a single one, when detection completed.
if self._C.detection_completed < 2:
self.application_name = ffi.string(self._C.application_name).decode('utf-8', errors='ignore')
self.application_category_name = ffi.string(self._C.category_name).decode('utf-8', errors='ignore')
self.requested_server_name = ffi.string(self._C.requested_server_name).decode('utf-8', errors='ignore')
self.client_fingerprint = ffi.string(self._C.c_hash).decode('utf-8', errors='ignore')
self.server_fingerprint = ffi.string(self._C.s_hash).decode('utf-8', errors='ignore')
self.user_agent = ffi.string(self._C.user_agent).decode('utf-8', errors='ignore')
self.content_type = ffi.string(self._C.content_type).decode('utf-8', errors='ignore')
self.application_name = ffi.string(self._C.application_name).decode(
"utf-8", errors="ignore"
)
self.application_category_name = ffi.string(
self._C.category_name
).decode("utf-8", errors="ignore")
self.requested_server_name = ffi.string(
self._C.requested_server_name
).decode("utf-8", errors="ignore")
self.client_fingerprint = ffi.string(self._C.c_hash).decode(
"utf-8", errors="ignore"
)
self.server_fingerprint = ffi.string(self._C.s_hash).decode(
"utf-8", errors="ignore"
)
self.user_agent = ffi.string(self._C.user_agent).decode(
"utf-8", errors="ignore"
)
self.content_type = ffi.string(self._C.content_type).decode(
"utf-8", errors="ignore"
)
self.application_is_guessed = self._C.guessed
self.application_confidence = self._C.confidence
if splt:
if sync_mode: # Same for splt, once we reach splt limit, there is no need to sync it anymore.
if (
sync_mode
): # Same for splt, once we reach splt limit, there is no need to sync it anymore.
if self._C.bidirectional_packets <= splt:
self.splt_direction = ffi.unpack(self._C.splt_direction, splt)
self.splt_ps = ffi.unpack(self._C.splt_ps, splt)
self.splt_piat_ms = ffi.unpack(self._C.splt_piat_ms, splt)
else:
if self._C.splt_closed == 0: # we also release the memory to keep only the obtained list.
lib.meter_free_flow(self._C, n_dissections, splt, 0) # free SPLT
if (
self._C.splt_closed == 0
): # we also release the memory to keep only the obtained list.
lib.meter_free_flow(
self._C, n_dissections, splt, 0
) # free SPLT
else:
self.splt_direction = ffi.unpack(self._C.splt_direction, splt)
self.splt_ps = ffi.unpack(self._C.splt_ps, splt)
@ -460,11 +564,11 @@ class NFlow(object):
# Memory will be released by freer.
def is_idle(self, tick, idle_timeout):
""" is_idle method to check if NFlow is idle accoring to configured timeout """
"""is_idle method to check if NFlow is idle accoring to configured timeout"""
return (tick - idle_timeout) >= self._C.bidirectional_last_seen_ms
def __str__(self):
""" String representation of NFlow """
"""String representation of NFlow"""
started = False
printable = "NFlow("
for attr_name in self.__slots__:
@ -473,27 +577,38 @@ class NFlow(object):
printable += attr_name + "=" + str(getattr(self, attr_name))
started = True
else:
if attr_name == 'udps':
if attr_name == "udps":
for udp_name in self.udps.__dict__.keys():
printable += ',\n ' + attr_name + '.' + udp_name + "=" + str(getattr(self.udps,
udp_name))
printable += (
",\n "
+ attr_name
+ "."
+ udp_name
+ "="
+ str(getattr(self.udps, udp_name))
)
else:
printable += ',\n ' + attr_name + "=" + str(getattr(self, attr_name))
printable += (
",\n "
+ attr_name
+ "="
+ str(getattr(self, attr_name))
)
except AttributeError:
pass
printable += ")"
return printable
def keys(self):
""" get NFlow keys"""
"""get NFlow keys"""
# Note we transform udps to udps.value_name as preprocessing for csv/pandas interfaces
ret = []
for attr_name in self.__slots__:
try:
getattr(self, attr_name)
if attr_name == 'udps':
if attr_name == "udps":
for udp_name in self.udps.__dict__.keys():
ret.append(attr_name + '.' + udp_name)
ret.append(attr_name + "." + udp_name)
else:
ret.append(attr_name)
except AttributeError:
@ -501,13 +616,13 @@ class NFlow(object):
return ret
def values(self):
""" get flow values """
"""get flow values"""
# Note: same indexing as keys.
ret = []
for attr_name in self.__slots__:
try:
attr_value = getattr(self, attr_name)
if attr_name == 'udps':
if attr_name == "udps":
for udp_value in self.udps.__dict__.values():
ret.append(udp_value)
else:

View file

@ -31,12 +31,13 @@ FLOW_KEY = "{}:{}:{}:{}:{}:{}:{}:{}:{}"
class NFCache(OrderedDict):
""" LRU Flow Cache
"""LRU Flow Cache
The NFCache object is used to cache flows entries such as MRU entries are kept on the end and LRU entries
will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly
linked list with sentinel nodes to track order.
By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout.
"""
def __init__(self, *args, **kwds):
super().__init__(*args, **kwds)
@ -54,20 +55,38 @@ class NFCache(OrderedDict):
return next(iter(self))
def meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections, statistics, splt, ffi, lib,
dissector):
def meter_scan(
meter_tick,
cache,
idle_timeout,
channel,
udps,
sync,
n_dissections,
statistics,
splt,
ffi,
lib,
dissector,
):
"""Checks flow cache for expired flow.
Expired flows are identified, added to channel and then removed from the cache.
"""
remaining = True # We suppose that there is something to expire
scanned = 0
while remaining and scanned < 1000: # idle scan budget (each 10ms we scan 1000 as maximum)
while (
remaining and scanned < 1000
): # idle scan budget (each 10ms we scan 1000 as maximum)
try:
flow_key = cache.get_lru_key() # will return the LRU flow key.
flow = cache[flow_key]
if flow.is_idle(meter_tick, idle_timeout): # idle, expire it.
channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector))
channel.put(
flow.expire(
udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
)
)
del cache[flow_key]
del flow
scanned += 1
@ -79,52 +98,114 @@ def meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissectio
def get_flow_key(src_ip, src_port, dst_ip, dst_port, protocol, vlan_id, tunnel_id):
""" Create a consistent direction agnostic flow key """
"""Create a consistent direction agnostic flow key"""
if src_ip[1] < dst_ip[1] or ((src_ip[1] == dst_ip[1]) and (src_ip[0] < dst_ip[0])):
key = (src_ip[0], src_ip[1], src_port,
dst_ip[0], dst_ip[1], dst_port,
protocol, vlan_id, tunnel_id)
key = (
src_ip[0],
src_ip[1],
src_port,
dst_ip[0],
dst_ip[1],
dst_port,
protocol,
vlan_id,
tunnel_id,
)
else:
if src_ip[0] == dst_ip[0] and src_ip[1] == dst_ip[1]:
if src_port <= dst_port:
key = (src_ip[0], src_ip[1], src_port,
dst_ip[0], dst_ip[1], dst_port,
protocol, vlan_id, tunnel_id)
key = (
src_ip[0],
src_ip[1],
src_port,
dst_ip[0],
dst_ip[1],
dst_port,
protocol,
vlan_id,
tunnel_id,
)
else:
key = (dst_ip[0], dst_ip[1], dst_port,
src_ip[0], src_ip[1], src_port,
protocol, vlan_id, tunnel_id)
key = (
dst_ip[0],
dst_ip[1],
dst_port,
src_ip[0],
src_ip[1],
src_port,
protocol,
vlan_id,
tunnel_id,
)
else:
key = (dst_ip[0], dst_ip[1], dst_port,
src_ip[0], src_ip[1], src_port,
protocol, vlan_id, tunnel_id)
key = (
dst_ip[0],
dst_ip[1],
dst_port,
src_ip[0],
src_ip[1],
src_port,
protocol,
vlan_id,
tunnel_id,
)
return key
def get_flow_key_from_pkt(packet):
""" Create flow key from packet information (7-tuple)
"""Create flow key from packet information (7-tuple)
A flow key uniquely determines a flow using source ip,
destination ip, source port, destination port, TCP/UDP protocol, VLAN ID
and tunnel ID of the packets.
"""
return get_flow_key(packet.src_ip,
packet.src_port,
packet.dst_ip,
packet.dst_port,
packet.protocol,
packet.vlan_id,
packet.tunnel_id)
return get_flow_key(
packet.src_ip,
packet.src_port,
packet.dst_ip,
packet.dst_port,
packet.protocol,
packet.vlan_id,
packet.tunnel_id,
)
def consume(packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps, sync, accounting_mode, n_dissections,
statistics, splt, dissector, decode_tunnels, system_visibility_mode):
""" consume a packet and produce flow """
def consume(
packet,
cache,
active_timeout,
idle_timeout,
channel,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
):
"""consume a packet and produce flow"""
# We maintain state for active flows computation 1 for creation, 0 for update/cut, -1 for custom expire
flow_key = get_flow_key_from_pkt(packet)
try: # update flow
flow = cache[flow_key].update(packet, idle_timeout, active_timeout, ffi, lib, udps, sync, accounting_mode,
n_dissections, statistics, splt, dissector)
flow = cache[flow_key].update(
packet,
idle_timeout,
active_timeout,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
)
if flow is not None:
if flow.expiration_id < 0: # custom expiration
channel.put(flow)
@ -136,52 +217,123 @@ def consume(packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps
del cache[flow_key]
del flow
try:
cache[flow_key] = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections,
statistics, splt, dissector, decode_tunnels, system_visibility_mode)
if cache[flow_key].expiration_id == -1: # A user Plugin forced expiration on the first packet
cache[flow_key] = NFlow(
packet,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
)
if (
cache[flow_key].expiration_id == -1
): # A user Plugin forced expiration on the first packet
channel.put(
cache[flow_key].expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector))
cache[flow_key].expire(
udps,
sync,
n_dissections,
statistics,
splt,
ffi,
lib,
dissector,
)
)
del cache[flow_key]
state = 0
except OSError:
print("WARNING: Failed to allocate memory space for flow creation. Flow creation aborted.")
print(
"WARNING: Failed to allocate memory space for flow creation. Flow creation aborted."
)
state = 0
else:
state = 0
except KeyError: # create flow
try:
if sync:
flow = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt, dissector,
decode_tunnels, system_visibility_mode)
if flow.expiration_id == -1: # A user Plugin forced expiration on the first packet
channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector))
flow = NFlow(
packet,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
)
if (
flow.expiration_id == -1
): # A user Plugin forced expiration on the first packet
channel.put(
flow.expire(
udps,
sync,
n_dissections,
statistics,
splt,
ffi,
lib,
dissector,
)
)
del flow
state = 0
else:
cache[flow_key] = flow
state = 1
else:
cache[flow_key] = NFlow(packet, ffi, lib, udps, sync, accounting_mode, n_dissections, statistics, splt,
dissector, decode_tunnels, system_visibility_mode)
cache[flow_key] = NFlow(
packet,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
)
state = 1
except OSError:
print("WARNING: Failed to allocate memory space for flow creation. Flow creation aborted.")
print(
"WARNING: Failed to allocate memory space for flow creation. Flow creation aborted."
)
state = 0
return state
def meter_cleanup(cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector):
""" cleanup all entries in NFCache """
def meter_cleanup(
cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
):
"""cleanup all entries in NFCache"""
for flow_key in list(cache.keys()):
flow = cache[flow_key]
# Push it on channel.
channel.put(flow.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector))
channel.put(
flow.expire(
udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
)
)
del cache[flow_key]
del flow
def capture_track(lib, capture, mode, interface_stats, tracker, processed, ignored):
""" Update shared performance values """
"""Update shared performance values"""
lib.capture_stats(capture, interface_stats, mode)
tracker[0].value = interface_stats.dropped
tracker[1].value = processed
@ -193,17 +345,44 @@ def send_error(root_idx, channel, msg):
channel.put(InternalError(NFEvent.ERROR, msg))
def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots, root_idx, mode,
idle_timeout, active_timeout, accounting_mode, udps, n_dissections, statistics, splt,
channel, tracker, lock, group_id, system_visibility_mode, socket_buffer_size):
""" Metering workflow """
set_affinity(root_idx+1)
def meter_workflow(
source,
snaplen,
decode_tunnels,
bpf_filter,
promisc,
n_roots,
root_idx,
mode,
idle_timeout,
active_timeout,
accounting_mode,
udps,
n_dissections,
statistics,
splt,
channel,
tracker,
lock,
group_id,
system_visibility_mode,
socket_buffer_size,
):
"""Metering workflow"""
set_affinity(root_idx + 1)
ffi, lib = create_engine()
if lib is None:
send_error(root_idx, channel, ENGINE_LOAD_ERR)
return
meter_tick, meter_scan_tick, meter_track_tick = 0, 0, 0 # meter, idle scan and perf track timelines
meter_scan_interval, meter_track_interval = 10, 1000 # we scan each 10 msecs and update perf each sec.
meter_tick, meter_scan_tick, meter_track_tick = (
0,
0,
0,
) # meter, idle scan and perf track timelines
meter_scan_interval, meter_track_interval = (
10,
1000,
) # we scan each 10 msecs and update perf each sec.
cache = NFCache()
dissector = setup_dissector(ffi, lib, n_dissections)
if dissector == ffi.NULL and n_dissections:
@ -229,19 +408,39 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots
for source_idx, source in enumerate(sources):
error_child = ffi.new("char[256]")
capture = setup_capture(ffi, lib, source, snaplen, promisc, mode, error_child, group_id, socket_buffer_size)
capture = setup_capture(
ffi,
lib,
source,
snaplen,
promisc,
mode,
error_child,
group_id,
socket_buffer_size,
)
if capture is None:
send_error(root_idx, channel, ffi.string(error_child).decode('utf-8', errors='ignore'))
send_error(
root_idx,
channel,
ffi.string(error_child).decode("utf-8", errors="ignore"),
)
return
# Here the last operation, BPF filtering setup and activation.
if not activate_capture(capture, lib, error_child, bpf_filter, mode):
send_error(root_idx, channel, ffi.string(error_child).decode('utf-8', errors='ignore'))
send_error(
root_idx,
channel,
ffi.string(error_child).decode("utf-8", errors="ignore"),
)
return
remaining_packets = True
while remaining_packets:
nf_packet = ffi.new("struct nf_packet *")
ret = lib.capture_next(capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode))
ret = lib.capture_next(
capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode)
)
if ret > 0: # Valid must be processed by meter
packet_time = nf_packet.time
if packet_time > meter_tick:
@ -255,18 +454,57 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots
go_scan = True # Activate scan
meter_scan_tick = meter_tick
# Consume packet and return diff
diff = consume(nf_packet, cache, active_timeout, idle_timeout, channel, ffi, lib, udps, sync,
accounting_mode, n_dissections, statistics, splt, dissector, decode_tunnels,
system_visibility_mode)
diff = consume(
nf_packet,
cache,
active_timeout,
idle_timeout,
channel,
ffi,
lib,
udps,
sync,
accounting_mode,
n_dissections,
statistics,
splt,
dissector,
decode_tunnels,
system_visibility_mode,
)
active_flows += diff
if go_scan:
idles = meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections,
statistics, splt, ffi, lib, dissector)
idles = meter_scan(
meter_tick,
cache,
idle_timeout,
channel,
udps,
sync,
n_dissections,
statistics,
splt,
ffi,
lib,
dissector,
)
active_flows -= idles
else: # time ticker
if meter_tick - meter_scan_tick >= meter_scan_interval:
idles = meter_scan(meter_tick, cache, idle_timeout, channel, udps, sync, n_dissections,
statistics, splt, ffi, lib, dissector)
idles = meter_scan(
meter_tick,
cache,
idle_timeout,
channel,
udps,
sync,
n_dissections,
statistics,
splt,
ffi,
lib,
dissector,
)
active_flows -= idles
meter_scan_tick = meter_tick
elif ret == 0: # Ignored packet
@ -275,14 +513,26 @@ def meter_workflow(source, snaplen, decode_tunnels, bpf_filter, promisc, n_roots
pass
else: # End of file
remaining_packets = False # end of loop
if meter_tick - meter_track_tick >= meter_track_interval: # Performance tracking
capture_track(lib, capture, mode, interface_stats, tracker, processed_packets, ignored_packets)
if (
meter_tick - meter_track_tick >= meter_track_interval
): # Performance tracking
capture_track(
lib,
capture,
mode,
interface_stats,
tracker,
processed_packets,
ignored_packets,
)
meter_track_tick = meter_tick
# Close capture
lib.capture_close(capture)
# Expire all remaining flows in the cache.
meter_cleanup(cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector)
meter_cleanup(
cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
)
# Clean dissector
lib.dissector_cleanup(dissector)
# Release engine library

View file

@ -15,7 +15,8 @@ If not, see <http://www.gnu.org/licenses/>.
class NFPlugin(object):
""" NFPlugin class: Main entry point to extend NFStream """
"""NFPlugin class: Main entry point to extend NFStream"""
def __init__(self, **kwargs):
"""
NFPlugin Parameters:

View file

@ -32,10 +32,10 @@ class MsgType(Enum):
class DHCP(NFPlugin):
""" DHCP plugin
"""DHCP plugin
This plugin extracts client information from DHCP sessions, and split flow
on transaction completion to prevent several sessions to be considered as
on transaction completion to prevent several sessions to be considered as
the same flow. The following information are retrieved:
- dhcp_12 (Option 12, hostname): hostname is decoded as utf8 with special characters being replaced.
@ -48,6 +48,7 @@ class DHCP(NFPlugin):
- dhcp_addr: The IP address allocated to the client
"""
def on_init(self, packet, flow):
flow.udps.dhcp_12 = None # Sometimes hostname is missing from ndpi
flow.udps.dhcp_50 = None # must be anonymized on export
@ -70,19 +71,19 @@ class DHCP(NFPlugin):
for opt in dhcp.opts:
if opt[0] == 12: # Hostname
hostname = opt[1].decode('utf-8', errors='replace')
hostname = opt[1].decode("utf-8", errors="replace")
if len(hostname) > 0:
flow.udps.dhcp_12 = hostname
elif opt[0] == 53: # Msg type
msg_type = MsgType(int.from_bytes(opt[1], "big"))
elif opt[0] == 60: # Vendor class identifier
flow.udps.dhcp_60 = opt[1].decode('utf-8')
flow.udps.dhcp_60 = opt[1].decode("utf-8")
elif opt[0] == 77: # User class id
flow.udps.dhcp_77 = opt[1].decode('utf-8')
flow.udps.dhcp_77 = opt[1].decode("utf-8")
elif opt[0] == 57: # Maximum DHCP Message Size
flow.udps.dhcp_57 = int.from_bytes(opt[1], "big")
elif opt[0] == 55: # parameter request list (aka fingerprint)
opt55 = ','.join(str(i) for i in opt[1])
opt55 = ",".join(str(i) for i in opt[1])
elif opt[0] == 50: # requested ip
opt50 = ipaddress.ip_address(int.from_bytes(opt[1], "big"))
options.append(opt[0])
@ -101,8 +102,10 @@ class DHCP(NFPlugin):
msg_type, options, opt50, opt55 = self._process_options(flow, dhcp)
if msg_type == MsgType.REQUEST:
mac = struct.unpack('BBBBBB', dhcp.chaddr)
flow.udps.dhcp_oui = '{:02x}:{:02x}:{:02x}'.format(mac[0], mac[1], mac[2])
mac = struct.unpack("BBBBBB", dhcp.chaddr)
flow.udps.dhcp_oui = "{:02x}:{:02x}:{:02x}".format(
mac[0], mac[1], mac[2]
)
flow.udps.dhcp_options = options
flow.udps.dhcp_55 = opt55 if opt55 is not None else None
flow.udps.dhcp_50 = str(opt50) if opt50 is not None else None
@ -110,7 +113,12 @@ class DHCP(NFPlugin):
if ciaddr != ipaddress.ip_address(0):
flow.udps.dhcp_addr = str(ciaddr)
if msg_type in [MsgType.ACK, MsgType.NACK, MsgType.INFORM, MsgType.DECLINE] or flow.src_ip == str(ipaddress.ip_address(0)):
if msg_type in [
MsgType.ACK,
MsgType.NACK,
MsgType.INFORM,
MsgType.DECLINE,
] or flow.src_ip == str(ipaddress.ip_address(0)):
flow.expiration_id = -1
if flow.udps.dhcp_msg_type is None:

View file

@ -18,13 +18,14 @@ import dpkt
class MDNS(NFPlugin):
""" MDNS plugin
"""MDNS plugin
This plugin extracts answer information from MDNS requests. The following information are retrieved:
- mdns_ptr: An ordered list of PTR answsers.
"""
def on_init(self, packet, flow):
flow.udps.mdns_ptr = []
self.on_update(packet, flow)
@ -41,6 +42,6 @@ class MDNS(NFPlugin):
if len(dns.an) > 0:
for answer in dns.an:
if answer.type == 12: # PTR
ptr = answer.ptrname.replace(',', ' ')
ptr = answer.ptrname.replace(",", " ")
if ptr not in flow.udps.mdns_ptr:
flow.udps.mdns_ptr.append(ptr)

View file

@ -17,10 +17,11 @@ from nfstream import NFPlugin
class FlowSlicer(NFPlugin):
""" FlowSlicer plugin
"""FlowSlicer plugin
This plugin implements a custom flow expiration logic based on a packets count limit.
"""
def on_init(self, packet, flow):
if self.limit == 1:
flow.expiration_id = -1

View file

@ -32,6 +32,7 @@ class SPLT(NFPlugin):
- splt_ipt: Array with inter packet arrival time in milliseconds.
Note: Tail will be set with default value -1.
"""
@staticmethod
def _get_packet_size(packet, accounting_mode):
if accounting_mode == 0:
@ -54,5 +55,7 @@ class SPLT(NFPlugin):
if flow.bidirectional_packets <= self.sequence_length:
packet_index = flow.bidirectional_packets - 1
flow.udps.splt_direction[packet_index] = packet.direction
flow.udps.splt_ps[packet_index] = self._get_packet_size(packet, self.accounting_mode)
flow.udps.splt_ps[packet_index] = self._get_packet_size(
packet, self.accounting_mode
)
flow.udps.splt_piat_ms[packet_index] = packet.delta_time

View file

@ -22,7 +22,7 @@ from nfstream import NFPlugin
class WFPlugin(NFPlugin):
"""Wavelet-based Features plugin. This plugin attempts to recreate wavelet-based features from [1].
Features are calculated from `ip_size`, that is binned on packet timestamps into timeseries
of len 2**`levels` (spanning `active_timeout`).
@ -49,17 +49,17 @@ class WFPlugin(NFPlugin):
assert hasattr(self, "levels")
assert hasattr(self, "active_timeout")
# Pywt requires vector of length 2**level as input
# Pywt requires vector of length 2**level as input
# set nbins to that number:
self.nbins = 2**self.levels
# Given `active_timeout` as max flow length calculate size of each bin:
self.bin_size = (self.active_timeout / 2**self.levels * 1000)
self.bin_size = self.active_timeout / 2**self.levels * 1000
# Reserve a empty vectors for data i forward and backward direction
flow.udps.forward = np.zeros(self.nbins).tolist()
flow.udps.forward = np.zeros(self.nbins).tolist()
flow.udps.backward = np.zeros(self.nbins).tolist()
# Save first packet timestamp that will be used to calc index
# of time series bin
flow.udps.first_packet_timestamp = packet.time # timestamp in ms
@ -67,7 +67,7 @@ class WFPlugin(NFPlugin):
def on_update(self, packet, flow):
# Calculate time in ms from first packet
mstime_since_first_packet: int = packet.time - flow.udps.first_packet_timestamp
# Calculate index of bin to put data into
ibin, _ = divmod(mstime_since_first_packet, self.bin_size)
@ -133,12 +133,12 @@ class WFPlugin(NFPlugin):
E_k = np.sum(np.power(np.abs(d), 2), axis=0)
E_total = np.sum(E_k, axis=0)
# Relarive wavelet energy
p_k = E_k / (E_total + 1e-7)
p_k = E_k / (E_total + 1e-7)
p_n = np.power(d, 2) / (E_k + 1e-7)
# Shannon entropy
S_k = -np.sum(p_n * np.log(p_n + 1e-7), axis=0)
S_k = -np.sum(p_n * np.log(p_n + 1e-7), axis=0)
# Absolute mean of coefficients
u_k = np.mean(np.abs(d), axis=0)
# Std. deviation of coeficcients

View file

@ -26,8 +26,21 @@ from .meter import meter_workflow
from .anonymizer import NFAnonymizer
from .engine import is_interface
from .plugin import NFPlugin
from .utils import csv_converter, open_file, RepeatedTimer, update_performances, set_affinity, available_cpus_count
from .utils import validate_flows_per_file, NFMode, create_csv_file_path, NFEvent, validate_rotate_files
from .utils import (
csv_converter,
open_file,
RepeatedTimer,
update_performances,
set_affinity,
available_cpus_count,
)
from .utils import (
validate_flows_per_file,
NFMode,
create_csv_file_path,
NFEvent,
validate_rotate_files,
)
from .system import system_socket_worflow, match_flow_conn
@ -48,25 +61,27 @@ class NFStreamer(object):
"""
def __init__(self,
source=None,
decode_tunnels=True,
bpf_filter=None,
promiscuous_mode=True,
snapshot_length=1536,
socket_buffer_size=0,
idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
active_timeout=1800,
accounting_mode=0,
udps=None,
n_dissections=20,
statistical_analysis=False,
splt_analysis=0,
n_meters=0,
max_nflows=0,
performance_report=0,
system_visibility_mode=0,
system_visibility_poll_ms=100):
def __init__(
self,
source=None,
decode_tunnels=True,
bpf_filter=None,
promiscuous_mode=True,
snapshot_length=1536,
socket_buffer_size=0,
idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
active_timeout=1800,
accounting_mode=0,
udps=None,
n_dissections=20,
statistical_analysis=False,
splt_analysis=0,
n_meters=0,
max_nflows=0,
performance_report=0,
system_visibility_mode=0,
system_visibility_poll_ms=100,
):
with NFStreamer.glock:
NFStreamer.streamer_id += 1
self._idx = NFStreamer.streamer_id
@ -115,13 +130,17 @@ class NFStreamer(object):
if not isfile(value[i]):
raise TypeError
except TypeError:
raise ValueError("Invalid pcap file path at index: " + str(i) + ".")
raise ValueError(
"Invalid pcap file path at index: " + str(i) + "."
)
self._mode = NFMode.MULTIPLE_FILES
else:
try:
value = str(os.fspath(value))
except TypeError:
raise ValueError("Please specify a pcap file path or a valid network interface name as source.")
raise ValueError(
"Please specify a pcap file path or a valid network interface name as source."
)
if isfile(value):
self._mode = NFMode.SINGLE_FILE
else:
@ -130,7 +149,9 @@ class NFStreamer(object):
self._mode = NFMode.INTERFACE
value = interface
else:
raise ValueError("Please specify a pcap file path or a valid network interface name as source.")
raise ValueError(
"Please specify a pcap file path or a valid network interface name as source."
)
self._source = value
@property
@ -140,7 +161,9 @@ class NFStreamer(object):
@decode_tunnels.setter
def decode_tunnels(self, value):
if not isinstance(value, bool):
raise ValueError("Please specify a valid decode_tunnels parameter (possible values: True, False).")
raise ValueError(
"Please specify a valid decode_tunnels parameter (possible values: True, False)."
)
self._decode_tunnels = value
@property
@ -160,7 +183,9 @@ class NFStreamer(object):
@promiscuous_mode.setter
def promiscuous_mode(self, value):
if not isinstance(value, bool):
raise ValueError("Please specify a valid promiscuous_mode parameter (possible values: True, False).")
raise ValueError(
"Please specify a valid promiscuous_mode parameter (possible values: True, False)."
)
self._promiscuous_mode = value
@property
@ -170,7 +195,9 @@ class NFStreamer(object):
@snapshot_length.setter
def snapshot_length(self, value):
if not isinstance(value, int) or value <= 0:
raise ValueError("Please specify a valid snapshot_length parameter (positive integer).")
raise ValueError(
"Please specify a valid snapshot_length parameter (positive integer)."
)
self._snapshot_length = value
@property
@ -179,8 +206,10 @@ class NFStreamer(object):
@socket_buffer_size.setter
def socket_buffer_size(self, value):
if not isinstance(value, int) or (value < 0 or value > 2**31-1):
raise ValueError("Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1).")
if not isinstance(value, int) or (value < 0 or value > 2**31 - 1):
raise ValueError(
"Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1)."
)
self._socket_buffer_size = value
@property
@ -189,8 +218,12 @@ class NFStreamer(object):
@idle_timeout.setter
def idle_timeout(self, value):
if not isinstance(value, int) or ((value < 0) or (value * 1000) > 18446744073709551615): # max uint64_t
raise ValueError("Please specify a valid idle_timeout parameter (positive integer in seconds).")
if not isinstance(value, int) or (
(value < 0) or (value * 1000) > 18446744073709551615
): # max uint64_t
raise ValueError(
"Please specify a valid idle_timeout parameter (positive integer in seconds)."
)
self._idle_timeout = value
@property
@ -199,8 +232,12 @@ class NFStreamer(object):
@active_timeout.setter
def active_timeout(self, value):
if not isinstance(value, int) or ((value < 0) or (value * 1000) > 18446744073709551615): # max uint64_t
raise ValueError("Please specify a valid active_timeout parameter (positive integer in seconds).")
if not isinstance(value, int) or (
(value < 0) or (value * 1000) > 18446744073709551615
): # max uint64_t
raise ValueError(
"Please specify a valid active_timeout parameter (positive integer in seconds)."
)
self._active_timeout = value
@property
@ -210,7 +247,9 @@ class NFStreamer(object):
@accounting_mode.setter
def accounting_mode(self, value):
if not isinstance(value, int) or (value not in [0, 1, 2, 3]):
raise ValueError("Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3).")
raise ValueError(
"Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3)."
)
self._accounting_mode = value
@property
@ -225,7 +264,9 @@ class NFStreamer(object):
if isinstance(plugin, NFPlugin):
pass
else:
raise ValueError("User defined plugins must inherit from NFPlugin type.")
raise ValueError(
"User defined plugins must inherit from NFPlugin type."
)
self._udps = value
else:
if isinstance(value, NFPlugin):
@ -234,7 +275,9 @@ class NFStreamer(object):
if value is None:
self._udps = ()
else:
raise ValueError("User defined plugins must inherit from NFPlugin type.")
raise ValueError(
"User defined plugins must inherit from NFPlugin type."
)
@property
def n_dissections(self):
@ -243,7 +286,9 @@ class NFStreamer(object):
@n_dissections.setter
def n_dissections(self, value):
if not isinstance(value, int) or (value < 0 or value > 255):
raise ValueError("Please specify a valid n_dissections parameter (possible values in : [0,...,255]).")
raise ValueError(
"Please specify a valid n_dissections parameter (possible values in : [0,...,255])."
)
self._n_dissections = value
@property
@ -253,7 +298,9 @@ class NFStreamer(object):
@statistical_analysis.setter
def statistical_analysis(self, value):
if not isinstance(value, bool):
raise ValueError("Please specify a valid statistical_analysis parameter (possible values: True, False).")
raise ValueError(
"Please specify a valid statistical_analysis parameter (possible values: True, False)."
)
self._statistical_analysis = value
@property
@ -263,9 +310,13 @@ class NFStreamer(object):
@splt_analysis.setter
def splt_analysis(self, value):
if not isinstance(value, int) or (value < 0 or value > 65535):
raise ValueError("Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])")
raise ValueError(
"Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])"
)
if value > 255:
print("[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool.")
print(
"[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool."
)
self._splt_analysis = value
@property
@ -277,17 +328,25 @@ class NFStreamer(object):
if isinstance(value, int) and value >= 0:
pass
else:
raise ValueError("Please specify a valid n_meters parameter (>=1 or 0 for auto scaling).")
raise ValueError(
"Please specify a valid n_meters parameter (>=1 or 0 for auto scaling)."
)
c_cpus, c_cores = available_cpus_count(), psutil.cpu_count(logical=False)
if c_cores is None: # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078)
if (
c_cores is None
): # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078)
c_cores = c_cpus
if value == 0:
if platform.system() == "Linux" and self._mode == NFMode.INTERFACE:
self._n_meters = c_cpus - 1 # We are in live capture mode and kernel fanout will be available
self._n_meters = (
c_cpus - 1
) # We are in live capture mode and kernel fanout will be available
# only on Linux, we set the n_meters to detected logical CPUs -1
else: # Windows, MacOS, offline capture
if c_cpus >= c_cores:
if c_cpus == 2 * c_cores or c_cpus == c_cores: # multi-thread or single threaded
if (
c_cpus == 2 * c_cores or c_cpus == c_cores
): # multi-thread or single threaded
self._n_meters = c_cores - 1
else:
self._n_meters = int(divmod(c_cpus / 2, 1)[0]) - 1
@ -297,7 +356,11 @@ class NFStreamer(object):
if (value + 1) <= c_cpus:
self._n_meters = value
else: # avoid contention
print("WARNING: n_meters set to :{} in order to avoid contention.".format(c_cpus - 1))
print(
"WARNING: n_meters set to :{} in order to avoid contention.".format(
c_cpus - 1
)
)
self._n_meters = c_cpus - 1
if self._n_meters == 0: # one CPU case
self._n_meters = 1
@ -322,8 +385,10 @@ class NFStreamer(object):
if isinstance(value, int) and value >= 0:
pass
else:
raise ValueError("Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)"
" or 0 to disable). [Available only for Live capture]")
raise ValueError(
"Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)"
" or 0 to disable). [Available only for Live capture]"
)
self._performance_report = value
@property
@ -334,16 +399,20 @@ class NFStreamer(object):
def system_visibility_mode(self, value):
if isinstance(value, int) and value in [0, 1]:
if self._mode == NFMode.SINGLE_FILE and value > 0:
print("WARNING: system_visibility_mode switched to 0 in offline capture "
"(available only for live capture)")
print(
"WARNING: system_visibility_mode switched to 0 in offline capture "
"(available only for live capture)"
)
value = 0
else:
pass
else:
raise ValueError("Please specify a valid system_visibility_mode parameter\n"
"0: disable\n"
"1: process information\n"
"[Available only for live capture on the system generating the traffic]")
raise ValueError(
"Please specify a valid system_visibility_mode parameter\n"
"0: disable\n"
"1: process information\n"
"[Available only for live capture on the system generating the traffic]"
)
self._system_visibility_mode = value
@property
@ -355,8 +424,10 @@ class NFStreamer(object):
if isinstance(value, int) and value >= 0:
pass
else:
raise ValueError("Please specify a valid system_visibility_poll_ms parameter "
"(positive integer in milliseconds)")
raise ValueError(
"Please specify a valid system_visibility_poll_ms parameter "
"(positive integer in milliseconds)"
)
self._system_visibility_poll_ms = value
def __iter__(self):
@ -374,50 +445,76 @@ class NFStreamer(object):
# To avoid issues on PyPy on Windows (See https://foss.heptapod.net/pypy/pypy/-/issues/3488), All
# multiprocessing Value invocation must be performed before the call to Queue.
n_meters = self.n_meters
idx_generator = self._mp_context.Value('i', 0)
idx_generator = self._mp_context.Value("i", 0)
for i in range(n_meters):
performances.append([self._mp_context.Value('I', 0),
self._mp_context.Value('I', 0),
self._mp_context.Value('I', 0)])
performances.append(
[
self._mp_context.Value("I", 0),
self._mp_context.Value("I", 0),
self._mp_context.Value("I", 0),
]
)
channel = self._mp_context.Queue(maxsize=32767) # Backpressure strategy.
# We set it to (2^15-1) to cope with OSX max semaphore value.
group_id = os.getpid() + self._idx # Used for fanout on Linux systems
try:
for i in range(n_meters):
meters.append(self._mp_context.Process(target=meter_workflow,
args=(self.source,
self.snapshot_length,
self.decode_tunnels,
self.bpf_filter,
self.promiscuous_mode,
n_meters,
i,
self._mode,
self.idle_timeout * 1000,
self.active_timeout * 1000,
self.accounting_mode,
self.udps,
self.n_dissections,
self.statistical_analysis,
self.splt_analysis,
channel,
performances[i],
lock,
group_id,
self.system_visibility_mode,
self.socket_buffer_size,)))
meters.append(
self._mp_context.Process(
target=meter_workflow,
args=(
self.source,
self.snapshot_length,
self.decode_tunnels,
self.bpf_filter,
self.promiscuous_mode,
n_meters,
i,
self._mode,
self.idle_timeout * 1000,
self.active_timeout * 1000,
self.accounting_mode,
self.udps,
self.n_dissections,
self.statistical_analysis,
self.splt_analysis,
channel,
performances[i],
lock,
group_id,
self.system_visibility_mode,
self.socket_buffer_size,
),
)
)
meters[i].daemon = True # demonize meter
meters[i].start()
if self._mode == NFMode.INTERFACE and self.performance_report > 0:
if platform.system() == "Linux":
rt = RepeatedTimer(self.performance_report, update_performances, performances, True, idx_generator)
rt = RepeatedTimer(
self.performance_report,
update_performances,
performances,
True,
idx_generator,
)
else:
rt = RepeatedTimer(self.performance_report, update_performances, performances, False, idx_generator)
rt = RepeatedTimer(
self.performance_report,
update_performances,
performances,
False,
idx_generator,
)
if self._mode == NFMode.INTERFACE and self.system_visibility_mode:
socket_listener = self._mp_context.Process(target=system_socket_worflow,
args=(channel,
self.idle_timeout * 1000,
self.system_visibility_poll_ms / 1000,))
socket_listener = self._mp_context.Process(
target=system_socket_worflow,
args=(
channel,
self.idle_timeout * 1000,
self.system_visibility_poll_ms / 1000,
),
)
socket_listener.daemon = True # demonize socket_listener
socket_listener.start()
@ -435,7 +532,9 @@ class NFStreamer(object):
child_error = recv.message
break
elif recv.id == NFEvent.ALL_AFFINITY_SET:
set_affinity(0) # we pin streamer to core 0 as it's the less intensive task and several services runs
set_affinity(
0
) # we pin streamer to core 0 as it's the less intensive task and several services runs
# by default on this core.
elif recv.id == NFEvent.SOCKET_CREATE:
conn_cache[recv.key] = [recv.process_name, recv.process_pid]
@ -444,7 +543,10 @@ class NFStreamer(object):
else: # NFEvent.FLOW
recv.id = idx_generator.value # Unify ID
idx_generator.value = idx_generator.value + 1
if self._mode == NFMode.INTERFACE and self.system_visibility_mode:
if (
self._mode == NFMode.INTERFACE
and self.system_visibility_mode
):
recv = match_flow_conn(conn_cache, recv)
yield recv
if recv.id == self.max_nflows:
@ -464,10 +566,14 @@ class NFStreamer(object):
channel.join_thread() # and we join its thread
if child_error is not None:
raise ValueError(child_error)
except ValueError as observer_error: # job initiation failed due to some bad observer parameters.
except (
ValueError
) as observer_error: # job initiation failed due to some bad observer parameters.
raise ValueError(observer_error)
def to_csv(self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0):
def to_csv(
self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0
):
validate_flows_per_file(flows_per_file)
validate_rotate_files(rotate_files)
chunked, chunk_idx = True, -1
@ -479,18 +585,20 @@ class NFStreamer(object):
f = None
for flow in self:
try:
if total_flows == 0 or (chunked and (chunk_flows > flows_per_file)): # header creation
if total_flows == 0 or (
chunked and (chunk_flows > flows_per_file)
): # header creation
if f is not None:
f.close()
chunk_flows = 1
chunk_idx += 1
f = open_file(output_path, chunked, chunk_idx, rotate_files)
header = ','.join([str(i) for i in flow.keys()]) + "\n"
f.write(header.encode('utf-8'))
header = ",".join([str(i) for i in flow.keys()]) + "\n"
f.write(header.encode("utf-8"))
values = anon.process(flow)
csv_converter(values)
to_export = ','.join([str(i) for i in values]) + "\n"
f.write(to_export.encode('utf-8'))
to_export = ",".join([str(i) for i in values]) + "\n"
f.write(to_export.encode("utf-8"))
total_flows = total_flows + 1
chunk_flows += 1
except KeyboardInterrupt:
@ -501,16 +609,26 @@ class NFStreamer(object):
return total_flows
def to_pandas(self, columns_to_anonymize=()):
""" streamer to pandas function """
temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format(pid=os.getpid(),
iid=NFStreamer.streamer_id,
ts=tm.time())
total_flows = self.to_csv(path=temp_file_path, columns_to_anonymize=columns_to_anonymize, flows_per_file=0)
"""streamer to pandas function"""
temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format(
pid=os.getpid(), iid=NFStreamer.streamer_id, ts=tm.time()
)
total_flows = self.to_csv(
path=temp_file_path,
columns_to_anonymize=columns_to_anonymize,
flows_per_file=0,
)
if total_flows > 0: # If there is flows, return Dataframe else return None.
df = pd.read_csv(temp_file_path, engine="c") # Use C engine for superior performance (non-experimental)
df = pd.read_csv(
temp_file_path, engine="c"
) # Use C engine for superior performance (non-experimental)
if total_flows != df.shape[0]:
print("WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() "
"method if drops are critical.".format(abs(df.shape[0] - total_flows)))
print(
"WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() "
"method if drops are critical.".format(
abs(df.shape[0] - total_flows)
)
)
else:
df = None
if os.path.exists(temp_file_path):

View file

@ -21,19 +21,17 @@ from .utils import NFEvent
import time
NFSocket = namedtuple('NFSocket', ['id',
'key',
'process_pid',
'process_name'])
NFSocket = namedtuple("NFSocket", ["id", "key", "process_pid", "process_name"])
class ConnCache(OrderedDict):
""" LRU Connections Cache
"""LRU Connections Cache
The ConnCache object is used to cache connections entries such as MRU entries are kept on the end and LRU entries
will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly
linked list with sentinel nodes to track order.
By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout.
"""
def __init__(self, channel, timeout, *args, **kwds):
self.channel = channel
self.timeout = timeout + 5000
@ -54,17 +52,21 @@ class ConnCache(OrderedDict):
return next(iter(self))
def scan(self, current_time):
""" Scan and delete LRU entries based on a defined timeout """
"""Scan and delete LRU entries based on a defined timeout"""
if (current_time - self.last_scan_time) > 10:
remaining = True # We suppose that there is something to expire
scanned = 0
while remaining and scanned <= 1000: # Each 10 ms we scan with 1000 entries budget
while (
remaining and scanned <= 1000
): # Each 10 ms we scan with 1000 entries budget
try:
lru_key = self.get_lru_key() # will return the LRU conn key.
lru_last_update_time = self[lru_key]
if current_time - lru_last_update_time >= self.timeout:
del self[lru_key]
self.channel.put(NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None)) # Send to streamer
self.channel.put(
NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None)
) # Send to streamer
scanned += 1
else:
remaining = False # LRU flow is not yet idle.
@ -74,7 +76,7 @@ class ConnCache(OrderedDict):
def simplify_protocol(protocol):
""" Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others """
"""Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others"""
if protocol == 6:
return protocol
if protocol == 17:
@ -83,17 +85,14 @@ def simplify_protocol(protocol):
def get_conn_key_from_flow(f):
""" Compute a conn key from NFlow object attributes """
return get_flow_key(f.src_ip,
f.src_port,
f.dst_ip,
f.dst_port,
simplify_protocol(f.protocol),
0, 0)
"""Compute a conn key from NFlow object attributes"""
return get_flow_key(
f.src_ip, f.src_port, f.dst_ip, f.dst_port, simplify_protocol(f.protocol), 0, 0
)
def match_flow_conn(conn_cache, flow):
""" Match a flow with a connection entry based on a shared key"""
"""Match a flow with a connection entry based on a shared key"""
if len(conn_cache) > 0:
flow_key = get_conn_key_from_flow(flow)
try:
@ -106,23 +105,27 @@ def match_flow_conn(conn_cache, flow):
def get_conn_key(c):
""" Create a 5-tuple connection key tuple """
"""Create a 5-tuple connection key tuple"""
if c.raddr != () and c.pid is not None:
if c.type == SocketKind.SOCK_STREAM: # TCP protocol
return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0)
return get_flow_key(
c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0
)
if c.type == SocketKind.SOCK_DGRAM: # UDP protocol
return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0)
return get_flow_key(
c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0
)
return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 0, 0, 0)
return None
def system_socket_worflow(channel, idle_timeout, poll_period):
""" Host ground-truth generation workflow """
"""Host ground-truth generation workflow"""
conn_cache = ConnCache(channel=channel, timeout=idle_timeout)
try:
while True:
current_time = time.time() * 1000
for conn in net_connections(kind='inet'):
for conn in net_connections(kind="inet"):
# IMPORTANT: The rationale behind the usage of an active polling approach (net_connections call):
# System process visibility is intended to generate the most accurate ground truth for traffic
# classification end-host-based research experiments, as reported in the literature [1].
@ -140,7 +143,9 @@ def system_socket_worflow(channel, idle_timeout, poll_period):
if key not in conn_cache: # Create and send
process_name = Process(conn.pid).name()
conn_cache[key] = current_time
channel.put(NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name)) # Send to streamer
channel.put(
NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name)
) # Send to streamer
else: # update time
conn_cache[key] = current_time
conn_cache.scan(current_time)

View file

@ -35,56 +35,58 @@ class NFMode(IntEnum):
MULTIPLE_FILES = 2
InternalError = namedtuple('InternalError', ['id', 'message'])
InternalError = namedtuple("InternalError", ["id", "message"])
InternalState = namedtuple('InternalState', ['id'])
InternalState = namedtuple("InternalState", ["id"])
def validate_flows_per_file(n):
""" Simple parameter validator """
"""Simple parameter validator"""
if not isinstance(n, int) or isinstance(n, int) and n < 0:
raise ValueError("Please specify a valid flows_per_file parameter (>= 0).")
def validate_rotate_files(n):
""" Simple parameter validator """
"""Simple parameter validator"""
if not isinstance(n, int) or isinstance(n, int) and n < 0:
raise ValueError("Please specify a valid rotate_files parameter (>= 0).")
def create_csv_file_path(path, source):
""" File path creator """
"""File path creator"""
if path is None:
if type(source) == list:
return str(source[0]) + '.csv'
return str(source) + '.csv'
return str(source[0]) + ".csv"
return str(source) + ".csv"
return path
def csv_converter(values):
""" Convert non numeric values to string using their __str__ method and ensure proper quoting """
"""Convert non numeric values to string using their __str__ method and ensure proper quoting"""
for idx, value in enumerate(values):
if not isinstance(value, float) and not isinstance(value, int):
if value is None:
values[idx] = ""
else:
values[idx] = str(values[idx])
values[idx] = values[idx].replace('\"', '\\"')
values[idx] = "\"" + values[idx] + "\""
values[idx] = values[idx].replace('"', '\\"')
values[idx] = '"' + values[idx] + '"'
def open_file(path, chunked, chunk_idx, rotate_files):
""" File opener taking chunk mode into consideration """
"""File opener taking chunk mode into consideration"""
if not chunked:
return open(path, 'wb')
return open(path, "wb")
else:
if rotate_files:
return open(path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), 'wb')
return open(path.replace("csv", "{}.csv".format(chunk_idx)), 'wb')
return open(
path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), "wb"
)
return open(path.replace("csv", "{}.csv".format(chunk_idx)), "wb")
def update_performances(performances, is_linux, flows_count):
""" Update performance report and check platform for consistency """
"""Update performance report and check platform for consistency"""
drops = 0
processed = 0
ignored = 0
@ -98,15 +100,22 @@ def update_performances(performances, is_linux, flows_count):
ignored = max(meter[2].value, ignored)
processed += meter[1].value
load.append(meter[1].value)
print(json.dumps({"flows_expired": flows_count.value,
"packets_processed": processed,
"packets_ignored": ignored,
"packets_dropped_filtered_by_kernel": drops,
"meters_packets_processing_balance": load}))
print(
json.dumps(
{
"flows_expired": flows_count.value,
"packets_processed": processed,
"packets_ignored": ignored,
"packets_dropped_filtered_by_kernel": drops,
"meters_packets_processing_balance": load,
}
)
)
class RepeatedTimer(object):
""" Repeated timer thread """
"""Repeated timer thread"""
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
@ -131,14 +140,15 @@ class RepeatedTimer(object):
self._timer.cancel()
self.is_running = False
def chunks_of_list(lst, n):
""" create list of chunks of size n from a list"""
"""create list of chunks of size n from a list"""
for i in range(0, len(lst), n):
yield lst[i:i + n]
yield lst[i : i + n]
def set_affinity(idx):
""" CPU affinity setter """
"""CPU affinity setter"""
if platform.system() == "Linux":
c_cpus = psutil.Process().cpu_affinity()
temp = list(chunks_of_list(c_cpus, 2))

View file

@ -24,15 +24,14 @@ THIS_DIRECTORY = str(pathlib.Path(__file__).parent.resolve())
if (not sys.version_info[0] == 3) and (not sys.version_info[1] >= 6):
sys.exit("Sorry, nfstream requires Python3.6+ versions.")
with open(os.path.join(THIS_DIRECTORY, 'README.md'), encoding='utf-8') as f:
with open(os.path.join(THIS_DIRECTORY, "README.md"), encoding="utf-8") as f:
LONG_DESCRIPTION = f.read()
INSTALL_REQUIRES = ['cffi>=1.15.0',
'psutil>=5.8.0',
'dpkt>=1.9.7',
'numpy>=1.19.5']
INSTALL_REQUIRES = ["cffi>=1.15.0", "psutil>=5.8.0", "dpkt>=1.9.7", "numpy>=1.19.5"]
if platform.python_implementation() == 'PyPy': # This is mandatory to fix pandas issues with PyPy
if (
platform.python_implementation() == "PyPy"
): # This is mandatory to fix pandas issues with PyPy
INSTALL_REQUIRES.append("pandas<=1.2.5")
else:
INSTALL_REQUIRES.append("pandas>=1.1.5")
@ -40,40 +39,40 @@ else:
setup(
name="nfstream",
version='6.5.4',
url='https://www.nfstream.org/',
license='LGPLv3',
version="6.5.4",
url="https://www.nfstream.org/",
license="LGPLv3",
description="A Flexible Network Data Analysis Framework",
long_description=LONG_DESCRIPTION,
long_description_content_type='text/markdown',
author='Zied Aouini',
author_email='aouinizied@gmail.com',
packages=['nfstream', 'nfstream.plugins', 'nfstream.engine'],
long_description_content_type="text/markdown",
author="Zied Aouini",
author_email="aouinizied@gmail.com",
packages=["nfstream", "nfstream.plugins", "nfstream.engine"],
setup_requires=["cffi>=1.15.0"],
cffi_modules=["nfstream/engine/engine_build.py:ffi_builder"],
install_requires=INSTALL_REQUIRES,
include_package_data=True,
platforms=["Linux", "Mac OS-X", "Windows", "Unix"],
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)',
'Intended Audience :: Telecommunications Industry',
'Intended Audience :: Information Technology',
'Intended Audience :: System Administrators',
'Intended Audience :: Science/Research',
'Intended Audience :: Developers',
'Programming Language :: Python :: 3 :: Only',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'Topic :: Security',
'Topic :: Internet :: Log Analysis',
'Topic :: System :: Networking :: Monitoring',
'Topic :: Scientific/Engineering :: Artificial Intelligence'
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: GNU Lesser General Public License v3 or later (LGPLv3+)",
"Intended Audience :: Telecommunications Industry",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Topic :: Security",
"Topic :: Internet :: Log Analysis",
"Topic :: System :: Networking :: Monitoring",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
],
project_urls={
'GitHub': 'https://github.com/nfstream/nfstream',
}
"GitHub": "https://github.com/nfstream/nfstream",
},
)

583
tests.py
View file

@ -25,7 +25,9 @@ def get_files_list(path):
files = []
for r, d, f in os.walk(path):
for file in f:
if '.pcap' == file[-5:] or ".pcapng" == file[-7:]: # Pick out only pcaps files
if (
".pcap" == file[-5:] or ".pcapng" == file[-7:]
): # Pick out only pcaps files
files.append(os.path.join(r, file))
files.sort()
return files
@ -34,7 +36,9 @@ def get_files_list(path):
class NFStreamTest(object):
@staticmethod
def test_source_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
source = ["inexisting.pcap", "lo", 11]
for x in source:
@ -43,268 +47,444 @@ class NFStreamTest(object):
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_source_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_source_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_decode_tunnels_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
decode_tunnels = [33, "True"]
for x in decode_tunnels:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
decode_tunnels=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
decode_tunnels=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_decode_tunnels_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_decode_tunnels_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_bpf_filter_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
bpf_filter = ["my filter", 11]
for x in bpf_filter:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), bpf_filter=x).to_pandas()
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
bpf_filter=x,
).to_pandas()
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_bpf_filter_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_bpf_filter_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_promiscuous_mode_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
promiscuous_mode = ["yes", 89]
for x in promiscuous_mode:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), promiscuous_mode=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
promiscuous_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_promiscuous_mode_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_promiscuous_mode_parameter".ljust(60, " "),
colored("OK", "green"),
)
)
@staticmethod
def test_snapshot_length_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
snapshot_length = ["largest", -1]
for x in snapshot_length:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), snapshot_length=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
snapshot_length=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_snapshot_length_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_snapshot_length_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_socket_buffer_size_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
socket_buffer_size = ["largest", -1, 2**31]
for x in socket_buffer_size:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), socket_buffer_size=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
socket_buffer_size=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_socket_buffer_size_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_socket_buffer_size_parameter".ljust(60, " "),
colored("OK", "green"),
)
)
@staticmethod
def test_idle_timeout_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
idle_timeout = [-1, "idle"]
for x in idle_timeout:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
idle_timeout=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_idle_timeout_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_idle_timeout_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_active_timeout_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
active_timeout = [-1, "active"]
for x in active_timeout:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
active_timeout=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_active_timeout_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_active_timeout_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_accounting_mode_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
accounting_mode = [-1, 5, 'ip']
accounting_mode = [-1, 5, "ip"]
for x in accounting_mode:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), accounting_mode=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
accounting_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_accounting_mode_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_accounting_mode_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_udps_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
udps = [lambda y: y + 1, "NFPlugin"]
for x in udps:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), udps=x
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_udps_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_udps_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_n_dissections_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
n_dissections = ["yes", -1, 256]
for x in n_dissections:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_dissections=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_dissections=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_n_dissections_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_n_dissections_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_system_visibility_mode_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
system_visibility_mode = ["yes", -1, 3]
for x in system_visibility_mode:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_mode=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_mode=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_system_visibility_mode_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_system_visibility_mode_parameter".ljust(60, " "),
colored("OK", "green"),
)
)
@staticmethod
def test_system_visibility_poll_ms():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
system_visibility_mode = ["yes", -1]
for x in system_visibility_mode:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), system_visibility_poll_ms=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
system_visibility_poll_ms=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_system_visibility_poll_ms".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_system_visibility_poll_ms".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_statistical_analysis_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
statistical_analysis = ["yes", 89]
for x in statistical_analysis:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), statistical_analysis=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_statistical_analysis_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_statistical_analysis_parameter".ljust(60, " "),
colored("OK", "green"),
)
)
@staticmethod
def test_splt_analysis_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
splt_analysis = [-1, 70000, "yes"]
for x in splt_analysis:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
splt_analysis=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 3
print("{}\t: {}".format(".test_splt_analysis_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_splt_analysis_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_n_meters_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
n_meters = ["yes", -1]
for x in n_meters:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
n_meters=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), n_meters=x
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_n_meters_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_n_meters_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_max_nflows_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
max_nflows = ["yes", -1]
for x in max_nflows:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
max_nflows=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
max_nflows=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_max_nflows_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_max_nflows_parameter".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_performance_report_parameter():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
performance_report = ["yes", -1]
for x in performance_report:
try:
NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), performance_report=x)
NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
performance_report=x,
)
except ValueError:
n_exceptions += 1
assert n_exceptions == 2
print("{}\t: {}".format(".test_performance_report_parameter".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_performance_report_parameter".ljust(60, " "),
colored("OK", "green"),
)
)
@staticmethod
def test_expiration_management():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
# Idle expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0)
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), idle_timeout=0
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Active expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0)
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"), active_timeout=0
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
# Custom expiration
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=1))
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=1),
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 27
streamer_expiration = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=4))
streamer_expiration = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
udps=FlowSlicer(limit=4),
)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
assert last_id == 6
print("{}\t: {}".format(".test_expiration_management".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_expiration_management".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_tunnel_decoding():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
n_exceptions = 0
decode_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "gtp-u.pcap"),
statistical_analysis=True, decode_tunnels=True)
decode_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "gtp-u.pcap"),
statistical_analysis=True,
decode_tunnels=True,
)
for flow in decode_streamer:
assert flow.tunnel_id == 1
decode_streamer.decode_tunnels = False
@ -315,23 +495,32 @@ class NFStreamTest(object):
n_exceptions += 1
assert n_exceptions == 1
del decode_streamer
print("{}\t: {}".format(".test_tunnel_decoding".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_tunnel_decoding".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_statistical():
print("\n----------------------------------------------------------------------")
statistical_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=True, accounting_mode=1)
print(
"\n----------------------------------------------------------------------"
)
statistical_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
statistical_analysis=True,
accounting_mode=1,
)
for flow in statistical_streamer:
assert flow.id == 0
assert flow.expiration_id == 0
assert flow.src_ip == '172.31.3.224'
assert flow.src_mac == '80:c6:ca:00:9e:9f'
assert flow.src_oui == '80:c6:ca'
assert flow.src_ip == "172.31.3.224"
assert flow.src_mac == "80:c6:ca:00:9e:9f"
assert flow.src_oui == "80:c6:ca"
assert flow.src_port == 42835
assert flow.dst_ip == '216.58.212.100'
assert flow.dst_mac == '00:0e:8e:4d:b4:a8'
assert flow.dst_oui == '00:0e:8e'
assert flow.dst_ip == "216.58.212.100"
assert flow.dst_mac == "00:0e:8e:4d:b4:a8"
assert flow.dst_oui == "00:0e:8e"
assert flow.dst_port == 443
assert flow.protocol == 6
assert flow.ip_version == 4
@ -401,46 +590,79 @@ class NFStreamTest(object):
assert flow.dst2src_rst_packets == 0
assert flow.dst2src_fin_packets == 1
del statistical_streamer
print("{}\t: {}".format(".test_statistical".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_statistical".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_fingerprint_extraction():
print("\n----------------------------------------------------------------------")
fingerprint_streamer = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
statistical_analysis=True, accounting_mode=1)
print(
"\n----------------------------------------------------------------------"
)
fingerprint_streamer = NFStreamer(
source=os.path.join("tests", "pcaps", "facebook.pcap"),
statistical_analysis=True,
accounting_mode=1,
)
for flow in fingerprint_streamer:
assert flow.application_name == 'TLS.Facebook'
assert flow.application_category_name == 'SocialNetwork'
assert flow.application_name == "TLS.Facebook"
assert flow.application_category_name == "SocialNetwork"
assert flow.application_is_guessed == 0
assert flow.application_confidence == 6
requested_server_name = flow.requested_server_name in ['facebook.com', 'www.facebook.com']
requested_server_name = flow.requested_server_name in [
"facebook.com",
"www.facebook.com",
]
assert int(requested_server_name) == 1
client_fingerprint = flow.client_fingerprint in ['bfcc1a3891601edb4f137ab7ab25b840',
'5c60e71f1b8cd40e4d40ed5b6d666e3f']
client_fingerprint = flow.client_fingerprint in [
"bfcc1a3891601edb4f137ab7ab25b840",
"5c60e71f1b8cd40e4d40ed5b6d666e3f",
]
assert int(client_fingerprint) == 1
server_fingerprint = flow.server_fingerprint in ['2d1eb5817ece335c24904f516ad5da12',
'96681175a9547081bf3d417f1a572091']
server_fingerprint = flow.server_fingerprint in [
"2d1eb5817ece335c24904f516ad5da12",
"96681175a9547081bf3d417f1a572091",
]
assert int(server_fingerprint) == 1
del fingerprint_streamer
print("{}\t: {}".format(".test_fingerprint_extraction".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_fingerprint_extraction".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_export():
print("\n----------------------------------------------------------------------")
df = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_pandas()
df_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
print(
"\n----------------------------------------------------------------------"
)
df = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20,
).to_pandas()
df_anon = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20,
).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
assert df_anon.shape[0] == df.shape[0]
assert df_anon.shape[1] == df.shape[1]
assert df_anon['src_ip'].nunique() == df['src_ip'].nunique()
assert df_anon['dst_ip'].nunique() == df['dst_ip'].nunique()
total_flows = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
assert df_anon["src_ip"].nunique() == df["src_ip"].nunique()
assert df_anon["dst_ip"].nunique() == df["dst_ip"].nunique()
total_flows = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20,
).to_csv()
df_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
total_flows_anon = NFStreamer(source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True, n_dissections=20).to_csv()
total_flows_anon = NFStreamer(
source=os.path.join("tests", "pcaps", "steam.pcap"),
statistical_analysis=True,
n_dissections=20,
).to_csv()
df_anon_from_csv = pd.read_csv(os.path.join("tests", "pcaps", "steam.pcap.csv"))
os.remove(os.path.join("tests", "pcaps", "steam.pcap.csv"))
assert total_flows == total_flows_anon
@ -448,24 +670,29 @@ class NFStreamTest(object):
assert total_flows_anon == df_anon_from_csv.shape[0]
assert total_flows == df.shape[0]
assert total_flows_anon == df_anon.shape[0]
print("{}\t: {}".format(".test_export".ljust(60, ' '), colored('OK', 'green')))
print("{}\t: {}".format(".test_export".ljust(60, " "), colored("OK", "green")))
@staticmethod
def test_bpf():
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source=os.path.join("tests", "pcaps", "facebook.pcap"),
bpf_filter="src port 52066 or dst port 52066")
print(
"\n----------------------------------------------------------------------"
)
streamer_test = NFStreamer(
source=os.path.join("tests", "pcaps", "facebook.pcap"),
bpf_filter="src port 52066 or dst port 52066",
)
last_id = 0
for flow in streamer_test:
last_id = flow.id
assert flow.src_port == 52066
assert last_id == 0
print("{}\t: {}".format(".test_bpf".ljust(60, ' '), colored('OK', 'green')))
print("{}\t: {}".format(".test_bpf".ljust(60, " "), colored("OK", "green")))
@staticmethod
def test_ndpi_integration():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
pcap_files = get_files_list(os.path.join("tests", "pcaps"))
result_files = get_files_list(os.path.join("tests", "results"))
failures = 0
@ -473,30 +700,49 @@ class NFStreamTest(object):
for file_idx, test_file in enumerate(pcap_files):
test_case_name = os.path.basename(test_file)
try:
test = NFStreamer(source=test_file,
n_dissections=20,
n_meters=1).to_pandas()[["id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence"]].to_dict()
test = (
NFStreamer(source=test_file, n_dissections=20, n_meters=1)
.to_pandas()[
[
"id",
"bidirectional_packets",
"bidirectional_bytes",
"application_name",
"application_category_name",
"application_is_guessed",
"application_confidence",
]
]
.to_dict()
)
true = pd.read_csv(result_files[file_idx]).to_dict()
assert test == true
print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
test_case_name.ljust(60, " "), colored("OK", "green")
)
)
except AssertionError:
failures += 1
print("{}\t: {}".format(test_case_name.ljust(60, ' '), colored('KO', 'red')))
print(
"{}\t: {}".format(
test_case_name.ljust(60, " "), colored("KO", "red")
)
)
# Everything must be OK
assert failures == 0
@staticmethod
def test_splt():
print("\n----------------------------------------------------------------------")
splt_df = NFStreamer(source=os.path.join("tests", "pcaps", "google_ssl.pcap"), splt_analysis=5,
udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas()
print(
"\n----------------------------------------------------------------------"
)
splt_df = NFStreamer(
source=os.path.join("tests", "pcaps", "google_ssl.pcap"),
splt_analysis=5,
udps=SPLT(sequence_length=5, accounting_mode=0),
).to_pandas()
direction = json.loads(splt_df["udps.splt_direction"][0])
ps = json.loads(splt_df["udps.splt_ps"][0])
piat = json.loads(splt_df["udps.splt_piat_ms"][0])
@ -509,13 +755,23 @@ class NFStreamTest(object):
assert direction == ndirection
assert ps == nps
assert piat == npiat
print("{}\t: {}".format(".test_splt".ljust(60, ' '), colored('OK', 'green')))
print("{}\t: {}".format(".test_splt".ljust(60, " "), colored("OK", "green")))
@staticmethod
def test_dhcp():
print("\n----------------------------------------------------------------------")
dhcp_df = NFStreamer(source=os.path.join("tests", "pcaps", "dhcp.pcap"), n_dissections=0, udps=DHCP()) \
.to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
print(
"\n----------------------------------------------------------------------"
)
dhcp_df = (
NFStreamer(
source=os.path.join("tests", "pcaps", "dhcp.pcap"),
n_dissections=0,
udps=DHCP(),
)
.to_pandas()
.sort_values(by=["src_ip"])
.reset_index(drop=True)
)
assert dhcp_df["udps.dhcp_msg_type"][0] == "MsgType.DISCOVER"
assert dhcp_df["udps.dhcp_50"][1] == "192.168.0.10"
assert dhcp_df["udps.dhcp_55"][1] == "1,3,6,42"
@ -523,27 +779,43 @@ class NFStreamTest(object):
assert dhcp_df["udps.dhcp_msg_type"][1] == "MsgType.REQUEST"
assert dhcp_df["udps.dhcp_oui"][1] == "00:0b:82"
assert dhcp_df.shape[0] == 3
print("{}\t: {}".format(".test_dhcp".ljust(60, ' '), colored('OK', 'green')))
print("{}\t: {}".format(".test_dhcp".ljust(60, " "), colored("OK", "green")))
@staticmethod
def test_mdns():
print("\n----------------------------------------------------------------------")
mdns_df = NFStreamer(source=os.path.join("tests", "pcaps", "mdns.pcap"), n_dissections=0, udps=MDNS()) \
.to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
assert mdns_df["udps.mdns_ptr"][0] == "['skynet.local', " \
"'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', " \
"'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', " \
"'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', " \
"'_workstation._tcp.local']"
print("{}\t: {}".format(".test_mdns".ljust(60, ' '), colored('OK', 'green')))
print(
"\n----------------------------------------------------------------------"
)
mdns_df = (
NFStreamer(
source=os.path.join("tests", "pcaps", "mdns.pcap"),
n_dissections=0,
udps=MDNS(),
)
.to_pandas()
.sort_values(by=["src_ip"])
.reset_index(drop=True)
)
assert (
mdns_df["udps.mdns_ptr"][0] == "['skynet.local', "
"'skynet [00:1a:ef:17:c3:05]._workstation._tcp.local', "
"'recombinator_mpd._mpd._tcp.local', '_mpd._tcp.local', "
"'skynet._udisks-ssh._tcp.local', '_udisks-ssh._tcp.local', "
"'_workstation._tcp.local']"
)
print("{}\t: {}".format(".test_mdns".ljust(60, " "), colored("OK", "green")))
@staticmethod
def test_multi_files():
print("\n----------------------------------------------------------------------")
multi_files = [os.path.join("tests", "pcaps", "one_flow_1_5.pcap"),
os.path.join("tests", "pcaps", "one_flow_6_10.pcap"),
os.path.join("tests", "pcaps", "one_flow_11_15.pcap"),
os.path.join("tests", "pcaps", "one_flow_16_19.pcap")]
print(
"\n----------------------------------------------------------------------"
)
multi_files = [
os.path.join("tests", "pcaps", "one_flow_1_5.pcap"),
os.path.join("tests", "pcaps", "one_flow_6_10.pcap"),
os.path.join("tests", "pcaps", "one_flow_11_15.pcap"),
os.path.join("tests", "pcaps", "one_flow_16_19.pcap"),
]
for flow in NFStreamer(source=multi_files):
assert flow.id == 0
assert flow.expiration_id == 0
@ -583,22 +855,33 @@ class NFStreamTest(object):
assert flow.server_fingerprint == "2d1eb5817ece335c24904f516ad5da12"
assert flow.user_agent == ""
assert flow.content_type == ""
print("{}\t: {}".format(".test_multi_files".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(
".test_multi_files".ljust(60, " "), colored("OK", "green")
)
)
@staticmethod
def test_max_nflows():
print("\n----------------------------------------------------------------------")
print(
"\n----------------------------------------------------------------------"
)
df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap")).to_pandas()
assert df.shape[0] == 294
df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=100).to_pandas()
df = NFStreamer(
source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=100
).to_pandas()
assert df.shape[0] == 100
df = NFStreamer(source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=0).to_pandas()
df = NFStreamer(
source=os.path.join("tests", "pcaps", "skype.pcap"), max_nflows=0
).to_pandas()
assert df.shape[0] == 294
print("{}\t: {}".format(".test_max_nflows".ljust(60, ' '), colored('OK', 'green')))
print(
"{}\t: {}".format(".test_max_nflows".ljust(60, " "), colored("OK", "green"))
)
if __name__ == '__main__':
if __name__ == "__main__":
# IMPORTANT: As NFStream input is network bytes, we rely on fuzzing techniques to ensure robustness.
# Fuzzing testing is part of Google OSS-Fuzz project.
# Github: https://github.com/google/oss-fuzz/tree/master/projects/nfstream