diff --git a/doc/README.logstash b/doc/README.logstash index 244d380685..9ca8a61ea0 100644 --- a/doc/README.logstash +++ b/doc/README.logstash @@ -23,7 +23,6 @@ Once started, ntopng will push to LS flows that are expired or periodically send Logstash configuration example : - input { tcp { host => "localhost" diff --git a/src/Logstash.cpp b/src/Logstash.cpp index 81f03ded23..75dcb5f877 100644 --- a/src/Logstash.cpp +++ b/src/Logstash.cpp @@ -146,8 +146,30 @@ void Logstash::sendLSdata() { char *portstr = NULL; int sendTCP = 1; struct sockaddr_in serv_addr; - int sockfd; + int sockfd = -1; int portno; + int retval; + int skipDequeue = 0; + int sent = 0; + size_t sentLength = 0; + + server = gethostbyname(ntop->getPrefs()->get_ls_host()); + portstr = ntop->getPrefs()->get_ls_port(); + + if(server == NULL || portstr == NULL){ + //can't send + return; + } + + proto = ntop->getPrefs()->get_ls_proto(); + if(proto && !strncmp(proto,"udp",3)){ + sendTCP = 0; + } + portno = atoi(portstr); + + bzero((char *) &serv_addr,sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + bcopy((char *) server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length); server = gethostbyname(ntop->getPrefs()->get_ls_host()); portstr = ntop->getPrefs()->get_ls_port(); @@ -174,78 +196,121 @@ void Logstash::sendLSdata() { u_int len, num_flows; len = 0, num_flows = 0; - listMutex.lock(__FILE__, __LINE__); - for(u_int i=0; (i min_buf_size); i++) { - struct string_list *prev; - if(!(tail && tail->str)){ - //No events in queue - break; - } - prev = tail->prev; - len += snprintf(&postbuf[len], sizeof(postbuf)-len, "%s\n", tail->str), num_flows++; - free(tail->str); - free(tail); - tail = prev; - num_queued_elems--; - if(num_queued_elems == 0) - head = NULL; - } /* for */ + if(sockfd<0){ + if(!sendTCP) { //UDP socket + sockfd = socket(AF_INET,SOCK_DGRAM, IPPROTO_UDP); + } else { //TCP socket + sockfd = socket(AF_INET, SOCK_STREAM, 0); + } - listMutex.unlock(__FILE__, __LINE__); - postbuf[len] = '\0'; + if(sockfd < 0) { + ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to create socket. Skipping dequeue"); + continue; + } + + //Set nonblocking + retval = fcntl(sockfd, F_SETFL, fcntl(sockfd,F_GETFL,0) | O_NONBLOCK); + if(retval == -1){ + ntop->getTrace()->traceEvent(TRACE_WARNING,"Error while setting NONBLOCK flag. Skipping dequeue."); + close(sockfd); + sockfd = -1; + sleep(1); + continue; + } + if(sendTCP + && (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) <0) + && (errno == ECONNREFUSED || errno == EALREADY || errno == EAGAIN + || errno == ENETUNREACH || errno == ETIMEDOUT) + ) { + ntop->getTrace()->traceEvent(TRACE_WARNING,"Could not connect to remote. Skipping dequeue.."); + sleep(1); + //close socket and reinitialize for next loop + close(sockfd); + sockfd = -1; + continue; + } + + } + if(skipDequeue == 1){ + //Next loop should start dequeuing again if all goes well + skipDequeue = 0; + } else { + listMutex.lock(__FILE__, __LINE__); + //clear buffer to get rid of garbage bytes + memset(&postbuf[0],0,sizeof(postbuf)); + + for(u_int i=0; (i min_buf_size); i++) { + struct string_list *prev; + if(!(tail && tail->str)){ + //No events in queue + break; + } + prev = tail->prev; + len += snprintf(&postbuf[len], sizeof(postbuf)-len, "%s\n", tail->str), num_flows++; + free(tail->str); + free(tail); + tail = prev; + num_queued_elems--; + + if(num_queued_elems == 0) + head = NULL; + } /* for */ + + listMutex.unlock(__FILE__, __LINE__); + postbuf[len] = '\0'; + } if(postbuf[0]!='{') { continue; } - if(!sendTCP) { //UDP socket - sockfd = socket(AF_INET,SOCK_DGRAM, IPPROTO_UDP); - } else { //TCP socket - sockfd = socket(AF_INET, SOCK_STREAM, 0); + + //try to send data + if(skipDequeue == 1){ + //Continue with the leftover data + } else { + sent = 0; + sentLength = sizeof(postbuf); + } + if(sendTCP){ + //TCP + while(sentLength > 0){ + retval = send(sockfd,postbuf,sentLength,0); + if(retval<=0){ + // Err occured + // don't clear postbuf as it hasn't been sent + // and skip dequeue next time + skipDequeue = 1; + break; + } + sent += retval; + sentLength -= retval; + } + } + else{ + //UDP + retval = sendto(sockfd, postbuf, sizeof(postbuf), 0, + (struct sockaddr *)&serv_addr, sizeof(serv_addr)); + if(retval<=0){ + skipDequeue = 1; + break; + } } - if(sockfd < 0) { - ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to create socket. Dropping %u flow(s)", num_flows); - /* Post failure */ - elkDroppedFlowsQueueTooLong += num_flows; - continue; - } - - //Set nonblocking - fcntl(sockfd, F_SETFL, MSG_DONTWAIT); - - if(sendTCP - && (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0)) { + if(skipDequeue == 1){ + //Sending most likely failed. sleep(1); - close(sockfd); - ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to connect to socket. Dropping %u flow(s)", num_flows); - elkDroppedFlowsQueueTooLong += num_flows; - continue; - } - - if( - (sendTCP - && (send(sockfd, postbuf, sizeof(postbuf), 0) < 0)) - || - (!sendTCP - && - (sendto(sockfd, postbuf, sizeof(postbuf), 0, - (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0) - )){ - sleep(1); - close(sockfd); - ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to send. Dropping %u flow(s)", num_flows); - elkDroppedFlowsQueueTooLong += num_flows; - continue; + continue; } //If all steps succeded, increment exported flows elkExportedFlows += num_flows; - close(sockfd); } else { sleep(1); } } /* while */ + + close(sockfd); }