diff --git a/README.rst b/README.rst index 09b691e..7ab115b 100644 --- a/README.rst +++ b/README.rst @@ -11,6 +11,7 @@ nfstream is a flexible and lightweight network data analysis library. * **Performance:** nfstream was designed to be fast, CPU savvy and small memory fingerprint. * **Layer-7 visibility:** nfstream dissection is based on nDPI_ (~300 applications including Tor, Messenger, WhatsApp, etc.). * **Flexibility:** add a flow metric in 2 lines of code using nfstream plugins method. +* **Machine Learning oriented:** add your trained model as an NFStream Classifier. **examples of use** @@ -22,10 +23,15 @@ nfstream is a flexible and lightweight network data analysis library. my_capture_streamer = Streamer(source="instagram.pcap", capacity=128000, active_timeout=120, - inactive_timeout=60) + inactive_timeout=60, + user_metrics=None, + user_classifiers=None, + enable_ndpi=True) + my_live_streamer = Streamer(source="eth1") # or capture from a network interface for flow in my_capture_streamer: # or for flow in my_live_streamer print(flow) # print, append to pandas Dataframe or whatever you want :)! + .. code-block:: json {"ip_src": "192.168.122.121", @@ -50,15 +56,19 @@ nfstream is a flexible and lightweight network data analysis library. from nfstream.streamer import Streamer def my_awesome_plugin(packet_information, flow): - if packet_information.size > 666: - flow.metrics['count_pkts_gt_666'] += 1 - return flow + old_value = flow.metrics['count_pkts_gt_666'] + if packet_information.size > 999: + old_value = flow.metrics['count_pkts_gt_666'] + new_value = old_value + 1 + return new_value + else: + return old_value streamer_awesome = Streamer(source='devil.pcap', user_metrics={'count_pkts_gt_666': my_awesome_plugin}) - for flow in streamer_awesome: + for export in streamer_awesome: # now you will see your created metric in generated flows - print(flow.metrics['count_pkts_gt_666']) + print(export.metrics['count_pkts_gt_666']) * More example and details are provided on the official Documentation_. diff --git a/docs/source/index.rst b/docs/source/index.rst index aec3f18..e3e01ef 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -16,6 +16,7 @@ nfstream documentation * **Performance:** nfstream was designed to be fast, CPU savvy and small memory fingerprint. * **Layer-7 visibility:** nfstream dissection is based on nDPI_ (~300 applications including Tor, Messenger, WhatsApp, etc.). * **Flexibility:** add a flow metric in 2 lines of code using nfstream plugins method. +* **Machine Learning oriented:** add your trained model as an NFStream Classifier. .. toctree:: diff --git a/docs/source/tutorials.rst b/docs/source/tutorials.rst index 1de49dc..2ab7045 100644 --- a/docs/source/tutorials.rst +++ b/docs/source/tutorials.rst @@ -14,7 +14,11 @@ nfstream make this path easier in few lines: my_capture_streamer = Streamer(source="instagram.pcap", capacity=128000, active_timeout=120, - inactive_timeout=60) + inactive_timeout=60, + user_metrics=None, + user_classifiers=None, + enable_ndpi=True) + my_live_streamer = Streamer(source="eth1") # or capture from a network interface for flow in my_capture_streamer: # or for flow in my_live_streamer print(flow) # print, append to pandas Dataframe or whatever you want :)! @@ -29,7 +33,12 @@ nfstream make this path easier in few lines: **inactive_timeout** flows that are inactive for more than this value in seconds will be exported. (Default: 120) - **user_metrics** dict with metric_name as key ans callback as value. (Default {}) + **user_metrics** dict with metric_name as key ans callback as value. (Default None) + + **user_classifiers** NFStream Classifier or list of NFStream Classifiers. (Default None) + + **enable_ndpi** enable nDPI classifier a Layer 7 visibility. (Default True) + This will print a json representation of nfstream flow object: @@ -78,16 +87,81 @@ Didn't find a specific flow feature? add it to Streamer as a plugin in few lines from nfstream.streamer import Streamer def my_awesome_plugin(packet_information, flow): - if packet_information.size > 666: - flow.metrics['count_pkts_gt_666'] += 1 - return flow + old_value = flow.metrics['count_pkts_gt_666'] + if packet_information.size > 999: + old_value = flow.metrics['count_pkts_gt_666'] + new_value = old_value + 1 + return new_value + else: + return old_value streamer_awesome = Streamer(source='devil.pcap', user_metrics={'count_pkts_gt_666': my_awesome_plugin}) - for flow in streamer_awesome: + for export in streamer_awesome: # now you will see your created metric in generated flows - print(flow.metrics['count_pkts_gt_666']) + print(export.metrics['count_pkts_gt_666']) .. warning:: **Plugin signature** - Your nfstream plugin must always update the received flow and **return the flow**. \ No newline at end of file + + * Your nfstream plugin must always **return the new value**. + * nfstream always set metrics to 0 (Default value). + +How if I want to log the size of the fourth packet from src -> dst ? + +.. code-block:: python + + from nfstream.streamer import Streamer + + def my_awesome_plugin(packet_information, flow): + if flow.src_to_dst_pkts == 4 and packet_information.direction == 0: + return packet_information.size + else: + return 0 + + streamer_awesome = Streamer(source='devil.pcap', + user_metrics={'fourth_src_to_dst_pkt_size': my_awesome_plugin}) + for export in streamer_awesome: + # now you will see your created metric in generated flows + print(export.metrics['fourth_src_to_dst_pkt_size']) + +Create your own Classifier +-------------------------- + +If you wan to add one or many classifiers to nfstream, you must create your classifier inheriting from +NFStreamClassifier. +Example, let's say that you have a trained Machine Learning Model and you want to use it to classify real traffic. +We suppose that your model takes as features the packet size of 3 first packets of a flow. + +.. code-block:: python + + class DummyClassifier(NFStreamClassifier) + def __init__(self, name): + NFStreamClassifier.__init__(self, name) + self.dummy_classifier = pickle.load(open('your_trained_model_file', "rb")) + + def on_flow_init(self, flow): # Initialize your flow features if needed + flow.classifiers[self.name]['1'] = 0 + flow.classifiers[self.name]['2'] = 0 + flow.classifiers[self.name]['3'] = 0 + + def on_flow_update(self, packet_information, flow): + number_packets = flow.src_to_dst_pkts + flow.dst_to_src_pkts + if number_packets == 1: + flow.classifiers[self.name]['1'] = packet_information.size + elif number_packets == 2: + flow.classifiers[self.name]['2'] = packet_information.size + elif number_packets == 3: + flow.classifiers[self.name]['3'] = packet_information.size + flow.metrics[self.name]['prediction'] = self.dummy_classifier.predict(flow.classifiers[self.name]['1'], + flow.classifiers[self.name]['2'], + flow.classifiers[self.name]['3']) + def on_flow_terminate(self, flow): + # Will be called when flow is expired by nfstream + return + + def on_exit(self): + # Will be called when nfstream is cleaning up. + return + + my_capture_streamer = Streamer(source="instagram.pcap", user_classifiers=DummyClassifier("my_dummy_classifier")) \ No newline at end of file