Code cleanup and stubs for supporting ClickHouse (via -F clickhouse)

This commit is contained in:
Luca Deri 2021-10-21 11:58:08 +02:00
parent e10eb3b8be
commit f4c512d552
14 changed files with 90 additions and 40 deletions

View file

@ -38,7 +38,7 @@ void* MySQLDB::queryLoop() {
bool queue_not_empty = false;
while(!ntop->getGlobals()->isShutdown()
&& !MySQLDB::isDbCreated() /* wait until the db has been created */) {
&& !isDbCreated() /* wait until the db has been created */) {
sleep(1);
}
@ -61,7 +61,7 @@ void* MySQLDB::queryLoop() {
}
/* ******************************************* */
volatile bool MySQLDB::db_created = false;
bool MySQLDB::createDBSchema(bool set_db_created) {
char sql[CONST_MAX_SQL_QUERY_LEN];
@ -430,6 +430,7 @@ bool MySQLDB::createDBSchema(bool set_db_created) {
if(set_db_created)
db_created = true;
return true;
}
@ -488,10 +489,12 @@ bool MySQLDB::createNprobeDBView() {
/* ******************************************* */
MySQLDB::MySQLDB(NetworkInterface *_iface) : DB(_iface) {
MySQLDB::MySQLDB(NetworkInterface *_iface, bool _clickhouse_mode) : DB(_iface) {
clickhouse_mode = _clickhouse_mode;
mysqlEnqueuedFlows = 0;
log_fd = NULL;
open_log();
db_created = false;
connectToDB(&mysql, false);
mysql_alt_connected = connectToDB(&mysql_alt, true);
@ -594,7 +597,7 @@ char* MySQLDB::escapeAphostrophes(const char *unescaped) {
/* ******************************************* */
int MySQLDB::flow2InsertValues(Flow *f, char *json,
char *values_buf, size_t values_buf_len) const {
char *values_buf, size_t values_buf_len) {
char cli_str[64], srv_str[64], *json_buf, *info_buf, buf[64];
u_int32_t packets, first_seen, last_seen;
u_int64_t bytes_cli2srv, bytes_srv2cli;
@ -603,8 +606,8 @@ int MySQLDB::flow2InsertValues(Flow *f, char *json,
if(!values_buf || !values_buf_len || !f)
return -1;
json_buf = escapeAphostrophes(json);
info_buf = escapeAphostrophes(f->getFlowInfo(buf, sizeof(buf)));
json_buf = escapeAphostrophes((const char*)json);
info_buf = escapeAphostrophes((const char*)f->getFlowInfo(buf, sizeof(buf)));
/* Prevents ERROR 1406 (22001): Data too long for column 'INFO' at row 1 */
if(info_buf && strlen(info_buf) > 254)
@ -749,14 +752,14 @@ void MySQLDB::disconnectFromDB(MYSQL *conn) {
/* ******************************************* */
static MYSQL *mysql_try_connect(MYSQL *conn, const char *dbname) {
MYSQL* MySQLDB::mysql_try_connect(MYSQL *conn, const char *dbname) {
MYSQL *rc;
unsigned long flags = CLIENT_COMPRESS;
if(!ntop->getPrefs()->get_mysql_host())
return(NULL);
if(ntop->getPrefs()->get_mysql_host()[0] == '/') /* Use socketD */
if((!clickhouse_mode) && (ntop->getPrefs()->get_mysql_host()[0] == '/') /* Use socketD */)
rc = mysql_real_connect(conn,
NULL, /* Host */
ntop->getPrefs()->get_mysql_user(),
@ -841,7 +844,7 @@ bool MySQLDB::connectToDB(MYSQL *conn, bool select_db) {
mysql_error(conn),
ntop->getPrefs()->get_mysql_user(),
ntop->getPrefs()->get_mysql_host(),
ntop->getPrefs()->get_mysql_port());
ntop->getPrefs()->get_mysql_port());
m.unlock(__FILE__, __LINE__);
return(db_operational);
@ -867,20 +870,20 @@ int MySQLDB::exec_single_query(lua_State *vm, char *sql) {
bool result_ok = false;
if(mysql_init(&conn) != NULL) {
if((mysql_try_connect(&conn, NULL /* no db */) != NULL) &&
(mysql_query(&conn, sql) == 0) &&
((result = mysql_store_result(&conn)) != NULL)) {
if(mysql_field_count(&conn) != 0) {
mysql_result_to_lua(vm, result, false);
result_ok = true;
}
mysql_free_result(result);
if((mysql_try_connect(&conn, NULL /* no db */) != NULL)
&& (mysql_query(&conn, sql) == 0)
&& ((result = mysql_store_result(&conn)) != NULL)) {
if(mysql_field_count(&conn) != 0) {
mysql_result_to_lua(vm, result, false);
result_ok = true;
}
mysql_free_result(result);
}
mysql_close(&conn);
}
if(!result_ok) {
lua_pushnil(vm);
return(-1);
@ -959,7 +962,7 @@ int MySQLDB::exec_sql_query(lua_State *vm, char *sql, bool limitRows, bool wait_
MYSQL_RES *result;
int rc;
if((wait_for_db_created && !MySQLDB::db_created /* Make sure the db exists before doing queries */)
if((wait_for_db_created && !db_created /* Make sure the db exists before doing queries */)
|| !db_operational)
return(-2);