mirror of
https://github.com/ntop/ntopng.git
synced 2026-05-20 17:29:13 +00:00
520 lines
14 KiB
C++
520 lines
14 KiB
C++
/*
|
|
*
|
|
* (C) 2013-23 - ntop.org
|
|
*
|
|
*
|
|
* This program is free software; you can redistribute it and/or modify
|
|
* it under the terms of the GNU General Public License as published by
|
|
* the Free Software Foundation; either version 3 of the License, or
|
|
* (at your option) any later version.
|
|
*
|
|
* This program is distributed in the hope that it will be useful,
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
* GNU General Public License for more details.
|
|
*
|
|
* You should have received a copy of the GNU General Public License
|
|
* along with this program; if not, write to the Free Software Foundation,
|
|
* Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
|
|
*
|
|
*/
|
|
|
|
#include "ntop_includes.h"
|
|
|
|
#ifndef HAVE_NEDGE
|
|
|
|
/* **************************************************** */
|
|
|
|
bool SyslogCollectorInterface::openSocket(syslog_socket *ss, const char *server_address, int server_port, int protocol) {
|
|
struct sockaddr_in listen_addr;
|
|
int reuse = 1;
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Starting %s syslog collector on %s:%d",
|
|
protocol == SOCK_DGRAM ? "UDP" : "TCP", server_address, server_port);
|
|
|
|
ss->sock = Utils::openSocket(AF_INET, protocol, 0, "SyslogCollectorInterface");
|
|
|
|
if(ss->sock < 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "socket error");
|
|
return false;
|
|
}
|
|
|
|
/* Allow to re-bind in case previous instance died */
|
|
if(setsockopt(ss->sock, SOL_SOCKET, SO_REUSEADDR,
|
|
#ifdef WIN32
|
|
(const char*)
|
|
#endif
|
|
&reuse, sizeof(reuse)) != 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "setsockopt error");
|
|
return false;
|
|
}
|
|
|
|
memset(&listen_addr, 0, sizeof(listen_addr));
|
|
|
|
listen_addr.sin_family = AF_INET;
|
|
listen_addr.sin_addr.s_addr = inet_addr(server_address);
|
|
listen_addr.sin_port = htons(server_port);
|
|
|
|
if(::bind(ss->sock, (struct sockaddr *) &listen_addr, sizeof(struct sockaddr)) != 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "bind error");
|
|
return false;
|
|
}
|
|
|
|
if (protocol == SOCK_STREAM) {
|
|
if(listen(ss->sock, MAX_SYSLOG_SUBSCRIBERS) != 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "listen error");
|
|
return false;
|
|
}
|
|
|
|
memset(tcp_connections, 0, sizeof(tcp_connections));
|
|
}
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Accepting %s connections on %s:%d",
|
|
protocol == SOCK_DGRAM ? "UDP" : "TCP", server_address, server_port);
|
|
|
|
return true;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::closeSocket(syslog_socket *ss, int protocol) {
|
|
close(ss->sock);
|
|
|
|
if (protocol == SOCK_STREAM) {
|
|
for(int i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i)
|
|
if(tcp_connections[i].socket != 0)
|
|
Utils::closeSocket(tcp_connections[i].socket);
|
|
}
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
SyslogCollectorInterface::SyslogCollectorInterface(const char *_endpoint) : SyslogParserInterface(_endpoint) {
|
|
char *tmp, *pos, *port, *address, *protocol;
|
|
const char *server_address;
|
|
int server_port;
|
|
|
|
udp_socket.enable = true;
|
|
tcp_socket.enable = true;
|
|
|
|
endpoint = strdup(_endpoint);
|
|
|
|
if(endpoint == NULL)
|
|
throw("memory allocation error");
|
|
|
|
tmp = strdup(_endpoint);
|
|
|
|
if(tmp == NULL)
|
|
throw("memory allocation error");
|
|
|
|
/*
|
|
* Interface name format:
|
|
* syslog://<ip>:<port>[@{udp,tcp}]
|
|
*/
|
|
|
|
if(strncmp(tmp, (char*) "syslog://", 9) == 0) {
|
|
address = &tmp[9];
|
|
} else {
|
|
address = tmp;
|
|
}
|
|
|
|
pos = strchr(address, '@');
|
|
|
|
if (pos != NULL) {
|
|
pos[0] = '\0';
|
|
pos++;
|
|
protocol = pos;
|
|
|
|
if (strcmp(protocol, "udp") == 0)
|
|
tcp_socket.enable = false;
|
|
else if (strcmp(protocol, "tcp") == 0)
|
|
udp_socket.enable = false;
|
|
}
|
|
|
|
port = strchr(address, ':');
|
|
|
|
if(port != NULL) {
|
|
port[0] = '\0';
|
|
port++;
|
|
server_port = atoi(port);
|
|
} else {
|
|
throw("bad tcp bind address format");
|
|
}
|
|
|
|
if (strcmp(address, "*") == 0) {
|
|
/* any address */
|
|
server_address = "0.0.0.0";
|
|
} else {
|
|
server_address = address;
|
|
}
|
|
|
|
if (udp_socket.enable)
|
|
if (!openSocket(&udp_socket, server_address, server_port, SOCK_DGRAM))
|
|
throw("Error opening socket");
|
|
|
|
if (tcp_socket.enable)
|
|
if (!openSocket(&tcp_socket, server_address, server_port, SOCK_STREAM))
|
|
throw("Error opening socket");
|
|
|
|
free(tmp);
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
SyslogCollectorInterface::~SyslogCollectorInterface() {
|
|
if (udp_socket.enable)
|
|
closeSocket(&udp_socket, SOCK_DGRAM);
|
|
|
|
if (tcp_socket.enable)
|
|
closeSocket(&tcp_socket, SOCK_STREAM);
|
|
|
|
free(endpoint);
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
/* set FDs and returns the max sock */
|
|
int SyslogCollectorInterface::initFDSetsSocket(syslog_socket *ss,
|
|
fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, int protocol) {
|
|
int high_sock = ss->sock;
|
|
|
|
FD_SET(ss->sock, read_fds);
|
|
FD_SET(ss->sock, except_fds);
|
|
|
|
if (protocol == SOCK_STREAM) {
|
|
for(int i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) {
|
|
if(tcp_connections[i].socket != 0) {
|
|
FD_SET(tcp_connections[i].socket, read_fds);
|
|
FD_SET(tcp_connections[i].socket, except_fds);
|
|
if(tcp_connections[i].socket > high_sock)
|
|
high_sock = tcp_connections[i].socket;
|
|
}
|
|
}
|
|
}
|
|
|
|
return high_sock;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
/* set FDs and returns the max sock */
|
|
int SyslogCollectorInterface::initFDSets(fd_set *read_fds, fd_set *write_fds, fd_set *except_fds) {
|
|
int high_sock = 0;
|
|
|
|
FD_ZERO(read_fds);
|
|
FD_ZERO(write_fds);
|
|
FD_ZERO(except_fds);
|
|
|
|
if (udp_socket.enable) {
|
|
high_sock = initFDSetsSocket(&udp_socket, read_fds, write_fds, except_fds, SOCK_DGRAM);
|
|
}
|
|
|
|
if (tcp_socket.enable) {
|
|
int sock = initFDSetsSocket(&tcp_socket, read_fds, write_fds, except_fds, SOCK_STREAM);
|
|
if (sock > high_sock)
|
|
high_sock = sock;
|
|
}
|
|
|
|
return high_sock;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
int SyslogCollectorInterface::handleNewConnection() {
|
|
char client_ipv4_str[INET_ADDRSTRLEN];
|
|
struct sockaddr_in client_addr;
|
|
socklen_t client_len = sizeof(client_addr);
|
|
int new_client_sock;
|
|
int i;
|
|
|
|
memset(&client_addr, 0, sizeof(client_addr));
|
|
|
|
new_client_sock = accept(tcp_socket.sock, (struct sockaddr *) &client_addr, &client_len);
|
|
|
|
if(new_client_sock < 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "accept() failure");
|
|
return -1;
|
|
}
|
|
|
|
inet_ntop(AF_INET, &client_addr.sin_addr, client_ipv4_str, INET_ADDRSTRLEN);
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Incoming connection from %s:%d", client_ipv4_str, client_addr.sin_port);
|
|
|
|
for(i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) {
|
|
if(tcp_connections[i].socket == 0) {
|
|
tcp_connections[i].socket = new_client_sock;
|
|
tcp_connections[i].address = client_addr;
|
|
snprintf(tcp_connections[i].ip_str, sizeof(tcp_connections[i].ip_str), "%s", client_ipv4_str);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Too many connections. Closing connection from %s:%d",
|
|
client_ipv4_str, client_addr.sin_port);
|
|
|
|
close(new_client_sock);
|
|
|
|
return -1;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::closeConnection(syslog_client *client) {
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Closing client socket for %s:%d\n",
|
|
client->ip_str, client->address.sin_port);
|
|
|
|
close(client->socket);
|
|
client->socket = 0;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
#ifdef USE_RECVLINE
|
|
int SyslogCollectorInterface::recvLine(int socket, char *buffer, size_t n) {
|
|
ssize_t num_read;
|
|
size_t tot_read = 0;
|
|
char c;
|
|
|
|
while (tot_read < n - 1) {
|
|
|
|
num_read = read(socket, &c, 1);
|
|
|
|
if (num_read == -1) {
|
|
if (errno == EINTR) continue;
|
|
else return -1;
|
|
} else if (num_read == 0) { /* EOF */
|
|
if (tot_read == 0) return 0;
|
|
else break;
|
|
} else {
|
|
if (tot_read < n - 1) {
|
|
tot_read++;
|
|
*buffer++ = c;
|
|
}
|
|
if (c == '\n') break;
|
|
}
|
|
}
|
|
|
|
*buffer = '\0';
|
|
|
|
return tot_read;
|
|
}
|
|
#endif
|
|
|
|
/* **************************************************** */
|
|
|
|
int SyslogCollectorInterface::receive(int socket, char *client_ip, bool use_recvfrom) {
|
|
char buffer[8192];
|
|
int len, received_total = 0;
|
|
int buffer_size = sizeof(buffer) - 1;
|
|
char *line, *pos;
|
|
struct sockaddr_in client_addr;
|
|
socklen_t client_addr_len = sizeof(client_addr);
|
|
char ip_str[INET_ADDRSTRLEN];
|
|
|
|
// Note: do not loop to handle other clients in round robin
|
|
// and also handle purgeIdle
|
|
//do {
|
|
|
|
if (use_recvfrom)
|
|
len = recvfrom(socket,
|
|
#ifndef WIN32
|
|
(void *)
|
|
#endif
|
|
buffer, buffer_size,
|
|
#ifndef WIN32
|
|
MSG_DONTWAIT
|
|
#else
|
|
0
|
|
#endif
|
|
, (struct sockaddr *) &client_addr, &client_addr_len);
|
|
else
|
|
#ifdef USE_RECVLINE
|
|
/* Read single line to avoid splitting lines across chunks */
|
|
len = recvLine(socket, (char *) buffer, buffer_size);
|
|
#else
|
|
len = recv(socket, (char *) buffer, buffer_size,
|
|
#ifndef WIN32
|
|
MSG_DONTWAIT
|
|
#else
|
|
0
|
|
#endif
|
|
);
|
|
#endif
|
|
|
|
if(len < 0) {
|
|
if(errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
ntop->getTrace()->traceEvent(TRACE_INFO, "Client is not ready (%d)", errno);
|
|
goto done;
|
|
} else {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "Client error");
|
|
return -1;
|
|
}
|
|
|
|
} else if(len == 0) {
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Client shutdown");
|
|
return -1;
|
|
|
|
} else if(len > 0) {
|
|
if (client_ip == NULL && use_recvfrom) {
|
|
inet_ntop(AF_INET, &client_addr.sin_addr, ip_str, sizeof(ip_str));
|
|
client_ip = ip_str;
|
|
}
|
|
received_total += len;
|
|
buffer[len] = '\0';
|
|
line = strtok_r(buffer, "\n", &pos);
|
|
while (line) {
|
|
recvStats.num_flows += parseLog(line, client_ip);
|
|
line = strtok_r(NULL, "\n", &pos);
|
|
}
|
|
}
|
|
|
|
//} while (len > 0);
|
|
|
|
done:
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_INFO, "Total received bytes: %u", received_total);
|
|
|
|
return 0;
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
int SyslogCollectorInterface::receiveFromClient(syslog_client *client) {
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_INFO, "Trying to receive from %s:%d",
|
|
client->ip_str, client->address.sin_port);
|
|
|
|
return receive(client->socket, client->ip_str, false);
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::collect_events() {
|
|
u_int32_t max_num_polls_before_purge = MAX_SYSLOG_POLLS_BEFORE_PURGE;
|
|
fd_set read_fds, write_fds, except_fds;
|
|
struct timeval timeout;
|
|
time_t now, next_purge_idle = time(NULL) + FLOW_PURGE_FREQUENCY;;
|
|
int high_sock;
|
|
int i, rc;
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Collecting events on %s", ifname);
|
|
|
|
while (isRunning()) {
|
|
while(idle()) {
|
|
purgeIdle(time(NULL));
|
|
sleep(1);
|
|
if(ntop->getGlobals()->isShutdown()) return;
|
|
}
|
|
|
|
high_sock = initFDSets(&read_fds, &write_fds, &except_fds);
|
|
|
|
timeout.tv_sec = MAX_SYSLOG_POLL_WAIT_MS/1000;
|
|
timeout.tv_usec = (MAX_SYSLOG_POLL_WAIT_MS%1000)*1000;
|
|
|
|
rc = select(high_sock + 1, &read_fds, &write_fds, &except_fds, &timeout);
|
|
|
|
now = time(NULL);
|
|
max_num_polls_before_purge--;
|
|
if(rc == 0 || now >= next_purge_idle || max_num_polls_before_purge == 0) {
|
|
purgeIdle(now);
|
|
next_purge_idle = now + FLOW_PURGE_FREQUENCY;
|
|
max_num_polls_before_purge = MAX_SYSLOG_POLLS_BEFORE_PURGE;
|
|
}
|
|
|
|
if(rc > 0) {
|
|
|
|
if (udp_socket.enable){
|
|
if (FD_ISSET(udp_socket.sock, &read_fds)) {
|
|
if(receive(udp_socket.sock, NULL, true) != 0)
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "Error receiving from UDP socket fd");
|
|
}
|
|
|
|
if(FD_ISSET(udp_socket.sock, &except_fds))
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on listen UDP socket fd");
|
|
}
|
|
|
|
if (tcp_socket.enable) {
|
|
if (FD_ISSET(tcp_socket.sock, &read_fds)) {
|
|
handleNewConnection();
|
|
}
|
|
|
|
if(FD_ISSET(tcp_socket.sock, &except_fds))
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on listen TCP socket fd");
|
|
|
|
for(i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) {
|
|
if(tcp_connections[i].socket != 0 && FD_ISSET(tcp_connections[i].socket, &read_fds)) {
|
|
if(receiveFromClient(&tcp_connections[i]) != 0) {
|
|
closeConnection(&tcp_connections[i]);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if(tcp_connections[i].socket != 0 && FD_ISSET(tcp_connections[i].socket, &except_fds)) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on TCP client fd");
|
|
closeConnection(&tcp_connections[i]);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
ntop->getTrace()->traceEvent(TRACE_NORMAL, "Flow collection is over.");
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
static void* messagePollLoop(void* ptr) {
|
|
SyslogCollectorInterface *iface = (SyslogCollectorInterface*)ptr;
|
|
|
|
/* Wait until the initialization completes */
|
|
while(iface->isStartingUp()) sleep(1);
|
|
|
|
iface->collect_events();
|
|
|
|
return(NULL);
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::startPacketPolling() {
|
|
pthread_create(&pollLoop, NULL, messagePollLoop, (void*)this);
|
|
pollLoopCreated = true;
|
|
|
|
SyslogParserInterface::startPacketPolling();
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::shutdown() {
|
|
void *res;
|
|
|
|
if(running) {
|
|
NetworkInterface::shutdown();
|
|
pthread_join(pollLoop, &res);
|
|
}
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
bool SyslogCollectorInterface::set_packet_filter(char *filter) {
|
|
ntop->getTrace()->traceEvent(TRACE_ERROR,
|
|
"No filter can be set on a collector interface. Ignored %s", filter);
|
|
return(false);
|
|
}
|
|
|
|
/* **************************************************** */
|
|
|
|
void SyslogCollectorInterface::lua(lua_State* vm) {
|
|
SyslogParserInterface::lua(vm);
|
|
|
|
lua_push_bool_table_entry(vm, "isSyslog", true);
|
|
|
|
lua_newtable(vm);
|
|
lua_push_uint64_table_entry(vm, "flows", recvStats.num_flows);
|
|
lua_pushstring(vm, "syslogRecvStats");
|
|
lua_insert(vm, -2);
|
|
lua_settable(vm, -3);
|
|
}
|
|
|
|
#endif
|