Logstash code improvements (#1143)

* Logstash code cleanup

* removed unnecesary trace warnings from LS code

* sync

* Logstash readme updated + code cleanup

* changed back to default bulk mechanism

* Logstash extension code improvements

* PR #1143 revision
This commit is contained in:
Bogdan Perian 2017-04-10 15:30:07 +03:00 committed by simonemainardi
parent 38c9ddbb34
commit dbcec19d2b
2 changed files with 121 additions and 57 deletions

View file

@ -23,7 +23,6 @@ Once started, ntopng will push to LS flows that are expired or periodically send
Logstash configuration example :
input {
tcp {
host => "localhost"

View file

@ -146,8 +146,30 @@ void Logstash::sendLSdata() {
char *portstr = NULL;
int sendTCP = 1;
struct sockaddr_in serv_addr;
int sockfd;
int sockfd = -1;
int portno;
int retval;
int skipDequeue = 0;
int sent = 0;
size_t sentLength = 0;
server = gethostbyname(ntop->getPrefs()->get_ls_host());
portstr = ntop->getPrefs()->get_ls_port();
if(server == NULL || portstr == NULL){
//can't send
return;
}
proto = ntop->getPrefs()->get_ls_proto();
if(proto && !strncmp(proto,"udp",3)){
sendTCP = 0;
}
portno = atoi(portstr);
bzero((char *) &serv_addr,sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
bcopy((char *) server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length);
server = gethostbyname(ntop->getPrefs()->get_ls_host());
portstr = ntop->getPrefs()->get_ls_port();
@ -174,78 +196,121 @@ void Logstash::sendLSdata() {
u_int len, num_flows;
len = 0, num_flows = 0;
listMutex.lock(__FILE__, __LINE__);
for(u_int i=0; (i<watermark) && ((sizeof(postbuf)-len) > min_buf_size); i++) {
struct string_list *prev;
if(!(tail && tail->str)){
//No events in queue
break;
}
prev = tail->prev;
len += snprintf(&postbuf[len], sizeof(postbuf)-len, "%s\n", tail->str), num_flows++;
free(tail->str);
free(tail);
tail = prev;
num_queued_elems--;
if(num_queued_elems == 0)
head = NULL;
} /* for */
if(sockfd<0){
if(!sendTCP) { //UDP socket
sockfd = socket(AF_INET,SOCK_DGRAM, IPPROTO_UDP);
} else { //TCP socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
}
listMutex.unlock(__FILE__, __LINE__);
postbuf[len] = '\0';
if(sockfd < 0) {
ntop->getTrace()->traceEvent(TRACE_WARNING, "Unable to create socket. Skipping dequeue");
continue;
}
//Set nonblocking
retval = fcntl(sockfd, F_SETFL, fcntl(sockfd,F_GETFL,0) | O_NONBLOCK);
if(retval == -1){
ntop->getTrace()->traceEvent(TRACE_WARNING,"Error while setting NONBLOCK flag. Skipping dequeue.");
close(sockfd);
sockfd = -1;
sleep(1);
continue;
}
if(sendTCP
&& (connect(sockfd,(struct sockaddr *)&serv_addr,sizeof(serv_addr)) <0)
&& (errno == ECONNREFUSED || errno == EALREADY || errno == EAGAIN
|| errno == ENETUNREACH || errno == ETIMEDOUT)
) {
ntop->getTrace()->traceEvent(TRACE_WARNING,"Could not connect to remote. Skipping dequeue..");
sleep(1);
//close socket and reinitialize for next loop
close(sockfd);
sockfd = -1;
continue;
}
}
if(skipDequeue == 1){
//Next loop should start dequeuing again if all goes well
skipDequeue = 0;
} else {
listMutex.lock(__FILE__, __LINE__);
//clear buffer to get rid of garbage bytes
memset(&postbuf[0],0,sizeof(postbuf));
for(u_int i=0; (i<watermark) && ((sizeof(postbuf)-len) > min_buf_size); i++) {
struct string_list *prev;
if(!(tail && tail->str)){
//No events in queue
break;
}
prev = tail->prev;
len += snprintf(&postbuf[len], sizeof(postbuf)-len, "%s\n", tail->str), num_flows++;
free(tail->str);
free(tail);
tail = prev;
num_queued_elems--;
if(num_queued_elems == 0)
head = NULL;
} /* for */
listMutex.unlock(__FILE__, __LINE__);
postbuf[len] = '\0';
}
if(postbuf[0]!='{') {
continue;
}
if(!sendTCP) { //UDP socket
sockfd = socket(AF_INET,SOCK_DGRAM, IPPROTO_UDP);
} else { //TCP socket
sockfd = socket(AF_INET, SOCK_STREAM, 0);
//try to send data
if(skipDequeue == 1){
//Continue with the leftover data
} else {
sent = 0;
sentLength = sizeof(postbuf);
}
if(sendTCP){
//TCP
while(sentLength > 0){
retval = send(sockfd,postbuf,sentLength,0);
if(retval<=0){
// Err occured
// don't clear postbuf as it hasn't been sent
// and skip dequeue next time
skipDequeue = 1;
break;
}
sent += retval;
sentLength -= retval;
}
}
else{
//UDP
retval = sendto(sockfd, postbuf, sizeof(postbuf), 0,
(struct sockaddr *)&serv_addr, sizeof(serv_addr));
if(retval<=0){
skipDequeue = 1;
break;
}
}
if(sockfd < 0) {
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to create socket. Dropping %u flow(s)", num_flows);
/* Post failure */
elkDroppedFlowsQueueTooLong += num_flows;
continue;
}
//Set nonblocking
fcntl(sockfd, F_SETFL, MSG_DONTWAIT);
if(sendTCP
&& (connect(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0)) {
if(skipDequeue == 1){
//Sending most likely failed.
sleep(1);
close(sockfd);
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to connect to socket. Dropping %u flow(s)", num_flows);
elkDroppedFlowsQueueTooLong += num_flows;
continue;
}
if(
(sendTCP
&& (send(sockfd, postbuf, sizeof(postbuf), 0) < 0))
||
(!sendTCP
&&
(sendto(sockfd, postbuf, sizeof(postbuf), 0,
(struct sockaddr*)&serv_addr, sizeof(serv_addr)) < 0)
)){
sleep(1);
close(sockfd);
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to send. Dropping %u flow(s)", num_flows);
elkDroppedFlowsQueueTooLong += num_flows;
continue;
continue;
}
//If all steps succeded, increment exported flows
elkExportedFlows += num_flows;
close(sockfd);
} else {
sleep(1);
}
} /* while */
close(sockfd);
}