From aed3c8eb273f35425baf0f5fc693fe89c3614bf7 Mon Sep 17 00:00:00 2001 From: aouinizied Date: Thu, 7 May 2020 01:55:10 +0200 Subject: [PATCH] Rework to_pandas export. --- examples/csv_generator.py | 5 ++--- nfstream/entry.py | 6 ++++++ nfstream/streamer.py | 38 ++++++++++++++++++++++++++++++++------ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/examples/csv_generator.py b/examples/csv_generator.py index d9082d1..5fe3515 100644 --- a/examples/csv_generator.py +++ b/examples/csv_generator.py @@ -26,8 +26,7 @@ path = sys.argv[1] output_file_name = path + ".csv" print("nfstream processing started. Use Ctrl+C to interrupt and save.") start = datetime.datetime.now() -df = NFStreamer(source=path, statistics=True, idle_timeout=1).to_pandas() +total_flows = NFStreamer(source=path, statistics=True).to_csv(path=output_file_name, sep=";") end = datetime.datetime.now() -df.to_csv(output_file_name) -print("nfstream processed {} flows and saved them in file: {}".format(df.shape[0], output_file_name)) +print("\nnfstream processed {} flows and saved them in file: {}".format(total_flows, output_file_name)) print("Processing time: {}".format(end - start)) diff --git a/nfstream/entry.py b/nfstream/entry.py index dc19262..7701310 100644 --- a/nfstream/entry.py +++ b/nfstream/entry.py @@ -73,4 +73,10 @@ class NFEntry(object): """ Convert NFEntry to json """ return json.dumps(self.__dict__) + def values(self): + return list(self.__dict__.values()) + + def keys(self): + return list(self.__dict__) + diff --git a/nfstream/streamer.py b/nfstream/streamer.py index f836b0c..625f856 100644 --- a/nfstream/streamer.py +++ b/nfstream/streamer.py @@ -34,6 +34,7 @@ class NFStreamer(object): account_ip_padding_size=False, enable_guess=True, decode_tunnels=True, bpf_filter=None, promisc=True ): NFStreamer.streamer_id += 1 + self._source = source now = str(tm.time()) self._nroots = 100 self.sock_name = "ipc:///tmp/nfstream-{pid}-{streamerid}-{ts}".format(pid=os.getpid(), @@ -84,12 +85,37 @@ class NFStreamer(object): except RuntimeError: return None + def to_csv(self, sep=";", path=None): + if path is None: + output_path = str(self._source) + '.csv' + else: + output_path = path + if os.path.exists(output_path): + sys.exit("Output file exists: {}. Please specify a valid file path.".format(output_path)) + else: + total_flows = 0 + with open(output_path, 'ab') as f: + for flow in self: + try: + if total_flows == 0: # header creation + header = sep.join([str(i) for i in flow.keys()]) + "\n" + f.write(header.encode('utf-8')) + values = sep.join([str(i) for i in flow.values()]) + "\n" + f.write(values.encode('utf-8')) + total_flows = total_flows + 1 + except KeyboardInterup: + if not self._stopped: + self._stopped = True + self.cache.stopped = True + return total_flows + def to_pandas(self): """ streamer to pandas function """ - data = [] - for flow in self: - data.append(flow.to_namedtuple()) - df = pd.DataFrame(data=data) - del data - return df + temp_file_path = self.sock_name.replace("ipc:///tmp/", "") + ".csv" + total_flows = self.to_csv(path=temp_file_path) + df = pd.read_csv(temp_file_path, sep=";") + if os.path.exists(temp_file_path): + os.remove(temp_file_path) + if total_flows == df.shape[0]: + return df