Rework to_pandas export.

This commit is contained in:
aouinizied 2020-05-07 01:55:10 +02:00
parent fe55d8724f
commit aed3c8eb27
3 changed files with 40 additions and 9 deletions

View file

@ -26,8 +26,7 @@ path = sys.argv[1]
output_file_name = path + ".csv"
print("nfstream processing started. Use Ctrl+C to interrupt and save.")
start = datetime.datetime.now()
df = NFStreamer(source=path, statistics=True, idle_timeout=1).to_pandas()
total_flows = NFStreamer(source=path, statistics=True).to_csv(path=output_file_name, sep=";")
end = datetime.datetime.now()
df.to_csv(output_file_name)
print("nfstream processed {} flows and saved them in file: {}".format(df.shape[0], output_file_name))
print("\nnfstream processed {} flows and saved them in file: {}".format(total_flows, output_file_name))
print("Processing time: {}".format(end - start))

View file

@ -73,4 +73,10 @@ class NFEntry(object):
""" Convert NFEntry to json """
return json.dumps(self.__dict__)
def values(self):
return list(self.__dict__.values())
def keys(self):
return list(self.__dict__)

View file

@ -34,6 +34,7 @@ class NFStreamer(object):
account_ip_padding_size=False, enable_guess=True, decode_tunnels=True, bpf_filter=None, promisc=True
):
NFStreamer.streamer_id += 1
self._source = source
now = str(tm.time())
self._nroots = 100
self.sock_name = "ipc:///tmp/nfstream-{pid}-{streamerid}-{ts}".format(pid=os.getpid(),
@ -84,12 +85,37 @@ class NFStreamer(object):
except RuntimeError:
return None
def to_csv(self, sep=";", path=None):
if path is None:
output_path = str(self._source) + '.csv'
else:
output_path = path
if os.path.exists(output_path):
sys.exit("Output file exists: {}. Please specify a valid file path.".format(output_path))
else:
total_flows = 0
with open(output_path, 'ab') as f:
for flow in self:
try:
if total_flows == 0: # header creation
header = sep.join([str(i) for i in flow.keys()]) + "\n"
f.write(header.encode('utf-8'))
values = sep.join([str(i) for i in flow.values()]) + "\n"
f.write(values.encode('utf-8'))
total_flows = total_flows + 1
except KeyboardInterup:
if not self._stopped:
self._stopped = True
self.cache.stopped = True
return total_flows
def to_pandas(self):
""" streamer to pandas function """
data = []
for flow in self:
data.append(flow.to_namedtuple())
df = pd.DataFrame(data=data)
del data
return df
temp_file_path = self.sock_name.replace("ipc:///tmp/", "") + ".csv"
total_flows = self.to_csv(path=temp_file_path)
df = pd.read_csv(temp_file_path, sep=";")
if os.path.exists(temp_file_path):
os.remove(temp_file_path)
if total_flows == df.shape[0]:
return df