Pull request 178: Fix sending deferred headers after StreamBlocked

Squashed commit of the following:

commit 7b93fde43d
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:13:53 2026 +0400

    Make flush do nothing by default

commit 791d1906a5
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:13:44 2026 +0400

    Do not send deferred headers in write method

commit 03075b729d
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:10:54 2026 +0400

    Flush at the start of data exchange

commit 13a420a4a7
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:05:41 2026 +0400

    Consume deferred headers in flush method and utilize WaitingWritable event for it

commit 08ccb5b216
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:04:15 2026 +0400

    Remove consuming deferred headers from wait_writable

commit fb1305d264
Author: boommy <a.yakushin>
Date:   Tue Mar 3 17:02:12 2026 +0400

    Extend ulimit for bench remote container
This commit is contained in:
Andrey Yakushin 2026-03-11 09:48:37 +00:00
parent 220cbc4410
commit ab0e6713bc
3 changed files with 18 additions and 21 deletions

View file

@ -188,6 +188,7 @@ run() {
remote_container=$(docker run -d \
--hostname="$ENDPOINT_HOSTNAME" \
--network="$NETWORK_NAME" \
--ulimit nofile=65536:65536 \
"$REMOTE_IMAGE")
remote_ip=$(docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' "$remote_container")
fi

View file

@ -431,22 +431,6 @@ impl StreamSink {
Ok(())
}
async fn consume_pending_response(&mut self) -> io::Result<()> {
while self.pending_response.is_some() {
self.try_send_pending_response()?;
if self.pending_response.is_some() {
self.codec_tx
.send(StreamMessage::WaitingWritable(self.stream_id))
.map_err(|_| io::Error::from(ErrorKind::UnexpectedEof))?;
match self.writable_event_rx.recv().await {
None => return Err(io::Error::from(ErrorKind::UnexpectedEof)),
Some(_) => continue,
}
}
}
Ok(())
}
async fn wait_body_capacity(&mut self) -> io::Result<()> {
loop {
match self.socket.stream_capacity(self.stream_id) {
@ -508,10 +492,9 @@ impl pipe::Sink for StreamSink {
}
fn write(&mut self, data: Bytes) -> io::Result<Bytes> {
self.try_send_pending_response()?;
if self.pending_response.is_some() {
log_id!(
debug,
error,
self.id,
"Body write deferred: response headers not yet sent"
);
@ -542,9 +525,21 @@ impl pipe::Sink for StreamSink {
}
async fn wait_writable(&mut self) -> io::Result<()> {
self.consume_pending_response().await?;
self.wait_body_capacity().await
}
async fn flush(&mut self) -> io::Result<()> {
while self.pending_response.is_some() {
self.codec_tx
.send(StreamMessage::WaitingWritable(self.stream_id))
.map_err(|_| io::Error::from(ErrorKind::UnexpectedEof))?;
match self.writable_event_rx.recv().await {
None => return Err(io::Error::from(ErrorKind::UnexpectedEof)),
Some(_) => self.try_send_pending_response()?,
}
}
Ok(())
}
}
impl http_codec::DroppingSink for StreamSink {

View file

@ -70,9 +70,9 @@ pub(crate) trait Sink: Send {
async fn wait_writable(&mut self) -> io::Result<()>;
/// Flush all intermediately buffered contents.
/// By default, just waits for the writable state.
/// By default, does nothing.
async fn flush(&mut self) -> io::Result<()> {
self.wait_writable().await
Ok(())
}
}
@ -149,6 +149,7 @@ impl<F: Fn(SimplexDirection, usize) + Send> SimplexPipe<F> {
self.last_activity = Instant::now();
let future = async {
self.sink.flush().await?;
if self.pending_chunk.is_none() {
let x = self.source.read().await?;
log_dir!(trace, self.source.id(), self.direction, "TCP data: {}", x);