json2tlv now sends multiple records per message using the end_of_frame delimiter

This commit is contained in:
Alfredo Cardigliano 2019-07-17 18:01:25 +02:00
parent b62f7e2113
commit e2001d69c6

View file

@ -109,6 +109,8 @@ void json_to_tlv(json_object * jobj, ndpi_serializer *serializer) {
if (rc == -1)
printf("Serialization error: %d\n", rc);
}
ndpi_serialize_end_of_record(serializer);
}
/* *************************************** */
@ -126,12 +128,13 @@ int main(int argc, char *argv[]) {
void *zmq_sock = NULL;
void *zmq_context = NULL;
int enc_repeat = 1, dec_repeat = 1;
int batch_size = 32;
int verbose = 0;
struct timeval t1, t2;
uint64_t total_time_usec;
ndpi_serializer *serializer;
ndpi_serializer deserializer;
int rc, i, num_elements, exported_flows = 0;
int rc, i, j, num_records, tlv_msgs = 0, exported_msgs = 0;
char c;
while ((c = getopt(argc, argv,"hi:vz:E:D:")) != '?') {
@ -207,19 +210,20 @@ int main(int argc, char *argv[]) {
f = json_tokener_parse_verbose(buffer, &jerr);
if (json_object_get_type(f) == json_type_array)
num_elements = json_object_array_length(f);
num_records = json_object_array_length(f);
else
num_elements = 1;
num_records = 1;
printf("%u records found\n", num_elements);
printf("%u records found\n", num_records);
free(buffer);
/* nDPI TLV Serialization */
serializer = (ndpi_serializer *) calloc(num_elements, sizeof(ndpi_serializer));
tlv_msgs = (num_records/batch_size)+1;
serializer = (ndpi_serializer *) calloc(tlv_msgs, sizeof(ndpi_serializer));
for (i = 0; i < num_elements; i++)
for (i = 0; i < tlv_msgs; i++)
ndpi_init_serializer(&serializer[i], ndpi_serialization_format_tlv);
total_time_usec = 0;
@ -228,18 +232,30 @@ int main(int argc, char *argv[]) {
gettimeofday(&t1, NULL);
if(json_object_get_type(f) == json_type_array) {
for(i = 0; i < num_elements; i++) {
ndpi_reset_serializer(&serializer[i]);
json_to_tlv(json_object_array_get_idx(f, i), &serializer[i]);
/* Converting from JSON to TLV records */
tlv_msgs = 0;
if (json_object_get_type(f) == json_type_array) {
i = 0;
while (i < num_records) {
ndpi_reset_serializer(&serializer[tlv_msgs]);
j = 0;
while (i < num_records && j < batch_size) {
json_to_tlv(json_object_array_get_idx(f, i), &serializer[tlv_msgs]);
j++, i++;
}
tlv_msgs++;
}
} else {
ndpi_reset_serializer(&serializer[0]);
json_to_tlv(f, &serializer[0]);
ndpi_reset_serializer(&serializer[tlv_msgs]);
json_to_tlv(f, &serializer[tlv_msgs]);
tlv_msgs++;
}
/* Sending TLV records over ZMQ */
if (zmq_sock) {
for(i = 0; i < num_elements; i++) {
for(i = 0; i < tlv_msgs; i++) {
struct zmq_msg_hdr msg_hdr;
strncpy(msg_hdr.url, "flow", sizeof(msg_hdr.url));
msg_hdr.version = 3;
@ -247,7 +263,7 @@ int main(int argc, char *argv[]) {
zmq_send(zmq_sock, &msg_hdr, sizeof(msg_hdr), ZMQ_SNDMORE);
rc = zmq_send(zmq_sock, serializer[i].buffer, msg_hdr.size, 0);
if (rc > 0)
exported_flows++;
exported_msgs++;
}
}
@ -268,7 +284,10 @@ int main(int argc, char *argv[]) {
gettimeofday(&t1, NULL);
for (i = 0; i < num_elements; i++) {
for (i = 0; i < tlv_msgs; i++) {
if (verbose) printf("\n[Message %u]\n\n", i);
rc = ndpi_init_deserializer(&deserializer, &serializer[i]);
if (rc == -1) {
@ -312,15 +331,21 @@ int main(int argc, char *argv[]) {
ks.str[ks.str_len] = bkpk;
break;
case ndpi_serialization_end_of_record:
ndpi_deserialize_end_of_record(&deserializer);
if (verbose) printf("EOR\n");
break;
default:
goto close_record;
if (verbose) printf("Unsupported type %u\n", et);
goto close_message;
break;
}
}
close_record:
close_message:
if (verbose) printf("\n");
if (verbose) printf("\n---\n");
}
gettimeofday(&t2, NULL);
@ -331,9 +356,9 @@ int main(int argc, char *argv[]) {
printf("Deserialization perf: %.3f msec total time for %u iterations\n", (double) total_time_usec/1000, dec_repeat);
if (zmq_sock)
printf("%u total flows exported over ZMQ\n", exported_flows);
printf("%u messages (max %u records each) sent over ZMQ\n", exported_msgs, batch_size);
for (i = 0; i < num_elements; i++)
for (i = 0; i < tlv_msgs; i++)
ndpi_term_serializer(&serializer[i]);
if (zmq_sock != NULL) zmq_close(zmq_sock);