nDPI/example/ndpiSimpleIntegration.c
Ivan Nardi e6b332aa4a
Add support for flow client/server information (#1671)
In a lot of places in ndPI we use *packet* source/dest info
(address/port/direction) when we are interested in *flow* client/server
info, instead.

Add basic logic to autodetect this kind of information.

nDPI doesn't perform any "flow management" itself but this task is
delegated to the external application. It is then likely that the
application might provide more reliable hints about flow
client/server direction and about the TCP handshake presence: in that case,
these information might be (optionally) passed to the library, disabling
the internal "autodetect" logic.

These new fields have been used in some LRU caches and in the "guessing"
algorithm.
It is quite likely that some other code needs to be updated.
2022-07-24 17:46:24 +02:00

1176 lines
38 KiB
C

/*
*
* Copyright (C) 2011-22 - ntop.org
*
* nDPI is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* nDPI is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with nDPI. If not, see <http://www.gnu.org/licenses/>.
*
*/
#ifndef WIN32
#include <arpa/inet.h>
#include <netinet/in.h>
#endif
#include <errno.h>
#include <ndpi_api.h>
#include <ndpi_main.h>
#include <ndpi_typedefs.h>
#include <pcap/pcap.h>
#include <pthread.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#ifdef WIN32
#include <windows.h>
#endif
//#define VERBOSE 1
#define MAX_FLOW_ROOTS_PER_THREAD 2048
#define MAX_IDLE_FLOWS_PER_THREAD 64
#define TICK_RESOLUTION 1000
#define MAX_READER_THREADS 4
#define IDLE_SCAN_PERIOD 10000 /* msec */
#define MAX_IDLE_TIME 300000 /* msec */
#define INITIAL_THREAD_HASH 0x03dd018b
#ifndef ETH_P_IP
#define ETH_P_IP 0x0800
#endif
#ifndef ETH_P_IPV6
#define ETH_P_IPV6 0x86DD
#endif
#ifndef ETH_P_ARP
#define ETH_P_ARP 0x0806
#endif
enum nDPI_l3_type {
L3_IP, L3_IP6
};
struct nDPI_flow_info {
uint32_t flow_id;
unsigned long long int packets_processed;
uint64_t first_seen;
uint64_t last_seen;
uint64_t hashval;
enum nDPI_l3_type l3_type;
union {
struct {
uint32_t src;
uint32_t pad_00[3];
uint32_t dst;
uint32_t pad_01[3];
} v4;
struct {
uint64_t src[2];
uint64_t dst[2];
} v6;
struct {
uint32_t src[4];
uint32_t dst[4];
} u32;
} ip_tuple;
unsigned long long int total_l4_data_len;
uint16_t src_port;
uint16_t dst_port;
uint8_t is_midstream_flow:1;
uint8_t flow_fin_ack_seen:1;
uint8_t flow_ack_seen:1;
uint8_t detection_completed:1;
uint8_t tls_client_hello_seen:1;
uint8_t tls_server_hello_seen:1;
uint8_t flow_info_printed:1;
uint8_t reserved_00:1;
uint8_t l4_protocol;
struct ndpi_proto detected_l7_protocol;
struct ndpi_proto guessed_protocol;
struct ndpi_flow_struct * ndpi_flow;
};
struct nDPI_workflow {
pcap_t * pcap_handle;
volatile long int error_or_eof;
unsigned long long int packets_captured;
unsigned long long int packets_processed;
unsigned long long int total_l4_data_len;
unsigned long long int detected_flow_protocols;
uint64_t last_idle_scan_time;
uint64_t last_time;
void ** ndpi_flows_active;
unsigned long long int max_active_flows;
unsigned long long int cur_active_flows;
unsigned long long int total_active_flows;
void ** ndpi_flows_idle;
unsigned long long int max_idle_flows;
unsigned long long int cur_idle_flows;
unsigned long long int total_idle_flows;
struct ndpi_detection_module_struct * ndpi_struct;
};
struct nDPI_reader_thread {
struct nDPI_workflow * workflow;
pthread_t thread_id;
int array_index;
};
static struct nDPI_reader_thread reader_threads[MAX_READER_THREADS] = {};
static int reader_thread_count = MAX_READER_THREADS;
static volatile long int main_thread_shutdown = 0;
static volatile long int flow_id = 0;
static void free_workflow(struct nDPI_workflow ** const workflow);
static struct nDPI_workflow * init_workflow(char const * const file_or_device)
{
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
struct nDPI_workflow * workflow = (struct nDPI_workflow *)ndpi_calloc(1, sizeof(*workflow));
const char *bpfFilter = "ip or ip6";
static struct bpf_program bpf_code;
static struct bpf_program *bpf_cfilter = NULL;
if (workflow == NULL) {
return NULL;
}
if (access(file_or_device, R_OK) != 0 && errno == ENOENT) {
workflow->pcap_handle = pcap_open_live(file_or_device, /* 1536 */ 65535, 1, 250, pcap_error_buffer);
} else {
#ifdef WIN32
workflow->pcap_handle = pcap_open_offline(file_or_device, pcap_error_buffer);
#else
workflow->pcap_handle = pcap_open_offline_with_tstamp_precision(file_or_device, PCAP_TSTAMP_PRECISION_MICRO,
pcap_error_buffer);
#endif
}
if (workflow->pcap_handle == NULL) {
fprintf(stderr, "pcap_open_live / pcap_open_offline: %.*s\n",
(int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
free_workflow(&workflow);
return NULL;
}
if(pcap_compile(workflow->pcap_handle, &bpf_code, bpfFilter, 1, 0xFFFFFF00) < 0) {
printf("pcap_compile error: '%s'\n", pcap_geterr(workflow->pcap_handle));
exit(-1);
}
bpf_cfilter = &bpf_code;
if(pcap_setfilter(workflow->pcap_handle, bpf_cfilter) < 0) {
printf("pcap_setfilter error: '%s'\n", pcap_geterr(workflow->pcap_handle));
}
ndpi_init_prefs init_prefs = ndpi_no_prefs;
workflow->ndpi_struct = ndpi_init_detection_module(init_prefs);
if (workflow->ndpi_struct == NULL) {
free_workflow(&workflow);
return NULL;
}
workflow->total_active_flows = 0;
workflow->max_active_flows = MAX_FLOW_ROOTS_PER_THREAD;
workflow->ndpi_flows_active = (void **)ndpi_calloc(workflow->max_active_flows, sizeof(void *));
if (workflow->ndpi_flows_active == NULL) {
free_workflow(&workflow);
return NULL;
}
workflow->total_idle_flows = 0;
workflow->max_idle_flows = MAX_IDLE_FLOWS_PER_THREAD;
workflow->ndpi_flows_idle = (void **)ndpi_calloc(workflow->max_idle_flows, sizeof(void *));
if (workflow->ndpi_flows_idle == NULL) {
free_workflow(&workflow);
return NULL;
}
NDPI_PROTOCOL_BITMASK protos;
NDPI_BITMASK_SET_ALL(protos);
ndpi_set_protocol_detection_bitmask2(workflow->ndpi_struct, &protos);
ndpi_finalize_initialization(workflow->ndpi_struct);
return workflow;
}
static void ndpi_flow_info_freer(void * const node)
{
struct nDPI_flow_info * const flow = (struct nDPI_flow_info *)node;
ndpi_flow_free(flow->ndpi_flow);
ndpi_free(flow);
}
static void free_workflow(struct nDPI_workflow ** const workflow)
{
struct nDPI_workflow * const w = *workflow;
if (w == NULL) {
return;
}
if (w->pcap_handle != NULL) {
pcap_close(w->pcap_handle);
w->pcap_handle = NULL;
}
if (w->ndpi_struct != NULL) {
ndpi_exit_detection_module(w->ndpi_struct);
}
for(size_t i = 0; i < w->max_active_flows; i++) {
ndpi_tdestroy(w->ndpi_flows_active[i], ndpi_flow_info_freer);
}
ndpi_free(w->ndpi_flows_active);
ndpi_free(w->ndpi_flows_idle);
ndpi_free(w);
*workflow = NULL;
}
static char * get_default_pcapdev(char *errbuf)
{
char * ifname;
pcap_if_t * all_devices = NULL;
if (pcap_findalldevs(&all_devices, errbuf) != 0)
{
return NULL;
}
ifname = strdup(all_devices[0].name);
pcap_freealldevs(all_devices);
return ifname;
}
static int setup_reader_threads(char const * const file_or_device)
{
char * file_or_default_device;
char pcap_error_buffer[PCAP_ERRBUF_SIZE];
if (reader_thread_count > MAX_READER_THREADS) {
return 1;
}
if (file_or_device == NULL) {
file_or_default_device = get_default_pcapdev(pcap_error_buffer);
if (file_or_default_device == NULL) {
fprintf(stderr, "pcap_findalldevs: %.*s\n", (int) PCAP_ERRBUF_SIZE, pcap_error_buffer);
return 1;
}
} else {
file_or_default_device = strdup(file_or_device);
if (file_or_default_device == NULL) {
return 1;
}
}
for (int i = 0; i < reader_thread_count; ++i) {
reader_threads[i].workflow = init_workflow(file_or_default_device);
if (reader_threads[i].workflow == NULL)
{
free(file_or_default_device);
return 1;
}
}
free(file_or_default_device);
return 0;
}
static int ip_tuple_to_string(struct nDPI_flow_info const * const flow,
char * const src_addr_str, size_t src_addr_len,
char * const dst_addr_str, size_t dst_addr_len)
{
switch (flow->l3_type) {
case L3_IP:
return inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.src,
src_addr_str, src_addr_len) != NULL &&
inet_ntop(AF_INET, (struct sockaddr_in *)&flow->ip_tuple.v4.dst,
dst_addr_str, dst_addr_len) != NULL;
case L3_IP6:
return inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.src[0],
src_addr_str, src_addr_len) != NULL &&
inet_ntop(AF_INET6, (struct sockaddr_in6 *)&flow->ip_tuple.v6.dst[0],
dst_addr_str, dst_addr_len) != NULL;
}
return 0;
}
#ifdef VERBOSE
static void print_packet_info(struct nDPI_reader_thread const * const reader_thread,
struct pcap_pkthdr const * const header,
uint32_t l4_data_len,
struct nDPI_flow_info const * const flow)
{
struct nDPI_workflow const * const workflow = reader_thread->workflow;
char src_addr_str[INET6_ADDRSTRLEN+1] = {0};
char dst_addr_str[INET6_ADDRSTRLEN+1] = {0};
char buf[256];
int used = 0, ret;
ret = ndpi_snprintf(buf, sizeof(buf), "[%8llu, %d, %4u] %4u bytes: ",
workflow->packets_captured, reader_thread->array_index,
flow->flow_id, header->caplen);
if (ret > 0) {
used += ret;
}
if (ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str)) != 0) {
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, "IP[%s -> %s]", src_addr_str, dst_addr_str);
} else {
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, "IP[ERROR]");
}
if (ret > 0) {
used += ret;
}
switch (flow->l4_protocol) {
case IPPROTO_UDP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> UDP[%u -> %u, %u bytes]",
flow->src_port, flow->dst_port, l4_data_len);
break;
case IPPROTO_TCP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> TCP[%u -> %u, %u bytes]",
flow->src_port, flow->dst_port, l4_data_len);
break;
case IPPROTO_ICMP:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP");
break;
case IPPROTO_ICMPV6:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP6");
break;
case IPPROTO_HOPOPTS:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> ICMP6 Hop-By-Hop");
break;
default:
ret = ndpi_snprintf(buf + used, sizeof(buf) - used, " -> Unknown[0x%X]", flow->l4_protocol);
break;
}
if (ret > 0) {
used += ret;
}
printf("%.*s\n", used, buf);
}
#endif
static int ip_tuples_compare(struct nDPI_flow_info const * const A, struct nDPI_flow_info const * const B)
{
// generate a warning if the enum changes
switch (A->l3_type)
{
case L3_IP:
case L3_IP6:
break;
}
if (A->l3_type == L3_IP && B->l3_type == L3_IP)
{
if (A->ip_tuple.v4.src < B->ip_tuple.v4.src)
{
return -1;
}
if (A->ip_tuple.v4.src > B->ip_tuple.v4.src)
{
return 1;
}
if (A->ip_tuple.v4.dst < B->ip_tuple.v4.dst)
{
return -1;
}
if (A->ip_tuple.v4.dst > B->ip_tuple.v4.dst)
{
return 1;
}
}
else if (A->l3_type == L3_IP6 && B->l3_type == L3_IP6)
{
if (A->ip_tuple.v6.src[0] < B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] < B->ip_tuple.v6.src[1])
{
return -1;
}
if (A->ip_tuple.v6.src[0] > B->ip_tuple.v6.src[0] && A->ip_tuple.v6.src[1] > B->ip_tuple.v6.src[1])
{
return 1;
}
if (A->ip_tuple.v6.dst[0] < B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] < B->ip_tuple.v6.dst[1])
{
return -1;
}
if (A->ip_tuple.v6.dst[0] > B->ip_tuple.v6.dst[0] && A->ip_tuple.v6.dst[1] > B->ip_tuple.v6.dst[1])
{
return 1;
}
}
if (A->src_port < B->src_port)
{
return -1;
}
if (A->src_port > B->src_port)
{
return 1;
}
if (A->dst_port < B->dst_port)
{
return -1;
}
if (A->dst_port > B->dst_port)
{
return 1;
}
return 0;
}
static void ndpi_idle_scan_walker(void const * const A, ndpi_VISIT which, int depth, void * const user_data)
{
struct nDPI_workflow * const workflow = (struct nDPI_workflow *)user_data;
struct nDPI_flow_info * const flow = *(struct nDPI_flow_info **)A;
(void)depth;
if (workflow == NULL || flow == NULL) {
return;
}
if (workflow->cur_idle_flows == MAX_IDLE_FLOWS_PER_THREAD) {
return;
}
if (which == ndpi_preorder || which == ndpi_leaf) {
if ((flow->flow_fin_ack_seen == 1 && flow->flow_ack_seen == 1) ||
flow->last_seen + MAX_IDLE_TIME < workflow->last_time)
{
char src_addr_str[INET6_ADDRSTRLEN+1];
char dst_addr_str[INET6_ADDRSTRLEN+1];
ip_tuple_to_string(flow, src_addr_str, sizeof(src_addr_str), dst_addr_str, sizeof(dst_addr_str));
workflow->ndpi_flows_idle[workflow->cur_idle_flows++] = flow;
workflow->total_idle_flows++;
}
}
}
static int ndpi_workflow_node_cmp(void const * const A, void const * const B) {
struct nDPI_flow_info const * const flow_info_a = (struct nDPI_flow_info *)A;
struct nDPI_flow_info const * const flow_info_b = (struct nDPI_flow_info *)B;
if (flow_info_a->hashval < flow_info_b->hashval) {
return(-1);
} else if (flow_info_a->hashval > flow_info_b->hashval) {
return(1);
}
/* Flows have the same hash */
if (flow_info_a->l4_protocol < flow_info_b->l4_protocol) {
return(-1);
} else if (flow_info_a->l4_protocol > flow_info_b->l4_protocol) {
return(1);
}
return ip_tuples_compare(flow_info_a, flow_info_b);
}
static void check_for_idle_flows(struct nDPI_workflow * const workflow)
{
if (workflow->last_idle_scan_time + IDLE_SCAN_PERIOD < workflow->last_time) {
for (size_t idle_scan_index = 0; idle_scan_index < workflow->max_active_flows; ++idle_scan_index) {
ndpi_twalk(workflow->ndpi_flows_active[idle_scan_index], ndpi_idle_scan_walker, workflow);
while (workflow->cur_idle_flows > 0) {
struct nDPI_flow_info * const f =
(struct nDPI_flow_info *)workflow->ndpi_flows_idle[--workflow->cur_idle_flows];
if (f->flow_fin_ack_seen == 1) {
printf("Free fin flow with id %u\n", f->flow_id);
} else {
printf("Free idle flow with id %u\n", f->flow_id);
}
ndpi_tdelete(f, &workflow->ndpi_flows_active[idle_scan_index],
ndpi_workflow_node_cmp);
ndpi_flow_info_freer(f);
workflow->cur_active_flows--;
}
}
workflow->last_idle_scan_time = workflow->last_time;
}
}
static void ndpi_process_packet(uint8_t * const args,
struct pcap_pkthdr const * const header,
uint8_t const * const packet)
{
struct nDPI_reader_thread * const reader_thread =
(struct nDPI_reader_thread *)args;
struct nDPI_workflow * workflow;
struct nDPI_flow_info flow = {};
size_t hashed_index;
void * tree_result;
struct nDPI_flow_info * flow_to_process;
const struct ndpi_ethhdr * ethernet;
const struct ndpi_iphdr * ip;
struct ndpi_ipv6hdr * ip6;
uint64_t time_ms;
const uint16_t eth_offset = 0;
uint16_t ip_offset;
uint16_t ip_size;
const uint8_t * l4_ptr = NULL;
uint16_t l4_len = 0;
uint16_t type;
int thread_index = INITIAL_THREAD_HASH; // generated with `dd if=/dev/random bs=1024 count=1 |& hd'
if (reader_thread == NULL) {
return;
}
workflow = reader_thread->workflow;
if (workflow == NULL) {
return;
}
workflow->packets_captured++;
time_ms = ((uint64_t) header->ts.tv_sec) * TICK_RESOLUTION + header->ts.tv_usec / (1000000 / TICK_RESOLUTION);
workflow->last_time = time_ms;
check_for_idle_flows(workflow);
/* process datalink layer */
switch (pcap_datalink(workflow->pcap_handle)) {
case DLT_NULL:
if (ntohl(*((uint32_t *)&packet[eth_offset])) == 0x00000002) {
type = ETH_P_IP;
} else {
type = ETH_P_IPV6;
}
ip_offset = 4 + eth_offset;
break;
case DLT_EN10MB:
if (header->len < sizeof(struct ndpi_ethhdr)) {
fprintf(stderr, "[%8llu, %d] Ethernet packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
ethernet = (struct ndpi_ethhdr *) &packet[eth_offset];
ip_offset = sizeof(struct ndpi_ethhdr) + eth_offset;
type = ntohs(ethernet->h_proto);
switch (type) {
case ETH_P_IP: /* IPv4 */
if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_iphdr)) {
fprintf(stderr, "[%8llu, %d] IP packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
break;
case ETH_P_IPV6: /* IPV6 */
if (header->len < sizeof(struct ndpi_ethhdr) + sizeof(struct ndpi_ipv6hdr)) {
fprintf(stderr, "[%8llu, %d] IP6 packet too short - skipping\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
break;
case ETH_P_ARP: /* ARP */
return;
default:
fprintf(stderr, "[%8llu, %d] Unknown Ethernet packet with type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
break;
default:
fprintf(stderr, "[%8llu, %d] Captured non IP/Ethernet packet with datalink type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, pcap_datalink(workflow->pcap_handle));
return;
}
if (type == ETH_P_IP) {
ip = (struct ndpi_iphdr *)&packet[ip_offset];
ip6 = NULL;
} else if (type == ETH_P_IPV6) {
ip = NULL;
ip6 = (struct ndpi_ipv6hdr *)&packet[ip_offset];
} else {
fprintf(stderr, "[%8llu, %d] Captured non IPv4/IPv6 packet with type 0x%X - skipping\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
ip_size = header->len - ip_offset;
if (type == ETH_P_IP && header->len >= ip_offset) {
if (header->caplen < header->len) {
fprintf(stderr, "[%8llu, %d] Captured packet size is smaller than packet size: %u < %u\n",
workflow->packets_captured, reader_thread->array_index, header->caplen, header->len);
}
}
/* process layer3 e.g. IPv4 / IPv6 */
if (ip != NULL && ip->version == 4) {
if (ip_size < sizeof(*ip)) {
fprintf(stderr, "[%8llu, %d] Packet smaller than IP4 header length: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(*ip));
return;
}
flow.l3_type = L3_IP;
if (ndpi_detection_get_l4((uint8_t*)ip, ip_size, &l4_ptr, &l4_len,
&flow.l4_protocol, NDPI_DETECTION_ONLY_IPV4) != 0)
{
fprintf(stderr, "[%8llu, %d] nDPI IPv4/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip));
return;
}
flow.ip_tuple.v4.src = ip->saddr;
flow.ip_tuple.v4.dst = ip->daddr;
uint32_t min_addr = (flow.ip_tuple.v4.src > flow.ip_tuple.v4.dst ?
flow.ip_tuple.v4.dst : flow.ip_tuple.v4.src);
thread_index = min_addr + ip->protocol;
} else if (ip6 != NULL) {
if (ip_size < sizeof(ip6->ip6_hdr)) {
fprintf(stderr, "[%8llu, %d] Packet smaller than IP6 header length: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size, sizeof(ip6->ip6_hdr));
return;
}
flow.l3_type = L3_IP6;
if (ndpi_detection_get_l4((uint8_t*)ip6, ip_size, &l4_ptr, &l4_len,
&flow.l4_protocol, NDPI_DETECTION_ONLY_IPV6) != 0)
{
fprintf(stderr, "[%8llu, %d] nDPI IPv6/L4 payload detection failed, L4 length: %zu\n",
workflow->packets_captured, reader_thread->array_index, ip_size - sizeof(*ip6));
return;
}
flow.ip_tuple.v6.src[0] = ip6->ip6_src.u6_addr.u6_addr64[0];
flow.ip_tuple.v6.src[1] = ip6->ip6_src.u6_addr.u6_addr64[1];
flow.ip_tuple.v6.dst[0] = ip6->ip6_dst.u6_addr.u6_addr64[0];
flow.ip_tuple.v6.dst[1] = ip6->ip6_dst.u6_addr.u6_addr64[1];
uint64_t min_addr[2];
if (flow.ip_tuple.v6.src[0] > flow.ip_tuple.v6.dst[0] &&
flow.ip_tuple.v6.src[1] > flow.ip_tuple.v6.dst[1])
{
min_addr[0] = flow.ip_tuple.v6.dst[0];
min_addr[1] = flow.ip_tuple.v6.dst[0];
} else {
min_addr[0] = flow.ip_tuple.v6.src[0];
min_addr[1] = flow.ip_tuple.v6.src[0];
}
thread_index = min_addr[0] + min_addr[1] + ip6->ip6_hdr.ip6_un1_nxt;
} else {
fprintf(stderr, "[%8llu, %d] Non IP/IPv6 protocol detected: 0x%X\n",
workflow->packets_captured, reader_thread->array_index, type);
return;
}
/* process layer4 e.g. TCP / UDP */
if (flow.l4_protocol == IPPROTO_TCP) {
const struct ndpi_tcphdr * tcp;
if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_tcphdr)) {
fprintf(stderr, "[%8llu, %d] Malformed TCP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index,
header->len, (l4_ptr - packet) + sizeof(struct ndpi_tcphdr));
return;
}
tcp = (struct ndpi_tcphdr *)l4_ptr;
flow.is_midstream_flow = (tcp->syn == 0 ? 1 : 0);
flow.flow_fin_ack_seen = (tcp->fin == 1 && tcp->ack == 1 ? 1 : 0);
flow.flow_ack_seen = tcp->ack;
flow.src_port = ntohs(tcp->source);
flow.dst_port = ntohs(tcp->dest);
} else if (flow.l4_protocol == IPPROTO_UDP) {
const struct ndpi_udphdr * udp;
if (header->len < (l4_ptr - packet) + sizeof(struct ndpi_udphdr)) {
fprintf(stderr, "[%8llu, %d] Malformed UDP packet, packet size smaller than expected: %u < %zu\n",
workflow->packets_captured, reader_thread->array_index,
header->len, (l4_ptr - packet) + sizeof(struct ndpi_udphdr));
return;
}
udp = (struct ndpi_udphdr *)l4_ptr;
flow.src_port = ntohs(udp->source);
flow.dst_port = ntohs(udp->dest);
}
/* distribute flows to threads while keeping stability (same flow goes always to same thread) */
thread_index += (flow.src_port < flow.dst_port ? flow.dst_port : flow.src_port);
thread_index %= reader_thread_count;
if (thread_index != reader_thread->array_index) {
return;
}
workflow->packets_processed++;
workflow->total_l4_data_len += l4_len;
#ifdef VERBOSE
print_packet_info(reader_thread, header, l4_len, &flow);
#endif
/* calculate flow hash for btree find, search(insert) */
if (flow.l3_type == L3_IP) {
if (ndpi_flowv4_flow_hash(flow.l4_protocol, flow.ip_tuple.v4.src, flow.ip_tuple.v4.dst,
flow.src_port, flow.dst_port, 0, 0,
(uint8_t *)&flow.hashval, sizeof(flow.hashval)) != 0)
{
flow.hashval = flow.ip_tuple.v4.src + flow.ip_tuple.v4.dst; // fallback
}
} else if (flow.l3_type == L3_IP6) {
if (ndpi_flowv6_flow_hash(flow.l4_protocol, &ip6->ip6_src, &ip6->ip6_dst,
flow.src_port, flow.dst_port, 0, 0,
(uint8_t *)&flow.hashval, sizeof(flow.hashval)) != 0)
{
flow.hashval = flow.ip_tuple.v6.src[0] + flow.ip_tuple.v6.src[1];
flow.hashval += flow.ip_tuple.v6.dst[0] + flow.ip_tuple.v6.dst[1];
}
}
flow.hashval += flow.l4_protocol + flow.src_port + flow.dst_port;
hashed_index = flow.hashval % workflow->max_active_flows;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
if (tree_result == NULL) {
/* flow not found in btree: switch src <-> dst and try to find it again */
uint32_t orig_src_ip[4] = { flow.ip_tuple.u32.src[0], flow.ip_tuple.u32.src[1],
flow.ip_tuple.u32.src[2], flow.ip_tuple.u32.src[3] };
uint32_t orig_dst_ip[4] = { flow.ip_tuple.u32.dst[0], flow.ip_tuple.u32.dst[1],
flow.ip_tuple.u32.dst[2], flow.ip_tuple.u32.dst[3] };
uint16_t orig_src_port = flow.src_port;
uint16_t orig_dst_port = flow.dst_port;
flow.ip_tuple.u32.src[0] = orig_dst_ip[0];
flow.ip_tuple.u32.src[1] = orig_dst_ip[1];
flow.ip_tuple.u32.src[2] = orig_dst_ip[2];
flow.ip_tuple.u32.src[3] = orig_dst_ip[3];
flow.ip_tuple.u32.dst[0] = orig_src_ip[0];
flow.ip_tuple.u32.dst[1] = orig_src_ip[1];
flow.ip_tuple.u32.dst[2] = orig_src_ip[2];
flow.ip_tuple.u32.dst[3] = orig_src_ip[3];
flow.src_port = orig_dst_port;
flow.dst_port = orig_src_port;
tree_result = ndpi_tfind(&flow, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp);
flow.ip_tuple.u32.src[0] = orig_src_ip[0];
flow.ip_tuple.u32.src[1] = orig_src_ip[1];
flow.ip_tuple.u32.src[2] = orig_src_ip[2];
flow.ip_tuple.u32.src[3] = orig_src_ip[3];
flow.ip_tuple.u32.dst[0] = orig_dst_ip[0];
flow.ip_tuple.u32.dst[1] = orig_dst_ip[1];
flow.ip_tuple.u32.dst[2] = orig_dst_ip[2];
flow.ip_tuple.u32.dst[3] = orig_dst_ip[3];
flow.src_port = orig_src_port;
flow.dst_port = orig_dst_port;
}
if (tree_result == NULL) {
/* flow still not found, must be new */
if (workflow->cur_active_flows == workflow->max_active_flows) {
fprintf(stderr, "[%8llu, %d] max flows to track reached: %llu, idle: %llu\n",
workflow->packets_captured, reader_thread->array_index,
workflow->max_active_flows, workflow->cur_idle_flows);
return;
}
flow_to_process = (struct nDPI_flow_info *)ndpi_malloc(sizeof(*flow_to_process));
if (flow_to_process == NULL) {
fprintf(stderr, "[%8llu, %d] Not enough memory for flow info\n",
workflow->packets_captured, reader_thread->array_index);
return;
}
memcpy(flow_to_process, &flow, sizeof(*flow_to_process));
flow_to_process->flow_id = __sync_fetch_and_add(&flow_id, 1);
flow_to_process->ndpi_flow = (struct ndpi_flow_struct *)ndpi_flow_malloc(SIZEOF_FLOW_STRUCT);
if (flow_to_process->ndpi_flow == NULL) {
fprintf(stderr, "[%8llu, %d, %4u] Not enough memory for flow struct\n",
workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
return;
}
memset(flow_to_process->ndpi_flow, 0, SIZEOF_FLOW_STRUCT);
printf("[%8llu, %d, %4u] new %sflow\n", workflow->packets_captured, thread_index,
flow_to_process->flow_id,
(flow_to_process->is_midstream_flow != 0 ? "midstream-" : ""));
if (ndpi_tsearch(flow_to_process, &workflow->ndpi_flows_active[hashed_index], ndpi_workflow_node_cmp) == NULL) {
/* Possible Leak, but should not happen as we'd abort earlier. */
return;
}
workflow->cur_active_flows++;
workflow->total_active_flows++;
} else {
flow_to_process = *(struct nDPI_flow_info **)tree_result;
}
flow_to_process->packets_processed++;
flow_to_process->total_l4_data_len += l4_len;
/* update timestamps, important for timeout handling */
if (flow_to_process->first_seen == 0) {
flow_to_process->first_seen = time_ms;
}
flow_to_process->last_seen = time_ms;
/* current packet is an TCP-ACK? */
flow_to_process->flow_ack_seen = flow.flow_ack_seen;
/* TCP-FIN: indicates that at least one side wants to end the connection */
if (flow.flow_fin_ack_seen != 0 && flow_to_process->flow_fin_ack_seen == 0) {
flow_to_process->flow_fin_ack_seen = 1;
printf("[%8llu, %d, %4u] end of flow\n", workflow->packets_captured, thread_index,
flow_to_process->flow_id);
return;
}
/*
* This example tries to use maximum supported packets for detection:
* for uint8: 0xFF
*/
if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFF) {
return;
} else if (flow_to_process->ndpi_flow->num_processed_pkts == 0xFE) {
/* last chance to guess something, better then nothing */
uint8_t protocol_was_guessed = 0;
flow_to_process->guessed_protocol =
ndpi_detection_giveup(workflow->ndpi_struct,
flow_to_process->ndpi_flow,
1, &protocol_was_guessed);
if (protocol_was_guessed != 0) {
printf("[%8llu, %d, %4d][GUESSED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.master_protocol),
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.app_protocol),
ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->guessed_protocol.category));
} else {
printf("[%8llu, %d, %4d][FLOW NOT CLASSIFIED]\n",
workflow->packets_captured, reader_thread->array_index, flow_to_process->flow_id);
}
}
flow_to_process->detected_l7_protocol =
ndpi_detection_process_packet(workflow->ndpi_struct, flow_to_process->ndpi_flow,
ip != NULL ? (uint8_t *)ip : (uint8_t *)ip6,
ip_size, time_ms, NULL);
if (ndpi_is_protocol_detected(workflow->ndpi_struct,
flow_to_process->detected_l7_protocol) != 0 &&
flow_to_process->detection_completed == 0)
{
if (flow_to_process->detected_l7_protocol.master_protocol != NDPI_PROTOCOL_UNKNOWN ||
flow_to_process->detected_l7_protocol.app_protocol != NDPI_PROTOCOL_UNKNOWN)
{
flow_to_process->detection_completed = 1;
workflow->detected_flow_protocols++;
printf("[%8llu, %d, %4d][DETECTED] protocol: %s | app protocol: %s | category: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.master_protocol),
ndpi_get_proto_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.app_protocol),
ndpi_category_get_name(workflow->ndpi_struct, flow_to_process->detected_l7_protocol.category));
}
}
if (flow_to_process->ndpi_flow->num_extra_packets_checked <=
flow_to_process->ndpi_flow->max_extra_packets_to_check)
{
/*
* Your business logic starts here.
*
* This example does print some information about
* TLS client and server hellos if available.
*
* You could also use nDPI's built-in json serialization
* and send it to a high-level application for further processing.
*
* EoE - End of Example
*/
if (flow_to_process->flow_info_printed == 0)
{
char const * const flow_info = ndpi_get_flow_info(flow_to_process->ndpi_flow, &flow_to_process->detected_l7_protocol);
if (flow_info != NULL)
{
printf("[%8llu, %d, %4d] info: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
flow_info);
flow_to_process->flow_info_printed = 1;
}
}
if (flow_to_process->detected_l7_protocol.master_protocol == NDPI_PROTOCOL_TLS ||
flow_to_process->detected_l7_protocol.app_protocol == NDPI_PROTOCOL_TLS)
{
if (flow_to_process->tls_client_hello_seen == 0 &&
flow_to_process->ndpi_flow->protos.tls_quic.hello_processed != 0)
{
uint8_t unknown_tls_version = 0;
char buf_ver[16];
printf("[%8llu, %d, %4d][TLS-CLIENT-HELLO] version: %s | sni: %s | alpn: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_ssl_version2str(buf_ver, sizeof(buf_ver),
flow_to_process->ndpi_flow->protos.tls_quic.ssl_version,
&unknown_tls_version),
flow_to_process->ndpi_flow->host_server_name,
(flow_to_process->ndpi_flow->protos.tls_quic.alpn != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.alpn : "-"));
flow_to_process->tls_client_hello_seen = 1;
}
if (flow_to_process->tls_server_hello_seen == 0 &&
flow_to_process->ndpi_flow->l4.tcp.tls.certificate_processed != 0)
{
uint8_t unknown_tls_version = 0;
char buf_ver[16];
printf("[%8llu, %d, %4d][TLS-SERVER-HELLO] version: %s | common-name(s): %.*s | "
"issuer: %s | subject: %s\n",
workflow->packets_captured,
reader_thread->array_index,
flow_to_process->flow_id,
ndpi_ssl_version2str(buf_ver, sizeof(buf_ver),
flow_to_process->ndpi_flow->protos.tls_quic.ssl_version,
&unknown_tls_version),
(flow_to_process->ndpi_flow->protos.tls_quic.server_names_len == 0 ?
1 : flow_to_process->ndpi_flow->protos.tls_quic.server_names_len),
(flow_to_process->ndpi_flow->protos.tls_quic.server_names == NULL ?
"-" : flow_to_process->ndpi_flow->protos.tls_quic.server_names),
(flow_to_process->ndpi_flow->protos.tls_quic.issuerDN != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.issuerDN : "-"),
(flow_to_process->ndpi_flow->protos.tls_quic.subjectDN != NULL ?
flow_to_process->ndpi_flow->protos.tls_quic.subjectDN : "-"));
flow_to_process->tls_server_hello_seen = 1;
}
}
}
}
static void run_pcap_loop(struct nDPI_reader_thread const * const reader_thread)
{
if (reader_thread->workflow != NULL &&
reader_thread->workflow->pcap_handle != NULL) {
if (pcap_loop(reader_thread->workflow->pcap_handle, -1,
&ndpi_process_packet, (uint8_t *)reader_thread) == PCAP_ERROR) {
fprintf(stderr, "Error while reading pcap file: '%s'\n",
pcap_geterr(reader_thread->workflow->pcap_handle));
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
}
}
}
static void break_pcap_loop(struct nDPI_reader_thread * const reader_thread)
{
if (reader_thread->workflow != NULL &&
reader_thread->workflow->pcap_handle != NULL)
{
pcap_breakloop(reader_thread->workflow->pcap_handle);
}
}
static void * processing_thread(void * const ndpi_thread_arg)
{
struct nDPI_reader_thread const * const reader_thread =
(struct nDPI_reader_thread *)ndpi_thread_arg;
printf("Starting Thread %d\n", reader_thread->array_index);
run_pcap_loop(reader_thread);
__sync_fetch_and_add(&reader_thread->workflow->error_or_eof, 1);
return NULL;
}
static int processing_threads_error_or_eof(void)
{
for (int i = 0; i < reader_thread_count; ++i) {
if (__sync_fetch_and_add(&reader_threads[i].workflow->error_or_eof, 0) == 0) {
return 0;
}
}
return 1;
}
static int start_reader_threads(void)
{
#ifndef WIN32
sigset_t thread_signal_set, old_signal_set;
sigfillset(&thread_signal_set);
sigdelset(&thread_signal_set, SIGINT);
sigdelset(&thread_signal_set, SIGTERM);
if (pthread_sigmask(SIG_BLOCK, &thread_signal_set, &old_signal_set) != 0) {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
#endif
for (int i = 0; i < reader_thread_count; ++i) {
reader_threads[i].array_index = i;
if (reader_threads[i].workflow == NULL) {
/* no more threads should be started */
break;
}
if (pthread_create(&reader_threads[i].thread_id, NULL,
processing_thread, &reader_threads[i]) != 0)
{
fprintf(stderr, "pthread_create: %s\n", strerror(errno));
return 1;
}
}
if (pthread_sigmask(SIG_BLOCK, &old_signal_set, NULL) != 0) {
fprintf(stderr, "pthread_sigmask: %s\n", strerror(errno));
return 1;
}
return 0;
}
static int stop_reader_threads(void)
{
unsigned long long int total_packets_captured = 0;
unsigned long long int total_packets_processed = 0;
unsigned long long int total_l4_data_len = 0;
unsigned long long int total_flows_captured = 0;
unsigned long long int total_flows_idle = 0;
unsigned long long int total_flows_detected = 0;
for (int i = 0; i < reader_thread_count; ++i) {
break_pcap_loop(&reader_threads[i]);
}
printf("------------------------------------ Stopping reader threads\n");
for (int i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {
continue;
}
if (pthread_join(reader_threads[i].thread_id, NULL) != 0) {
fprintf(stderr, "pthread_join: %s\n", strerror(errno));
}
total_packets_processed += reader_threads[i].workflow->packets_processed;
total_l4_data_len += reader_threads[i].workflow->total_l4_data_len;
total_flows_captured += reader_threads[i].workflow->total_active_flows;
total_flows_idle += reader_threads[i].workflow->total_idle_flows;
total_flows_detected += reader_threads[i].workflow->detected_flow_protocols;
printf("Stopping Thread %d, processed %10llu packets, %12llu bytes, total flows: %8llu, "
"idle flows: %8llu, detected flows: %8llu\n",
reader_threads[i].array_index, reader_threads[i].workflow->packets_processed,
reader_threads[i].workflow->total_l4_data_len, reader_threads[i].workflow->total_active_flows,
reader_threads[i].workflow->total_idle_flows, reader_threads[i].workflow->detected_flow_protocols);
}
/* total packets captured: same value for all threads as packet2thread distribution happens later */
total_packets_captured = reader_threads[0].workflow->packets_captured;
for (int i = 0; i < reader_thread_count; ++i) {
if (reader_threads[i].workflow == NULL) {
continue;
}
free_workflow(&reader_threads[i].workflow);
}
printf("Total packets captured.: %llu\n", total_packets_captured);
printf("Total packets processed: %llu\n", total_packets_processed);
printf("Total layer4 data size.: %llu\n", total_l4_data_len);
printf("Total flows captured...: %llu\n", total_flows_captured);
printf("Total flows timed out..: %llu\n", total_flows_idle);
printf("Total flows detected...: %llu\n", total_flows_detected);
return 0;
}
static void sighandler(int signum)
{
fprintf(stderr, "Received SIGNAL %d\n", signum);
if (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0) {
__sync_fetch_and_add(&main_thread_shutdown, 1);
} else {
fprintf(stderr, "Reader threads are already shutting down, please be patient.\n");
}
}
int main(int argc, char ** argv)
{
if (argc == 0) {
printf("usage: ndpiSimpleIntegration Mdevice name>\n");
return 1;
}
printf("usage: %s [PCAP-FILE-OR-INTERFACE]\n"
"----------------------------------\n"
"nDPI version: %s\n"
" API version: %u\n"
"libgcrypt...: %s\n"
"----------------------------------\n",
argv[0],
ndpi_revision(), ndpi_get_api_version(),
(ndpi_get_gcrypt_version() == NULL ? "-" : ndpi_get_gcrypt_version()));
if (setup_reader_threads((argc >= 2 ? argv[1] : NULL)) != 0) {
fprintf(stderr, "%s: setup_reader_threads failed\n", argv[0]);
return 1;
}
if (start_reader_threads() != 0) {
fprintf(stderr, "%s: start_reader_threads\n", argv[0]);
return 1;
}
signal(SIGINT, sighandler);
signal(SIGTERM, sighandler);
while (__sync_fetch_and_add(&main_thread_shutdown, 0) == 0 && processing_threads_error_or_eof() == 0) {
sleep(1);
}
if (stop_reader_threads() != 0) {
fprintf(stderr, "%s: stop_reader_threads\n", argv[0]);
return 1;
}
return 0;
}