From ab0e6713bc165f2cf5071554b551ea88cfb38313 Mon Sep 17 00:00:00 2001 From: Andrey Yakushin Date: Wed, 11 Mar 2026 09:48:37 +0000 Subject: [PATCH] Pull request 178: Fix sending deferred headers after StreamBlocked Squashed commit of the following: commit 7b93fde43d86b7398f6e684dd33c853483f7415a Author: boommy Date: Tue Mar 3 17:13:53 2026 +0400 Make flush do nothing by default commit 791d1906a53fe38747279fcf2e3788a71493da43 Author: boommy Date: Tue Mar 3 17:13:44 2026 +0400 Do not send deferred headers in write method commit 03075b729d787a13311b4a8006fa70c41014cf41 Author: boommy Date: Tue Mar 3 17:10:54 2026 +0400 Flush at the start of data exchange commit 13a420a4a73b1298a99e01cbfa6ea3e641495ce9 Author: boommy Date: Tue Mar 3 17:05:41 2026 +0400 Consume deferred headers in flush method and utilize WaitingWritable event for it commit 08ccb5b21613004e738746e906c65b6b6d1d2c76 Author: boommy Date: Tue Mar 3 17:04:15 2026 +0400 Remove consuming deferred headers from wait_writable commit fb1305d2646ce2c1c4f42492ce1dbeef62046a70 Author: boommy Date: Tue Mar 3 17:02:12 2026 +0400 Extend ulimit for bench remote container --- bench/single_host.sh | 1 + lib/src/http3_codec.rs | 33 ++++++++++++++------------------- lib/src/pipe.rs | 5 +++-- 3 files changed, 18 insertions(+), 21 deletions(-) diff --git a/bench/single_host.sh b/bench/single_host.sh index 7feba16..4f84629 100755 --- a/bench/single_host.sh +++ b/bench/single_host.sh @@ -188,6 +188,7 @@ run() { remote_container=$(docker run -d \ --hostname="$ENDPOINT_HOSTNAME" \ --network="$NETWORK_NAME" \ + --ulimit nofile=65536:65536 \ "$REMOTE_IMAGE") remote_ip=$(docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$remote_container") fi diff --git a/lib/src/http3_codec.rs b/lib/src/http3_codec.rs index 47fe893..df1ed9b 100644 --- a/lib/src/http3_codec.rs +++ b/lib/src/http3_codec.rs @@ -431,22 +431,6 @@ impl StreamSink { Ok(()) } - async fn consume_pending_response(&mut self) -> io::Result<()> { - while self.pending_response.is_some() { - self.try_send_pending_response()?; - if self.pending_response.is_some() { - self.codec_tx - .send(StreamMessage::WaitingWritable(self.stream_id)) - .map_err(|_| io::Error::from(ErrorKind::UnexpectedEof))?; - match self.writable_event_rx.recv().await { - None => return Err(io::Error::from(ErrorKind::UnexpectedEof)), - Some(_) => continue, - } - } - } - Ok(()) - } - async fn wait_body_capacity(&mut self) -> io::Result<()> { loop { match self.socket.stream_capacity(self.stream_id) { @@ -508,10 +492,9 @@ impl pipe::Sink for StreamSink { } fn write(&mut self, data: Bytes) -> io::Result { - self.try_send_pending_response()?; if self.pending_response.is_some() { log_id!( - debug, + error, self.id, "Body write deferred: response headers not yet sent" ); @@ -542,9 +525,21 @@ impl pipe::Sink for StreamSink { } async fn wait_writable(&mut self) -> io::Result<()> { - self.consume_pending_response().await?; self.wait_body_capacity().await } + + async fn flush(&mut self) -> io::Result<()> { + while self.pending_response.is_some() { + self.codec_tx + .send(StreamMessage::WaitingWritable(self.stream_id)) + .map_err(|_| io::Error::from(ErrorKind::UnexpectedEof))?; + match self.writable_event_rx.recv().await { + None => return Err(io::Error::from(ErrorKind::UnexpectedEof)), + Some(_) => self.try_send_pending_response()?, + } + } + Ok(()) + } } impl http_codec::DroppingSink for StreamSink { diff --git a/lib/src/pipe.rs b/lib/src/pipe.rs index 3e71203..f3b8b34 100644 --- a/lib/src/pipe.rs +++ b/lib/src/pipe.rs @@ -70,9 +70,9 @@ pub(crate) trait Sink: Send { async fn wait_writable(&mut self) -> io::Result<()>; /// Flush all intermediately buffered contents. - /// By default, just waits for the writable state. + /// By default, does nothing. async fn flush(&mut self) -> io::Result<()> { - self.wait_writable().await + Ok(()) } } @@ -149,6 +149,7 @@ impl SimplexPipe { self.last_activity = Instant::now(); let future = async { + self.sink.flush().await?; if self.pending_chunk.is_none() { let x = self.source.read().await?; log_dir!(trace, self.source.id(), self.direction, "TCP data: {}", x);