Add native clickhouse support for queries

This commit is contained in:
Alfredo Cardigliano 2025-05-16 11:14:18 +02:00
parent c0d42f2343
commit bb810adeff
15 changed files with 296 additions and 8 deletions

3
.gitmodules vendored
View file

@ -6,3 +6,6 @@
path = httpdocs/dist
url = https://github.com/ntop/ntopng-dist.git
branch = dev
[submodule "third-party/clickhouse-cpp"]
path = third-party/clickhouse-cpp
url = https://github.com/ClickHouse/clickhouse-cpp.git

View file

@ -34,6 +34,10 @@ LUA_HOME=${PWD}/third-party/lua-5.4.6
LUA_INC=-I$(LUA_HOME)/src
LUA_LIB=$(LUA_HOME)/src/liblua.a
CLICKHOUSE_HOME=${PWD}/third-party/clickhouse-cpp
CLICKHOUSE_INC=-I$(CLICKHOUSE_HOME) -I$(CLICKHOUSE_HOME)/contrib/absl
CLICKHOUSE_LIB=$(CLICKHOUSE_HOME)/build/clickhouse/libclickhouse-cpp-lib.a $(CLICKHOUSE_HOME)/build/contrib/cityhash/cityhash/libcityhash.a $(CLICKHOUSE_HOME)/build/contrib/lz4/lz4/liblz4.a
ifeq ($(OS),Linux)
LUA_PLATFORM=linux
else ifeq ($(OS),Darwin)
@ -43,7 +47,6 @@ else ifeq ($(OS), $(filter $(OS), FreeBSD))
LUA_LIB=/usr/local/lib/liblua-5.3.a
endif
######
LIBRRDTOOL_HOME=${PWD}/third-party/rrdtool-1.4.8
HAS_LIBRRDTOOL=$(shell $(PKG_CONFIG) --atleast-version=1.4.8 librrd && echo 0)
@ -86,12 +89,12 @@ endif
######
TARGET = ntopng
LIBS = $(NDPI_LIB) $(LIBPCAP) $(LUA_LIB) $(LIBRRDTOOL_LIB) $(ZEROMQ_LIB) $(JSON_LIB) $(SNMP_LIB) @MAXMINDDB_LIBS@ $(SODIUM_LIB) @NTOPCLOUD_LIB@ @HIREDIS_LIB@ @SQLITE_LIB@ @MYSQL_LIB@ @RADCLI_LIB@ @EXPAT_LIB@ @SSL_LIB@ @ZMQ_LIBS@ @LINK_OPTS@ @LDFLAGS@ @PRO_LIBS@ @LIBS@ $(ZSTD_LIB) @JE@ -lm -lpthread
LIBS = $(NDPI_LIB) $(LIBPCAP) $(LUA_LIB) $(CLICKHOUSE_LIB) $(LIBRRDTOOL_LIB) $(ZEROMQ_LIB) $(JSON_LIB) $(SNMP_LIB) @MAXMINDDB_LIBS@ $(SODIUM_LIB) @NTOPCLOUD_LIB@ @HIREDIS_LIB@ @SQLITE_LIB@ @MYSQL_LIB@ @RADCLI_LIB@ @EXPAT_LIB@ @SSL_LIB@ @ZMQ_LIBS@ @LINK_OPTS@ @LDFLAGS@ @PRO_LIBS@ @LIBS@ $(ZSTD_LIB) @JE@ -lm -lpthread
CC := @CC@
CXX := @CXX@
CPPFLAGS := @CPPFLAGS@
CFLAGS := @CFLAGS@
CXXFLAGS = $(CFLAGS) -std=c++1y @HIREDIS_INC@ @MAXMINDDB_CFLAGS@ $(MONGOOSE_INC) $(JSON_INC) $(SNMP_INC) $(SODIUM_INC) $(NDPI_INC) $(LUA_INC) $(LIBRRDTOOL_INC) $(ZEROMQ_INC) @MYSQL_INC@ @ZMQ_CFLAGS@ @CXXFLAGS@ -I$(HTTPCLIENT_INC) @SSL_INC@ @PRO_INCS@ -DDATA_DIR='"$(datadir)"' # -D_GLIBCPP_DEBUG
CXXFLAGS = $(CFLAGS) -std=c++17 @HIREDIS_INC@ @MAXMINDDB_CFLAGS@ $(MONGOOSE_INC) $(JSON_INC) $(SNMP_INC) $(SODIUM_INC) $(NDPI_INC) $(LUA_INC) $(CLICKHOUSE_INC) $(LIBRRDTOOL_INC) $(ZEROMQ_INC) @MYSQL_INC@ @ZMQ_CFLAGS@ @CXXFLAGS@ -I$(HTTPCLIENT_INC) @SSL_INC@ @PRO_INCS@ -DDATA_DIR='"$(datadir)"' # -D_GLIBCPP_DEBUG
######
# ntopng-1.0_1234.x86_64.rpm
PLATFORM = `uname -p`
@ -248,7 +251,7 @@ Makefile: configure Makefile.in
# ./autogen.sh
cppcheck:
cppcheck --template='{file}:{line}:{severity}:{message}' --quiet --enable=all --force -I include/ @HIREDIS_INC@ $(MONGOOSE_INC) $(JSON_INC) $(NDPI_INC) $(LUA_INC) $(LIBRRDTOOL_INC) $(ZEROMQ_INC) src/*.cpp
cppcheck --template='{file}:{line}:{severity}:{message}' --quiet --enable=all --force -I include/ @HIREDIS_INC@ $(MONGOOSE_INC) $(JSON_INC) $(NDPI_INC) $(LUA_INC) $(CLICKHOUSE_INC) $(LIBRRDTOOL_INC) $(ZEROMQ_INC) src/*.cpp
test: test_version

View file

@ -80,6 +80,11 @@ git submodule update --remote
# git submodule update --init --recursive
cd third-party/clickhouse-cpp
cmake -B build .
cmake --build build
cd ../../
echo "Wait please..."
autoreconf -if
echo ""

View file

@ -68,6 +68,9 @@ class MySQLDB : public DB {
void *cb_user_data = NULL);
int exec_sql_query(lua_State *vm, char *sql, bool limitRows,
bool wait_for_db_created);
#if defined(NTOPNG_PRO) && defined(HAVE_CLICKHOUSE)
int exec_sql_query_ch(lua_State *vm, char *sql, bool limitRows);
#endif
virtual bool startQueryLoop();
void shutdown();
int exec_single_query(lua_State *vm, char *sql);

View file

@ -210,6 +210,7 @@ class Prefs {
bool snmp_polling;
bool active_monitoring, network_discovery, starttls;
bool dump_pcap_to_clickhouse; /* Dump pcap-interface flows to ClickHouse */
bool use_native_clickhouse_client;
InterfacesSet lan_interfaces, wan_interfaces;
@ -407,6 +408,7 @@ class Prefs {
inline bool do_tls_quic_hostnaming() { return (tls_quic_hostnaming); };
inline bool email_starttls_enabled() { return (starttls); };
inline bool dump_pcap_to_clickhouse_enabled() { return (dump_pcap_to_clickhouse); };
inline bool native_clickhouse_client_enabled() { return (use_native_clickhouse_client); };
inline char* get_cpu_affinity() { return (cpu_affinity); };
inline char* get_other_cpu_affinity() { return (other_cpu_affinity); };
#ifdef __linux__

View file

@ -676,6 +676,8 @@
NTOPNG_PREFS_PREFIX ".starttls"
#define CONST_PREFS_ENABLE_DUMP_PCAP_TO_CLICKHOUSE \
NTOPNG_PREFS_PREFIX ".dump_pcap_to_clickhouse"
#define CONST_PREFS_ENABLE_NATIVE_CLICKHOUSE_CLIENT \
NTOPNG_PREFS_PREFIX ".use_native_clickhouse_client"
#define CONST_PREFS_ENABLE_ACCESS_LOG NTOPNG_PREFS_PREFIX ".enable_access_log"
#define CONST_PREFS_ENABLE_ASSETS_LOG NTOPNG_PREFS_PREFIX ".enable_assets_log"
#define CONST_PREFS_ENABLE_SQL_LOG NTOPNG_PREFS_PREFIX ".enable_sql_log"

View file

@ -290,7 +290,7 @@ using namespace std;
#endif
#ifdef NTOPNG_PRO
#include "Profile.h"
#include "GenericProfile.h"
#include "Profiles.h"
#include "CountMinSketch.h"
#include "AlertExclusionsInfo.h"
@ -418,6 +418,12 @@ using namespace std;
#endif
#endif
#if defined(NTOPNG_PRO) && defined(HAVE_CLICKHOUSE)
/* clickhouse-cpp */
#include <clickhouse/client.h>
#include <iomanip> /* std::setfill */
using namespace clickhouse;
#include "ClickHouseImport.h"
#include "ClickHouseFlowDB.h"
#include "ClickHouseAlertStore.h"

View file

@ -7722,6 +7722,8 @@ local lang = {
["toggle_traffic_rrd_creation_title"] = "Traffic",
["toggle_use_mac_in_flow_key_description"] = "Use the MAC Address to generate the flow key. This is suggested in case the same IP can be seen using different MAC Addresses (e.g. a load balancer).",
["toggle_use_mac_in_flow_key_title"] = "Use MAC Address in Flow Key",
["toggle_use_native_clickhouse_client_description"] = "Enable queries through the native ClickHouse TCP socket.",
["toggle_use_native_clickhouse_client_title"] = "Native ClickHouse Socket",
["toggle_users_rrds_description"] = "Toggle the creation of bytes and applications timeseries for defined users.",
["toggle_users_rrds_title"] = "Users",
["toggle_vlan_rrds_description"] = "Toggle the creation of bytes and applications timeseries for VLANs.",

View file

@ -2175,6 +2175,13 @@ if auth.has_capability(auth.capabilities.preferences) then
max = 5000
})
prefsToggleButton(subpage_active, {
field = "toggle_use_native_clickhouse_client",
default = "0",
pref = "use_native_clickhouse_client",
hidden = not showAggregateFlowsPrefs
})
prefsToggleButton(subpage_active, {
field = "toggle_flow_aggregated_alerted_flows",
default = "0",

View file

@ -1101,6 +1101,8 @@ end
-- NOTE noLimit should be used only with small deltas between epoch_end - epoch_begin to not overload the db
function alert_store:select_historical(filter, fields, download --[[ Available only with ClickHouse ]], noLimit)
local prefs = ntop.getPrefs()
local table_name = self:get_table_name()
local res = {}
local where_clause = ''
@ -1203,12 +1205,10 @@ function alert_store:select_historical(filter, fields, download --[[ Available o
return ""
end
-- tprint(q)
-- res = interface.alert_store_query(q, true)
res = interface.alert_store_query(q, true, true) -- Limit results to the max set in the backend
if ntop.isClickHouseEnabled() then
if ntop.isClickHouseEnabled() and not prefs.native_clickhouse_client_enabled then
-- convert DATETIME to epoch
for _, record in ipairs(res or {}) do
if record.tstamp_epoch then

View file

@ -2301,6 +2301,7 @@ local known_parameters = {
["toggle_fingerprint_stats"] = validateBool,
["toggle_starttls"] = validateBool,
["toggle_dump_pcap_to_clickhouse"] = validateBool,
["toggle_use_native_clickhouse_client"] = validateBool,
["behaviour_analysis_learning_period"] = validateNumber,
["behaviour_analysis_learning_status_during_learning"] = validateNumber,
["behaviour_analysis_learning_status_post_learning"] = validateNumber,

View file

@ -233,6 +233,10 @@ local menu_subpages = {{
toggle_dump_pcap_to_clickhouse = {
title = i18n("prefs.toggle_dump_pcap_to_clickhouse_title"),
description = i18n("prefs.toggle_dump_pcap_to_clickhouse_description")
},
toggle_use_native_clickhouse_client = {
title = i18n("prefs.toggle_use_native_clickhouse_client_title"),
description = i18n("prefs.toggle_use_native_clickhouse_client_description")
}
}
}, {

View file

@ -1109,6 +1109,245 @@ int MySQLDB::exec_sql_query(const char *sql, bool doReconnect,
/* ******************************************* */
#if defined(NTOPNG_PRO) && defined(HAVE_CLICKHOUSE)
std::string uuid_to_string_ch(const std::pair<uint64_t, uint64_t>& uuid) {
uint8_t bytes[16];
std::memcpy(bytes, &uuid.first, 8);
std::memcpy(bytes + 8, &uuid.second, 8);
std::ostringstream ss;
ss << std::hex << std::setfill('0');
for (int i = 0; i < 16; ++i) {
ss << std::setw(2) << static_cast<int>(bytes[i]);
if (i == 3 || i == 5 || i == 7 || i == 9) ss << "-";
}
return ss.str();
}
/* ******************************************* */
void print_block_ch(const Block& block) {
size_t rows = block.GetRowCount();
size_t cols = block.GetColumnCount();
for (size_t i = 0; i < rows; ++i) {
std::cout << "#" << i << " ";
for (size_t j = 0; j < cols; ++j) {
auto col = block[j];
if (!col) {
std::cout << "(null)";
continue;
}
Type::Code type_code = col->Type()->GetCode();
switch (type_code) {
case Type::Code::UInt64: {
auto c = col->As<ColumnUInt64>();
std::cout << block.GetColumnName(j) << "=" << (*c)[i];
break;
}
case Type::Code::Int64: {
auto c = col->As<ColumnInt64>();
std::cout << block.GetColumnName(j) << "=" << (*c)[i];
break;
}
case Type::Code::Float64: {
auto c = col->As<ColumnFloat64>();
std::cout << block.GetColumnName(j) << "=" << (*c)[i];
break;
}
case Type::Code::String: {
auto c = col->As<ColumnString>();
std::cout << block.GetColumnName(j) << "=" << c->At(i);
break;
}
default:
std::cout << "[unsupported " << block.GetColumnName(j) << " type]";
break;
}
if (j != cols - 1)
std::cout << ", ";
}
std::cout << "\n";
}
}
/* ******************************************* */
void result_to_lua_ch(lua_State *vm, const Block& block, bool limitRows, int& count) {
size_t rows = block.GetRowCount();
size_t cols = block.GetColumnCount();
if (count >= MYSQL_MAX_NUM_ROWS)
return;
if (count + rows > MYSQL_MAX_NUM_ROWS) {
static bool warning_shown = false;
if (!warning_shown) {
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Too many results returned from ClickHouse, reduce query result set");
warning_shown = true;
}
if (limitRows) rows = MYSQL_MAX_NUM_ROWS - count;
}
for (size_t i = 0; i < rows; ++i) {
lua_newtable(vm);
for (size_t j = 0; j < cols; ++j) {
auto col = block[j];
std::string value;
if (!col)
continue;
Type::Code type_code = col->Type()->GetCode();
switch (type_code) {
case Type::Code::UInt8: {
/* Note: Boolean is stored as UInt8 */
auto c = col->As<ColumnUInt8>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::UInt16: {
auto c = col->As<ColumnUInt16>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::UInt32: {
auto c = col->As<ColumnUInt32>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::UInt64: {
auto c = col->As<ColumnUInt64>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::Int8: {
auto c = col->As<ColumnInt8>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::Int16: {
auto c = col->As<ColumnInt16>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::Int32: {
auto c = col->As<ColumnInt32>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::Int64: {
auto c = col->As<ColumnInt64>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::Float64: {
auto c = col->As<ColumnFloat64>();
value = std::to_string((*c)[i]);
break;
}
case Type::Code::String: {
auto c = col->As<ColumnString>();
value = std::string(c->At(i));
break;
}
case Type::Code::DateTime: {
#if 1
/* Return as epoch (same as mysql) */
auto c = col->As<ColumnDateTime>();
value = std::to_string((*c)[i]);
#else
/* Return as Y-m-d J:M:S */
auto c = col->As<ColumnDateTime>();
std::time_t t = (*c)[i];
std::tm* tm_ptr = std::gmtime(&t); // UTC time
char buf[20];
std::strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", tm_ptr);
value = std::string(buf);
#endif
break;
}
case Type::Code::UUID: {
auto c = col->As<ColumnUUID>();
clickhouse::UUID uuid = (*c)[i];
value = uuid_to_string_ch(uuid);
break;
}
case Type::Code::IPv6: {
auto c = col->As<ColumnIPv6>();
in6_addr ipv6 = (*c)[i];
char str[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &ipv6, str, sizeof(str));
value = std::string(str);
break;
}
case Type::Code::Nullable: {
break;
}
default:
std::cout << "Unsupported " << block.GetColumnName(j) << " type in record\n";
break;
}
lua_push_str_table_entry(vm, (const char *) block.GetColumnName(j).c_str(), value.c_str());
}
lua_pushinteger(vm, ++count);
lua_insert(vm, -2);
lua_settable(vm, -3);
}
}
/* ******************************************* */
int MySQLDB::exec_sql_query_ch(lua_State *vm, char *sql, bool limitRows) {
char *user = ntop->getPrefs()->get_ch_user() ?
ntop->getPrefs()->get_ch_user() : /* Using CH Cloud */
ntop->getPrefs()->get_mysql_user(); /* Using standard CH */
int count = 0;
int rc = -1;
lua_newtable(vm);
try {
Client client(ClientOptions()
.SetHost(ntop->getPrefs()->get_mysql_host())
.SetPort(ntop->getPrefs()->get_clickhouse_tcp_port())
.SetUser(user)
.SetPassword(ntop->getPrefs()->get_mysql_pw())
.SetDefaultDatabase(ntop->getPrefs()->get_mysql_dbname())
);
client.Select(sql, [vm, limitRows, &count](const Block& block) {
/* Print records to console (debug) */
//print_block_ch(block);
/* Convert records to lua table */
result_to_lua_ch(vm, block, limitRows, count);
});
rc = 0;
} catch (...) {
ntop->getTrace()->traceEvent(TRACE_ERROR, "Clickhouse select exception");
}
return rc;
}
#endif
/* ******************************************* */
int MySQLDB::exec_sql_query(lua_State *vm, char *sql, bool limitRows,
bool wait_for_db_created) {
MYSQL_RES *result;
@ -1124,6 +1363,13 @@ int MySQLDB::exec_sql_query(lua_State *vm, char *sql, bool limitRows,
}
}
#if defined(NTOPNG_PRO) && defined(HAVE_CLICKHOUSE)
if (ntop->getPrefs()->do_dump_flows_on_clickhouse() &&
ntop->getPrefs()->native_clickhouse_client_enabled()) {
return exec_sql_query_ch(vm, sql, limitRows);
}
#endif
if (!db_operational) {
if (!connectToDB(&mysql, true)) return (-3);
}

View file

@ -171,6 +171,7 @@ Prefs::Prefs(Ntop *_ntop) {
snmp_polling = true;
active_monitoring = network_discovery = starttls = false;
dump_pcap_to_clickhouse = false;
use_native_clickhouse_client = false;
vs_max_num_scans = 4;
vs_slow_scan = false;
tls_quic_hostnaming = false;
@ -989,6 +990,7 @@ void Prefs::reloadPrefsFromRedis() {
network_discovery = getDefaultBoolPrefsValue(CONST_PREFS_ENABLE_NETWORK_DISCOVERY, false);
starttls = getDefaultBoolPrefsValue(CONST_PREFS_ENABLE_STARTTLS, false);
dump_pcap_to_clickhouse = getDefaultBoolPrefsValue(CONST_PREFS_ENABLE_DUMP_PCAP_TO_CLICKHOUSE, false);
use_native_clickhouse_client = getDefaultBoolPrefsValue(CONST_PREFS_ENABLE_NATIVE_CLICKHOUSE_CLIENT, false);
enable_arp_matrix_generation =
getDefaultBoolPrefsValue(CONST_DEFAULT_ARP_MATRIX_GENERATION, false),
@ -2760,6 +2762,7 @@ void Prefs::lua(lua_State *vm) {
lua_push_bool_table_entry(vm, "network_discovery", network_discovery);
lua_push_bool_table_entry(vm, "starttls", starttls);
lua_push_bool_table_entry(vm, "dump_pcap_to_clickhouse", dump_pcap_to_clickhouse);
lua_push_bool_table_entry(vm, "native_clickhouse_client_enabled", use_native_clickhouse_client);
lua_push_bool_table_entry(vm, "tls_quic_hostnaming", tls_quic_hostnaming);
#ifdef HAVE_NEDGE

1
third-party/clickhouse-cpp vendored Submodule

@ -0,0 +1 @@
Subproject commit fbd7945f7e241c734e351dd72c5c4bad821e275c