From 9145838bbe2afce90b2a37d69665ba89f684b4d2 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Fri, 15 Aug 2025 11:55:59 +0800 Subject: [PATCH] use kanal instead of flume --- Cargo.lock | 64 +++++++------------ Cargo.toml | 1 + g3fcgen/Cargo.toml | 2 +- g3fcgen/src/backend/mod.rs | 9 ++- g3fcgen/src/frontend/mod.rs | 15 +++-- g3fcgen/src/lib.rs | 4 +- g3proxy/Cargo.toml | 2 +- g3proxy/src/audit/detour/connect.rs | 4 +- g3proxy/src/audit/detour/mod.rs | 21 ++---- g3proxy/src/audit/detour/pool.rs | 6 +- g3tiles/Cargo.toml | 2 +- g3tiles/src/backend/keyless_quic/connect.rs | 2 +- g3tiles/src/backend/keyless_quic/mod.rs | 24 +++---- g3tiles/src/backend/keyless_tcp/connect.rs | 4 +- g3tiles/src/backend/keyless_tcp/mod.rs | 24 +++---- .../module/keyless/backend/multiplex/mod.rs | 4 +- .../module/keyless/backend/multiplex/send.rs | 6 +- g3tiles/src/module/keyless/backend/pool.rs | 8 +-- lib/g3-fluentd/Cargo.toml | 2 +- lib/g3-fluentd/src/lib.rs | 11 ++-- lib/g3-icap-client/Cargo.toml | 2 +- lib/g3-icap-client/src/service/client.rs | 8 +-- lib/g3-icap-client/src/service/connection.rs | 6 +- lib/g3-icap-client/src/service/pool.rs | 14 ++-- lib/g3-journal/Cargo.toml | 2 +- lib/g3-journal/src/lib.rs | 6 +- lib/g3-resolver/Cargo.toml | 4 +- lib/g3-resolver/src/driver/hickory/client.rs | 4 +- .../src/driver/hickory/config/mod.rs | 2 +- lib/g3-resolver/src/driver/hickory/driver.rs | 6 +- lib/g3-stdlog/Cargo.toml | 2 +- lib/g3-stdlog/src/lib.rs | 9 ++- lib/g3-syslog/Cargo.toml | 2 +- lib/g3-syslog/src/async_streamer.rs | 15 ++--- lib/g3-types/Cargo.toml | 4 +- lib/g3-types/src/log/async_log.rs | 11 ++-- 36 files changed, 133 insertions(+), 179 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b7466be5..b3c48714 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -196,9 +196,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.88" +version = "0.1.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" +checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", @@ -847,18 +847,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "flume" -version = "0.11.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" -dependencies = [ - "futures-core", - "futures-sink", - "nanorand", - "spin", -] - [[package]] name = "fnv" version = "1.0.7" @@ -1126,13 +1114,13 @@ dependencies = [ "anyhow", "chrono", "constant_time_eq 0.4.2", - "flume", "g3-compat", "g3-openssl", "g3-socket", "g3-types", "g3-yaml", "hex", + "kanal", "log", "rmp", "rmp-serde", @@ -1254,7 +1242,6 @@ dependencies = [ "atoi", "base64", "bytes", - "flume", "g3-h2", "g3-http", "g3-io-ext", @@ -1265,6 +1252,7 @@ dependencies = [ "h2", "http", "itoa", + "kanal", "memchr", "rustls-pki-types", "thiserror", @@ -1348,9 +1336,9 @@ name = "g3-journal" version = "0.4.0" dependencies = [ "anyhow", - "flume", "g3-types", "itoa", + "kanal", "memchr", "once_cell", "rustix 1.0.8", @@ -1444,7 +1432,6 @@ dependencies = [ "c-ares", "c-ares-resolver", "c-ares-sys", - "flume", "g3-hickory-client", "g3-socket", "g3-types", @@ -1452,6 +1439,7 @@ dependencies = [ "hickory-client", "hickory-proto", "indexmap", + "kanal", "log", "rustls", "rustls-pki-types", @@ -1555,10 +1543,10 @@ version = "0.3.0" dependencies = [ "anstyle", "chrono", - "flume", "g3-datetime", "g3-types", "itoa", + "kanal", "ryu", "slog", ] @@ -1569,13 +1557,13 @@ version = "0.8.0" dependencies = [ "anyhow", "chrono", - "flume", "g3-compat", "g3-datetime", "g3-io-sys", "g3-types", "g3-yaml", "itoa", + "kanal", "log", "ryu", "serde", @@ -1630,7 +1618,6 @@ dependencies = [ "constant_time_eq 0.4.2", "crc32fast", "fastrand", - "flume", "fnv", "foldhash", "g3-std-ext", @@ -1641,6 +1628,7 @@ dependencies = [ "indexmap", "ip_network", "ip_network_table", + "kanal", "libc", "log", "lru", @@ -1771,7 +1759,6 @@ version = "0.9.0" dependencies = [ "anyhow", "clap", - "flume", "g3-build-env", "g3-cert-agent", "g3-daemon", @@ -1782,6 +1769,7 @@ dependencies = [ "g3-tls-cert", "g3-types", "g3-yaml", + "kanal", "log", "tokio", "variant-ssl", @@ -1907,7 +1895,6 @@ dependencies = [ "clap_complete", "fastrand", "fixedbitset", - "flume", "fnv", "foldhash", "futures-util", @@ -1951,6 +1938,7 @@ dependencies = [ "ip_network", "ip_network_table", "itoa", + "kanal", "log", "lru", "memchr", @@ -2103,7 +2091,6 @@ dependencies = [ "chrono", "clap", "clap_complete", - "flume", "foldhash", "futures-util", "g3-build-env", @@ -2123,6 +2110,7 @@ dependencies = [ "g3tiles-proto", "governor", "itoa", + "kanal", "log", "openssl-probe", "quinn", @@ -2681,6 +2669,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kanal" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -2837,15 +2835,6 @@ dependencies = [ "pkg-config", ] -[[package]] -name = "nanorand" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" -dependencies = [ - "getrandom 0.2.16", -] - [[package]] name = "nibble_vec" version = "0.1.0" @@ -3573,15 +3562,6 @@ dependencies = [ "windows-sys 0.59.0", ] -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" -dependencies = [ - "lock_api", -] - [[package]] name = "spinning_top" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index 2c6e6b89..5c70c5a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,6 +166,7 @@ openssl = { package = "variant-ssl", version = "0.17.5" } openssl-sys = { package = "variant-ssl-sys", version = "0.17.5" } openssl-probe = "0.1" # +kanal = { version = "0.1.1", default-features = false } flume = { version = "0.11", default-features = false } # c-ares = { version = "11.0", default-features = false } diff --git a/g3fcgen/Cargo.toml b/g3fcgen/Cargo.toml index 0f710ed7..1a0f8d52 100644 --- a/g3fcgen/Cargo.toml +++ b/g3fcgen/Cargo.toml @@ -14,7 +14,7 @@ clap.workspace = true log = { workspace = true, features = ["max_level_trace", "release_max_level_debug"] } openssl.workspace = true tokio = { workspace = true, features = ["macros", "net", "io-util", "time", "signal"] } -flume = { workspace = true, features = ["async"] } +kanal = { workspace = true, features = ["async"] } yaml-rust.workspace = true g3-std-ext.workspace = true g3-types.workspace = true diff --git a/g3fcgen/src/backend/mod.rs b/g3fcgen/src/backend/mod.rs index 2cd50b2b..1924bad4 100644 --- a/g3fcgen/src/backend/mod.rs +++ b/g3fcgen/src/backend/mod.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::anyhow; -use flume::{Receiver, Sender}; use log::{debug, error, warn}; use openssl::pkey::{PKey, Private}; use openssl::x509::X509; @@ -131,8 +130,8 @@ impl OpensslBackend { mut self, handle: &Handle, id: usize, - req_receiver: Receiver, - rsp_sender: Sender, + req_receiver: kanal::AsyncReceiver, + rsp_sender: kanal::AsyncSender, ) { handle.spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(300)); @@ -144,7 +143,7 @@ impl OpensslBackend { warn!("failed to refresh backend: {e:?}"); } } - r = req_receiver.recv_async() => { + r = req_receiver.recv() => { let Ok(req) = r else { break }; @@ -154,7 +153,7 @@ impl OpensslBackend { match self.generate(&req.user_req) { Ok(data) => { debug!("{host} - [#{id}] cert generated"); - if let Err(e) = rsp_sender.send_async(req.into_response(data)).await { + if let Err(e) = rsp_sender.send(req.into_response(data)).await { error!("{host} - [#{id}] failed to send cert to frontend: {e}"); break; } diff --git a/g3fcgen/src/frontend/mod.rs b/g3fcgen/src/frontend/mod.rs index 30de770b..3663c1ec 100644 --- a/g3fcgen/src/frontend/mod.rs +++ b/g3fcgen/src/frontend/mod.rs @@ -32,14 +32,14 @@ pub(super) struct Frontend { io: UdpDgramIo, stats: Arc, duration_recorder: HistogramRecorder, - rsp_receiver: flume::Receiver, + rsp_receiver: kanal::AsyncReceiver, } impl Frontend { pub(super) fn new( listen_config: &UdpListenConfig, duration_recorder: HistogramRecorder, - rsp_receiver: flume::Receiver, + rsp_receiver: kanal::AsyncReceiver, ) -> anyhow::Result { let io = UdpDgramIo::new(listen_config)?; Ok(Frontend { @@ -54,7 +54,10 @@ impl Frontend { self.stats.clone() } - pub(super) async fn run(self, req_sender: flume::Sender) -> anyhow::Result<()> { + pub(super) async fn run( + self, + req_sender: kanal::AsyncSender, + ) -> anyhow::Result<()> { let mut clt_c = Box::pin(tokio::signal::ctrl_c()); let mut rcv_buf = [0u8; 16384]; @@ -68,7 +71,7 @@ impl Frontend { Ok(user_req) => { debug!("{} - request received", user_req.host()); let req = BackendRequest {user_req, peer, recv_time}; - if let Err(e) = req_sender.send_async(req).await { + if let Err(e) = req_sender.send(req).await { return Err(anyhow!("failed to send request to backend: {e}")); } } @@ -80,7 +83,7 @@ impl Frontend { Err(e) => return Err(anyhow!("frontend recv error: {e:?}")), } } - r = self.rsp_receiver.recv_async() => { + r = self.rsp_receiver.recv() => { match r { Ok(rsp) => self.handle_rsp(rsp).await, Err(e) => return Err(anyhow!("recv from backend failed: {e}")), @@ -98,7 +101,7 @@ impl Frontend { } drop(req_sender); - while let Ok(rsp) = self.rsp_receiver.recv_async().await { + while let Ok(rsp) = self.rsp_receiver.recv().await { self.handle_rsp(rsp).await; } diff --git a/g3fcgen/src/lib.rs b/g3fcgen/src/lib.rs index d5f5bb2f..1362c1fd 100644 --- a/g3fcgen/src/lib.rs +++ b/g3fcgen/src/lib.rs @@ -59,8 +59,8 @@ impl BackendResponse { } pub async fn run(proc_args: &ProcArgs) -> anyhow::Result<()> { - let (req_sender, req_receiver) = flume::bounded::(1024); - let (rsp_sender, rsp_receiver) = flume::bounded::(1024); + let (req_sender, req_receiver) = kanal::bounded_async::(1024); + let (rsp_sender, rsp_receiver) = kanal::bounded_async::(1024); let backend_config = config::get_backend_config().ok_or_else(|| anyhow!("no backend config available"))?; diff --git a/g3proxy/Cargo.toml b/g3proxy/Cargo.toml index aa69606f..89a2b016 100644 --- a/g3proxy/Cargo.toml +++ b/g3proxy/Cargo.toml @@ -57,7 +57,7 @@ rustc-hash.workspace = true fnv.workspace = true governor = { workspace = true, features = ["std", "jitter"] } rmpv.workspace = true -flume.workspace = true +kanal = { workspace = true, features = ["async"] } lru.workspace = true regex.workspace = true mlua = { workspace = true, features = ["send"], optional = true } diff --git a/g3proxy/src/audit/detour/connect.rs b/g3proxy/src/audit/detour/connect.rs index 44b6a2c0..627047db 100644 --- a/g3proxy/src/audit/detour/connect.rs +++ b/g3proxy/src/audit/detour/connect.rs @@ -82,7 +82,7 @@ impl StreamDetourConnector { pub(super) async fn run_new_connection( &self, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, idle_timeout: Duration, ) { let mut connection = match self.new_connection().await { @@ -104,7 +104,7 @@ impl StreamDetourConnector { debug!("detour connection closed unexpectedly: {e}"); return; } - r = req_receiver.recv_async() => { + r = req_receiver.recv() => { idle_sleep.as_mut().reset(Instant::now() + idle_timeout); match r { Ok(req) => { diff --git a/g3proxy/src/audit/detour/mod.rs b/g3proxy/src/audit/detour/mod.rs index fa826711..d684c991 100644 --- a/g3proxy/src/audit/detour/mod.rs +++ b/g3proxy/src/audit/detour/mod.rs @@ -65,13 +65,13 @@ impl StreamDetourContext<'_, SC> { pub(crate) struct StreamDetourClient { config: Arc, - req_sender: flume::Sender, + req_sender: kanal::AsyncSender, pool_handle: StreamDetourPoolHandle, } impl StreamDetourClient { pub(super) fn new(config: Arc) -> anyhow::Result { - let (req_sender, req_receiver) = flume::unbounded(); + let (req_sender, req_receiver) = kanal::unbounded_async(); let connector = StreamDetourConnector::new(config.clone())?; let pool_handle = StreamDetourPool::spawn(config.connection_pool, req_receiver, Arc::new(connector)); @@ -115,18 +115,11 @@ impl StreamDetourClient { let (sender, receiver) = oneshot::channel(); let req = StreamDetourRequest(sender); - if let Err(e) = self.req_sender.try_send(req) { - match e { - flume::TrySendError::Full(req) => { - self.pool_handle.request_new_connection(); - if self.req_sender.send_async(req).await.is_err() { - return Err(anyhow!("stream detour client is down")); - } - } - flume::TrySendError::Disconnected(_req) => { - return Err(anyhow!("stream detour client is down")); - } - } + if self.req_sender.is_full() { + self.pool_handle.request_new_connection(); + } + if self.req_sender.send(req).await.is_err() { + return Err(anyhow!("stream detour client is down")); } match tokio::time::timeout(self.config.stream_open_timeout, receiver).await { diff --git a/g3proxy/src/audit/detour/pool.rs b/g3proxy/src/audit/detour/pool.rs index 74bc9461..eff8595b 100644 --- a/g3proxy/src/audit/detour/pool.rs +++ b/g3proxy/src/audit/detour/pool.rs @@ -33,7 +33,7 @@ pub(super) struct StreamDetourPool { connector: Arc, stats: Arc, - client_req_receiver: flume::Receiver, + client_req_receiver: kanal::AsyncReceiver, connection_id: u64, connection_close_receiver: mpsc::Receiver, @@ -43,7 +43,7 @@ pub(super) struct StreamDetourPool { impl StreamDetourPool { fn new( config: ConnectionPoolConfig, - client_req_receiver: flume::Receiver, + client_req_receiver: kanal::AsyncReceiver, connector: Arc, ) -> Self { let (connection_close_sender, connection_close_receiver) = mpsc::channel(1); @@ -60,7 +60,7 @@ impl StreamDetourPool { pub(super) fn spawn( config: ConnectionPoolConfig, - client_cmd_receiver: flume::Receiver, + client_cmd_receiver: kanal::AsyncReceiver, connector: Arc, ) -> StreamDetourPoolHandle { let pool = StreamDetourPool::new(config, client_cmd_receiver, connector); diff --git a/g3tiles/Cargo.toml b/g3tiles/Cargo.toml index 7975d50a..71b94edc 100644 --- a/g3tiles/Cargo.toml +++ b/g3tiles/Cargo.toml @@ -38,7 +38,7 @@ governor = { workspace = true, features = ["std", "jitter"] } chrono = { workspace = true, features = ["clock"] } uuid.workspace = true bitflags.workspace = true -flume.workspace = true +kanal = { workspace = true, features = ["async"] } rustc-hash.workspace = true g3-macros.workspace = true g3-daemon = { workspace = true, features = ["event-log"] } diff --git a/g3tiles/src/backend/keyless_quic/connect.rs b/g3tiles/src/backend/keyless_quic/connect.rs index 723ed067..906b6291 100644 --- a/g3tiles/src/backend/keyless_quic/connect.rs +++ b/g3tiles/src/backend/keyless_quic/connect.rs @@ -105,7 +105,7 @@ impl KeylessUpstreamConnect for KeylessQuicUpstreamConnector { async fn new_connection( &self, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, idle_timeout: Duration, ) -> anyhow::Result { diff --git a/g3tiles/src/backend/keyless_quic/mod.rs b/g3tiles/src/backend/keyless_quic/mod.rs index 12ff3af7..545ed3f1 100644 --- a/g3tiles/src/backend/keyless_quic/mod.rs +++ b/g3tiles/src/backend/keyless_quic/mod.rs @@ -35,7 +35,7 @@ pub(crate) struct KeylessQuicBackend { peer_addrs: Arc>>>, discover_handle: Mutex>, pool_handle: KeylessConnectionPoolHandle, - keyless_request_sender: flume::Sender, + keyless_request_sender: kanal::AsyncSender, } impl KeylessQuicBackend { @@ -52,7 +52,7 @@ impl KeylessQuicBackend { duration_stats.set_extra_tags(config.extra_metrics_tags.clone()); let (keyless_request_sender, keyless_request_receiver) = - flume::bounded(config.request_buffer_size); + kanal::bounded_async(config.request_buffer_size); let connector = KeylessQuicUpstreamConnector::new( config.clone(), stats.clone(), @@ -183,20 +183,12 @@ impl Backend for KeylessQuicBackend { let (rsp_sender, rsp_receiver) = oneshot::channel(); let req = KeylessForwardRequest::new(req, rsp_sender); - if let Err(e) = self.keyless_request_sender.try_send(req) { - match e { - flume::TrySendError::Full(req) => { - self.pool_handle.request_new_connection(); - if self.keyless_request_sender.send_async(req).await.is_err() { - self.stats.add_request_drop(); - return KeylessResponse::Local(err); - } - } - flume::TrySendError::Disconnected(_req) => { - self.stats.add_request_drop(); - return KeylessResponse::Local(err); - } - } + if self.keyless_request_sender.is_full() { + self.pool_handle.request_new_connection(); + } + if self.keyless_request_sender.send(req).await.is_err() { + self.stats.add_request_drop(); + return KeylessResponse::Local(err); } rsp_receiver.await.unwrap_or(KeylessResponse::Local(err)) } diff --git a/g3tiles/src/backend/keyless_tcp/connect.rs b/g3tiles/src/backend/keyless_tcp/connect.rs index 72b8ca5b..3172d42e 100644 --- a/g3tiles/src/backend/keyless_tcp/connect.rs +++ b/g3tiles/src/backend/keyless_tcp/connect.rs @@ -85,7 +85,7 @@ impl KeylessUpstreamConnect for KeylessTcpUpstreamConnector { async fn new_connection( &self, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, _idle_timeout: Duration, ) -> anyhow::Result { @@ -129,7 +129,7 @@ impl KeylessUpstreamConnect for KeylessTlsUpstreamConnector { async fn new_connection( &self, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, _idle_timeout: Duration, ) -> anyhow::Result { diff --git a/g3tiles/src/backend/keyless_tcp/mod.rs b/g3tiles/src/backend/keyless_tcp/mod.rs index 9227ce89..bf0c70a1 100644 --- a/g3tiles/src/backend/keyless_tcp/mod.rs +++ b/g3tiles/src/backend/keyless_tcp/mod.rs @@ -35,7 +35,7 @@ pub(crate) struct KeylessTcpBackend { peer_addrs: Arc>>>, discover_handle: Mutex>, pool_handle: KeylessConnectionPoolHandle, - keyless_request_sender: flume::Sender, + keyless_request_sender: kanal::AsyncSender, } impl KeylessTcpBackend { @@ -52,7 +52,7 @@ impl KeylessTcpBackend { duration_stats.set_extra_tags(config.extra_metrics_tags.clone()); let (keyless_request_sender, keyless_request_receiver) = - flume::bounded(config.request_buffer_size); + kanal::bounded_async(config.request_buffer_size); let tcp_connector = KeylessTcpUpstreamConnector::new( config.clone(), stats.clone(), @@ -194,20 +194,12 @@ impl Backend for KeylessTcpBackend { let (rsp_sender, rsp_receiver) = oneshot::channel(); let req = KeylessForwardRequest::new(req, rsp_sender); - if let Err(e) = self.keyless_request_sender.try_send(req) { - match e { - flume::TrySendError::Full(req) => { - self.pool_handle.request_new_connection(); - if self.keyless_request_sender.send_async(req).await.is_err() { - self.stats.add_request_drop(); - return KeylessResponse::Local(err); - } - } - flume::TrySendError::Disconnected(_req) => { - self.stats.add_request_drop(); - return KeylessResponse::Local(err); - } - } + if self.keyless_request_sender.is_full() { + self.pool_handle.request_new_connection(); + } + if self.keyless_request_sender.send(req).await.is_err() { + self.stats.add_request_drop(); + return KeylessResponse::Local(err); } rsp_receiver.await.unwrap_or(KeylessResponse::Local(err)) } diff --git a/g3tiles/src/module/keyless/backend/multiplex/mod.rs b/g3tiles/src/module/keyless/backend/multiplex/mod.rs index 78af3375..4b6f4804 100644 --- a/g3tiles/src/module/keyless/backend/multiplex/mod.rs +++ b/g3tiles/src/module/keyless/backend/multiplex/mod.rs @@ -46,7 +46,7 @@ pub(crate) struct MultiplexedUpstreamConnection { duration_recorder: Arc, r: R, w: W, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, alive_channel_guard: KeylessBackendAliveChannelGuard, } @@ -58,7 +58,7 @@ impl MultiplexedUpstreamConnection { duration_recorder: Arc, ups_r: R, ups_w: W, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, ) -> Self { let alive_channel_guard = stats.inc_alive_channel(); diff --git a/g3tiles/src/module/keyless/backend/multiplex/send.rs b/g3tiles/src/module/keyless/backend/multiplex/send.rs index 6bb39674..d94b4222 100644 --- a/g3tiles/src/module/keyless/backend/multiplex/send.rs +++ b/g3tiles/src/module/keyless/backend/multiplex/send.rs @@ -24,7 +24,7 @@ pub(super) struct KeylessUpstreamSendTask { max_request_count: usize, max_alive_time: Duration, stats: Arc, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, shared_state: Arc, duration_recorder: Arc, @@ -36,7 +36,7 @@ impl KeylessUpstreamSendTask { max_request_count: usize, max_alive_time: Duration, stats: Arc, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, shared_state: Arc, duration_recorder: Arc, @@ -70,7 +70,7 @@ impl KeylessUpstreamSendTask { tokio::select! { biased; - r = self.req_receiver.recv_async() => { + r = self.req_receiver.recv() => { idle_sleep.as_mut().reset(Instant::now() + idle_timeout); match r { Ok(req) => { diff --git a/g3tiles/src/module/keyless/backend/pool.rs b/g3tiles/src/module/keyless/backend/pool.rs index 5d8c42ef..4e182581 100644 --- a/g3tiles/src/module/keyless/backend/pool.rs +++ b/g3tiles/src/module/keyless/backend/pool.rs @@ -25,7 +25,7 @@ pub(crate) trait KeylessUpstreamConnect { type Connection: KeylessUpstreamConnection; async fn new_connection( &self, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, quit_notifier: broadcast::Receiver<()>, idle_timeout: Duration, ) -> anyhow::Result; @@ -70,7 +70,7 @@ pub(crate) struct KeylessConnectionPool { connector: ArcKeylessUpstreamConnect, stats: Arc, - keyless_request_receiver: flume::Receiver, + keyless_request_receiver: kanal::AsyncReceiver, connection_id: u64, connection_close_receiver: mpsc::Receiver, @@ -87,7 +87,7 @@ where fn new( config: ConnectionPoolConfig, connector: ArcKeylessUpstreamConnect, - keyless_request_receiver: flume::Receiver, + keyless_request_receiver: kanal::AsyncReceiver, graceful_close_wait: Duration, ) -> Self { let (connection_close_sender, connection_close_receiver) = mpsc::channel(1); @@ -108,7 +108,7 @@ where pub(crate) fn spawn( config: ConnectionPoolConfig, connector: ArcKeylessUpstreamConnect, - keyless_request_receiver: flume::Receiver, + keyless_request_receiver: kanal::AsyncReceiver, graceful_close_wait: Duration, ) -> KeylessConnectionPoolHandle { let pool = KeylessConnectionPool::new( diff --git a/lib/g3-fluentd/Cargo.toml b/lib/g3-fluentd/Cargo.toml index 15cad00a..76fb549f 100644 --- a/lib/g3-fluentd/Cargo.toml +++ b/lib/g3-fluentd/Cargo.toml @@ -11,7 +11,7 @@ rust-version.workspace = true anyhow.workspace = true slog.workspace = true chrono = { workspace = true, features = ["clock"] } -flume = { workspace = true, features = ["async"] } +kanal = { workspace = true, features = ["async"] } rmp.workspace = true rmp-serde.workspace = true serde.workspace = true diff --git a/lib/g3-fluentd/src/lib.rs b/lib/g3-fluentd/src/lib.rs index c9890498..f0a3b819 100644 --- a/lib/g3-fluentd/src/lib.rs +++ b/lib/g3-fluentd/src/lib.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use anyhow::anyhow; -use flume::Receiver; use log::warn; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; @@ -32,14 +31,14 @@ pub fn new_async_logger( fluent_conf: &Arc, tag_name: String, ) -> AsyncLogger, FluentdFormatter> { - let (sender, receiver) = flume::bounded::>(async_conf.channel_capacity); + let (sender, receiver) = kanal::bounded::>(async_conf.channel_capacity); let stats = Arc::new(LogStats::default()); for i in 0..async_conf.thread_number { let io_thread = AsyncIoThread { config: Arc::clone(fluent_conf), - receiver: receiver.clone(), + receiver: receiver.clone_async(), stats: Arc::clone(&stats), retry_queue: VecDeque::with_capacity(fluent_conf.retry_queue_len), }; @@ -65,7 +64,7 @@ enum FluentdConnection { struct AsyncIoThread { config: Arc, - receiver: Receiver>, + receiver: kanal::AsyncReceiver>, stats: Arc, retry_queue: VecDeque>, } @@ -112,7 +111,7 @@ impl AsyncIoThread { let drop_count = Arc::new(AtomicUsize::new(0)); let drop_count_i = drop_count.clone(); match tokio::time::timeout(self.config.connect_delay, async { - while let Ok(data) = self.receiver.recv_async().await { + while let Ok(data) = self.receiver.recv().await { if self.push_to_retry(data).is_some() { drop_count_i.fetch_add(1, Ordering::Relaxed); } @@ -157,7 +156,7 @@ impl AsyncIoThread { loop { tokio::select! { - r = self.receiver.recv_async() => { + r = self.receiver.recv() => { match r { Ok(data) => { match tokio::time::timeout(self.config.write_timeout, connection.write_all(data.as_slice())).await { diff --git a/lib/g3-icap-client/Cargo.toml b/lib/g3-icap-client/Cargo.toml index 1a6502d2..0c5e4496 100644 --- a/lib/g3-icap-client/Cargo.toml +++ b/lib/g3-icap-client/Cargo.toml @@ -16,7 +16,7 @@ itoa.workspace = true url.workspace = true bytes.workspace = true base64.workspace = true -flume = { workspace = true, features = ["async"] } +kanal = { workspace = true, features = ["async"] } tokio = { workspace = true, features = ["time", "io-util", "sync", "macros", "rt"] } tokio-rustls.workspace = true rustls-pki-types.workspace = true diff --git a/lib/g3-icap-client/src/service/client.rs b/lib/g3-icap-client/src/service/client.rs index f9556b9e..dc77fe09 100644 --- a/lib/g3-icap-client/src/service/client.rs +++ b/lib/g3-icap-client/src/service/client.rs @@ -17,13 +17,13 @@ use crate::options::{IcapOptionsRequest, IcapServiceOptions}; pub struct IcapServiceClient { pub(crate) config: Arc, pub(crate) partial_request_header: Vec, - cmd_sender: flume::Sender, + cmd_sender: kanal::AsyncSender, conn_creator: Arc, } impl IcapServiceClient { pub fn new(config: Arc) -> anyhow::Result { - let (cmd_sender, cmd_receiver) = flume::unbounded(); + let (cmd_sender, cmd_receiver) = kanal::unbounded_async(); let conn_creator = IcapConnector::new(config.clone())?; let conn_creator = Arc::new(conn_creator); let pool = IcapServicePool::new(config.clone(), cmd_receiver, conn_creator.clone()); @@ -40,7 +40,7 @@ impl IcapServiceClient { async fn fetch_from_pool(&self) -> Option<(IcapClientConnection, Arc)> { let (rsp_sender, rsp_receiver) = oneshot::channel(); let cmd = IcapServiceClientCommand::FetchConnection(rsp_sender); - if self.cmd_sender.send_async(cmd).await.is_ok() { + if self.cmd_sender.send(cmd).await.is_ok() { rsp_receiver.await.ok() } else { None @@ -76,7 +76,7 @@ impl IcapServiceClient { let pool_sender = self.cmd_sender.clone(); tokio::spawn(async move { let _ = pool_sender - .send_async(IcapServiceClientCommand::SaveConnection(conn)) + .send(IcapServiceClientCommand::SaveConnection(conn)) .await; }); } diff --git a/lib/g3-icap-client/src/service/connection.rs b/lib/g3-icap-client/src/service/connection.rs index a768f594..93c93eb6 100644 --- a/lib/g3-icap-client/src/service/connection.rs +++ b/lib/g3-icap-client/src/service/connection.rs @@ -162,13 +162,13 @@ impl IcapConnectionPollRequest { pub(super) struct IcapConnectionEofPoller { conn: IcapClientConnection, - req_receiver: flume::Receiver, + req_receiver: kanal::AsyncReceiver, } impl IcapConnectionEofPoller { pub(super) fn new( conn: IcapClientConnection, - req_receiver: &flume::Receiver, + req_receiver: &kanal::AsyncReceiver, ) -> Option { if conn.reusable() { Some(IcapConnectionEofPoller { @@ -186,7 +186,7 @@ impl IcapConnectionEofPoller { tokio::select! { _ = self.conn.reader.fill_wait_data() => {} _ = idle_sleep => {} - r = self.req_receiver.recv_async() => { + r = self.req_receiver.recv() => { if let Ok(req) = r { let IcapConnectionPollRequest { client_sender, diff --git a/lib/g3-icap-client/src/service/pool.rs b/lib/g3-icap-client/src/service/pool.rs index 63992ca6..5338b094 100644 --- a/lib/g3-icap-client/src/service/pool.rs +++ b/lib/g3-icap-client/src/service/pool.rs @@ -33,25 +33,25 @@ pub(super) struct IcapServicePool { options: Arc, connector: Arc, check_interval: Interval, - client_cmd_receiver: flume::Receiver, + client_cmd_receiver: kanal::AsyncReceiver, pool_cmd_sender: mpsc::Sender, pool_cmd_receiver: mpsc::Receiver, - conn_req_sender: flume::Sender, - conn_req_receiver: flume::Receiver, + conn_req_sender: kanal::AsyncSender, + conn_req_receiver: kanal::AsyncReceiver, idle_conn_count: Arc, } impl IcapServicePool { pub(super) fn new( config: Arc, - client_cmd_receiver: flume::Receiver, + client_cmd_receiver: kanal::AsyncReceiver, connector: Arc, ) -> Self { let options = Arc::new(IcapServiceOptions::new_expired(config.method)); let check_interval = tokio::time::interval(config.connection_pool.check_interval()); let (pool_cmd_sender, pool_cmd_receiver) = mpsc::channel(POOL_CMD_CHANNEL_SIZE); let (conn_req_sender, conn_req_receiver) = - flume::bounded(config.connection_pool.max_idle_count()); + kanal::bounded_async(config.connection_pool.max_idle_count()); IcapServicePool { config, options, @@ -78,7 +78,7 @@ impl IcapServicePool { _ = self.check_interval.tick() => { self.check(); } - r = self.client_cmd_receiver.recv_async() => { + r = self.client_cmd_receiver.recv() => { match r { Ok(cmd) => self.handle_client_cmd(cmd), Err(_) => break, @@ -137,7 +137,7 @@ impl IcapServicePool { let options = self.options.clone(); tokio::spawn(async move { let _ = req_sender - .send_async(IcapConnectionPollRequest::new(sender, options)) + .send(IcapConnectionPollRequest::new(sender, options)) .await; }); } else { diff --git a/lib/g3-journal/Cargo.toml b/lib/g3-journal/Cargo.toml index 5098fba9..7e99ae19 100644 --- a/lib/g3-journal/Cargo.toml +++ b/lib/g3-journal/Cargo.toml @@ -17,5 +17,5 @@ once_cell.workspace = true memchr.workspace = true itoa.workspace = true ryu.workspace = true -flume.workspace = true +kanal.workspace = true g3-types = { workspace = true, features = ["async-log"] } diff --git a/lib/g3-journal/src/lib.rs b/lib/g3-journal/src/lib.rs index fdb63dc8..d2f477d7 100644 --- a/lib/g3-journal/src/lib.rs +++ b/lib/g3-journal/src/lib.rs @@ -5,8 +5,6 @@ use std::sync::Arc; -use flume::Receiver; - use g3_types::log::{AsyncLogConfig, AsyncLogger, LogStats}; #[macro_use] @@ -41,7 +39,7 @@ pub fn new_async_logger( async_conf: &AsyncLogConfig, journal_conf: JournalConfig, ) -> AsyncLogger, JournalFormatter> { - let (sender, receiver) = flume::bounded::>(async_conf.channel_capacity); + let (sender, receiver) = kanal::bounded::>(async_conf.channel_capacity); let stats = Arc::new(LogStats::default()); @@ -62,7 +60,7 @@ pub fn new_async_logger( } struct AsyncIoThread { - receiver: Receiver>, + receiver: kanal::Receiver>, stats: Arc, } diff --git a/lib/g3-resolver/Cargo.toml b/lib/g3-resolver/Cargo.toml index f87f83c9..6bf2ada6 100644 --- a/lib/g3-resolver/Cargo.toml +++ b/lib/g3-resolver/Cargo.toml @@ -22,7 +22,7 @@ hickory-client = { workspace = true, optional = true } hickory-proto = { workspace = true, optional = true, features = ["tokio"] } rustls = { workspace = true, optional = true } rustls-pki-types = { workspace = true, optional = true } -flume = { workspace = true, optional = true, features = ["async"] } +kanal = { workspace = true, optional = true, features = ["async"] } async-recursion = { workspace = true, optional = true } yaml-rust = { workspace = true, optional = true } g3-types = { workspace = true, optional = true } @@ -35,5 +35,5 @@ default = [] yaml = ["dep:yaml-rust", "dep:g3-yaml"] c-ares = ["dep:c-ares", "dep:c-ares-resolver", "dep:c-ares-sys"] vendored-c-ares = ["c-ares", "c-ares-resolver/vendored", "c-ares/vendored"] -hickory = ["dep:hickory-client", "dep:hickory-proto", "dep:flume", "dep:rustls", "dep:rustls-pki-types", "dep:async-recursion", "dep:g3-hickory-client", "g3-types/rustls", "dep:g3-socket"] +hickory = ["dep:hickory-client", "dep:hickory-proto", "dep:kanal", "dep:rustls", "dep:rustls-pki-types", "dep:async-recursion", "dep:g3-hickory-client", "g3-types/rustls", "dep:g3-socket"] quic = ["g3-types?/quic", "g3-hickory-client?/quic"] diff --git a/lib/g3-resolver/src/driver/hickory/client.rs b/lib/g3-resolver/src/driver/hickory/client.rs index 81229f2f..2f148850 100644 --- a/lib/g3-resolver/src/driver/hickory/client.rs +++ b/lib/g3-resolver/src/driver/hickory/client.rs @@ -78,7 +78,7 @@ impl HickoryClient { pub(super) async fn run( mut self, - req_receiver: flume::Receiver<(DnsRequest, mpsc::Sender)>, + req_receiver: kanal::AsyncReceiver<(DnsRequest, mpsc::Sender)>, ) { let (client_sender, mut client_receiver) = mpsc::channel(1); let mut check_interval = tokio::time::interval(Duration::from_secs(60)); @@ -86,7 +86,7 @@ impl HickoryClient { tokio::select! { biased; - r = req_receiver.recv_async() => { + r = req_receiver.recv() => { let Ok((req, rsp_sender)) = r else { break; }; diff --git a/lib/g3-resolver/src/driver/hickory/config/mod.rs b/lib/g3-resolver/src/driver/hickory/config/mod.rs index eb87ff39..c21775dc 100644 --- a/lib/g3-resolver/src/driver/hickory/config/mod.rs +++ b/lib/g3-resolver/src/driver/hickory/config/mod.rs @@ -146,7 +146,7 @@ impl HickoryDriverConfig { tcp_misc_opts: self.tcp_misc_opts.clone(), udp_misc_opts: self.udp_misc_opts, }; - let (req_sender, req_receiver) = flume::unbounded(); + let (req_sender, req_receiver) = kanal::unbounded_async(); driver.push_client(req_sender); tokio::spawn(async move { let client = HickoryClient::new(client_config).await.unwrap(); // TODO diff --git a/lib/g3-resolver/src/driver/hickory/driver.rs b/lib/g3-resolver/src/driver/hickory/driver.rs index 3f18e3d0..2120e7f0 100644 --- a/lib/g3-resolver/src/driver/hickory/driver.rs +++ b/lib/g3-resolver/src/driver/hickory/driver.rs @@ -19,7 +19,7 @@ pub struct HickoryResolver { each_timeout: Duration, retry_interval: Duration, negative_min_ttl: u32, - clients: Vec)>>, + clients: Vec)>>, } impl ResolveDriver for HickoryResolver { @@ -85,7 +85,7 @@ impl HickoryResolver { pub(super) fn push_client( &mut self, - req_sender: flume::Sender<(DnsRequest, mpsc::Sender)>, + req_sender: kanal::AsyncSender<(DnsRequest, mpsc::Sender)>, ) { self.clients.push(req_sender); } @@ -103,7 +103,7 @@ impl HickoryResolver { ); }; if client - .send_async((request.clone(), rsp_sender.clone())) + .send((request.clone(), rsp_sender.clone())) .await .is_err() { diff --git a/lib/g3-stdlog/Cargo.toml b/lib/g3-stdlog/Cargo.toml index ab56f7f1..a62e79d1 100644 --- a/lib/g3-stdlog/Cargo.toml +++ b/lib/g3-stdlog/Cargo.toml @@ -10,7 +10,7 @@ rust-version.workspace = true [dependencies] slog.workspace = true chrono = { workspace = true, features = ["clock"] } -flume.workspace = true +kanal.workspace = true anstyle = "1.0" itoa.workspace = true ryu.workspace = true diff --git a/lib/g3-stdlog/src/lib.rs b/lib/g3-stdlog/src/lib.rs index 47af2a50..be453392 100644 --- a/lib/g3-stdlog/src/lib.rs +++ b/lib/g3-stdlog/src/lib.rs @@ -7,7 +7,6 @@ use std::io::{self, IsTerminal, Write}; use std::sync::Arc; use chrono::Local; -use flume::Receiver; use slog::Level; use g3_types::log::{AsyncLogConfig, AsyncLogger, LogStats}; @@ -40,7 +39,7 @@ pub fn new_async_logger( append_code_position: bool, use_stdout: bool, ) -> AsyncLogger { - let (sender, receiver) = flume::bounded::(async_conf.channel_capacity); + let (sender, receiver) = kanal::bounded::(async_conf.channel_capacity); let stats = Arc::new(LogStats::default()); @@ -63,7 +62,7 @@ pub fn new_async_logger( } struct AsyncIoThread { - receiver: Receiver, + receiver: kanal::Receiver, stats: Arc, } @@ -100,7 +99,7 @@ impl AsyncIoThread { let _ = self.write_plain(&mut buf, v); self.write_buf(&mut io, &buf); - while let Ok(v) = self.receiver.try_recv() { + while let Ok(Some(v)) = self.receiver.try_recv() { buf.clear(); let _ = self.write_plain(&mut buf, v); self.write_buf(&mut io, &buf); @@ -132,7 +131,7 @@ impl AsyncIoThread { let _ = self.write_console(&mut buf, v); self.write_buf(&mut io, &buf); - while let Ok(v) = self.receiver.try_recv() { + while let Ok(Some(v)) = self.receiver.try_recv() { buf.clear(); let _ = self.write_console(&mut buf, v); self.write_buf(&mut io, &buf); diff --git a/lib/g3-syslog/Cargo.toml b/lib/g3-syslog/Cargo.toml index 2e109bd0..e26a80de 100644 --- a/lib/g3-syslog/Cargo.toml +++ b/lib/g3-syslog/Cargo.toml @@ -12,7 +12,7 @@ slog.workspace = true chrono = { workspace = true, features = ["clock"] } itoa.workspace = true ryu.workspace = true -flume.workspace = true +kanal= { workspace = true, features = ["std-mutex"] } serde.workspace = true serde_json.workspace = true log.workspace = true diff --git a/lib/g3-syslog/src/async_streamer.rs b/lib/g3-syslog/src/async_streamer.rs index bf73cce4..39da8bd6 100644 --- a/lib/g3-syslog/src/async_streamer.rs +++ b/lib/g3-syslog/src/async_streamer.rs @@ -7,7 +7,6 @@ use std::cell::RefCell; use std::sync::Arc; use std::time::{Duration, Instant}; -use flume::{Receiver, Sender, TrySendError}; use log::warn; use slog::{Drain, OwnedKVList, Record}; @@ -23,7 +22,7 @@ thread_local! { pub struct AsyncSyslogStreamer { header: SyslogHeader, - sender: Sender, + sender: kanal::Sender, formatter: BoxSyslogFormatter, stats: Arc, } @@ -35,7 +34,7 @@ impl AsyncSyslogStreamer { formatter: BoxSyslogFormatter, backend_builder: &SyslogBackendBuilder, ) -> Self { - let (sender, receiver) = flume::bounded::(config.channel_capacity); + let (sender, receiver) = kanal::bounded::(config.channel_capacity); let stats = Arc::new(LogStats::default()); @@ -85,9 +84,9 @@ impl Drain for AsyncSyslogStreamer { Ok(_) => { let s = unsafe { String::from_utf8_unchecked(buf.clone()) }; match self.sender.try_send(s) { - Ok(_) => {} - Err(TrySendError::Full(_)) => self.stats.drop.add_channel_overflow(), - Err(TrySendError::Disconnected(_)) => self.stats.drop.add_channel_closed(), + Ok(true) => {} + Ok(false) => self.stats.drop.add_channel_overflow(), + Err(_) => self.stats.drop.add_channel_closed(), } Ok(()) @@ -102,7 +101,7 @@ impl Drain for AsyncSyslogStreamer { } struct AsyncIoThread { - receiver: Receiver, + receiver: kanal::Receiver, backend_builder: SyslogBackendBuilder, stats: Arc, recv_buf: Vec, @@ -120,7 +119,7 @@ impl AsyncIoThread { fn recv_send_all(&mut self) { while self.recv_buf.len() < MAX_BATCH_SIZE { - let Ok(s) = self.receiver.try_recv() else { + let Ok(Some(s)) = self.receiver.try_recv() else { break; }; self.recv_buf.push(s); diff --git a/lib/g3-types/Cargo.toml b/lib/g3-types/Cargo.toml index 428295be..6a18303a 100644 --- a/lib/g3-types/Cargo.toml +++ b/lib/g3-types/Cargo.toml @@ -46,7 +46,7 @@ lru = { workspace = true, optional = true } bytes = { workspace = true, optional = true } http = { workspace = true, optional = true } base64 = { workspace = true, optional = true } -flume = { workspace = true, features = ["eventual-fairness"], optional = true } +kanal = { workspace = true, optional = true, features = ["std-mutex"] } slog = { workspace = true, optional = true } indexmap = { workspace = true, optional = true } brotli = { version = "8.0", optional = true, default-features = false, features = ["std"] } @@ -66,4 +66,4 @@ openssl = ["dep:openssl", "dep:openssl-sys", "dep:lru", "dep:bytes", "dep:ahash" acl-rule = ["resolve", "dep:ahash", "dep:ip_network", "dep:ip_network_table", "dep:regex", "dep:radix_trie"] http = ["dep:http", "dep:bytes", "dep:base64"] route = ["resolve", "dep:ahash", "dep:radix_trie", "dep:indexmap"] -async-log = ["dep:flume", "dep:slog"] +async-log = ["dep:kanal", "dep:slog"] diff --git a/lib/g3-types/src/log/async_log.rs b/lib/g3-types/src/log/async_log.rs index 28515006..b7e74782 100644 --- a/lib/g3-types/src/log/async_log.rs +++ b/lib/g3-types/src/log/async_log.rs @@ -5,7 +5,6 @@ use std::sync::Arc; -use flume::{Sender, TrySendError}; use slog::{Drain, OwnedKVList, Record}; use super::LogStats; @@ -41,7 +40,7 @@ pub struct AsyncLogger where F: AsyncLogFormatter, { - sender: Sender, + sender: kanal::Sender, formatter: F, stats: Arc, } @@ -50,7 +49,7 @@ impl AsyncLogger where F: AsyncLogFormatter, { - pub fn new(sender: Sender, formatter: F, stats: Arc) -> Self { + pub fn new(sender: kanal::Sender, formatter: F, stats: Arc) -> Self { AsyncLogger { sender, formatter, @@ -76,9 +75,9 @@ where match self.formatter.format_slog(record, logger_values) { Ok(v) => { match self.sender.try_send(v) { - Ok(_) => {} - Err(TrySendError::Full(_)) => self.stats.drop.add_channel_overflow(), - Err(TrySendError::Disconnected(_)) => self.stats.drop.add_channel_closed(), + Ok(true) => {} + Ok(false) => self.stats.drop.add_channel_overflow(), + Err(_) => self.stats.drop.add_channel_closed(), } Ok(()) }