/* * * (C) 2013-25 - ntop.org * * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software Foundation, * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * */ #include "ntop_includes.h" #ifndef HAVE_NEDGE /* **************************************************** */ bool SyslogCollectorInterface::openSocket(syslog_socket *ss, const char *server_address, int server_port, int protocol) { struct sockaddr_in listen_addr; int reuse = 1; if(trace_new_delete) ntop->getTrace()->traceEvent(TRACE_NORMAL, "[new] %s", __FILE__); ntop->getTrace()->traceEvent(TRACE_NORMAL, "Starting %s syslog collector on %s:%d", protocol == SOCK_DGRAM ? "UDP" : "TCP", server_address, server_port); ss->sock = Utils::openSocket(AF_INET, protocol, 0, "SyslogCollectorInterface"); if (ss->sock < 0) { ntop->getTrace()->traceEvent(TRACE_ERROR, "socket error"); return false; } /* Allow to re-bind in case previous instance died */ if (setsockopt(ss->sock, SOL_SOCKET, SO_REUSEADDR, #ifdef WIN32 (const char *) #endif &reuse, sizeof(reuse)) != 0) { ntop->getTrace()->traceEvent(TRACE_ERROR, "setsockopt error"); return false; } memset(&listen_addr, 0, sizeof(listen_addr)); listen_addr.sin_family = AF_INET; listen_addr.sin_addr.s_addr = inet_addr(server_address); listen_addr.sin_port = htons(server_port); if (::bind(ss->sock, (struct sockaddr *)&listen_addr, sizeof(struct sockaddr)) != 0) { ntop->getTrace()->traceEvent(TRACE_ERROR, "bind error"); return false; } if (protocol == SOCK_STREAM) { if (listen(ss->sock, MAX_SYSLOG_SUBSCRIBERS) != 0) { ntop->getTrace()->traceEvent(TRACE_ERROR, "listen error"); return false; } memset(tcp_connections, 0, sizeof(tcp_connections)); } ntop->getTrace()->traceEvent( TRACE_NORMAL, "Accepting %s connections on %s:%d", protocol == SOCK_DGRAM ? "UDP" : "TCP", server_address, server_port); return true; } /* **************************************************** */ void SyslogCollectorInterface::closeSocket(syslog_socket *ss, int protocol) { close(ss->sock); if (protocol == SOCK_STREAM) { for (int i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) if (tcp_connections[i].socket != 0) Utils::closeSocket(tcp_connections[i].socket); } } /* **************************************************** */ SyslogCollectorInterface::SyslogCollectorInterface(const char *_endpoint) : SyslogParserInterface(_endpoint) { char *tmp, *pos, *port, *address, *protocol; const char *server_address; int server_port; udp_socket.enable = true; tcp_socket.enable = true; endpoint = strdup(_endpoint); if (endpoint == NULL) throw("memory allocation error"); tmp = strdup(_endpoint); if (tmp == NULL) throw("memory allocation error"); /* * Interface name format: * syslog://:[@{udp,tcp}] */ if (strncmp(tmp, (char *)"syslog://", 9) == 0) { address = &tmp[9]; } else { address = tmp; } pos = strchr(address, '@'); if (pos != NULL) { pos[0] = '\0'; pos++; protocol = pos; if (strcmp(protocol, "udp") == 0) tcp_socket.enable = false; else if (strcmp(protocol, "tcp") == 0) udp_socket.enable = false; } port = strchr(address, ':'); if (port != NULL) { port[0] = '\0'; port++; server_port = atoi(port); } else { throw("bad tcp bind address format"); } if (strcmp(address, "*") == 0) { /* any address */ server_address = "0.0.0.0"; } else { server_address = address; } if (udp_socket.enable) if (!openSocket(&udp_socket, server_address, server_port, SOCK_DGRAM)) throw("Error opening socket"); if (tcp_socket.enable) if (!openSocket(&tcp_socket, server_address, server_port, SOCK_STREAM)) throw("Error opening socket"); free(tmp); } /* **************************************************** */ SyslogCollectorInterface::~SyslogCollectorInterface() { if (udp_socket.enable) closeSocket(&udp_socket, SOCK_DGRAM); if (tcp_socket.enable) closeSocket(&tcp_socket, SOCK_STREAM); free(endpoint); } /* **************************************************** */ /* set FDs and returns the max sock */ int SyslogCollectorInterface::initFDSetsSocket(syslog_socket *ss, fd_set *read_fds, fd_set *write_fds, fd_set *except_fds, int protocol) { int high_sock = ss->sock; FD_SET(ss->sock, read_fds); FD_SET(ss->sock, except_fds); if (protocol == SOCK_STREAM) { for (int i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) { if (tcp_connections[i].socket != 0) { FD_SET(tcp_connections[i].socket, read_fds); FD_SET(tcp_connections[i].socket, except_fds); if (tcp_connections[i].socket > high_sock) high_sock = tcp_connections[i].socket; } } } return high_sock; } /* **************************************************** */ /* set FDs and returns the max sock */ int SyslogCollectorInterface::initFDSets(fd_set *read_fds, fd_set *write_fds, fd_set *except_fds) { int high_sock = 0; FD_ZERO(read_fds); FD_ZERO(write_fds); FD_ZERO(except_fds); if (udp_socket.enable) { high_sock = initFDSetsSocket(&udp_socket, read_fds, write_fds, except_fds, SOCK_DGRAM); } if (tcp_socket.enable) { int sock = initFDSetsSocket(&tcp_socket, read_fds, write_fds, except_fds, SOCK_STREAM); if (sock > high_sock) high_sock = sock; } return high_sock; } /* **************************************************** */ int SyslogCollectorInterface::handleNewConnection() { char client_ipv4_str[INET_ADDRSTRLEN]; struct sockaddr_in client_addr; socklen_t client_len = sizeof(client_addr); int new_client_sock; int i; memset(&client_addr, 0, sizeof(client_addr)); new_client_sock = accept(tcp_socket.sock, (struct sockaddr *)&client_addr, &client_len); if (new_client_sock < 0) { ntop->getTrace()->traceEvent(TRACE_ERROR, "accept() failure"); return -1; } inet_ntop(AF_INET, &client_addr.sin_addr, client_ipv4_str, INET_ADDRSTRLEN); ntop->getTrace()->traceEvent(TRACE_NORMAL, "Incoming connection from %s:%d", client_ipv4_str, client_addr.sin_port); for (i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) { if (tcp_connections[i].socket == 0) { tcp_connections[i].socket = new_client_sock; tcp_connections[i].address = client_addr; snprintf(tcp_connections[i].ip_str, sizeof(tcp_connections[i].ip_str), "%s", client_ipv4_str); return 0; } } ntop->getTrace()->traceEvent( TRACE_NORMAL, "Too many connections. Closing connection from %s:%d", client_ipv4_str, client_addr.sin_port); close(new_client_sock); return -1; } /* **************************************************** */ void SyslogCollectorInterface::closeConnection(syslog_client *client) { ntop->getTrace()->traceEvent(TRACE_NORMAL, "Closing client socket for %s:%d\n", client->ip_str, client->address.sin_port); close(client->socket); client->socket = 0; } /* **************************************************** */ #ifdef USE_RECVLINE int SyslogCollectorInterface::recvLine(int socket, char *buffer, size_t n) { ssize_t num_read; size_t tot_read = 0; char c; while (tot_read < n - 1) { num_read = read(socket, &c, 1); if (num_read == -1) { if (errno == EINTR) continue; else return -1; } else if (num_read == 0) { /* EOF */ if (tot_read == 0) return 0; else break; } else { if (tot_read < n - 1) { tot_read++; *buffer++ = c; } if (c == '\n') break; } } *buffer = '\0'; return tot_read; } #endif /* **************************************************** */ int SyslogCollectorInterface::receive(int socket, char *client_ip, bool use_recvfrom) { char buffer[8192]; int len, received_total = 0; int buffer_size = sizeof(buffer) - 1; char *line, *pos; struct sockaddr_in client_addr; socklen_t client_addr_len = sizeof(client_addr); char ip_str[INET_ADDRSTRLEN]; // Note: do not loop to handle other clients in round robin // and also handle purgeIdle // do { if (use_recvfrom) len = recvfrom(socket, #ifndef WIN32 (void *) #endif buffer, buffer_size, #ifndef WIN32 MSG_DONTWAIT #else 0 #endif , (struct sockaddr *)&client_addr, &client_addr_len); else #ifdef USE_RECVLINE /* Read single line to avoid splitting lines across chunks */ len = recvLine(socket, (char *)buffer, buffer_size); #else len = recv(socket, (char *)buffer, buffer_size, #ifndef WIN32 MSG_DONTWAIT #else 0 #endif ); #endif if (len < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { ntop->getTrace()->traceEvent(TRACE_INFO, "Client is not ready (%d)", errno); goto done; } else { ntop->getTrace()->traceEvent(TRACE_ERROR, "Client error"); return -1; } } else if (len == 0) { ntop->getTrace()->traceEvent(TRACE_NORMAL, "Client shutdown"); return -1; } else if (len > 0) { if (client_ip == NULL && use_recvfrom) { inet_ntop(AF_INET, &client_addr.sin_addr, ip_str, sizeof(ip_str)); client_ip = ip_str; } received_total += len; buffer[len] = '\0'; line = strtok_r(buffer, "\n", &pos); while (line) { recvStats.num_flows += parseLog(line, client_ip); line = strtok_r(NULL, "\n", &pos); } } //} while (len > 0); done: ntop->getTrace()->traceEvent(TRACE_INFO, "Total received bytes: %u", received_total); return 0; } /* **************************************************** */ int SyslogCollectorInterface::receiveFromClient(syslog_client *client) { ntop->getTrace()->traceEvent(TRACE_INFO, "Trying to receive from %s:%d", client->ip_str, client->address.sin_port); return receive(client->socket, client->ip_str, false); } /* **************************************************** */ void SyslogCollectorInterface::collect_events() { u_int32_t max_num_polls_before_purge = MAX_SYSLOG_POLLS_BEFORE_PURGE; fd_set read_fds, write_fds, except_fds; struct timeval timeout; time_t now, next_purge_idle = time(NULL) + FLOW_PURGE_FREQUENCY; ; int high_sock; int i, rc; ntop->getTrace()->traceEvent(TRACE_NORMAL, "Collecting events on %s", ifname); while (isRunning()) { while (idle()) { purgeIdle(time(NULL)); sleep(1); if (ntop->getGlobals()->isShutdown()) return; } high_sock = initFDSets(&read_fds, &write_fds, &except_fds); timeout.tv_sec = MAX_SYSLOG_POLL_WAIT_MS / 1000; timeout.tv_usec = (MAX_SYSLOG_POLL_WAIT_MS % 1000) * 1000; rc = select(high_sock + 1, &read_fds, &write_fds, &except_fds, &timeout); now = time(NULL); max_num_polls_before_purge--; if (rc == 0 || now >= next_purge_idle || max_num_polls_before_purge == 0) { purgeIdle(now); next_purge_idle = now + FLOW_PURGE_FREQUENCY; max_num_polls_before_purge = MAX_SYSLOG_POLLS_BEFORE_PURGE; } if (rc > 0) { if (udp_socket.enable) { if (FD_ISSET(udp_socket.sock, &read_fds)) { if (receive(udp_socket.sock, NULL, true) != 0) ntop->getTrace()->traceEvent(TRACE_ERROR, "Error receiving from UDP socket fd"); } if (FD_ISSET(udp_socket.sock, &except_fds)) ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on listen UDP socket fd"); } if (tcp_socket.enable) { if (FD_ISSET(tcp_socket.sock, &read_fds)) { handleNewConnection(); } if (FD_ISSET(tcp_socket.sock, &except_fds)) ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on listen TCP socket fd"); for (i = 0; i < MAX_SYSLOG_SUBSCRIBERS; ++i) { if (tcp_connections[i].socket != 0 && FD_ISSET(tcp_connections[i].socket, &read_fds)) { if (receiveFromClient(&tcp_connections[i]) != 0) { closeConnection(&tcp_connections[i]); continue; } } if (tcp_connections[i].socket != 0 && FD_ISSET(tcp_connections[i].socket, &except_fds)) { ntop->getTrace()->traceEvent(TRACE_ERROR, "Exception on TCP client fd"); closeConnection(&tcp_connections[i]); } } } } } ntop->getTrace()->traceEvent(TRACE_NORMAL, "Flow collection is over."); } /* **************************************************** */ static void *messagePollLoop(void *ptr) { SyslogCollectorInterface *iface = (SyslogCollectorInterface *)ptr; /* Wait until the initialization completes */ while (iface->isStartingUp()) sleep(1); iface->collect_events(); return (NULL); } /* **************************************************** */ void SyslogCollectorInterface::startPacketPolling() { pthread_create(&pollLoop, NULL, messagePollLoop, (void *)this); pollLoopCreated = true; SyslogParserInterface::startPacketPolling(); } /* **************************************************** */ void SyslogCollectorInterface::shutdown() { void *res; if (running) { NetworkInterface::shutdown(); pthread_join(pollLoop, &res); } } /* **************************************************** */ bool SyslogCollectorInterface::set_packet_filter(char *filter) { ntop->getTrace()->traceEvent( TRACE_ERROR, "No filter can be set on a collector interface. Ignored %s", filter); return (false); } /* **************************************************** */ void SyslogCollectorInterface::lua(lua_State *vm, bool fullStats) { SyslogParserInterface::lua(vm, fullStats); lua_push_bool_table_entry(vm, "isSyslog", true); lua_newtable(vm); lua_push_uint64_table_entry(vm, "flows", recvStats.num_flows); lua_pushstring(vm, "syslogRecvStats"); lua_insert(vm, -2); lua_settable(vm, -3); } #endif