diff --git a/include/ZMQParserInterface.h b/include/ZMQParserInterface.h index 80e377286e..a4fc470b31 100755 --- a/include/ZMQParserInterface.h +++ b/include/ZMQParserInterface.h @@ -44,8 +44,8 @@ class ZMQParserInterface : public ParserInterface { void preprocessFlow(ParsedFlow *flow, NetworkInterface *iface); bool getKeyId(char *sym, u_int32_t * const pen, u_int32_t * const field) const; void addMapping(const char *sym, u_int32_t num, u_int32_t pen = 0); - bool parsePENZeroField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int32_t ivalue) const; - bool parsePENNtopField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int32_t ivalue) const; + bool parsePENZeroField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int64_t ivalue) const; + bool parsePENNtopField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int64_t ivalue) const; static bool parseContainerInfo(json_object *jo, ContainerInfo * const container_info); bool parseNProbeMiniField(ParsedFlow * const flow, const char * const key, const char * const value, json_object * const jvalue) const; void parseSingleJSONFlow(json_object *o, u_int8_t source_id, NetworkInterface *iface); diff --git a/src/ZMQCollectorInterface.cpp b/src/ZMQCollectorInterface.cpp index 12cb4d6342..b17506c506 100644 --- a/src/ZMQCollectorInterface.cpp +++ b/src/ZMQCollectorInterface.cpp @@ -188,7 +188,11 @@ void ZMQCollectorInterface::collect_flows() { /* Legacy version */ msg_id = 0, source_id = 0; publisher_version = h0.version; - } else if((size != sizeof(struct zmq_msg_hdr)) || ((h->version != ZMQ_MSG_VERSION) && (h->version != ZMQ_COMPATIBILITY_MSG_VERSION))) { + } else if(size != sizeof(struct zmq_msg_hdr) || ( + h->version != ZMQ_MSG_VERSION && + h->version != ZMQ_MSG_VERSION_TLV && + h->version != ZMQ_COMPATIBILITY_MSG_VERSION + )) { ntop->getTrace()->traceEvent(TRACE_WARNING, "Unsupported publisher version: your nProbe sender is outdated? [%u][%u][%u][%u][%u]", size, sizeof(struct zmq_msg_hdr), h->version, ZMQ_MSG_VERSION, ZMQ_COMPATIBILITY_MSG_VERSION); diff --git a/src/ZMQParserInterface.cpp b/src/ZMQParserInterface.cpp index c6c5ef4754..ee35be3773 100755 --- a/src/ZMQParserInterface.cpp +++ b/src/ZMQParserInterface.cpp @@ -356,7 +356,7 @@ u_int8_t ZMQParserInterface::parseEvent(const char * const payload, int payload_ /* **************************************************** */ -bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int32_t ivalue) const { +bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int64_t ivalue) const { IpAddress ip_aux; /* used to check empty IPs */ switch(field) { @@ -414,7 +414,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t fi if (value) flow->src_port = htons(atoi(value)); else - flow->src_port = htons(ivalue); + flow->src_port = htons((u_int32_t) ivalue); } break; case L4_DST_PORT: @@ -422,7 +422,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t fi if (value) flow->dst_port = htons(atoi(value)); else - flow->dst_port = htons(ivalue); + flow->dst_port = htons((u_int32_t) ivalue); } break; case SRC_VLAN: @@ -547,7 +547,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t fi if (value) flow->src_port = htons(atoi(value)); else - flow->src_port = htons(ivalue); + flow->src_port = htons((u_int16_t) ivalue); } break; case POST_NAPT_DST_TRANSPORT_PORT: @@ -555,7 +555,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t fi if (value) flow->dst_port = htons(atoi(value)); else - flow->dst_port = htons(ivalue); + flow->dst_port = htons((u_int16_t) ivalue); } break; case INGRESS_VRFID: @@ -582,7 +582,7 @@ bool ZMQParserInterface::parsePENZeroField(ParsedFlow * const flow, u_int32_t fi /* **************************************************** */ -bool ZMQParserInterface::parsePENNtopField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int32_t ivalue) const { +bool ZMQParserInterface::parsePENNtopField(ParsedFlow * const flow, u_int32_t field, const char * const value, u_int64_t ivalue) const { switch(field) { case L7_PROTO: if (value) { @@ -988,6 +988,7 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, while ((et = ndpi_deserialize_get_nextitem_type(deserializer)) != ndpi_serialization_unknown) { u_int32_t pen = 0, key_id; u_int32_t v32 = 0; + u_int64_t v64 = 0; ndpi_string key, vs; char key_str[64]; u_int8_t vbkp, kbkp; @@ -997,9 +998,15 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, switch(et) { case ndpi_serialization_uint32_uint32: ndpi_deserialize_uint32_uint32(deserializer, &key_id, &v32); + v64 = v32; if (debug_tlv) printf("%u=%u ", key_id, v32); break; + case ndpi_serialization_uint32_uint64: + ndpi_deserialize_uint32_uint64(deserializer, &key_id, &v64); + if (debug_tlv) printf("%u=%llu ", key_id, (unsigned long long) v64); + break; + case ndpi_serialization_uint32_string: ndpi_deserialize_uint32_string(deserializer, &key_id, &vs); vbkp = vs.str[vs.str_len]; @@ -1008,6 +1015,25 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, if (debug_tlv) printf("%u='%s' ", key_id, vs.str); break; + case ndpi_serialization_string_uint32: + ndpi_deserialize_string_uint32(deserializer, &key, &v32); + v64 = v32; + kbkp = key.str[key.str_len]; + key.str[key.str_len] = '\0'; + getKeyId(key.str, &pen, &key_id); + key_is_string = true; + if (debug_tlv) printf("%s=%u ", key.str, v32); + break; + + case ndpi_serialization_string_uint64: + ndpi_deserialize_string_uint64(deserializer, &key, &v64); + kbkp = key.str[key.str_len]; + key.str[key.str_len] = '\0'; + getKeyId(key.str, &pen, &key_id); + key_is_string = true; + if (debug_tlv) printf("%s=%llu ", key.str, (unsigned long long) v64); + break; + case ndpi_serialization_string_string: ndpi_deserialize_string_string(deserializer, &key, &vs); kbkp = key.str[key.str_len], vbkp = vs.str[vs.str_len]; @@ -1017,15 +1043,6 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, if (debug_tlv) printf("%s='%s' ", key.str, vs.str); break; - case ndpi_serialization_string_uint32: - ndpi_deserialize_string_uint32(deserializer, &key, &v32); - kbkp = key.str[key.str_len]; - key.str[key.str_len] = '\0'; - getKeyId(key.str, &pen, &key_id); - key_is_string = true; - if (debug_tlv) printf("%s=%u ", key.str, v32); - break; - case ndpi_serialization_end_of_record: ndpi_deserialize_end_of_record(deserializer); if (debug_tlv) printf("EOR "); @@ -1042,12 +1059,12 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, switch(pen) { case 0: /* No PEN */ - rc = parsePENZeroField(&flow, key_id, value_is_string ? vs.str : NULL, v32); + rc = parsePENZeroField(&flow, key_id, value_is_string ? vs.str : NULL, v64); if (rc) break; /* Dont'break when rc == false for backward compatibility: attempt to parse Zero-PEN as Ntop-PEN */ case NTOP_PEN: - rc = parsePENNtopField(&flow, key_id, value_is_string ? vs.str : NULL, v32); + rc = parsePENNtopField(&flow, key_id, value_is_string ? vs.str : NULL, v64); break; case UNKNOWN_PEN: default: @@ -1089,7 +1106,7 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, case UNKNOWN_FLOW_ELEMENT: #if 0 // TODO /* Attempt to parse it as an nProbe mini field */ - if(parseNProbeMiniField(&flow, key_str, value_is_string ? vs.str : NULL, v32)) { + if(parseNProbeMiniField(&flow, key_str, value_is_string ? vs.str : NULL, v64)) { if(!flow.hasParsedeBPF()) { flow.setParsedeBPF(); flow.absolute_packet_octet_counters = true; @@ -1100,7 +1117,7 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, default: #ifdef NTOPNG_PRO if(custom_app_maps || (custom_app_maps = new(std::nothrow) CustomAppMaps())) - custom_app_maps->checkCustomApp(key_str, vs.str, &flow); // TODO pass v32 for integer values + custom_app_maps->checkCustomApp(key_str, vs.str, &flow); // TODO pass v64 for integer values #endif ntop->getTrace()->traceEvent(TRACE_DEBUG, "Not handled ZMQ field %u.%u", pen, key_id); add_to_additional_fields = true; @@ -1110,7 +1127,7 @@ int ZMQParserInterface::parseSingleTLVFlow(ndpi_deserializer *deserializer, if (add_to_additional_fields) { json_object_object_add(flow.additional_fields, key_str, - value_is_string ? json_object_new_string(vs.str) : json_object_new_int(v32)); + value_is_string ? json_object_new_string(vs.str) : json_object_new_int64(v64)); } switch(et) {