""" Interface ==================================== The Interface class can be used to access information about interface statistics through the REST API (https://www.ntop.org/guides/ntopng/api/rest/api_v2.html). """ from ntopng.host import Host from ntopng.historical import Historical class Interface: """ Interface provides information about a Network interface :param ntopng_obj: The ntopng handle :param ifid: The interface ID """ def __init__(self, ntopng_obj, ifid): """ Construct a new Interface object :param ntopng_obj: The ntopng handle :type ntopng_obj: Ntopng :param ifid: The interface ID :type ifid: int """ self.ntopng_obj = ntopng_obj self.ifid = ifid self.rest_v2_url = "/lua/rest/v2" self.rest_pro_v2_url = "/lua/pro/rest/v2" def get_data(self): """ Return information about a Network interface :return: Information about the interface :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/interface/data.lua", {"ifid": self.ifid})) def get_broadcast_domains(self): """ Return information about broadcast domains on an interface :return: Information about broadcast domains :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/interface/bcast_domains.lua", {"ifid": self.ifid})) def get_address(self): """ Return the interface IP address(es) :return: The interface address(es) :rtype: array """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/interface/address.lua", {"ifid": self.ifid})) def get_l7_stats(self, max_num_results): """ Return statistics about Layer 7 protocols seen on an interface :param max_num_results: The maximum number of results to limit the output :type max_num_results: int :return: Layer 7 protocol statistics :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/interface/l7/stats.lua", {"ifid": self.ifid, 'ndpistats_mode': 'count', 'breed': True, 'ndpi_category': True, 'all_values' : True, 'max_values': max_num_results, 'collapse_stats': False })) def get_dscp_stats(self): """ Return statistics about DSCP :return: DSCP statistics :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/interface/dscp/stats.lua", {"ifid": self.ifid})) def get_host(self, ip, vlan=None): """ Return an Host instance :param ifid: The interface ID :type ifid: int :param ip: The host IP address :type ip: string :param vlan: The host VLAN ID (if any) :type vlan: int :return: The host instance :rtype: ntopng.Host """ return Host(self.ntopng_obj, self.ifid, ip, vlan) def get_active_hosts(self): """ Retrieve the list of active hosts for the specified interface :return: All active hosts :rtype: array """ rsp = self.ntopng_obj.request(self.rest_v2_url + "/get/host/active.lua", {"ifid": self.ifid, "all": "true"}) return rsp["data"] def get_active_hosts_paginated(self, currentPage, perPage): return(self.ntopng_obj.request(self.rest_v2_url + "/get/host/active.lua", {"ifid": self.ifid, "currentPage": currentPage, "perPage": perPage})) def get_top_local_talkers(self): """ Return Top Local hosts generating more traffic :return: The top local hosts :rtype: array """ return(self.ntopng_obj.request(self.rest_pro_v2_url + "/get/interface/top/local/talkers.lua", { "ifid": self.ifid })) def get_top_remote_talkers(self): """ Return Top Remote hosts generating more traffic :return: The top remote hosts :rtype: array """ return(self.ntopng_obj.request(self.rest_pro_v2_url + "/get/interface/top/remote/talkers.lua", { "ifid": self.ifid })) def get_active_flows_paginated(self, currentPage, perPage): """ Retrieve the (paginated) list of active flows for the specified interface :param currentPage: The current page :type currentPage: int :param perPage: The number of results per page :type perPage: int :return: All active flows :rtype: array """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/flow/active.lua", {"ifid": self.ifid, "currentPage": currentPage, "perPage": perPage})) def get_active_l4_proto_flow_counters(self): """ Return statistics about active flows per Layer 4 protocol on an interface :return: Layer 4 protocol flows statistics :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/flow/l4/counters.lua", {"ifid": self.ifid })) def get_active_l7_proto_flow_counters(self): """ Return statistics about active flows per Layer 7 protocol on an interface :return: Layer 7 protocol flows statistics :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/flow/l7/counters.lua", {"ifid": self.ifid })) def get_alert_types_enum(self): """ Return the enum for all the alert types :return: Alert types enum :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/alert/type/consts.lua", {})) def get_alert_severities_enum(self): """ Return the enum for all the alert severities :return: Alert severity enum :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/alert/severity/consts.lua", {})) def get_alerts_counter_per_type(self, ifid_num): """ Return the number of alerts for each alert type :return: Number of alerts for each alert type :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/alert/type/counters.lua", {"ifid": ifid_num })) def get_alerts_counter_per_severity(self, ifid_num): """ Return the number of alerts for each severity value :return: Number of alerts for each severity value :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/alert/severity/counters.lua", {"ifid": ifid_num })) def get_l7_application_proto_enum(self): """ Return the enum for L7 application enum :return: Enum for L7 application enum :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/l7/application/consts.lua", {})) def get_l7_application_category_enum(self): """ Return the enum for L7 application category enum :return: Enum for L7 application category enum :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/l7/category/consts.lua", {})) def get_l4_protocols_enum(self): """ Return the enum for L4 protocols :return: Enum for L4 protocols :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/l4/protocol/consts.lua", {})) def get_active_hosts(self, ifid_num): """ Return all active hosts on the selected interface :return: Active hosts on the selected interface :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/host/active.lua", {"ifid": ifid_num })) def get_host_data(self, ifid_num, host_ip): """ Return all host data for the selected interface and IP :return: Host data for the specified interface and IP address :rtype: object """ return(self.ntopng_obj.request(self.rest_v2_url + "/get/host/data.lua", {"ifid": ifid_num, "host": str(host_ip) })) def get_historical(self): """ Return an Historical handle for the interface :return: The historical handle :rtype: ntopng.Historical """ return Historical(self.ntopng_obj, self.ifid) def get_all_alerts(self, ifid, epoch_begin, epoch_end, ip=None): """ Return alerts for specified interface and epoch_begin and epoch_end. By default it returns all alerts :return: Flow alerts for the specified IP (if present) :rtype: object """ request_params = {"ifid": ifid, "epoch_begin": epoch_begin, "epoch_end": epoch_end, "format": "json"} if (ip != None): request_params["cli_ip"] = str(ip) + ";eq" return self.ntopng_obj.request(self.rest_v2_url + "/get/flow/alert/list.lua", request_params) def self_test(self): print(self.get_data()) try: print("Broadcast Domains ----------------------------") print(self.get_broadcast_domains()) print("Address ----------------------------") print(self.get_address()) print("L7 Stats ----------------------------") max_num_records = 100 print(self.get_l7_stats(max_num_records)) print("DSCP Stats ----------------------------") print(self.get_dscp_stats()) print("Active Hosts ----------------------------") print(self.get_active_hosts()) print("Active Hosts (100) ----------------------------") print(self.get_active_hosts_paginated(1, 100)) print("Top Local Talkers ----------------------------") print(self.get_top_local_talkers()) print("Top Remote Talkers ----------------------------") print(self.get_top_remote_talkers()) print("Active Flows (100) ----------------------------") print(self.get_active_flows_paginated(1, 100)) print("L4 Flow Counters ----------------------------") print(self.get_active_l4_proto_flow_counters()) print("L7 Flow Counters ----------------------------") print(self.get_active_l7_proto_flow_counters()) print("----------------------------") except: raise ValueError("Invalid interface ID specified")