mirror of
https://github.com/nfstream/nfstream.git
synced 2026-04-26 14:10:37 +00:00
Minor fixes.
This commit is contained in:
parent
1cf175d107
commit
38354b445c
8 changed files with 21 additions and 42 deletions
1
.github/workflows/build_and_publish.yml
vendored
1
.github/workflows/build_and_publish.yml
vendored
|
|
@ -11,7 +11,6 @@ on:
|
|||
schedule:
|
||||
# nightly build at 00:00.
|
||||
- cron: '0 10 * * *'
|
||||
|
||||
jobs:
|
||||
test:
|
||||
name: ${{ matrix.python-version }} on ${{ matrix.os }}
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
# Documentation: https://docs.codecov.io/docs/codecov-yaml
|
||||
|
||||
codecov:
|
||||
# Avoid "Missing base report" due to committing CHANGES.rst with "[CI skip]"
|
||||
# https://github.com/codecov/support/issues/363
|
||||
# https://docs.codecov.io/docs/comparing-commits
|
||||
allow_coverage_offsets: true
|
||||
|
||||
comment: off
|
||||
|
|
|
|||
|
|
@ -20,10 +20,8 @@ If not, see <http://www.gnu.org/licenses/>.
|
|||
from .streamer import NFStreamer
|
||||
from .plugin import NFPlugin
|
||||
|
||||
"""
|
||||
streamer module is the core module of nfstream package.
|
||||
"""
|
||||
|
||||
# streamer module is the core module of nfstream package.
|
||||
__author__ = """Zied Aouini"""
|
||||
__email__ = 'aouinizied@gmail.com'
|
||||
__version__ = '6.1.2'
|
||||
|
|
|
|||
|
|
@ -48,8 +48,6 @@ nf_packet = namedtuple('NFPacket', ['time',
|
|||
|
||||
class UDPS(object):
|
||||
""" dummy class that add udps slot the flexibility required for extensions """
|
||||
pass
|
||||
|
||||
|
||||
def pythonize_packet(packet, ffi):
|
||||
""" convert a cdata packet to a namedtuple """
|
||||
|
|
@ -284,14 +282,13 @@ class NFlow(object):
|
|||
if ret > 0: # If update done it will be zero, idle and active are matched to 1 and 2.
|
||||
self.expiration_id = ret - 1
|
||||
return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # expire it.
|
||||
else:
|
||||
if sync: # If running with Plugins
|
||||
self.sync(n_dissections, statistics, splt, ffi, lib, sync)
|
||||
# We need to copy computed values on C struct.
|
||||
for udp in udps: # Then call each plugin on_update entrypoint.
|
||||
udp.on_update(pythonize_packet(packet, ffi), self)
|
||||
if self.expiration_id == -1: # One of the plugins set expiration to custom value (-1)
|
||||
return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # Expire it.
|
||||
if sync: # If running with Plugins
|
||||
self.sync(n_dissections, statistics, splt, ffi, lib, sync)
|
||||
# We need to copy computed values on C struct.
|
||||
for udp in udps: # Then call each plugin on_update entrypoint.
|
||||
udp.on_update(pythonize_packet(packet, ffi), self)
|
||||
if self.expiration_id == -1: # One of the plugins set expiration to custom value (-1)
|
||||
return self.expire(udps, sync, n_dissections, statistics, splt, ffi, lib, dissector) # Expire it.
|
||||
|
||||
def expire(self, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector):
|
||||
""" NFlow expiration method """
|
||||
|
|
@ -412,10 +409,7 @@ class NFlow(object):
|
|||
|
||||
def is_idle(self, tick, idle_timeout):
|
||||
""" is_idle method to check if NFlow is idle accoring to configured timeout """
|
||||
if (tick - idle_timeout) >= self._C.bidirectional_last_seen_ms:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
return (tick - idle_timeout) >= self._C.bidirectional_last_seen_ms
|
||||
|
||||
def __str__(self):
|
||||
""" String representation of NFlow """
|
||||
|
|
|
|||
|
|
@ -139,9 +139,8 @@ def setup_dissector(ffi, lib, n_dissections):
|
|||
dissector = lib.dissector_init(checker)
|
||||
if dissector == ffi.NULL:
|
||||
raise ValueError("Error while initializing dissector.")
|
||||
else:
|
||||
# Configure it (activate bitmask to all protocols)
|
||||
lib.dissector_configure(dissector)
|
||||
# Configure it (activate bitmask to all protocols)
|
||||
lib.dissector_configure(dissector)
|
||||
else: # No dissection configured
|
||||
dissector = ffi.NULL
|
||||
return dissector
|
||||
|
|
|
|||
|
|
@ -39,7 +39,6 @@ class NFPlugin(object):
|
|||
flow.udps.packet_40_count = 0
|
||||
----------------------------------------------------------------
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_update(self, packet, flow):
|
||||
"""
|
||||
|
|
@ -49,7 +48,6 @@ class NFPlugin(object):
|
|||
flow.udps.packet_40_count += 1
|
||||
----------------------------------------------------------------
|
||||
"""
|
||||
pass
|
||||
|
||||
def on_expire(self, flow):
|
||||
"""
|
||||
|
|
@ -59,7 +57,6 @@ class NFPlugin(object):
|
|||
flow.udps.magic_message = "YES"
|
||||
----------------------------------------------------------------
|
||||
"""
|
||||
pass
|
||||
|
||||
def cleanup(self):
|
||||
"""
|
||||
|
|
@ -68,8 +65,6 @@ class NFPlugin(object):
|
|||
del self.large_dict_passed_as_plugin_attribute
|
||||
----------------------------------------------------------------
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# A working example.
|
||||
class SPLT(NFPlugin):
|
||||
|
|
@ -92,12 +87,11 @@ class SPLT(NFPlugin):
|
|||
def _get_packet_size(packet, accounting_mode):
|
||||
if accounting_mode == 0:
|
||||
return packet.raw_size
|
||||
elif accounting_mode == 1:
|
||||
if accounting_mode == 1:
|
||||
return packet.ip_size
|
||||
elif accounting_mode == 2:
|
||||
if accounting_mode == 2:
|
||||
return packet.transport_size
|
||||
else:
|
||||
return packet.payload_size
|
||||
return packet.payload_size
|
||||
|
||||
def on_init(self, packet, flow):
|
||||
flow.udps.splt_direction = [-1] * self.sequence_length
|
||||
|
|
@ -113,6 +107,3 @@ class SPLT(NFPlugin):
|
|||
flow.udps.splt_direction[packet_index] = packet.direction
|
||||
flow.udps.splt_ps[packet_index] = self._get_packet_size(packet, self.accounting_mode)
|
||||
flow.udps.splt_piat_ms[packet_index] = packet.delta_time
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -357,8 +357,9 @@ class NFStreamer(object):
|
|||
chunk_flows += 1
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
if not f.closed:
|
||||
f.close()
|
||||
if f is not None:
|
||||
if not f.closed:
|
||||
f.close()
|
||||
return total_flows
|
||||
|
||||
def to_pandas(self, ip_anonymization=False):
|
||||
|
|
|
|||
|
|
@ -34,8 +34,7 @@ def csv_converter(values):
|
|||
def open_file(path, chunked, chunk_idx):
|
||||
if not chunked:
|
||||
return open(path, 'wb')
|
||||
else:
|
||||
return open(path.replace("csv", "{}.csv".format(chunk_idx)), 'wb')
|
||||
return open(path.replace("csv", "{}.csv".format(chunk_idx)), 'wb')
|
||||
|
||||
|
||||
def update_performances(performances, is_linux, flows_count):
|
||||
|
|
@ -92,13 +91,13 @@ def chunks(l, n):
|
|||
return (l[i:i+n] for i in range(0, len(l), n))
|
||||
|
||||
|
||||
def set_affinity(id):
|
||||
def set_affinity(mask):
|
||||
if platform.system() == "Linux":
|
||||
c_cores = psutil.cpu_count(logical=False)
|
||||
c_cpus = psutil.cpu_count(logical=True)
|
||||
if c_cpus == c_cores: # single threaded.
|
||||
psutil.Process().cpu_affinity([id,])
|
||||
psutil.Process().cpu_affinity([mask,])
|
||||
else:
|
||||
if c_cpus == (2*c_cores): # multi threaded
|
||||
temp = list(chunks(range(c_cpus), 2))
|
||||
psutil.Process().cpu_affinity(list(temp[id]))
|
||||
psutil.Process().cpu_affinity(list(temp[mask]))
|
||||
Loading…
Add table
Add a link
Reference in a new issue