mirror of
https://github.com/vel21ripn/nDPI.git
synced 2026-05-02 00:40:17 +00:00
Added DPDK support to ndpiReader
This commit is contained in:
parent
6d929bf4cc
commit
71b2c19cf2
7 changed files with 242 additions and 34 deletions
|
|
@ -204,7 +204,9 @@ typedef struct ndpi_id {
|
|||
|
||||
// used memory counters
|
||||
u_int32_t current_ndpi_memory = 0, max_ndpi_memory = 0;
|
||||
|
||||
#ifdef USE_DPDK
|
||||
static int dpdk_port_id = 0, dpdk_run_capture = 1;
|
||||
#endif
|
||||
|
||||
void test_lib(); /* Forward */
|
||||
|
||||
|
|
@ -227,7 +229,11 @@ static void setupDetection(u_int16_t thread_id, pcap_t * pcap_handle);
|
|||
static void help(u_int long_help) {
|
||||
printf("Welcome to nDPI %s\n\n", ndpi_revision());
|
||||
|
||||
printf("ndpiReader -i <file|device> [-f <filter>][-s <duration>][-m <duration>]\n"
|
||||
printf("ndpiReader "
|
||||
#ifndef USE_DPDK
|
||||
"-i <file|device> "
|
||||
#endif
|
||||
"[-f <filter>][-s <duration>][-m <duration>]\n"
|
||||
" [-p <protos>][-l <loops> [-q][-d][-h][-t][-v <level>]\n"
|
||||
" [-n <threads>][-w <file>][-c <file>][-j <file>][-x <file>]\n\n"
|
||||
"Usage:\n"
|
||||
|
|
@ -457,7 +463,18 @@ static void parseOptions(int argc, char **argv) {
|
|||
if(trace) fprintf(trace, " #### %s #### \n", __FUNCTION__);
|
||||
#endif
|
||||
|
||||
while ((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) {
|
||||
#ifdef USE_DPDK
|
||||
{
|
||||
int ret = rte_eal_init(argc, argv);
|
||||
|
||||
if(ret < 0)
|
||||
rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
|
||||
|
||||
argc -= ret, argv += ret;
|
||||
}
|
||||
#endif
|
||||
|
||||
while((opt = getopt_long(argc, argv, "c:df:g:i:hp:l:s:tv:V:n:j:rp:w:q0123:456:7:89:m:b:x:", longopts, &option_idx)) != EOF) {
|
||||
#ifdef DEBUG_TRACE
|
||||
if(trace) fprintf(trace, " #### -%c [%s] #### \n", opt, optarg ? optarg : "");
|
||||
#endif
|
||||
|
|
@ -603,7 +620,7 @@ static void parseOptions(int argc, char **argv) {
|
|||
|
||||
case '9':
|
||||
extcap_packet_filter = ndpi_get_proto_by_name(ndpi_info_mod, optarg);
|
||||
if (extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg);
|
||||
if(extcap_packet_filter == NDPI_PROTOCOL_UNKNOWN) extcap_packet_filter = atoi(optarg);
|
||||
break;
|
||||
|
||||
case 257:
|
||||
|
|
@ -616,6 +633,7 @@ static void parseOptions(int argc, char **argv) {
|
|||
}
|
||||
}
|
||||
|
||||
#ifndef USE_DPDK
|
||||
if(!bpf_filter_flag) {
|
||||
if(do_capture) {
|
||||
quiet_mode = 1;
|
||||
|
|
@ -630,7 +648,7 @@ static void parseOptions(int argc, char **argv) {
|
|||
if(strchr(_pcap_file[0], ',')) { /* multiple ingress interfaces */
|
||||
num_threads = 0; /* setting number of threads = number of interfaces */
|
||||
__pcap_file = strtok(_pcap_file[0], ",");
|
||||
while (__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) {
|
||||
while(__pcap_file != NULL && num_threads < MAX_NUM_READER_THREADS) {
|
||||
_pcap_file[num_threads++] = __pcap_file;
|
||||
__pcap_file = strtok(NULL, ",");
|
||||
}
|
||||
|
|
@ -647,13 +665,14 @@ static void parseOptions(int argc, char **argv) {
|
|||
if(num_cores > 1 && bind_mask != NULL) {
|
||||
char *core_id = strtok(bind_mask, ":");
|
||||
thread_id = 0;
|
||||
while (core_id != NULL && thread_id < num_threads) {
|
||||
while(core_id != NULL && thread_id < num_threads) {
|
||||
core_affinity[thread_id++] = atoi(core_id) % num_cores;
|
||||
core_id = strtok(NULL, ":");
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef DEBUG_TRACE
|
||||
if(trace) fclose(trace);
|
||||
|
|
@ -719,7 +738,7 @@ char* intoaV4(u_int32_t addr, char* buf, u_int16_t bufLen) {
|
|||
}
|
||||
*--cp = '.';
|
||||
addr >>= 8;
|
||||
} while (--n > 0);
|
||||
} while(--n > 0);
|
||||
|
||||
/* Convert the string to lowercase */
|
||||
retStr = (char*)(cp+1);
|
||||
|
|
@ -769,7 +788,7 @@ static void printFlow(u_int16_t id, struct ndpi_flow_info *flow, u_int16_t threa
|
|||
ndpi_get_proto_name(ndpi_thread_info[thread_id].workflow->ndpi_struct, flow->detected_protocol.app_protocol));
|
||||
|
||||
if(flow->detected_protocol.category != 0)
|
||||
fprintf(out, "[cat: %s/%u]",
|
||||
fprintf(out, "[cat: %s/%u]",
|
||||
ndpi_category_get_name(ndpi_thread_info[thread_id].workflow->ndpi_struct,
|
||||
flow->detected_protocol.category),
|
||||
(unsigned int)flow->detected_protocol.category);
|
||||
|
|
@ -924,7 +943,7 @@ static void node_proto_guess_walker(const void *node, ndpi_VISIT which, int dept
|
|||
}
|
||||
|
||||
process_ndpi_collected_info(ndpi_thread_info[thread_id].workflow, flow);
|
||||
|
||||
|
||||
ndpi_thread_info[thread_id].workflow->stats.protocol_counter[flow->detected_protocol.app_protocol] += flow->src2dst_packets + flow->dst2src_packets;
|
||||
ndpi_thread_info[thread_id].workflow->stats.protocol_counter_bytes[flow->detected_protocol.app_protocol] += flow->src2dst_bytes + flow->dst2src_bytes;
|
||||
ndpi_thread_info[thread_id].workflow->stats.protocol_flows[flow->detected_protocol.app_protocol]++;
|
||||
|
|
@ -985,7 +1004,7 @@ int updateIpTree(u_int32_t key, u_int8_t version,
|
|||
if(rootp == (addr_node **)0)
|
||||
return 0;
|
||||
|
||||
while (*rootp != (addr_node *)0) {
|
||||
while(*rootp != (addr_node *)0) {
|
||||
/* Knuth's T1: */
|
||||
if((version == (*rootp)->version) && (key == (*rootp)->addr)) {
|
||||
/* T2: */
|
||||
|
|
@ -1015,7 +1034,7 @@ int updateIpTree(u_int32_t key, u_int8_t version,
|
|||
/* *********************************************** */
|
||||
|
||||
void freeIpTree(addr_node *root) {
|
||||
if (root == NULL)
|
||||
if(root == NULL)
|
||||
return;
|
||||
|
||||
freeIpTree(root->left);
|
||||
|
|
@ -1210,9 +1229,9 @@ static void deleteReceivers(struct receiver *receivers) {
|
|||
/* *********************************************** */
|
||||
/* implementation of: https://jeroen.massar.ch/presentations/files/FloCon2010-TopK.pdf
|
||||
*
|
||||
* if (table1.size < max1 || acceptable){
|
||||
* if(table1.size < max1 || acceptable){
|
||||
* create new element and add to the table1
|
||||
* if (table1.size > max2) {
|
||||
* if(table1.size > max2) {
|
||||
* cut table1 back to max1
|
||||
* merge table 1 to table2
|
||||
* if(table2.size > max1)
|
||||
|
|
@ -2266,9 +2285,13 @@ free_stats:
|
|||
* @brief Force a pcap_dispatch() or pcap_loop() call to return
|
||||
*/
|
||||
static void breakPcapLoop(u_int16_t thread_id) {
|
||||
#ifdef USE_DPDK
|
||||
dpdk_run_capture = 0;
|
||||
#else
|
||||
if(ndpi_thread_info[thread_id].workflow->pcap_handle != NULL) {
|
||||
pcap_breakloop(ndpi_thread_info[thread_id].workflow->pcap_handle);
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -2335,15 +2358,26 @@ static void configurePcapHandle(pcap_t * pcap_handle) {
|
|||
* @brief Open a pcap file or a specified device - Always returns a valid pcap_t
|
||||
*/
|
||||
static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_file) {
|
||||
|
||||
u_int snaplen = 1536;
|
||||
int promisc = 1;
|
||||
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
|
||||
pcap_t * pcap_handle = NULL;
|
||||
|
||||
/* trying to open a live interface */
|
||||
if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen, promisc,
|
||||
500, pcap_error_buffer)) == NULL) {
|
||||
#ifdef USE_DPDK
|
||||
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS,
|
||||
MBUF_CACHE_SIZE, 0,
|
||||
RTE_MBUF_DEFAULT_BUF_SIZE,
|
||||
rte_socket_id());
|
||||
|
||||
if(mbuf_pool == NULL)
|
||||
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool: are hugepages ok?\n");
|
||||
|
||||
if(dpdk_port_init(dpdk_port_id, mbuf_pool) != 0)
|
||||
rte_exit(EXIT_FAILURE, "DPDK: Cannot init port %u: please see README.dpdk\n", dpdk_port_id);
|
||||
#else
|
||||
if((pcap_handle = pcap_open_live((char*)pcap_file, snaplen,
|
||||
promisc, 500, pcap_error_buffer)) == NULL) {
|
||||
capture_for = capture_until = 0;
|
||||
|
||||
live_capture = 0;
|
||||
|
|
@ -2370,11 +2404,17 @@ static pcap_t * openPcapFileOrDevice(u_int16_t thread_id, const u_char * pcap_fi
|
|||
} else {
|
||||
live_capture = 1;
|
||||
|
||||
if((!json_flag) && (!quiet_mode))
|
||||
if((!json_flag) && (!quiet_mode)) {
|
||||
#ifdef USE_DPDK
|
||||
printf("Capturing from DPDK (port 0)...\n");
|
||||
#else
|
||||
printf("Capturing live traffic from device %s...\n", pcap_file);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
configurePcapHandle(pcap_handle);
|
||||
#endif /* !DPDK */
|
||||
|
||||
if(capture_for > 0) {
|
||||
if((!json_flag) && (!quiet_mode))
|
||||
|
|
@ -2520,13 +2560,13 @@ static void runPcapLoop(u_int16_t thread_id) {
|
|||
* @brief Process a running thread
|
||||
*/
|
||||
void * processing_thread(void *_thread_id) {
|
||||
|
||||
long thread_id = (long) _thread_id;
|
||||
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
|
||||
|
||||
#if defined(linux) && defined(HAVE_PTHREAD_SETAFFINITY_NP)
|
||||
if(core_affinity[thread_id] >= 0) {
|
||||
cpu_set_t cpuset;
|
||||
|
||||
CPU_ZERO(&cpuset);
|
||||
CPU_SET(core_affinity[thread_id], &cpuset);
|
||||
|
||||
|
|
@ -2539,6 +2579,33 @@ void * processing_thread(void *_thread_id) {
|
|||
#endif
|
||||
if((!json_flag) && (!quiet_mode)) printf("Running thread %ld...\n", thread_id);
|
||||
|
||||
#ifdef USE_DPDK
|
||||
while(dpdk_run_capture) {
|
||||
struct rte_mbuf *bufs[BURST_SIZE];
|
||||
u_int16_t num = rte_eth_rx_burst(dpdk_port_id, 0, bufs, BURST_SIZE);
|
||||
u_int i;
|
||||
|
||||
if(num == 0) {
|
||||
usleep(1);
|
||||
continue;
|
||||
}
|
||||
|
||||
for(i = 0; i < PREFETCH_OFFSET && i < num; i++)
|
||||
rte_prefetch0(rte_pktmbuf_mtod(bufs[i], void *));
|
||||
|
||||
for(i = 0; i < num; i++) {
|
||||
char *data = rte_pktmbuf_mtod(bufs[i], char *);
|
||||
int len = rte_pktmbuf_pkt_len(bufs[i]);
|
||||
struct pcap_pkthdr h;
|
||||
|
||||
h.len = h.caplen = len;
|
||||
gettimeofday(&h.ts, NULL);
|
||||
|
||||
pcap_process_packet((u_char*)&thread_id, &h, (const u_char *)data);
|
||||
rte_pktmbuf_free(bufs[i]);
|
||||
}
|
||||
}
|
||||
#else
|
||||
pcap_loop:
|
||||
runPcapLoop(thread_id);
|
||||
|
||||
|
|
@ -2551,6 +2618,7 @@ pcap_loop:
|
|||
goto pcap_loop;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
@ -3239,7 +3307,7 @@ int orginal_main(int argc, char **argv) {
|
|||
automataUnitTest();
|
||||
|
||||
ndpi_info_mod = ndpi_init_detection_module();
|
||||
if (ndpi_info_mod == NULL) return -1;
|
||||
if(ndpi_info_mod == NULL) return -1;
|
||||
|
||||
memset(ndpi_thread_info, 0, sizeof(ndpi_thread_info));
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue