Add HR counters to CH schema. Dump HR counters.

This commit is contained in:
Alfredo Cardigliano 2026-04-27 11:48:30 +02:00
parent bfb8507371
commit e541c57bcd
5 changed files with 44 additions and 3 deletions

View file

@ -96,7 +96,9 @@ CREATE TABLE IF NOT EXISTS `flows` (
`SRC_PEER_ASN` UInt32 COMMENT 'BGP peer ASN upstream of the source IP',
`DST_PEER_ASN` UInt32 COMMENT 'BGP peer ASN upstream of the destination IP',
`REQUIRE_ATTENTION` Boolean COMMENT 'True if this flow/alert has been flagged as requiring manual review',
`NEXT_ADJACENT_ASN` UInt32 COMMENT 'BGP next adjacent ASN (BGP_NEXT_ADJACENT_ASN / IPFIX field 128)'
`NEXT_ADJACENT_ASN` UInt32 COMMENT 'BGP next adjacent ASN (BGP_NEXT_ADJACENT_ASN / IPFIX field 128)',
`HR_SRC2DST_BYTES` Array(UInt64) COMMENT '15-second delta byte counters src->dst from nProbe high-resolution counters',
`HR_DST2SRC_BYTES` Array(UInt64) COMMENT '15-second delta byte counters dst->src from nProbe high-resolution counters'
) ENGINE = MergeTree() PARTITION BY toYYYYMMDD(FIRST_SEEN) ORDER BY (FIRST_SEEN, IPV4_SRC_ADDR, IPV4_DST_ADDR)
COMMENT 'Per-flow telemetry records captured locally or received via NetFlow/sFlow/IPFIX. Each row represents one bidirectional network flow with 5-tuple (src/dst IP, src/dst port, protocol), byte/packet counters, L7 application identification, flow-risk bitmap, DSCP, NAT addresses, process info, and optional alert metadata. Partitioned by day on FIRST_SEEN.';
@
@ -203,6 +205,10 @@ ALTER TABLE flows DROP COLUMN IF EXISTS `PRE_NAT_SRC_PORT`;
ALTER TABLE flows DROP COLUMN IF EXISTS `PRE_NAT_IPV4_DST_ADDR`;
@
ALTER TABLE flows DROP COLUMN IF EXISTS `PRE_NAT_DST_PORT`;
@
ALTER TABLE flows ADD COLUMN IF NOT EXISTS `HR_SRC2DST_BYTES` Array(UInt64);
@
ALTER TABLE flows ADD COLUMN IF NOT EXISTS `HR_DST2SRC_BYTES` Array(UInt64);
@

View file

@ -100,7 +100,9 @@ CREATE TABLE IF NOT EXISTS `flows` ON CLUSTER '$CLUSTER' (
`SRC_PEER_ASN` UInt32 COMMENT 'BGP peer ASN upstream of the source IP',
`DST_PEER_ASN` UInt32 COMMENT 'BGP peer ASN upstream of the destination IP',
`REQUIRE_ATTENTION` Boolean COMMENT 'True if this flow/alert has been flagged as requiring manual review',
`NEXT_ADJACENT_ASN` UInt32 COMMENT 'BGP next adjacent ASN (BGP_NEXT_ADJACENT_ASN / IPFIX field 128)'
`NEXT_ADJACENT_ASN` UInt32 COMMENT 'BGP next adjacent ASN (BGP_NEXT_ADJACENT_ASN / IPFIX field 128)',
`HR_SRC2DST_BYTES` Array(UInt64) COMMENT '15-second delta byte counters src->dst from nProbe high-resolution counters',
`HR_DST2SRC_BYTES` Array(UInt64) COMMENT '15-second delta byte counters dst->src from nProbe high-resolution counters'
) ENGINE = ReplicatedMergeTree('/clickhouse/{cluster}/tables/{database}/{table}', '{replica}') PARTITION BY toYYYYMMDD(FIRST_SEEN) ORDER BY (FIRST_SEEN, IPV4_SRC_ADDR, IPV4_DST_ADDR)
COMMENT 'Per-flow telemetry records captured locally or received via NetFlow/sFlow/IPFIX. Each row represents one bidirectional network flow with 5-tuple (src/dst IP, src/dst port, protocol), byte/packet counters, L7 application identification, flow-risk bitmap, DSCP, NAT addresses, process info, and optional alert metadata. Partitioned by day on FIRST_SEEN.';
@
@ -210,6 +212,10 @@ ALTER TABLE flows DROP COLUMN IF EXISTS `PRE_NAT_DST_PORT`;
@
ALTER TABLE `flows` ON CLUSTER '$CLUSTER' ADD COLUMN IF NOT EXISTS `EXPORTER_SITE` UInt16;
@
ALTER TABLE `flows` ON CLUSTER '$CLUSTER' ADD COLUMN IF NOT EXISTS `HR_SRC2DST_BYTES` Array(UInt64);
@
ALTER TABLE `flows` ON CLUSTER '$CLUSTER' ADD COLUMN IF NOT EXISTS `HR_DST2SRC_BYTES` Array(UInt64);
@
ALTER TABLE flows ADD COLUMN IF NOT EXISTS `INTERFACE_ROLE` UInt8;
@

View file

@ -84,12 +84,13 @@ typedef struct {
struct {
char *src, *dst;
} bgp;
struct {
char* wlan_ssid;
u_int8_t wtp_mac_address[6];
} wifi;
struct {
/* IPv4 only, so a int32 bit is only needed */
u_int32_t src_ip_addr_post_nat, dst_ip_addr_post_nat;
@ -120,6 +121,7 @@ class Flow : public GenericHashEntry {
/* Data collected from nProbe */
std::string l7_json;
std::vector<uint64_t> hr_src2dst_bytes, hr_dst2src_bytes;
ICMPinfo* icmp_info;
char* category_list_name_shared_pointer; /* NOTE: this is a pointer handled by
Ntop::getPersistentCustomListNameById()
@ -1602,6 +1604,10 @@ class Flow : public GenericHashEntry {
void setServerBGPInfo(char* bgp_info);
inline char* getClientBGPInfo() { return((collection && collection->bgp.src) ? collection->bgp.src : (char*)""); }
inline char* getServerBGPInfo() { return((collection && collection->bgp.dst) ? collection->bgp.dst : (char*)""); }
inline void setHRSrc2DstBytes(const std::vector<uint64_t>& v) { hr_src2dst_bytes = v; }
inline void setHRDst2SrcBytes(const std::vector<uint64_t>& v) { hr_dst2src_bytes = v; }
inline const std::vector<uint64_t>& getHRSrc2DstBytes() const { return hr_src2dst_bytes; }
inline const std::vector<uint64_t>& getHRDst2SrcBytes() const { return hr_dst2src_bytes; }
char* getWLANSSID() {
return (collection ? collection->wifi.wlan_ssid : NULL);
};

View file

@ -429,6 +429,7 @@ using namespace std;
#if defined(NTOPNG_PRO) && defined(HAVE_CLICKHOUSE)
/* clickhouse-cpp */
#include <clickhouse/client.h>
#include <clickhouse/columns/array.h>
#include <clickhouse/columns/numeric.h>
#include <clickhouse/columns/string.h>
#include <cstring> /* std::memcpy*/

View file

@ -21,6 +21,25 @@
#include "ntop_includes.h"
/* Parser for high-resolution counters string (JSON like) */
static std::vector<uint64_t> parseHRBytesString(const char *s) {
std::vector<uint64_t> v;
const char *p;
if (s == NULL || s[0] != '[') return v;
p = s + 1;
while (*p && *p != ']') {
char *end;
uint64_t val = strtoull(p, &end, 10);
if (end == p) break;
v.push_back(val);
p = end;
if (*p == ',') p++;
}
return v;
}
#ifndef HAVE_NEDGE
/* **************************************************** */
@ -740,6 +759,9 @@ bool ParserInterface::processFlow(ParsedFlow* zflow) {
flow->setQoE(zflow->getQoESrc2Dst(), zflow->getQoEDst2Src());
if (zflow->getHRSrcToDstBytes()) flow->setHRSrc2DstBytes(parseHRBytesString(zflow->getHRSrcToDstBytes()));
if (zflow->getHRDstToSrcBytes()) flow->setHRDst2SrcBytes(parseHRBytesString(zflow->getHRDstToSrcBytes()));
if (zflow->getOSHint() != ndpi_os_unknown) {
if (flow->get_cli_host() != NULL)
flow->get_cli_host()->setnDPIOS(zflow->getOSHint());