diff --git a/.coveragerc b/.coveragerc index c01b51d..7781a1d 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,3 +1,2 @@ [run] -concurrency = multiprocessing -parallel = True \ No newline at end of file +concurrency = multiprocessing \ No newline at end of file diff --git a/nfstream/streamer.py b/nfstream/streamer.py index 1883647..f32267a 100644 --- a/nfstream/streamer.py +++ b/nfstream/streamer.py @@ -345,7 +345,7 @@ class NFStreamer(object): conn_cache = {} request_cache = {"chrome": RequestCache(timeout=(self.idle_timeout + self.active_timeout) * 1000), "firefox": RequestCache(timeout=(self.idle_timeout + self.active_timeout) * 1000)} - channel = self._mp_context.JoinableQueue(maxsize=32767) # Backpressure strategy. + channel = self._mp_context.Queue(maxsize=32767) # Backpressure strategy. # We set it to (2^15-1) to cope with OSX max semaphore value. n_meters = self.n_meters group_id = os.getpid() + self._idx # Used for fanout on Linux systems @@ -436,10 +436,9 @@ class NFStreamer(object): for i in range(n_meters): # We break workflow loop meters[i].terminate() break - while not channel.empty(): # Fix join hangs on Windows platforms when Queue is not empty. - channel.get() # https://docs.python.org/3.6/library/multiprocessing.html#pipes-and-queues for i in range(n_meters): - meters[i].join() # Join metering jobs + if meters[i].is_alive(): + meters[i].join() # Join metering jobs if self._mode == 1 and self.performance_report > 0: rt.stop() if self._mode == 1 and self.system_visibility_mode > 0: