Major cleanup.

This commit is contained in:
aouinizied 2019-12-06 19:03:51 +01:00
parent 489db8e175
commit 26b484d1e7
7 changed files with 86 additions and 79 deletions

View file

@ -54,7 +54,7 @@ How to use it?
.. code-block:: python
NFFlow(
NFEntry(
flow_id=0,
first_seen=1472393122365,
last_seen=1472393123665,

View file

@ -58,13 +58,13 @@ NFStreamer object
- Maximum per flow UDP packets to dissect (ignored when dissect=False).
NFStreamer returns an iterator of **NFFlow** object.
NFStreamer returns an iterator of **NFEntry** object.
*************
NFFlow object
*************
**************
NFEntry object
**************
.. list-table:: NFFlow object
.. list-table:: NFEntry object
:widths: 25 25 50
:header-rows: 1
@ -159,7 +159,7 @@ NFFlow object
- str
- J3A_ server fingerprint.
**NFFlow** is an aggregation of **NFPacket** objects.
**NFEntry** is an aggregation of **NFPacket** objects.
***************
NFPacket object

View file

@ -39,15 +39,15 @@ NFPlugin methods
****************
* ``on_init(self, obs)`` [default= ``return 0`` ]
- Method called at entry creation). When aggregating packets into flows, this method is called on ``NFFlow`` object creation based on first ``NFPacket`` object belonging to it.
- Method called at entry creation). When aggregating packets into flows, this method is called on ``NFEntry`` object creation based on first ``NFPacket`` object belonging to it.
* ``on_update(self, obs, entry)`` [default= ``pass`` ]
- Method called to update each entry with its belonging obs. When aggregating packets into flows, the entry is an ``NFFlow`` object and the obs is an ``NFPacket`` object.
- Method called to update each entry with its belonging obs. When aggregating packets into flows, the entry is an ``NFEntry`` object and the obs is an ``NFPacket`` object.
* ``on_expire(self, entry)`` [default= ``pass`` ]
- Method called at entry expiration. When aggregating packets into flows, the entry is an ``NFFlow``
- Method called at entry expiration. When aggregating packets into flows, the entry is an ``NFEntry``
* ``cleanup(self)`` [default= ``pass`` ]

View file

@ -19,7 +19,7 @@ If not, see <http://www.gnu.org/licenses/>.
from .plugin import nfstream_core_plugins, nfplugins_validator, ndpi_infos_plugins, nDPI
from collections import OrderedDict
from .flow import NFFlow
from .entry import NFEntry
from .ndpi import NDPI
import time as tm
import zmq
@ -46,7 +46,7 @@ class LRU(OrderedDict):
class NFCache(object):
""" NFCache for flows management """
""" NFCache for entries management """
def __init__(self, observer=None, idle_timeout=30, active_timeout=300, nroots=512,
core_plugins=nfstream_core_plugins, user_plugins=(),
dissect=True, max_tcp_dissections=10, max_udp_dissections=16, sock_name=None):
@ -57,7 +57,7 @@ class NFCache(object):
self.producer.bind(sock_name)
except zmq.error.ZMQError:
raise OSError("NFStreamer failed to bind socket (producer).")
self._roots = [] # root structure for flow caching: dict of LRUs
self._roots = [] # root structure for entries caching: dict of LRUs
self.nroots = nroots
self.idle_timeout = idle_timeout * 1000
self.active_timeout = active_timeout * 1000
@ -69,7 +69,7 @@ class NFCache(object):
self._roots.append(LRU(idle_timeout=self.idle_timeout))
self.current_tick = 0
self.last_visited_root_idx = 0
self.active_flows = 0
self.active_entries = 0
self.idx_generator = 0
self.processed_pkts = 0
self.performances = [0, 0]
@ -103,10 +103,10 @@ class NFCache(object):
if idle_item is not None: # idle
self.producer.send_pyobj(idle_item)
del self._roots[self.last_visited_root_idx][idle_item.nfhash]
self.active_flows -= 1 # remove it
self.active_entries -= 1 # remove it
scanned += 1
else:
remaining = False # no idle flows to poll
remaining = False # no idle entries to poll
except StopIteration:
remaining = False # root is empty
self.last_visited_root_idx += 1 # we move to next root
@ -119,7 +119,7 @@ class NFCache(object):
for h in list(self._roots[root_idx].keys()):
f = self._roots[root_idx][h].clean(self.core_plugins, self.user_plugins)
self.producer.send_pyobj(f)
self.active_flows -= 1
self.active_entries -= 1
del self._roots[root_idx][h]
for plugin in self.core_plugins:
plugin.cleanup()
@ -128,47 +128,47 @@ class NFCache(object):
self.observer.packet_generator.close() # close generator
self.producer.send_pyobj(None)
def consume(self, ppkt):
""" consume a parsed packet and produce flow """
def consume(self, obs):
""" consume an observable and produce entry """
# classical create/update
try: # update flow
flow = self._roots[ppkt.root_idx][ppkt.nfhash].update(ppkt,
self.core_plugins,
self.user_plugins,
self.active_timeout)
if flow is not None:
if flow.expiration_id < 0: # custom expiration
self.producer.send_pyobj(flow)
del self._roots[ppkt.root_idx][flow.nfhash]
self.active_flows -= 1
try: # update entry
entry = self._roots[obs.root_idx][obs.nfhash].update(obs,
self.core_plugins,
self.user_plugins,
self.active_timeout)
if entry is not None:
if entry.expiration_id < 0: # custom expiration
self.producer.send_pyobj(entry)
del self._roots[obs.root_idx][entry.nfhash]
self.active_entries -= 1
else: # active expiration
parent_id = flow.id
self.producer.send_pyobj(flow)
del self._roots[ppkt.root_idx][flow.nfhash]
self._roots[ppkt.root_idx][ppkt.nfhash] = NFFlow(ppkt,
self.core_plugins,
self.user_plugins,
parent_id)
except KeyError: # create flow
self._roots[ppkt.root_idx][ppkt.nfhash] = NFFlow(ppkt,
self.core_plugins,
self.user_plugins,
self.idx_generator)
self.active_flows += 1
parent_id = entry.id
self.producer.send_pyobj(entry)
del self._roots[obs.root_idx][entry.nfhash]
self._roots[obs.root_idx][obs.nfhash] = NFEntry(obs,
self.core_plugins,
self.user_plugins,
parent_id)
except KeyError: # create entry
self._roots[obs.root_idx][obs.nfhash] = NFEntry(obs,
self.core_plugins,
self.user_plugins,
self.idx_generator)
self.active_entries += 1
self.idx_generator += 1
def run(self):
""" run NFCache main processing loop """
for parsed_packet in self.observer:
for observable in self.observer:
if not self.stopped:
if parsed_packet is not None:
if observable is not None:
go_scan = False
if parsed_packet.time - self.idle_scan_tick >= self.idle_scan_period:
if observable.time - self.idle_scan_tick >= self.idle_scan_period:
go_scan = True
self.idle_scan_tick = parsed_packet.time
if parsed_packet.time >= self.current_tick:
self.current_tick = parsed_packet.time
self.consume(parsed_packet)
self.idle_scan_tick = observable.time
if observable.time >= self.current_tick:
self.current_tick = observable.time
self.consume(observable)
if go_scan:
self.idle_scan() # perform a micro scan
else:

View file

@ -19,14 +19,14 @@ If not, see <http://www.gnu.org/licenses/>.
from collections import namedtuple
class NFFlow(object):
""" Flow entry class """
def __init__(self, ppkt, core, user, idx):
class NFEntry(object):
""" NFEntry base class """
def __init__(self, obs, core, user, idx):
self.id = idx
for plugin in core: # for each NFCache core plugin, we init and update
setattr(self, plugin.name, plugin.on_init(ppkt))
setattr(self, plugin.name, plugin.on_init(obs))
for plugin in user: # for each NFCache core plugin, we init and update
setattr(self, plugin.name, plugin.on_init(ppkt))
setattr(self, plugin.name, plugin.on_init(obs))
def clean(self, core, user):
""" Volatile attributes cleaner """
@ -40,16 +40,16 @@ class NFFlow(object):
delattr(self, plugin.name)
return self
def update(self, ppkt, core, user, to):
def update(self, obs, core, user, to):
""" Update a flow from a packet """
if ppkt.time - getattr(self, 'first_seen') >= to:
if obs.time - getattr(self, 'first_seen') >= to:
setattr(self, 'expiration_id', 1)
return self.clean(core, user)
else:
for plugin in core: # for each NFCache core plugin, we update
plugin.on_update(ppkt, self)
plugin.on_update(obs, self)
for plugin in user: # for each NFCache core plugin, we update
plugin.on_update(ppkt, self)
plugin.on_update(obs, self)
if getattr(self, 'expiration_id') < -1: # custom export
return self.clean(core, user)
@ -62,4 +62,4 @@ class NFFlow(object):
def __str__(self):
""" String representation of flow """
return str(namedtuple('NFFlow', self.__dict__.keys())(*self.__dict__.values()))
return str(namedtuple(type(self).__name__, self.__dict__.keys())(*self.__dict__.values()))

View file

@ -439,7 +439,6 @@ class _PcapFfi(object):
proto = 0
time = 0
time = (header.tv_sec * TICK_RESOLUTION) + (header.tv_usec / (1000000 / TICK_RESOLUTION))
datalink_type = self._libpcap.pcap_datalink(xdev)
datalink_check = True
while datalink_check:
@ -504,28 +503,34 @@ class _PcapFfi(object):
else:
return None
if type == 0x8100:
vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF
type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3]
ip_offset += 4
if type == 0x8100: # Double tagging for 802.1Q
ether_type_check = True
while ether_type_check:
ether_type_check = False
if type == 0x8100:
vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF
type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3]
ip_offset += 4
elif (type == 0x8847) or (type == 0x8848):
tmp_u32 = self._ffi.cast('struct pp_32 *', packet + ip_offset)
mpls.u32 = int(ntohl(tmp_u32.value))
type = 0x0800
ip_offset += 4
while not mpls.mpls.s:
tmp_u32_loop = self._ffi.cast('struct pp_32 *', packet + ip_offset)
mpls.u32 = int(ntohl(tmp_u32_loop.value))
while type == 0x8100 and ip_offset < header.caplen: # Double tagging for 802.1Q
vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF
type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3]
ip_offset += 4
ether_type_check = True
elif (type == 0x8847) or (type == 0x8848):
tmp_u32 = self._ffi.cast('struct pp_32 *', packet + ip_offset)
mpls.u32 = int(ntohl(tmp_u32.value))
type = 0x0800
ip_offset += 4
elif type == 0x8864:
type = 0x0800
ip_offset += 8
else:
pass
while not mpls.mpls.s:
tmp_u32_loop = self._ffi.cast('struct pp_32 *', packet + ip_offset)
mpls.u32 = int(ntohl(tmp_u32_loop.value))
ip_offset += 4
ether_type_check = True
elif type == 0x8864:
type = 0x0800
ip_offset += 8
ether_type_check = True
else:
pass
ip_check = True
while ip_check:

View file

@ -27,7 +27,7 @@ import sys
class NFStreamer(object):
""" Network Flow Streamer """
def __init__(self, source=None, bpf_filter=None, snaplen=65535, idle_timeout=30, active_timeout=300,
def __init__(self, source=None, snaplen=65535, idle_timeout=30, active_timeout=300,
plugins=(), dissect=True, max_tcp_dissections=10, max_udp_dissections=16):
self._consumer = zmq.Context().socket(zmq.PULL)
self._nroots = 512
@ -72,3 +72,5 @@ class NFStreamer(object):
return None