From fa85749fe8d089bbd2114384b564900ecc97504c Mon Sep 17 00:00:00 2001 From: emanuele-f Date: Thu, 16 Nov 2017 17:42:29 +0100 Subject: [PATCH] Selectively compress checkpoints json data Compression is currently enabled only on hosts --- include/Checkpointable.h | 10 +++- include/NetworkInterface.h | 2 +- src/Checkpointable.cpp | 97 ++++++++++++++++++++++++++++++++++---- src/Host.cpp | 8 ++-- src/NetworkInterface.cpp | 6 +-- 5 files changed, 104 insertions(+), 19 deletions(-) diff --git a/include/Checkpointable.h b/include/Checkpointable.h index b09b6e3407..d2fba73907 100644 --- a/include/Checkpointable.h +++ b/include/Checkpointable.h @@ -28,10 +28,16 @@ class Checkpointable { private: char *checkpoints[CONST_MAX_NUM_CHECKPOINTS]; /* controllable json serializations */ +#ifdef HAVE_ZLIB + bool compression_enabled; + uLongf *compressed_lengths; + uLongf *uncompressed_lengths; +#endif + public: - Checkpointable(); + Checkpointable(bool compress=false); ~Checkpointable(); - void checkpoint(lua_State* vm, u_int8_t checkpoint_id); + bool checkpoint(lua_State* vm, u_int8_t checkpoint_id); /* This function must return a serialization of the entity information needed * for the checkpoint. The returned string is dynamically allocated and will be diff --git a/include/NetworkInterface.h b/include/NetworkInterface.h index 883655a3b7..1cdea30c89 100644 --- a/include/NetworkInterface.h +++ b/include/NetworkInterface.h @@ -288,7 +288,7 @@ class NetworkInterface : public Checkpointable { inline void incLostPkts(u_int32_t num) { tcpPacketStats.incLost(num); }; bool checkPointHostCounters(lua_State* vm, u_int8_t checkpoint_id, char *host_ip, u_int16_t vlan_id); bool checkPointNetworkCounters(lua_State* vm, u_int8_t checkpoint_id, u_int8_t network_id); - inline bool checkPointInterfaceCounters(lua_State* vm, u_int8_t checkpoint_id) { checkpoint(vm, checkpoint_id); return true; } + inline bool checkPointInterfaceCounters(lua_State* vm, u_int8_t checkpoint_id) { return checkpoint(vm, checkpoint_id); } void checkPointCounters(bool drops_only); virtual char* serializeCheckpoint(); diff --git a/src/Checkpointable.cpp b/src/Checkpointable.cpp index 91d403a2cd..0b15ae7ed9 100644 --- a/src/Checkpointable.cpp +++ b/src/Checkpointable.cpp @@ -23,34 +23,110 @@ /* *************************************** */ -Checkpointable::Checkpointable() { +Checkpointable::Checkpointable(bool compress) { memset(&checkpoints, 0, sizeof(checkpoints)); + +#ifdef HAVE_ZLIB + compression_enabled = compress; + + if(compression_enabled) { + compressed_lengths = (uLongf *) calloc(CONST_MAX_NUM_CHECKPOINTS, sizeof(uLongf)); + uncompressed_lengths = (uLongf *) calloc(CONST_MAX_NUM_CHECKPOINTS, sizeof(uLongf)); + } else + compressed_lengths = uncompressed_lengths = NULL; +#endif } /* *************************************** */ -void Checkpointable::checkpoint(lua_State* vm, u_int8_t checkpoint_id) { +bool Checkpointable::checkpoint(lua_State* vm, u_int8_t checkpoint_id) { + char *new_data; + if(checkpoint_id >= CONST_MAX_NUM_CHECKPOINTS) { if(vm) lua_pushnil(vm); - return; + return false; } if(vm) { lua_newtable(vm); - if(checkpoints[checkpoint_id]) + if(checkpoints[checkpoint_id]) { +#ifdef HAVE_ZLIB + if(compression_enabled) { + char *uncompressed = (char*) malloc(uncompressed_lengths[checkpoint_id] + 1); + + if(uncompressed == NULL) { + ntop->getTrace()->traceEvent(TRACE_ERROR, "Cannot allocate decompression buffer"); + return false; + } + + int err; + + if((err = uncompress((Bytef*)uncompressed, &uncompressed_lengths[checkpoint_id], (Bytef*)checkpoints[checkpoint_id], compressed_lengths[checkpoint_id])) != Z_OK) { + ntop->getTrace()->traceEvent(TRACE_ERROR, "Uncompress error [%d][len: %u]", err, uncompressed_lengths[checkpoint_id]); + free(uncompressed); + return false; + } + + uncompressed[uncompressed_lengths[checkpoint_id]] = '\0'; + lua_push_str_table_entry(vm, (char*)"previous", uncompressed); + free(uncompressed); + } else + lua_push_str_table_entry(vm, (char*)"previous", checkpoints[checkpoint_id]); +#else lua_push_str_table_entry(vm, (char*)"previous", checkpoints[checkpoint_id]); +#endif + } } if(checkpoints[checkpoint_id]) free(checkpoints[checkpoint_id]); - checkpoints[checkpoint_id] = serializeCheckpoint(); + new_data = serializeCheckpoint(); - if(vm) { - if(checkpoints[checkpoint_id]) - lua_push_str_table_entry(vm, (char*)"current", checkpoints[checkpoint_id]); + if(new_data) { +#ifdef HAVE_ZLIB + if(compression_enabled) { + uLongf sourceLen = strlen(new_data); + uLongf destLen = compressBound(sourceLen); + + checkpoints[checkpoint_id] = (char *) malloc(destLen); + if(checkpoints[checkpoint_id] == NULL) { + ntop->getTrace()->traceEvent(TRACE_ERROR, "Cannot allocate compression buffer"); + free(new_data); + return false; + } + + compress((Bytef*)checkpoints[checkpoint_id], &destLen, (Bytef*)new_data, sourceLen); + uncompressed_lengths[checkpoint_id] = sourceLen; + compressed_lengths[checkpoint_id] = destLen; + +#ifdef CHECKPOINT_COMPRESSION_DEBUG + /* Note: 2 * uLongf is the space needed to hold compression metadata */ + /* Negative values means compression is not worth it! */ + uLongf occupied_len = destLen + 2 * sizeof(uLongf); + float save_ratio = (1 - (occupied_len * 1.f / sourceLen)) * 100; + ntop->getTrace()->traceEvent(TRACE_NORMAL, "Checkpoint compress: [%u/%u bytes] %.2f%% save", + occupied_len, sourceLen, save_ratio); +#endif + } else + checkpoints[checkpoint_id] = new_data; +#else + checkpoints[checkpoint_id] = new_data; +#endif + + if(vm) + lua_push_str_table_entry(vm, (char*)"current", new_data); + +#ifdef HAVE_ZLIB + if(compression_enabled) + free(new_data); +#endif + } else { + if(vm) lua_pushnil(vm); } + + return true; } /* *************************************** */ @@ -59,4 +135,9 @@ Checkpointable::~Checkpointable() { for(int i = 0; i < CONST_MAX_NUM_CHECKPOINTS; i++) { if(checkpoints[i]) free(checkpoints[i]); } + +#ifdef HAVE_ZLIB + if (compressed_lengths) free(compressed_lengths); + if (uncompressed_lengths) free(uncompressed_lengths); +#endif } diff --git a/src/Host.cpp b/src/Host.cpp index 0f4fcb600f..d10b7c34be 100644 --- a/src/Host.cpp +++ b/src/Host.cpp @@ -23,13 +23,13 @@ /* *************************************** */ -Host::Host(NetworkInterface *_iface) : GenericHost(_iface) { +Host::Host(NetworkInterface *_iface) : GenericHost(_iface), Checkpointable(true) { initialize(NULL, 0, false); } /* *************************************** */ -Host::Host(NetworkInterface *_iface, char *ipAddress, u_int16_t _vlanId) : GenericHost(_iface) { +Host::Host(NetworkInterface *_iface, char *ipAddress, u_int16_t _vlanId) : GenericHost(_iface), Checkpointable(true) { ip.set(ipAddress); initialize(NULL, _vlanId, true); } @@ -37,7 +37,7 @@ Host::Host(NetworkInterface *_iface, char *ipAddress, u_int16_t _vlanId) : Gener /* *************************************** */ Host::Host(NetworkInterface *_iface, Mac *_mac, - u_int16_t _vlanId, IpAddress *_ip) : GenericHost(_iface) { + u_int16_t _vlanId, IpAddress *_ip) : GenericHost(_iface), Checkpointable(true) { ip.set(_ip); #ifdef BROADCAST_DEBUG @@ -50,7 +50,7 @@ Host::Host(NetworkInterface *_iface, Mac *_mac, /* *************************************** */ -Host::Host(NetworkInterface *_iface, Mac *_mac, u_int16_t _vlanId) : GenericHost(_iface) { +Host::Host(NetworkInterface *_iface, Mac *_mac, u_int16_t _vlanId) : GenericHost(_iface), Checkpointable(true) { initialize(_mac, _vlanId, true); } diff --git a/src/NetworkInterface.cpp b/src/NetworkInterface.cpp index e92d717e46..2b47f06251 100644 --- a/src/NetworkInterface.cpp +++ b/src/NetworkInterface.cpp @@ -2984,7 +2984,7 @@ bool NetworkInterface::checkPointHostCounters(lua_State* vm, u_int8_t checkpoint bool ret = false; if(host_ip && (h = getHost(host_ip, vlan_id))) - h->checkpoint(vm, checkpoint_id), ret = true; + ret = h->checkpoint(vm, checkpoint_id); return ret; } @@ -2998,9 +2998,7 @@ bool NetworkInterface::checkPointNetworkCounters(lua_State* vm, u_int8_t checkpo if (stats == NULL) return false; - stats->checkpoint(vm, checkpoint_id); - - return true; + return stats->checkpoint(vm, checkpoint_id); } /* **************************************************** */