Disable temporary coverage on macos.

This commit is contained in:
Zied Aouini 2021-04-14 15:11:52 +02:00
parent b29a08afbf
commit c2eea5802f
3 changed files with 13 additions and 31 deletions

View file

@ -98,10 +98,10 @@ jobs:
run: |
ls dist/
- name: Testing and coverage report generation
# Disable temporary coverage on MACOS
if: startsWith(matrix.os, 'ubuntu')
run: |
echo "Testing ..."
python -m coverage run tests.py
echo "Combining ..."
python -m coverage combine
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1

View file

@ -267,7 +267,6 @@ class NFStreamer(object):
channel = mp.Queue(maxsize=32767) # Backpressure strategy.
# We set it to (2^15-1) to cope with OSX maximum semaphore value.
n_meters = self.n_meters
print("DEBUG: {}".format(n_meters))
try:
for i in range(n_meters):
performances.append([mp.Value('I', 0), mp.Value('I', 0), mp.Value('I', 0)])

View file

@ -240,30 +240,26 @@ class TestMethods(unittest.TestCase):
def test_expiration_management(self):
print("\n----------------------------------------------------------------------")
# Idle expiration
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', idle_timeout=0,
n_meters=int(os.getenv('MAX_NFMETERS', 0)))
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', idle_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
# Active expiration
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', active_timeout=0,
n_meters=int(os.getenv('MAX_NFMETERS', 0)))
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', active_timeout=0)
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
# Custom expiration
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', udps=FlowSlicer(limit=1),
n_meters=int(os.getenv('MAX_NFMETERS', 0)))
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', udps=FlowSlicer(limit=1))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
self.assertEqual(last_id, 27)
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', udps=FlowSlicer(limit=4),
n_meters=int(os.getenv('MAX_NFMETERS', 0)))
streamer_expiration = NFStreamer(source='tests/google_ssl.pcap', udps=FlowSlicer(limit=4))
last_id = 0
for flow in streamer_expiration:
last_id = flow.id
@ -273,8 +269,7 @@ class TestMethods(unittest.TestCase):
def test_statistical(self):
print("\n----------------------------------------------------------------------")
statistical_streamer = NFStreamer(source='tests/google_ssl.pcap', statistical_analysis=True,
accounting_mode=1, n_meters=int(os.getenv('MAX_NFMETERS', 0)))
statistical_streamer = NFStreamer(source='tests/google_ssl.pcap', statistical_analysis=True, accounting_mode=1)
for flow in statistical_streamer:
self.assertEqual(flow.id, 0)
self.assertEqual(flow.expiration_id, 0)
@ -357,8 +352,7 @@ class TestMethods(unittest.TestCase):
def test_fingerprint_extraction(self):
print("\n----------------------------------------------------------------------")
fingerprint_streamer = NFStreamer(source='tests/facebook.pcap', statistical_analysis=True,
accounting_mode=1, n_meters=int(os.getenv('MAX_NFMETERS', 0)))
fingerprint_streamer = NFStreamer(source='tests/facebook.pcap', statistical_analysis=True, accounting_mode=1)
for flow in fingerprint_streamer:
self.assertEqual(flow.application_name, 'TLS.Facebook')
self.assertEqual(flow.application_category_name, 'SocialNetwork')
@ -373,24 +367,18 @@ class TestMethods(unittest.TestCase):
def test_export(self):
print("\n----------------------------------------------------------------------")
df = NFStreamer(source='tests/steam.pcap',
statistical_analysis=True, n_meters=int(os.getenv('MAX_NFMETERS', 0)),
n_dissections=20).to_pandas()
df = NFStreamer(source='tests/steam.pcap', statistical_analysis=True, n_dissections=20).to_pandas()
df_anon = NFStreamer(source='tests/steam.pcap',
statistical_analysis=True, n_meters=int(os.getenv('MAX_NFMETERS', 0)),
statistical_analysis=True,
n_dissections=20).to_pandas(columns_to_anonymize=["src_ip", "dst_ip"])
self.assertEqual(df_anon.shape[0], df.shape[0])
self.assertEqual(df_anon.shape[1], df.shape[1])
self.assertEqual(df_anon['src_ip'].nunique(), df['src_ip'].nunique())
self.assertEqual(df_anon['dst_ip'].nunique(), df['dst_ip'].nunique())
total_flows = NFStreamer(source='tests/steam.pcap',
statistical_analysis=True, n_meters=int(os.getenv('MAX_NFMETERS', 0)),
n_dissections=20).to_csv()
total_flows = NFStreamer(source='tests/steam.pcap', statistical_analysis=True, n_dissections=20).to_csv()
df_from_csv = pd.read_csv('tests/steam.pcap.csv')
total_flows_anon = NFStreamer(source='tests/steam.pcap',
statistical_analysis=True, n_meters=int(os.getenv('MAX_NFMETERS', 0)),
n_dissections=20).to_csv()
total_flows_anon = NFStreamer(source='tests/steam.pcap', statistical_analysis=True, n_dissections=20).to_csv()
df_anon_from_csv = pd.read_csv('tests/steam.pcap.csv')
os.remove('tests/steam.pcap.csv')
self.assertEqual(total_flows, total_flows_anon)
@ -402,9 +390,7 @@ class TestMethods(unittest.TestCase):
def test_bpf(self):
print("\n----------------------------------------------------------------------")
streamer_test = NFStreamer(source='tests/facebook.pcap',
bpf_filter="src port 52066 or dst port 52066",
n_meters=int(os.getenv('MAX_NFMETERS', 0)))
streamer_test = NFStreamer(source='tests/facebook.pcap', bpf_filter="src port 52066 or dst port 52066")
last_id = 0
for flow in streamer_test:
last_id = flow.id
@ -442,7 +428,6 @@ class TestMethods(unittest.TestCase):
def test_splt(self):
print("\n----------------------------------------------------------------------")
splt_df = NFStreamer(source='tests/google_ssl.pcap', splt_analysis=5,
n_meters=int(os.getenv('MAX_NFMETERS', 0)),
udps=SPLT(sequence_length=5, accounting_mode=0)).to_pandas()
direction = json.loads(splt_df["udps.splt_direction"][0])
ps = json.loads(splt_df["udps.splt_ps"][0])
@ -462,7 +447,6 @@ class TestMethods(unittest.TestCase):
print("\n----------------------------------------------------------------------")
dhcp_df = NFStreamer(source='tests/dhcp.pcap',
n_dissections=0,
n_meters=int(os.getenv('MAX_NFMETERS', 0)),
udps=DHCP()
).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
self.assertEqual(dhcp_df["udps.dhcp_msg_type"][0], "MsgType.DISCOVER")
@ -478,7 +462,6 @@ class TestMethods(unittest.TestCase):
print("\n----------------------------------------------------------------------")
mdns_df = NFStreamer(source='tests/mdns.pcap',
n_dissections=0,
n_meters=int(os.getenv('MAX_NFMETERS', 0)),
udps=MDNS()
).to_pandas().sort_values(by=['src_ip']).reset_index(drop=True)
self.assertEqual(mdns_df["udps.mdns_ptr"][0], "['skynet.local', "