Reworked nDPIsrvd.h C-API.

* nDPIsrvd.h: Provide nDPId thread storage.
 * nDPIsrvd.py: Fixed instance cleanup bug.
 * nDPIsrvd.h: Support for instance/thread user data and cleanup callback.
 * nDPIsrvd.h: Most recent flow time stored in thread ht instead of instance ht.
 * nDPId: Moved flow logger out the memory profilier into SIGUSR1 signal handling.
 * nDPId: Added signal fd to be usable within epoll's event handling (live-capture only!)
 * nDPId: Added information about ZLib compressions to daemon status/shutdown events.

Signed-off-by: Toni Uhlig <matzeton@googlemail.com>
This commit is contained in:
Toni Uhlig 2022-03-21 15:56:01 +01:00
parent daaaa61519
commit c0b7bdacbc
No known key found for this signature in database
GPG key ID: 22C5333D922537D2
273 changed files with 1911 additions and 1493 deletions

View file

@ -53,7 +53,7 @@ enum nDPIsrvd_read_return
{
READ_OK = CONNECT_LAST_ENUM_VALUE,
READ_PEER_DISCONNECT,
READ_ERROR,
READ_ERROR, /* check for errno */
READ_LAST_ENUM_VALUE
};
@ -118,18 +118,28 @@ struct nDPIsrvd_flow
{
nDPIsrvd_hashkey flow_key;
nDPIsrvd_ull id_as_ull;
nDPIsrvd_hashkey thread_id;
nDPIsrvd_ull last_seen;
nDPIsrvd_ull idle_time;
UT_hash_handle hh;
uint8_t flow_user_data[0];
};
struct nDPIsrvd_thread_data
{
nDPIsrvd_hashkey thread_key;
nDPIsrvd_ull most_recent_flow_time;
UT_hash_handle hh;
uint8_t thread_user_data[0];
};
struct nDPIsrvd_instance
{
nDPIsrvd_hashkey alias_source_key;
nDPIsrvd_ull most_recent_flow_time;
struct nDPIsrvd_flow * flow_table;
struct nDPIsrvd_thread_data * thread_data_table;
UT_hash_handle hh;
uint8_t instance_user_data[0];
};
struct nDPIsrvd_json_token
@ -154,9 +164,14 @@ extern void nDPIsrvd_memprof_log(char const * const format, ...);
typedef enum nDPIsrvd_callback_return (*json_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow);
typedef void (*instance_cleanup_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
enum nDPIsrvd_cleanup_reason reason);
typedef void (*flow_cleanup_callback)(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow,
enum nDPIsrvd_cleanup_reason reason);
@ -198,9 +213,12 @@ struct nDPIsrvd_socket
int fd;
struct nDPIsrvd_address address;
size_t instance_user_data_size;
size_t thread_user_data_size;
size_t flow_user_data_size;
struct nDPIsrvd_instance * instance_table;
json_callback json_callback;
instance_cleanup_callback instance_cleanup_callback;
flow_cleanup_callback flow_cleanup_callback;
struct nDPIsrvd_buffer buffer;
@ -373,8 +391,11 @@ static inline void nDPIsrvd_buffer_free(struct nDPIsrvd_buffer * const buffer)
}
static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_data_size,
size_t instance_user_data_size,
size_t thread_user_data_size,
size_t flow_user_data_size,
json_callback json_cb,
instance_cleanup_callback instance_cleanup_cb,
flow_cleanup_callback flow_cleanup_callback_cb)
{
static const UT_icd packet_data_icd = {sizeof(struct nDPIsrvd_json_token), NULL, NULL, NULL};
@ -393,9 +414,13 @@ static inline struct nDPIsrvd_socket * nDPIsrvd_socket_init(size_t global_user_d
goto error;
}
sock->address.raw.sa_family = -1;
sock->instance_user_data_size = instance_user_data_size;
sock->thread_user_data_size = thread_user_data_size;
sock->flow_user_data_size = flow_user_data_size;
sock->json_callback = json_cb;
sock->instance_cleanup_callback = instance_cleanup_cb;
sock->flow_cleanup_callback = flow_cleanup_callback_cb;
utarray_new(sock->json.tokens, &packet_data_icd);
@ -417,34 +442,72 @@ error:
static inline void nDPIsrvd_cleanup_flow(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const flow,
enum nDPIsrvd_cleanup_reason reason)
{
if (sock->flow_cleanup_callback != NULL)
{
sock->flow_cleanup_callback(sock, instance, flow, reason);
sock->flow_cleanup_callback(sock, instance, thread_data, flow, reason);
}
HASH_DEL(instance->flow_table, flow);
nDPIsrvd_free(flow);
}
static inline void nDPIsrvd_cleanup_flows(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
enum nDPIsrvd_cleanup_reason reason)
{
struct nDPIsrvd_flow * current_flow;
struct nDPIsrvd_flow * ftmp;
if (instance->flow_table != NULL)
{
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Cleaning up flows for instance 0x%x and thread %d.",
instance->alias_source_key,
thread_data->thread_key);
#endif
HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
{
if (current_flow->thread_id == thread_data->thread_key)
{
nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, reason);
}
}
}
}
static inline void nDPIsrvd_cleanup_instance(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
enum nDPIsrvd_cleanup_reason reason)
{
struct nDPIsrvd_flow * current_flow;
struct nDPIsrvd_flow * ftmp;
struct nDPIsrvd_thread_data * current_thread_data;
struct nDPIsrvd_thread_data * ttmp;
if (instance != NULL)
{
if (instance->flow_table != NULL)
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Cleaning up instance 0x%x.", instance->alias_source_key);
#endif
if (sock->instance_cleanup_callback != NULL)
{
HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
{
nDPIsrvd_cleanup_flow(sock, instance, current_flow, reason);
}
instance->flow_table = NULL;
sock->instance_cleanup_callback(sock, instance, reason);
}
if (instance->thread_data_table != NULL)
{
HASH_ITER(hh, instance->thread_data_table, current_thread_data, ttmp)
{
nDPIsrvd_cleanup_flows(sock, instance, current_thread_data, reason);
HASH_DEL(instance->thread_data_table, current_thread_data);
nDPIsrvd_free(current_thread_data);
}
instance->thread_data_table = NULL;
}
HASH_DEL(sock->instance_table, instance);
nDPIsrvd_free(instance);
}
@ -481,6 +544,7 @@ static inline void nDPIsrvd_socket_free(struct nDPIsrvd_socket ** const sock)
nDPIsrvd_cleanup_instance(*sock, current_instance, CLEANUP_REASON_APP_SHUTDOWN);
}
(*sock)->instance_table = NULL;
nDPIsrvd_buffer_free(&(*sock)->buffer);
nDPIsrvd_free(*sock);
@ -582,9 +646,7 @@ static inline enum nDPIsrvd_read_return nDPIsrvd_read(struct nDPIsrvd_socket * c
return READ_OK;
}
ssize_t bytes_read = read(sock->fd,
sock->buffer.ptr.raw + sock->buffer.used,
sock->buffer.max - sock->buffer.used);
ssize_t bytes_read = read(sock->fd, sock->buffer.ptr.raw + sock->buffer.used, sock->buffer.max - sock->buffer.used);
if (bytes_read == 0)
{
@ -799,7 +861,7 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s
if (instance == NULL)
{
instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance));
instance = (struct nDPIsrvd_instance *)nDPIsrvd_calloc(1, sizeof(*instance) + sock->instance_user_data_size);
if (instance == NULL)
{
return NULL;
@ -820,24 +882,100 @@ static inline struct nDPIsrvd_instance * nDPIsrvd_get_instance(struct nDPIsrvd_s
return instance;
}
static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance ** const instance)
static inline struct nDPIsrvd_thread_data * nDPIsrvd_get_thread_data(
struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_json_token const * const thread_id_token,
struct nDPIsrvd_json_token const * const ts_msec_token)
{
struct nDPIsrvd_flow * flow;
struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"),
TOKEN_GET_SZ(sock, "source"),
TOKEN_GET_SZ(sock, "flow_id"),
TOKEN_GET_SZ(sock, "thread_ts_msec"),
TOKEN_GET_SZ(sock, "flow_last_seen"),
TOKEN_GET_SZ(sock, "flow_idle_time")};
nDPIsrvd_hashkey flow_key;
struct nDPIsrvd_thread_data * thread_data;
nDPIsrvd_hashkey thread_id;
*instance = nDPIsrvd_get_instance(sock, tokens[0], tokens[1]);
if (*instance == NULL || nDPIsrvd_build_flow_key(tokens[2], &flow_key) != 0)
if (thread_id_token == NULL)
{
return NULL;
}
{
nDPIsrvd_ull thread_key;
TOKEN_VALUE_TO_ULL(thread_id_token, &thread_key);
thread_id = thread_key;
}
HASH_FIND_INT(instance->thread_data_table, &thread_id, thread_data);
if (thread_data == NULL)
{
thread_data =
(struct nDPIsrvd_thread_data *)nDPIsrvd_calloc(1, sizeof(*thread_data) + sock->thread_user_data_size);
if (thread_data == NULL)
{
return NULL;
}
thread_data->thread_key = thread_id;
HASH_ADD_INT(instance->thread_data_table, thread_key, thread_data);
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Thread Data %d added: %zu bytes.",
thread_data->thread_key,
sizeof(*thread_data) + sock->thread_user_data_size);
#endif
}
if (ts_msec_token != NULL)
{
nDPIsrvd_ull thread_ts_msec;
TOKEN_VALUE_TO_ULL(ts_msec_token, &thread_ts_msec);
if (thread_ts_msec > thread_data->most_recent_flow_time)
{
thread_data->most_recent_flow_time = thread_ts_msec;
}
}
return thread_data;
}
static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance ** const instance,
struct nDPIsrvd_thread_data ** const thread_data)
{
struct nDPIsrvd_flow * flow;
struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "alias"),
TOKEN_GET_SZ(sock, "source"),
TOKEN_GET_SZ(sock, "thread_id"),
TOKEN_GET_SZ(sock, "flow_id"),
TOKEN_GET_SZ(sock, "thread_ts_msec"),
TOKEN_GET_SZ(sock, "flow_last_seen"),
TOKEN_GET_SZ(sock, "flow_idle_time")};
enum
{
TOKEN_ALIAS = 0,
TOKEN_SOURCE,
TOKEN_THREAD_ID,
TOKEN_FLOW_ID,
TOKEN_THREAD_TS_MSEC,
TOKEN_FLOW_LAST_SEEN,
TOKEN_FLOW_IDLE_TIME
};
nDPIsrvd_hashkey flow_key;
*instance = nDPIsrvd_get_instance(sock, tokens[TOKEN_ALIAS], tokens[TOKEN_SOURCE]);
if (*instance == NULL)
{
return NULL;
}
*thread_data = nDPIsrvd_get_thread_data(sock, *instance, tokens[TOKEN_THREAD_ID], tokens[TOKEN_THREAD_TS_MSEC]);
if (*thread_data == NULL)
{
return NULL;
}
if (nDPIsrvd_build_flow_key(tokens[TOKEN_FLOW_ID], &flow_key) != 0)
{
return NULL;
}
HASH_FIND_INT((*instance)->flow_table, &flow_key, flow);
if (flow == NULL)
@ -849,34 +987,26 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
}
flow->flow_key = flow_key;
TOKEN_VALUE_TO_ULL(tokens[2], &flow->id_as_ull);
flow->thread_id = (*thread_data)->thread_key;
TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_ID], &flow->id_as_ull);
HASH_ADD_INT((*instance)->flow_table, flow_key, flow);
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Flow %llu added: %zu bytes.", flow->id_as_ull, sizeof(*flow) + sock->flow_user_data_size);
#endif
}
if (tokens[3] != NULL)
{
nDPIsrvd_ull thread_ts_msec;
TOKEN_VALUE_TO_ULL(tokens[3], &thread_ts_msec);
if (thread_ts_msec > (*instance)->most_recent_flow_time)
{
(*instance)->most_recent_flow_time = thread_ts_msec;
}
}
if (tokens[4] != NULL)
if (tokens[TOKEN_FLOW_LAST_SEEN] != NULL)
{
nDPIsrvd_ull flow_last_seen;
TOKEN_VALUE_TO_ULL(tokens[4], &flow_last_seen);
TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_LAST_SEEN], &flow_last_seen);
flow->last_seen = flow_last_seen;
}
if (tokens[5] != NULL)
if (tokens[TOKEN_FLOW_IDLE_TIME] != NULL)
{
nDPIsrvd_ull flow_idle_time = 0;
TOKEN_VALUE_TO_ULL(tokens[5], &flow_idle_time);
nDPIsrvd_ull flow_idle_time;
TOKEN_VALUE_TO_ULL(tokens[TOKEN_FLOW_IDLE_TIME], &flow_idle_time);
flow->idle_time = flow_idle_time;
}
@ -885,27 +1015,39 @@ static inline struct nDPIsrvd_flow * nDPIsrvd_get_flow(struct nDPIsrvd_socket *
static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
struct nDPIsrvd_instance * const instance,
struct nDPIsrvd_thread_data * const thread_data,
struct nDPIsrvd_flow * const current_flow)
{
if (instance == NULL || current_flow == NULL)
struct nDPIsrvd_json_token const * const tokens[] = {TOKEN_GET_SZ(sock, "daemon_event_name"),
TOKEN_GET_SZ(sock, "flow_event_name")};
enum
{
TOKEN_DAEMON_EVENT_NAME = 0,
TOKEN_FLOW_EVENT_NAME
};
if (instance == NULL)
{
return 0;
}
struct nDPIsrvd_json_token const * const daemon_event_name = TOKEN_GET_SZ(sock, "daemon_event_name");
if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "init") != 0)
if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "init") != 0)
{
nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_INIT);
nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_INIT);
}
if (TOKEN_VALUE_EQUALS_SZ(daemon_event_name, "shutdown") != 0)
if (TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_DAEMON_EVENT_NAME], "shutdown") != 0)
{
nDPIsrvd_cleanup_instance(sock, instance, CLEANUP_REASON_DAEMON_SHUTDOWN);
nDPIsrvd_cleanup_flows(sock, instance, thread_data, CLEANUP_REASON_DAEMON_SHUTDOWN);
}
if (current_flow == NULL)
{
return 0;
}
struct nDPIsrvd_json_token const * const flow_event_name = TOKEN_GET_SZ(sock, "flow_event_name");
int is_idle_flow;
if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(flow_event_name, "idle")) != 0 ||
TOKEN_VALUE_EQUALS_SZ(flow_event_name, "end") != 0)
if ((is_idle_flow = TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "idle")) != 0 ||
TOKEN_VALUE_EQUALS_SZ(tokens[TOKEN_FLOW_EVENT_NAME], "end") != 0)
{
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Flow %llu deleted: %zu bytes.",
@ -914,19 +1056,25 @@ static inline int nDPIsrvd_check_flow_end(struct nDPIsrvd_socket * const sock,
#endif
nDPIsrvd_cleanup_flow(sock,
instance,
thread_data,
current_flow,
(is_idle_flow != 0 ? CLEANUP_REASON_FLOW_IDLE : CLEANUP_REASON_FLOW_END));
}
else if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
else if (thread_data != NULL &&
current_flow->last_seen + current_flow->idle_time < thread_data->most_recent_flow_time)
{
#ifdef ENABLE_MEMORY_PROFILING
nDPIsrvd_memprof_log("Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: [%llu]",
current_flow->id_as_ull,
sizeof(*current_flow) + sock->flow_user_data_size,
current_flow->last_seen, current_flow->idle_time, instance->most_recent_flow_time,
instance->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time));
nDPIsrvd_memprof_log(
"Flow %llu timed out: %zu bytes. Last seen [%llu] + idle time [%llu] < most recent flow time [%llu]. Diff: "
"[%llu]",
current_flow->id_as_ull,
sizeof(*current_flow) + sock->flow_user_data_size,
current_flow->last_seen,
current_flow->idle_time,
thread_data->most_recent_flow_time,
thread_data->most_recent_flow_time - (current_flow->last_seen + current_flow->idle_time));
#endif
nDPIsrvd_cleanup_flow(sock, instance, current_flow, CLEANUP_REASON_FLOW_TIMEOUT);
nDPIsrvd_cleanup_flow(sock, instance, thread_data, current_flow, CLEANUP_REASON_FLOW_TIMEOUT);
}
return 0;
@ -1068,13 +1216,14 @@ static inline enum nDPIsrvd_parse_return nDPIsrvd_parse_all(struct nDPIsrvd_sock
}
struct nDPIsrvd_instance * instance = NULL;
struct nDPIsrvd_thread_data * thread_data = NULL;
struct nDPIsrvd_flow * flow = NULL;
flow = nDPIsrvd_get_flow(sock, &instance);
if (ret == PARSE_OK && sock->json_callback(sock, instance, flow) != CALLBACK_OK)
flow = nDPIsrvd_get_flow(sock, &instance, &thread_data);
if (ret == PARSE_OK && sock->json_callback(sock, instance, thread_data, flow) != CALLBACK_OK)
{
ret = PARSE_JSON_CALLBACK_ERROR;
}
if (nDPIsrvd_check_flow_end(sock, instance, flow) != 0)
if (nDPIsrvd_check_flow_end(sock, instance, thread_data, flow) != 0)
{
ret = PARSE_FLOW_MGMT_ERROR;
}
@ -1168,7 +1317,9 @@ static inline void nDPIsrvd_uthash_free(void * const freeable, size_t const size
#endif
static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instance,
void (*verify_cb)(struct nDPIsrvd_flow const *, void * user_data),
void (*verify_cb)(struct nDPIsrvd_thread_data const * const,
struct nDPIsrvd_flow const *,
void * user_data),
void * user_data)
{
int retval = 0;
@ -1177,11 +1328,30 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc
HASH_ITER(hh, instance->flow_table, current_flow, ftmp)
{
if (current_flow->last_seen + current_flow->idle_time < instance->most_recent_flow_time)
struct nDPIsrvd_thread_data * current_thread_data;
HASH_FIND_INT(instance->thread_data_table, &current_flow->thread_id, current_thread_data);
if (current_thread_data == NULL)
{
if (verify_cb != NULL)
{
verify_cb(current_flow, user_data);
verify_cb(current_thread_data, current_flow, user_data);
}
retval = 1;
}
else if (current_flow->thread_id != current_thread_data->thread_key)
{
if (verify_cb != NULL)
{
verify_cb(current_thread_data, current_flow, user_data);
}
retval = 1;
}
else if (current_flow->last_seen + current_flow->idle_time < current_thread_data->most_recent_flow_time)
{
if (verify_cb != NULL)
{
verify_cb(current_thread_data, current_flow, user_data);
}
retval = 1;
}
@ -1191,11 +1361,16 @@ static inline int nDPIsrvd_verify_flows(struct nDPIsrvd_instance * const instanc
}
static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock,
void (*info_cb)(struct nDPIsrvd_flow const *, void * user_data),
void (*info_cb)(struct nDPIsrvd_socket const *,
struct nDPIsrvd_instance const *,
struct nDPIsrvd_thread_data const *,
struct nDPIsrvd_flow const *,
void *),
void * user_data)
{
struct nDPIsrvd_instance const * current_instance;
struct nDPIsrvd_instance const * itmp;
struct nDPIsrvd_thread_data * current_thread_data;
struct nDPIsrvd_flow const * current_flow;
struct nDPIsrvd_flow const * ftmp;
@ -1207,7 +1382,8 @@ static inline void nDPIsrvd_flow_info(struct nDPIsrvd_socket const * const sock,
{
HASH_ITER(hh, current_instance->flow_table, current_flow, ftmp)
{
info_cb(current_flow, user_data);
HASH_FIND_INT(current_instance->thread_data_table, &current_flow->thread_id, current_thread_data);
info_cb(sock, current_instance, current_thread_data, current_flow, user_data);
}
}
}