g3bench: use atomic-waker for keyless multiplex connection

This commit is contained in:
Zhang Jingqiang 2023-11-13 11:08:49 +08:00
parent 516c8f0605
commit bebb4eeeeb
4 changed files with 17 additions and 49 deletions

1
Cargo.lock generated
View file

@ -1350,6 +1350,7 @@ dependencies = [
"ahash",
"anyhow",
"async-trait",
"atomic-waker",
"bytes",
"cadence-with-flush",
"clap",

View file

@ -24,6 +24,7 @@ h3-quinn.workspace = true
quinn = { workspace = true, features = ["tls-rustls", "runtime-tokio"] }
bytes.workspace = true
futures-util.workspace = true
atomic-waker.workspace = true
tokio-openssl.workspace = true
openssl.workspace = true
openssl-probe = { workspace = true, optional = true }

View file

@ -19,10 +19,11 @@ use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex};
use std::task::{ready, Context, Poll, Waker};
use std::time::Duration;
use atomic_waker::AtomicWaker;
use concurrent_queue::{ConcurrentQueue, PopError, PushError};
use rustc_hash::FxHashMap;
use tokio::io::{AsyncRead, AsyncWrite};
@ -58,7 +59,7 @@ impl ResponseValue {
}
struct SharedState {
write_waker: RwLock<Option<Waker>>,
write_waker: AtomicWaker,
next_req_id: AtomicU32,
req_queue: ConcurrentQueue<(KeylessRequest, Waker)>,
rsp_table: Mutex<FxHashMap<u32, ResponseValue>>,
@ -95,15 +96,14 @@ impl SharedState {
}
fn take_write_waker(&self) -> Option<Waker> {
let mut guard = self.write_waker.write().unwrap();
guard.take()
self.write_waker.take()
}
}
impl Default for SharedState {
fn default() -> Self {
SharedState {
write_waker: RwLock::new(None),
write_waker: AtomicWaker::new(),
next_req_id: AtomicU32::new(0),
req_queue: ConcurrentQueue::bounded(1024),
rsp_table: Mutex::new(FxHashMap::default()),
@ -113,7 +113,6 @@ impl Default for SharedState {
}
struct UnderlyingWriterState {
init: bool,
shared: Arc<SharedState>,
current_offset: usize,
current_request: Option<KeylessRequest>,
@ -126,12 +125,7 @@ impl UnderlyingWriterState {
where
W: AsyncWrite + Unpin,
{
if self.init {
let mut waker = self.shared.write_waker.write().unwrap();
*waker = Some(cx.waker().clone());
drop(waker);
self.init = false;
}
self.shared.write_waker.register(cx.waker());
let mut do_flush = false;
loop {
@ -221,33 +215,8 @@ where
}
}
struct WaitWriteWaker {
shared: Arc<SharedState>,
local_addr: SocketAddr,
}
impl Future for WaitWriteWaker {
type Output = MultiplexTransfer;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let underlying_waker_guard = self.shared.write_waker.read().unwrap();
match &*underlying_waker_guard {
Some(waker) => Poll::Ready(MultiplexTransfer {
shared: self.shared.clone(),
writer_waker: waker.clone(),
local_addr: self.local_addr,
}),
None => {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
}
pub(crate) struct SendRequest {
shared: Arc<SharedState>,
writer_waker: Waker,
request: Option<KeylessRequest>,
rsp_id: u32,
}
@ -262,7 +231,7 @@ impl Future for SendRequest {
req.set_id(id);
match self.shared.req_queue.push((req, rsp_waker)) {
Ok(_) => {
self.writer_waker.wake_by_ref();
self.shared.write_waker.wake();
self.rsp_id = id;
Poll::Pending
}
@ -291,7 +260,6 @@ impl Future for SendRequest {
pub(crate) struct MultiplexTransfer {
shared: Arc<SharedState>,
writer_waker: Waker,
local_addr: SocketAddr,
}
@ -317,7 +285,6 @@ impl MultiplexTransfer {
pub(crate) fn send_request(&self, req: KeylessRequest) -> SendRequest {
SendRequest {
shared: self.shared.clone(),
writer_waker: self.writer_waker.clone(),
request: Some(req),
rsp_id: 0,
}
@ -328,7 +295,7 @@ impl MultiplexTransfer {
guard.clone()
}
pub(crate) async fn start<R, W>(
pub(crate) fn start<R, W>(
mut r: R,
w: W,
local_addr: SocketAddr,
@ -339,11 +306,14 @@ impl MultiplexTransfer {
W: AsyncWrite + Send + Unpin + 'static,
{
let shared = Arc::new(SharedState::default());
let handle = MultiplexTransfer {
shared: shared.clone(),
local_addr,
};
let underlying_w = UnderlyingWriter {
writer: w,
state: UnderlyingWriterState {
init: true,
shared: Arc::clone(&shared),
current_offset: 0,
current_request: None,
@ -352,10 +322,6 @@ impl MultiplexTransfer {
},
};
tokio::spawn(underlying_w);
let wait_waiter = WaitWriteWaker {
shared: shared.clone(),
local_addr,
};
let clean_shared = shared.clone();
tokio::spawn(async move {
@ -409,6 +375,6 @@ impl MultiplexTransfer {
}
});
wait_waiter.await
handle
}
}

View file

@ -96,10 +96,10 @@ impl KeylessCloudflareArgs {
if let Some(tls_client) = &self.tls.client {
let ssl_stream = self.tls_connect_to_target(tls_client, tcp_stream).await?;
let (r, w) = tokio::io::split(ssl_stream);
Ok(MultiplexTransfer::start(r, w, local_addr, self.timeout).await)
Ok(MultiplexTransfer::start(r, w, local_addr, self.timeout))
} else {
let (r, w) = tcp_stream.into_split();
Ok(MultiplexTransfer::start(r, w, local_addr, self.timeout).await)
Ok(MultiplexTransfer::start(r, w, local_addr, self.timeout))
}
}