diff --git a/README.rst b/README.rst index 55c2645..a5ca182 100644 --- a/README.rst +++ b/README.rst @@ -54,7 +54,7 @@ How to use it? .. code-block:: python - NFFlow( + NFEntry( flow_id=0, first_seen=1472393122365, last_seen=1472393123665, diff --git a/docs/source/get_started.rst b/docs/source/get_started.rst index 2be5c6c..b06cb08 100644 --- a/docs/source/get_started.rst +++ b/docs/source/get_started.rst @@ -58,13 +58,13 @@ NFStreamer object - Maximum per flow UDP packets to dissect (ignored when dissect=False). -NFStreamer returns an iterator of **NFFlow** object. +NFStreamer returns an iterator of **NFEntry** object. -************* -NFFlow object -************* +************** +NFEntry object +************** -.. list-table:: NFFlow object +.. list-table:: NFEntry object :widths: 25 25 50 :header-rows: 1 @@ -159,7 +159,7 @@ NFFlow object - str - J3A_ server fingerprint. -**NFFlow** is an aggregation of **NFPacket** objects. +**NFEntry** is an aggregation of **NFPacket** objects. *************** NFPacket object diff --git a/docs/source/plugins.rst b/docs/source/plugins.rst index be12b38..7ab603f 100644 --- a/docs/source/plugins.rst +++ b/docs/source/plugins.rst @@ -39,15 +39,15 @@ NFPlugin methods **************** * ``on_init(self, obs)`` [default= ``return 0`` ] - - Method called at entry creation). When aggregating packets into flows, this method is called on ``NFFlow`` object creation based on first ``NFPacket`` object belonging to it. + - Method called at entry creation). When aggregating packets into flows, this method is called on ``NFEntry`` object creation based on first ``NFPacket`` object belonging to it. * ``on_update(self, obs, entry)`` [default= ``pass`` ] - - Method called to update each entry with its belonging obs. When aggregating packets into flows, the entry is an ``NFFlow`` object and the obs is an ``NFPacket`` object. + - Method called to update each entry with its belonging obs. When aggregating packets into flows, the entry is an ``NFEntry`` object and the obs is an ``NFPacket`` object. * ``on_expire(self, entry)`` [default= ``pass`` ] - - Method called at entry expiration. When aggregating packets into flows, the entry is an ``NFFlow`` + - Method called at entry expiration. When aggregating packets into flows, the entry is an ``NFEntry`` * ``cleanup(self)`` [default= ``pass`` ] diff --git a/nfstream/cache.py b/nfstream/cache.py index bda0c47..69432c7 100644 --- a/nfstream/cache.py +++ b/nfstream/cache.py @@ -19,7 +19,7 @@ If not, see . from .plugin import nfstream_core_plugins, nfplugins_validator, ndpi_infos_plugins, nDPI from collections import OrderedDict -from .flow import NFFlow +from .entry import NFEntry from .ndpi import NDPI import time as tm import zmq @@ -46,7 +46,7 @@ class LRU(OrderedDict): class NFCache(object): - """ NFCache for flows management """ + """ NFCache for entries management """ def __init__(self, observer=None, idle_timeout=30, active_timeout=300, nroots=512, core_plugins=nfstream_core_plugins, user_plugins=(), dissect=True, max_tcp_dissections=10, max_udp_dissections=16, sock_name=None): @@ -57,7 +57,7 @@ class NFCache(object): self.producer.bind(sock_name) except zmq.error.ZMQError: raise OSError("NFStreamer failed to bind socket (producer).") - self._roots = [] # root structure for flow caching: dict of LRUs + self._roots = [] # root structure for entries caching: dict of LRUs self.nroots = nroots self.idle_timeout = idle_timeout * 1000 self.active_timeout = active_timeout * 1000 @@ -69,7 +69,7 @@ class NFCache(object): self._roots.append(LRU(idle_timeout=self.idle_timeout)) self.current_tick = 0 self.last_visited_root_idx = 0 - self.active_flows = 0 + self.active_entries = 0 self.idx_generator = 0 self.processed_pkts = 0 self.performances = [0, 0] @@ -103,10 +103,10 @@ class NFCache(object): if idle_item is not None: # idle self.producer.send_pyobj(idle_item) del self._roots[self.last_visited_root_idx][idle_item.nfhash] - self.active_flows -= 1 # remove it + self.active_entries -= 1 # remove it scanned += 1 else: - remaining = False # no idle flows to poll + remaining = False # no idle entries to poll except StopIteration: remaining = False # root is empty self.last_visited_root_idx += 1 # we move to next root @@ -119,7 +119,7 @@ class NFCache(object): for h in list(self._roots[root_idx].keys()): f = self._roots[root_idx][h].clean(self.core_plugins, self.user_plugins) self.producer.send_pyobj(f) - self.active_flows -= 1 + self.active_entries -= 1 del self._roots[root_idx][h] for plugin in self.core_plugins: plugin.cleanup() @@ -128,47 +128,47 @@ class NFCache(object): self.observer.packet_generator.close() # close generator self.producer.send_pyobj(None) - def consume(self, ppkt): - """ consume a parsed packet and produce flow """ + def consume(self, obs): + """ consume an observable and produce entry """ # classical create/update - try: # update flow - flow = self._roots[ppkt.root_idx][ppkt.nfhash].update(ppkt, - self.core_plugins, - self.user_plugins, - self.active_timeout) - if flow is not None: - if flow.expiration_id < 0: # custom expiration - self.producer.send_pyobj(flow) - del self._roots[ppkt.root_idx][flow.nfhash] - self.active_flows -= 1 + try: # update entry + entry = self._roots[obs.root_idx][obs.nfhash].update(obs, + self.core_plugins, + self.user_plugins, + self.active_timeout) + if entry is not None: + if entry.expiration_id < 0: # custom expiration + self.producer.send_pyobj(entry) + del self._roots[obs.root_idx][entry.nfhash] + self.active_entries -= 1 else: # active expiration - parent_id = flow.id - self.producer.send_pyobj(flow) - del self._roots[ppkt.root_idx][flow.nfhash] - self._roots[ppkt.root_idx][ppkt.nfhash] = NFFlow(ppkt, - self.core_plugins, - self.user_plugins, - parent_id) - except KeyError: # create flow - self._roots[ppkt.root_idx][ppkt.nfhash] = NFFlow(ppkt, - self.core_plugins, - self.user_plugins, - self.idx_generator) - self.active_flows += 1 + parent_id = entry.id + self.producer.send_pyobj(entry) + del self._roots[obs.root_idx][entry.nfhash] + self._roots[obs.root_idx][obs.nfhash] = NFEntry(obs, + self.core_plugins, + self.user_plugins, + parent_id) + except KeyError: # create entry + self._roots[obs.root_idx][obs.nfhash] = NFEntry(obs, + self.core_plugins, + self.user_plugins, + self.idx_generator) + self.active_entries += 1 self.idx_generator += 1 def run(self): """ run NFCache main processing loop """ - for parsed_packet in self.observer: + for observable in self.observer: if not self.stopped: - if parsed_packet is not None: + if observable is not None: go_scan = False - if parsed_packet.time - self.idle_scan_tick >= self.idle_scan_period: + if observable.time - self.idle_scan_tick >= self.idle_scan_period: go_scan = True - self.idle_scan_tick = parsed_packet.time - if parsed_packet.time >= self.current_tick: - self.current_tick = parsed_packet.time - self.consume(parsed_packet) + self.idle_scan_tick = observable.time + if observable.time >= self.current_tick: + self.current_tick = observable.time + self.consume(observable) if go_scan: self.idle_scan() # perform a micro scan else: diff --git a/nfstream/flow.py b/nfstream/entry.py similarity index 80% rename from nfstream/flow.py rename to nfstream/entry.py index e77af3c..3fc5205 100644 --- a/nfstream/flow.py +++ b/nfstream/entry.py @@ -19,14 +19,14 @@ If not, see . from collections import namedtuple -class NFFlow(object): - """ Flow entry class """ - def __init__(self, ppkt, core, user, idx): +class NFEntry(object): + """ NFEntry base class """ + def __init__(self, obs, core, user, idx): self.id = idx for plugin in core: # for each NFCache core plugin, we init and update - setattr(self, plugin.name, plugin.on_init(ppkt)) + setattr(self, plugin.name, plugin.on_init(obs)) for plugin in user: # for each NFCache core plugin, we init and update - setattr(self, plugin.name, plugin.on_init(ppkt)) + setattr(self, plugin.name, plugin.on_init(obs)) def clean(self, core, user): """ Volatile attributes cleaner """ @@ -40,16 +40,16 @@ class NFFlow(object): delattr(self, plugin.name) return self - def update(self, ppkt, core, user, to): + def update(self, obs, core, user, to): """ Update a flow from a packet """ - if ppkt.time - getattr(self, 'first_seen') >= to: + if obs.time - getattr(self, 'first_seen') >= to: setattr(self, 'expiration_id', 1) return self.clean(core, user) else: for plugin in core: # for each NFCache core plugin, we update - plugin.on_update(ppkt, self) + plugin.on_update(obs, self) for plugin in user: # for each NFCache core plugin, we update - plugin.on_update(ppkt, self) + plugin.on_update(obs, self) if getattr(self, 'expiration_id') < -1: # custom export return self.clean(core, user) @@ -62,4 +62,4 @@ class NFFlow(object): def __str__(self): """ String representation of flow """ - return str(namedtuple('NFFlow', self.__dict__.keys())(*self.__dict__.values())) \ No newline at end of file + return str(namedtuple(type(self).__name__, self.__dict__.keys())(*self.__dict__.values())) diff --git a/nfstream/observer.py b/nfstream/observer.py index 904ae8f..0784cb2 100644 --- a/nfstream/observer.py +++ b/nfstream/observer.py @@ -439,7 +439,6 @@ class _PcapFfi(object): proto = 0 time = 0 time = (header.tv_sec * TICK_RESOLUTION) + (header.tv_usec / (1000000 / TICK_RESOLUTION)) - datalink_type = self._libpcap.pcap_datalink(xdev) datalink_check = True while datalink_check: @@ -504,28 +503,34 @@ class _PcapFfi(object): else: return None - if type == 0x8100: - vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF - type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3] - ip_offset += 4 - if type == 0x8100: # Double tagging for 802.1Q + ether_type_check = True + while ether_type_check: + ether_type_check = False + if type == 0x8100: vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3] ip_offset += 4 - elif (type == 0x8847) or (type == 0x8848): - tmp_u32 = self._ffi.cast('struct pp_32 *', packet + ip_offset) - mpls.u32 = int(ntohl(tmp_u32.value)) - type = 0x0800 - ip_offset += 4 - while not mpls.mpls.s: - tmp_u32_loop = self._ffi.cast('struct pp_32 *', packet + ip_offset) - mpls.u32 = int(ntohl(tmp_u32_loop.value)) + while type == 0x8100 and ip_offset < header.caplen: # Double tagging for 802.1Q + vlan_id = ((packet[ip_offset] << 8) + packet[ip_offset + 1]) & 0xFFF + type = (packet[ip_offset + 2] << 8) + packet[ip_offset + 3] + ip_offset += 4 + ether_type_check = True + elif (type == 0x8847) or (type == 0x8848): + tmp_u32 = self._ffi.cast('struct pp_32 *', packet + ip_offset) + mpls.u32 = int(ntohl(tmp_u32.value)) + type = 0x0800 ip_offset += 4 - elif type == 0x8864: - type = 0x0800 - ip_offset += 8 - else: - pass + while not mpls.mpls.s: + tmp_u32_loop = self._ffi.cast('struct pp_32 *', packet + ip_offset) + mpls.u32 = int(ntohl(tmp_u32_loop.value)) + ip_offset += 4 + ether_type_check = True + elif type == 0x8864: + type = 0x0800 + ip_offset += 8 + ether_type_check = True + else: + pass ip_check = True while ip_check: diff --git a/nfstream/streamer.py b/nfstream/streamer.py index 3081b80..44507bb 100644 --- a/nfstream/streamer.py +++ b/nfstream/streamer.py @@ -27,7 +27,7 @@ import sys class NFStreamer(object): """ Network Flow Streamer """ - def __init__(self, source=None, bpf_filter=None, snaplen=65535, idle_timeout=30, active_timeout=300, + def __init__(self, source=None, snaplen=65535, idle_timeout=30, active_timeout=300, plugins=(), dissect=True, max_tcp_dissections=10, max_udp_dissections=16): self._consumer = zmq.Context().socket(zmq.PULL) self._nroots = 512 @@ -72,3 +72,5 @@ class NFStreamer(object): return None + +