mirror of
https://github.com/bytedance/g3.git
synced 2026-05-05 23:41:57 +00:00
use kanal instead of flume
Some checks failed
CrossCompiling / Build (push) Has been cancelled
CodeCoverage / g3bench test (push) Has been cancelled
CodeCoverage / lib unit test (push) Has been cancelled
CodeCoverage / g3mkcert test (push) Has been cancelled
CodeCoverage / g3keymess test (push) Has been cancelled
CodeCoverage / g3proxy test (push) Has been cancelled
CodeCoverage / g3statsd test (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (java-kotlin) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
CodeQL Advanced / Analyze (rust) (push) Has been cancelled
Linux-CI / Build (push) Has been cancelled
Linux-CI / Clippy (push) Has been cancelled
Linux-CI / Build vendored (push) Has been cancelled
Linux-CI / Build with OpenSSL Async Job (push) Has been cancelled
MacOS-CI / Build (push) Has been cancelled
MacOS-CI / Build vendored (push) Has been cancelled
StaticLinking / musl (push) Has been cancelled
StaticLinking / msvc (push) Has been cancelled
Windows-CI / Build (push) Has been cancelled
Windows-CI / Build vendored (push) Has been cancelled
Some checks failed
CrossCompiling / Build (push) Has been cancelled
CodeCoverage / g3bench test (push) Has been cancelled
CodeCoverage / lib unit test (push) Has been cancelled
CodeCoverage / g3mkcert test (push) Has been cancelled
CodeCoverage / g3keymess test (push) Has been cancelled
CodeCoverage / g3proxy test (push) Has been cancelled
CodeCoverage / g3statsd test (push) Has been cancelled
CodeQL Advanced / Analyze (actions) (push) Has been cancelled
CodeQL Advanced / Analyze (java-kotlin) (push) Has been cancelled
CodeQL Advanced / Analyze (python) (push) Has been cancelled
CodeQL Advanced / Analyze (rust) (push) Has been cancelled
Linux-CI / Build (push) Has been cancelled
Linux-CI / Clippy (push) Has been cancelled
Linux-CI / Build vendored (push) Has been cancelled
Linux-CI / Build with OpenSSL Async Job (push) Has been cancelled
MacOS-CI / Build (push) Has been cancelled
MacOS-CI / Build vendored (push) Has been cancelled
StaticLinking / musl (push) Has been cancelled
StaticLinking / msvc (push) Has been cancelled
Windows-CI / Build (push) Has been cancelled
Windows-CI / Build vendored (push) Has been cancelled
This commit is contained in:
parent
9341010650
commit
9145838bbe
36 changed files with 133 additions and 179 deletions
64
Cargo.lock
generated
64
Cargo.lock
generated
|
|
@ -196,9 +196,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "async-trait"
|
||||
version = "0.1.88"
|
||||
version = "0.1.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5"
|
||||
checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
|
|
@ -847,18 +847,6 @@ dependencies = [
|
|||
"miniz_oxide",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "flume"
|
||||
version = "0.11.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"nanorand",
|
||||
"spin",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "fnv"
|
||||
version = "1.0.7"
|
||||
|
|
@ -1126,13 +1114,13 @@ dependencies = [
|
|||
"anyhow",
|
||||
"chrono",
|
||||
"constant_time_eq 0.4.2",
|
||||
"flume",
|
||||
"g3-compat",
|
||||
"g3-openssl",
|
||||
"g3-socket",
|
||||
"g3-types",
|
||||
"g3-yaml",
|
||||
"hex",
|
||||
"kanal",
|
||||
"log",
|
||||
"rmp",
|
||||
"rmp-serde",
|
||||
|
|
@ -1254,7 +1242,6 @@ dependencies = [
|
|||
"atoi",
|
||||
"base64",
|
||||
"bytes",
|
||||
"flume",
|
||||
"g3-h2",
|
||||
"g3-http",
|
||||
"g3-io-ext",
|
||||
|
|
@ -1265,6 +1252,7 @@ dependencies = [
|
|||
"h2",
|
||||
"http",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"memchr",
|
||||
"rustls-pki-types",
|
||||
"thiserror",
|
||||
|
|
@ -1348,9 +1336,9 @@ name = "g3-journal"
|
|||
version = "0.4.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"flume",
|
||||
"g3-types",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"memchr",
|
||||
"once_cell",
|
||||
"rustix 1.0.8",
|
||||
|
|
@ -1444,7 +1432,6 @@ dependencies = [
|
|||
"c-ares",
|
||||
"c-ares-resolver",
|
||||
"c-ares-sys",
|
||||
"flume",
|
||||
"g3-hickory-client",
|
||||
"g3-socket",
|
||||
"g3-types",
|
||||
|
|
@ -1452,6 +1439,7 @@ dependencies = [
|
|||
"hickory-client",
|
||||
"hickory-proto",
|
||||
"indexmap",
|
||||
"kanal",
|
||||
"log",
|
||||
"rustls",
|
||||
"rustls-pki-types",
|
||||
|
|
@ -1555,10 +1543,10 @@ version = "0.3.0"
|
|||
dependencies = [
|
||||
"anstyle",
|
||||
"chrono",
|
||||
"flume",
|
||||
"g3-datetime",
|
||||
"g3-types",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"ryu",
|
||||
"slog",
|
||||
]
|
||||
|
|
@ -1569,13 +1557,13 @@ version = "0.8.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"flume",
|
||||
"g3-compat",
|
||||
"g3-datetime",
|
||||
"g3-io-sys",
|
||||
"g3-types",
|
||||
"g3-yaml",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"log",
|
||||
"ryu",
|
||||
"serde",
|
||||
|
|
@ -1630,7 +1618,6 @@ dependencies = [
|
|||
"constant_time_eq 0.4.2",
|
||||
"crc32fast",
|
||||
"fastrand",
|
||||
"flume",
|
||||
"fnv",
|
||||
"foldhash",
|
||||
"g3-std-ext",
|
||||
|
|
@ -1641,6 +1628,7 @@ dependencies = [
|
|||
"indexmap",
|
||||
"ip_network",
|
||||
"ip_network_table",
|
||||
"kanal",
|
||||
"libc",
|
||||
"log",
|
||||
"lru",
|
||||
|
|
@ -1771,7 +1759,6 @@ version = "0.9.0"
|
|||
dependencies = [
|
||||
"anyhow",
|
||||
"clap",
|
||||
"flume",
|
||||
"g3-build-env",
|
||||
"g3-cert-agent",
|
||||
"g3-daemon",
|
||||
|
|
@ -1782,6 +1769,7 @@ dependencies = [
|
|||
"g3-tls-cert",
|
||||
"g3-types",
|
||||
"g3-yaml",
|
||||
"kanal",
|
||||
"log",
|
||||
"tokio",
|
||||
"variant-ssl",
|
||||
|
|
@ -1907,7 +1895,6 @@ dependencies = [
|
|||
"clap_complete",
|
||||
"fastrand",
|
||||
"fixedbitset",
|
||||
"flume",
|
||||
"fnv",
|
||||
"foldhash",
|
||||
"futures-util",
|
||||
|
|
@ -1951,6 +1938,7 @@ dependencies = [
|
|||
"ip_network",
|
||||
"ip_network_table",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"log",
|
||||
"lru",
|
||||
"memchr",
|
||||
|
|
@ -2103,7 +2091,6 @@ dependencies = [
|
|||
"chrono",
|
||||
"clap",
|
||||
"clap_complete",
|
||||
"flume",
|
||||
"foldhash",
|
||||
"futures-util",
|
||||
"g3-build-env",
|
||||
|
|
@ -2123,6 +2110,7 @@ dependencies = [
|
|||
"g3tiles-proto",
|
||||
"governor",
|
||||
"itoa",
|
||||
"kanal",
|
||||
"log",
|
||||
"openssl-probe",
|
||||
"quinn",
|
||||
|
|
@ -2681,6 +2669,16 @@ dependencies = [
|
|||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "kanal"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9e3953adf0cd667798b396c2fa13552d6d9b3269d7dd1154c4c416442d1ff574"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "lazy_static"
|
||||
version = "1.5.0"
|
||||
|
|
@ -2837,15 +2835,6 @@ dependencies = [
|
|||
"pkg-config",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nanorand"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3"
|
||||
dependencies = [
|
||||
"getrandom 0.2.16",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "nibble_vec"
|
||||
version = "0.1.0"
|
||||
|
|
@ -3573,15 +3562,6 @@ dependencies = [
|
|||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spin"
|
||||
version = "0.9.8"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
|
||||
dependencies = [
|
||||
"lock_api",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "spinning_top"
|
||||
version = "0.3.0"
|
||||
|
|
|
|||
|
|
@ -166,6 +166,7 @@ openssl = { package = "variant-ssl", version = "0.17.5" }
|
|||
openssl-sys = { package = "variant-ssl-sys", version = "0.17.5" }
|
||||
openssl-probe = "0.1"
|
||||
#
|
||||
kanal = { version = "0.1.1", default-features = false }
|
||||
flume = { version = "0.11", default-features = false }
|
||||
#
|
||||
c-ares = { version = "11.0", default-features = false }
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ clap.workspace = true
|
|||
log = { workspace = true, features = ["max_level_trace", "release_max_level_debug"] }
|
||||
openssl.workspace = true
|
||||
tokio = { workspace = true, features = ["macros", "net", "io-util", "time", "signal"] }
|
||||
flume = { workspace = true, features = ["async"] }
|
||||
kanal = { workspace = true, features = ["async"] }
|
||||
yaml-rust.workspace = true
|
||||
g3-std-ext.workspace = true
|
||||
g3-types.workspace = true
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ use std::sync::Arc;
|
|||
use std::time::Duration;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use flume::{Receiver, Sender};
|
||||
use log::{debug, error, warn};
|
||||
use openssl::pkey::{PKey, Private};
|
||||
use openssl::x509::X509;
|
||||
|
|
@ -131,8 +130,8 @@ impl OpensslBackend {
|
|||
mut self,
|
||||
handle: &Handle,
|
||||
id: usize,
|
||||
req_receiver: Receiver<BackendRequest>,
|
||||
rsp_sender: Sender<BackendResponse>,
|
||||
req_receiver: kanal::AsyncReceiver<BackendRequest>,
|
||||
rsp_sender: kanal::AsyncSender<BackendResponse>,
|
||||
) {
|
||||
handle.spawn(async move {
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(300));
|
||||
|
|
@ -144,7 +143,7 @@ impl OpensslBackend {
|
|||
warn!("failed to refresh backend: {e:?}");
|
||||
}
|
||||
}
|
||||
r = req_receiver.recv_async() => {
|
||||
r = req_receiver.recv() => {
|
||||
let Ok(req) = r else {
|
||||
break
|
||||
};
|
||||
|
|
@ -154,7 +153,7 @@ impl OpensslBackend {
|
|||
match self.generate(&req.user_req) {
|
||||
Ok(data) => {
|
||||
debug!("{host} - [#{id}] cert generated");
|
||||
if let Err(e) = rsp_sender.send_async(req.into_response(data)).await {
|
||||
if let Err(e) = rsp_sender.send(req.into_response(data)).await {
|
||||
error!("{host} - [#{id}] failed to send cert to frontend: {e}");
|
||||
break;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,14 +32,14 @@ pub(super) struct Frontend {
|
|||
io: UdpDgramIo,
|
||||
stats: Arc<FrontendStats>,
|
||||
duration_recorder: HistogramRecorder<u64>,
|
||||
rsp_receiver: flume::Receiver<BackendResponse>,
|
||||
rsp_receiver: kanal::AsyncReceiver<BackendResponse>,
|
||||
}
|
||||
|
||||
impl Frontend {
|
||||
pub(super) fn new(
|
||||
listen_config: &UdpListenConfig,
|
||||
duration_recorder: HistogramRecorder<u64>,
|
||||
rsp_receiver: flume::Receiver<BackendResponse>,
|
||||
rsp_receiver: kanal::AsyncReceiver<BackendResponse>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let io = UdpDgramIo::new(listen_config)?;
|
||||
Ok(Frontend {
|
||||
|
|
@ -54,7 +54,10 @@ impl Frontend {
|
|||
self.stats.clone()
|
||||
}
|
||||
|
||||
pub(super) async fn run(self, req_sender: flume::Sender<BackendRequest>) -> anyhow::Result<()> {
|
||||
pub(super) async fn run(
|
||||
self,
|
||||
req_sender: kanal::AsyncSender<BackendRequest>,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut clt_c = Box::pin(tokio::signal::ctrl_c());
|
||||
|
||||
let mut rcv_buf = [0u8; 16384];
|
||||
|
|
@ -68,7 +71,7 @@ impl Frontend {
|
|||
Ok(user_req) => {
|
||||
debug!("{} - request received", user_req.host());
|
||||
let req = BackendRequest {user_req, peer, recv_time};
|
||||
if let Err(e) = req_sender.send_async(req).await {
|
||||
if let Err(e) = req_sender.send(req).await {
|
||||
return Err(anyhow!("failed to send request to backend: {e}"));
|
||||
}
|
||||
}
|
||||
|
|
@ -80,7 +83,7 @@ impl Frontend {
|
|||
Err(e) => return Err(anyhow!("frontend recv error: {e:?}")),
|
||||
}
|
||||
}
|
||||
r = self.rsp_receiver.recv_async() => {
|
||||
r = self.rsp_receiver.recv() => {
|
||||
match r {
|
||||
Ok(rsp) => self.handle_rsp(rsp).await,
|
||||
Err(e) => return Err(anyhow!("recv from backend failed: {e}")),
|
||||
|
|
@ -98,7 +101,7 @@ impl Frontend {
|
|||
}
|
||||
|
||||
drop(req_sender);
|
||||
while let Ok(rsp) = self.rsp_receiver.recv_async().await {
|
||||
while let Ok(rsp) = self.rsp_receiver.recv().await {
|
||||
self.handle_rsp(rsp).await;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -59,8 +59,8 @@ impl BackendResponse {
|
|||
}
|
||||
|
||||
pub async fn run(proc_args: &ProcArgs) -> anyhow::Result<()> {
|
||||
let (req_sender, req_receiver) = flume::bounded::<BackendRequest>(1024);
|
||||
let (rsp_sender, rsp_receiver) = flume::bounded::<BackendResponse>(1024);
|
||||
let (req_sender, req_receiver) = kanal::bounded_async::<BackendRequest>(1024);
|
||||
let (rsp_sender, rsp_receiver) = kanal::bounded_async::<BackendResponse>(1024);
|
||||
|
||||
let backend_config =
|
||||
config::get_backend_config().ok_or_else(|| anyhow!("no backend config available"))?;
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ rustc-hash.workspace = true
|
|||
fnv.workspace = true
|
||||
governor = { workspace = true, features = ["std", "jitter"] }
|
||||
rmpv.workspace = true
|
||||
flume.workspace = true
|
||||
kanal = { workspace = true, features = ["async"] }
|
||||
lru.workspace = true
|
||||
regex.workspace = true
|
||||
mlua = { workspace = true, features = ["send"], optional = true }
|
||||
|
|
|
|||
|
|
@ -82,7 +82,7 @@ impl StreamDetourConnector {
|
|||
|
||||
pub(super) async fn run_new_connection(
|
||||
&self,
|
||||
req_receiver: flume::Receiver<StreamDetourRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<StreamDetourRequest>,
|
||||
idle_timeout: Duration,
|
||||
) {
|
||||
let mut connection = match self.new_connection().await {
|
||||
|
|
@ -104,7 +104,7 @@ impl StreamDetourConnector {
|
|||
debug!("detour connection closed unexpectedly: {e}");
|
||||
return;
|
||||
}
|
||||
r = req_receiver.recv_async() => {
|
||||
r = req_receiver.recv() => {
|
||||
idle_sleep.as_mut().reset(Instant::now() + idle_timeout);
|
||||
match r {
|
||||
Ok(req) => {
|
||||
|
|
|
|||
|
|
@ -65,13 +65,13 @@ impl<SC> StreamDetourContext<'_, SC> {
|
|||
|
||||
pub(crate) struct StreamDetourClient {
|
||||
config: Arc<AuditStreamDetourConfig>,
|
||||
req_sender: flume::Sender<StreamDetourRequest>,
|
||||
req_sender: kanal::AsyncSender<StreamDetourRequest>,
|
||||
pool_handle: StreamDetourPoolHandle,
|
||||
}
|
||||
|
||||
impl StreamDetourClient {
|
||||
pub(super) fn new(config: Arc<AuditStreamDetourConfig>) -> anyhow::Result<Self> {
|
||||
let (req_sender, req_receiver) = flume::unbounded();
|
||||
let (req_sender, req_receiver) = kanal::unbounded_async();
|
||||
let connector = StreamDetourConnector::new(config.clone())?;
|
||||
let pool_handle =
|
||||
StreamDetourPool::spawn(config.connection_pool, req_receiver, Arc::new(connector));
|
||||
|
|
@ -115,18 +115,11 @@ impl StreamDetourClient {
|
|||
let (sender, receiver) = oneshot::channel();
|
||||
let req = StreamDetourRequest(sender);
|
||||
|
||||
if let Err(e) = self.req_sender.try_send(req) {
|
||||
match e {
|
||||
flume::TrySendError::Full(req) => {
|
||||
self.pool_handle.request_new_connection();
|
||||
if self.req_sender.send_async(req).await.is_err() {
|
||||
return Err(anyhow!("stream detour client is down"));
|
||||
}
|
||||
}
|
||||
flume::TrySendError::Disconnected(_req) => {
|
||||
return Err(anyhow!("stream detour client is down"));
|
||||
}
|
||||
}
|
||||
if self.req_sender.is_full() {
|
||||
self.pool_handle.request_new_connection();
|
||||
}
|
||||
if self.req_sender.send(req).await.is_err() {
|
||||
return Err(anyhow!("stream detour client is down"));
|
||||
}
|
||||
|
||||
match tokio::time::timeout(self.config.stream_open_timeout, receiver).await {
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ pub(super) struct StreamDetourPool {
|
|||
connector: Arc<StreamDetourConnector>,
|
||||
stats: Arc<ConnectionPoolStats>,
|
||||
|
||||
client_req_receiver: flume::Receiver<StreamDetourRequest>,
|
||||
client_req_receiver: kanal::AsyncReceiver<StreamDetourRequest>,
|
||||
|
||||
connection_id: u64,
|
||||
connection_close_receiver: mpsc::Receiver<u64>,
|
||||
|
|
@ -43,7 +43,7 @@ pub(super) struct StreamDetourPool {
|
|||
impl StreamDetourPool {
|
||||
fn new(
|
||||
config: ConnectionPoolConfig,
|
||||
client_req_receiver: flume::Receiver<StreamDetourRequest>,
|
||||
client_req_receiver: kanal::AsyncReceiver<StreamDetourRequest>,
|
||||
connector: Arc<StreamDetourConnector>,
|
||||
) -> Self {
|
||||
let (connection_close_sender, connection_close_receiver) = mpsc::channel(1);
|
||||
|
|
@ -60,7 +60,7 @@ impl StreamDetourPool {
|
|||
|
||||
pub(super) fn spawn(
|
||||
config: ConnectionPoolConfig,
|
||||
client_cmd_receiver: flume::Receiver<StreamDetourRequest>,
|
||||
client_cmd_receiver: kanal::AsyncReceiver<StreamDetourRequest>,
|
||||
connector: Arc<StreamDetourConnector>,
|
||||
) -> StreamDetourPoolHandle {
|
||||
let pool = StreamDetourPool::new(config, client_cmd_receiver, connector);
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ governor = { workspace = true, features = ["std", "jitter"] }
|
|||
chrono = { workspace = true, features = ["clock"] }
|
||||
uuid.workspace = true
|
||||
bitflags.workspace = true
|
||||
flume.workspace = true
|
||||
kanal = { workspace = true, features = ["async"] }
|
||||
rustc-hash.workspace = true
|
||||
g3-macros.workspace = true
|
||||
g3-daemon = { workspace = true, features = ["event-log"] }
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ impl KeylessUpstreamConnect for KeylessQuicUpstreamConnector {
|
|||
|
||||
async fn new_connection(
|
||||
&self,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
idle_timeout: Duration,
|
||||
) -> anyhow::Result<Self::Connection> {
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ pub(crate) struct KeylessQuicBackend {
|
|||
peer_addrs: Arc<ArcSwapOption<SelectiveVec<WeightedValue<SocketAddr>>>>,
|
||||
discover_handle: Mutex<Option<AbortHandle>>,
|
||||
pool_handle: KeylessConnectionPoolHandle,
|
||||
keyless_request_sender: flume::Sender<KeylessForwardRequest>,
|
||||
keyless_request_sender: kanal::AsyncSender<KeylessForwardRequest>,
|
||||
}
|
||||
|
||||
impl KeylessQuicBackend {
|
||||
|
|
@ -52,7 +52,7 @@ impl KeylessQuicBackend {
|
|||
duration_stats.set_extra_tags(config.extra_metrics_tags.clone());
|
||||
|
||||
let (keyless_request_sender, keyless_request_receiver) =
|
||||
flume::bounded(config.request_buffer_size);
|
||||
kanal::bounded_async(config.request_buffer_size);
|
||||
let connector = KeylessQuicUpstreamConnector::new(
|
||||
config.clone(),
|
||||
stats.clone(),
|
||||
|
|
@ -183,20 +183,12 @@ impl Backend for KeylessQuicBackend {
|
|||
|
||||
let (rsp_sender, rsp_receiver) = oneshot::channel();
|
||||
let req = KeylessForwardRequest::new(req, rsp_sender);
|
||||
if let Err(e) = self.keyless_request_sender.try_send(req) {
|
||||
match e {
|
||||
flume::TrySendError::Full(req) => {
|
||||
self.pool_handle.request_new_connection();
|
||||
if self.keyless_request_sender.send_async(req).await.is_err() {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
}
|
||||
flume::TrySendError::Disconnected(_req) => {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
}
|
||||
if self.keyless_request_sender.is_full() {
|
||||
self.pool_handle.request_new_connection();
|
||||
}
|
||||
if self.keyless_request_sender.send(req).await.is_err() {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
rsp_receiver.await.unwrap_or(KeylessResponse::Local(err))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -85,7 +85,7 @@ impl KeylessUpstreamConnect for KeylessTcpUpstreamConnector {
|
|||
|
||||
async fn new_connection(
|
||||
&self,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
_idle_timeout: Duration,
|
||||
) -> anyhow::Result<Self::Connection> {
|
||||
|
|
@ -129,7 +129,7 @@ impl KeylessUpstreamConnect for KeylessTlsUpstreamConnector {
|
|||
|
||||
async fn new_connection(
|
||||
&self,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
_idle_timeout: Duration,
|
||||
) -> anyhow::Result<Self::Connection> {
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ pub(crate) struct KeylessTcpBackend {
|
|||
peer_addrs: Arc<ArcSwapOption<SelectiveVec<WeightedValue<SocketAddr>>>>,
|
||||
discover_handle: Mutex<Option<AbortHandle>>,
|
||||
pool_handle: KeylessConnectionPoolHandle,
|
||||
keyless_request_sender: flume::Sender<KeylessForwardRequest>,
|
||||
keyless_request_sender: kanal::AsyncSender<KeylessForwardRequest>,
|
||||
}
|
||||
|
||||
impl KeylessTcpBackend {
|
||||
|
|
@ -52,7 +52,7 @@ impl KeylessTcpBackend {
|
|||
duration_stats.set_extra_tags(config.extra_metrics_tags.clone());
|
||||
|
||||
let (keyless_request_sender, keyless_request_receiver) =
|
||||
flume::bounded(config.request_buffer_size);
|
||||
kanal::bounded_async(config.request_buffer_size);
|
||||
let tcp_connector = KeylessTcpUpstreamConnector::new(
|
||||
config.clone(),
|
||||
stats.clone(),
|
||||
|
|
@ -194,20 +194,12 @@ impl Backend for KeylessTcpBackend {
|
|||
|
||||
let (rsp_sender, rsp_receiver) = oneshot::channel();
|
||||
let req = KeylessForwardRequest::new(req, rsp_sender);
|
||||
if let Err(e) = self.keyless_request_sender.try_send(req) {
|
||||
match e {
|
||||
flume::TrySendError::Full(req) => {
|
||||
self.pool_handle.request_new_connection();
|
||||
if self.keyless_request_sender.send_async(req).await.is_err() {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
}
|
||||
flume::TrySendError::Disconnected(_req) => {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
}
|
||||
if self.keyless_request_sender.is_full() {
|
||||
self.pool_handle.request_new_connection();
|
||||
}
|
||||
if self.keyless_request_sender.send(req).await.is_err() {
|
||||
self.stats.add_request_drop();
|
||||
return KeylessResponse::Local(err);
|
||||
}
|
||||
rsp_receiver.await.unwrap_or(KeylessResponse::Local(err))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ pub(crate) struct MultiplexedUpstreamConnection<R, W> {
|
|||
duration_recorder: Arc<KeylessUpstreamDurationRecorder>,
|
||||
r: R,
|
||||
w: W,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
alive_channel_guard: KeylessBackendAliveChannelGuard,
|
||||
}
|
||||
|
|
@ -58,7 +58,7 @@ impl<R, W> MultiplexedUpstreamConnection<R, W> {
|
|||
duration_recorder: Arc<KeylessUpstreamDurationRecorder>,
|
||||
ups_r: R,
|
||||
ups_w: W,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
) -> Self {
|
||||
let alive_channel_guard = stats.inc_alive_channel();
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ pub(super) struct KeylessUpstreamSendTask {
|
|||
max_request_count: usize,
|
||||
max_alive_time: Duration,
|
||||
stats: Arc<KeylessBackendStats>,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
shared_state: Arc<StreamSharedState>,
|
||||
duration_recorder: Arc<KeylessUpstreamDurationRecorder>,
|
||||
|
|
@ -36,7 +36,7 @@ impl KeylessUpstreamSendTask {
|
|||
max_request_count: usize,
|
||||
max_alive_time: Duration,
|
||||
stats: Arc<KeylessBackendStats>,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
shared_state: Arc<StreamSharedState>,
|
||||
duration_recorder: Arc<KeylessUpstreamDurationRecorder>,
|
||||
|
|
@ -70,7 +70,7 @@ impl KeylessUpstreamSendTask {
|
|||
tokio::select! {
|
||||
biased;
|
||||
|
||||
r = self.req_receiver.recv_async() => {
|
||||
r = self.req_receiver.recv() => {
|
||||
idle_sleep.as_mut().reset(Instant::now() + idle_timeout);
|
||||
match r {
|
||||
Ok(req) => {
|
||||
|
|
|
|||
|
|
@ -25,7 +25,7 @@ pub(crate) trait KeylessUpstreamConnect {
|
|||
type Connection: KeylessUpstreamConnection;
|
||||
async fn new_connection(
|
||||
&self,
|
||||
req_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
quit_notifier: broadcast::Receiver<()>,
|
||||
idle_timeout: Duration,
|
||||
) -> anyhow::Result<Self::Connection>;
|
||||
|
|
@ -70,7 +70,7 @@ pub(crate) struct KeylessConnectionPool<C: KeylessUpstreamConnection> {
|
|||
connector: ArcKeylessUpstreamConnect<C>,
|
||||
stats: Arc<ConnectionPoolStats>,
|
||||
|
||||
keyless_request_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
keyless_request_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
|
||||
connection_id: u64,
|
||||
connection_close_receiver: mpsc::Receiver<u64>,
|
||||
|
|
@ -87,7 +87,7 @@ where
|
|||
fn new(
|
||||
config: ConnectionPoolConfig,
|
||||
connector: ArcKeylessUpstreamConnect<C>,
|
||||
keyless_request_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
keyless_request_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
graceful_close_wait: Duration,
|
||||
) -> Self {
|
||||
let (connection_close_sender, connection_close_receiver) = mpsc::channel(1);
|
||||
|
|
@ -108,7 +108,7 @@ where
|
|||
pub(crate) fn spawn(
|
||||
config: ConnectionPoolConfig,
|
||||
connector: ArcKeylessUpstreamConnect<C>,
|
||||
keyless_request_receiver: flume::Receiver<KeylessForwardRequest>,
|
||||
keyless_request_receiver: kanal::AsyncReceiver<KeylessForwardRequest>,
|
||||
graceful_close_wait: Duration,
|
||||
) -> KeylessConnectionPoolHandle {
|
||||
let pool = KeylessConnectionPool::new(
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ rust-version.workspace = true
|
|||
anyhow.workspace = true
|
||||
slog.workspace = true
|
||||
chrono = { workspace = true, features = ["clock"] }
|
||||
flume = { workspace = true, features = ["async"] }
|
||||
kanal = { workspace = true, features = ["async"] }
|
||||
rmp.workspace = true
|
||||
rmp-serde.workspace = true
|
||||
serde.workspace = true
|
||||
|
|
|
|||
|
|
@ -8,7 +8,6 @@ use std::sync::Arc;
|
|||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
use anyhow::anyhow;
|
||||
use flume::Receiver;
|
||||
use log::warn;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use tokio::net::TcpStream;
|
||||
|
|
@ -32,14 +31,14 @@ pub fn new_async_logger(
|
|||
fluent_conf: &Arc<FluentdClientConfig>,
|
||||
tag_name: String,
|
||||
) -> AsyncLogger<Vec<u8>, FluentdFormatter> {
|
||||
let (sender, receiver) = flume::bounded::<Vec<u8>>(async_conf.channel_capacity);
|
||||
let (sender, receiver) = kanal::bounded::<Vec<u8>>(async_conf.channel_capacity);
|
||||
|
||||
let stats = Arc::new(LogStats::default());
|
||||
|
||||
for i in 0..async_conf.thread_number {
|
||||
let io_thread = AsyncIoThread {
|
||||
config: Arc::clone(fluent_conf),
|
||||
receiver: receiver.clone(),
|
||||
receiver: receiver.clone_async(),
|
||||
stats: Arc::clone(&stats),
|
||||
retry_queue: VecDeque::with_capacity(fluent_conf.retry_queue_len),
|
||||
};
|
||||
|
|
@ -65,7 +64,7 @@ enum FluentdConnection {
|
|||
|
||||
struct AsyncIoThread {
|
||||
config: Arc<FluentdClientConfig>,
|
||||
receiver: Receiver<Vec<u8>>,
|
||||
receiver: kanal::AsyncReceiver<Vec<u8>>,
|
||||
stats: Arc<LogStats>,
|
||||
retry_queue: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
|
@ -112,7 +111,7 @@ impl AsyncIoThread {
|
|||
let drop_count = Arc::new(AtomicUsize::new(0));
|
||||
let drop_count_i = drop_count.clone();
|
||||
match tokio::time::timeout(self.config.connect_delay, async {
|
||||
while let Ok(data) = self.receiver.recv_async().await {
|
||||
while let Ok(data) = self.receiver.recv().await {
|
||||
if self.push_to_retry(data).is_some() {
|
||||
drop_count_i.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
|
@ -157,7 +156,7 @@ impl AsyncIoThread {
|
|||
|
||||
loop {
|
||||
tokio::select! {
|
||||
r = self.receiver.recv_async() => {
|
||||
r = self.receiver.recv() => {
|
||||
match r {
|
||||
Ok(data) => {
|
||||
match tokio::time::timeout(self.config.write_timeout, connection.write_all(data.as_slice())).await {
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@ itoa.workspace = true
|
|||
url.workspace = true
|
||||
bytes.workspace = true
|
||||
base64.workspace = true
|
||||
flume = { workspace = true, features = ["async"] }
|
||||
kanal = { workspace = true, features = ["async"] }
|
||||
tokio = { workspace = true, features = ["time", "io-util", "sync", "macros", "rt"] }
|
||||
tokio-rustls.workspace = true
|
||||
rustls-pki-types.workspace = true
|
||||
|
|
|
|||
|
|
@ -17,13 +17,13 @@ use crate::options::{IcapOptionsRequest, IcapServiceOptions};
|
|||
pub struct IcapServiceClient {
|
||||
pub(crate) config: Arc<IcapServiceConfig>,
|
||||
pub(crate) partial_request_header: Vec<u8>,
|
||||
cmd_sender: flume::Sender<IcapServiceClientCommand>,
|
||||
cmd_sender: kanal::AsyncSender<IcapServiceClientCommand>,
|
||||
conn_creator: Arc<IcapConnector>,
|
||||
}
|
||||
|
||||
impl IcapServiceClient {
|
||||
pub fn new(config: Arc<IcapServiceConfig>) -> anyhow::Result<Self> {
|
||||
let (cmd_sender, cmd_receiver) = flume::unbounded();
|
||||
let (cmd_sender, cmd_receiver) = kanal::unbounded_async();
|
||||
let conn_creator = IcapConnector::new(config.clone())?;
|
||||
let conn_creator = Arc::new(conn_creator);
|
||||
let pool = IcapServicePool::new(config.clone(), cmd_receiver, conn_creator.clone());
|
||||
|
|
@ -40,7 +40,7 @@ impl IcapServiceClient {
|
|||
async fn fetch_from_pool(&self) -> Option<(IcapClientConnection, Arc<IcapServiceOptions>)> {
|
||||
let (rsp_sender, rsp_receiver) = oneshot::channel();
|
||||
let cmd = IcapServiceClientCommand::FetchConnection(rsp_sender);
|
||||
if self.cmd_sender.send_async(cmd).await.is_ok() {
|
||||
if self.cmd_sender.send(cmd).await.is_ok() {
|
||||
rsp_receiver.await.ok()
|
||||
} else {
|
||||
None
|
||||
|
|
@ -76,7 +76,7 @@ impl IcapServiceClient {
|
|||
let pool_sender = self.cmd_sender.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = pool_sender
|
||||
.send_async(IcapServiceClientCommand::SaveConnection(conn))
|
||||
.send(IcapServiceClientCommand::SaveConnection(conn))
|
||||
.await;
|
||||
});
|
||||
}
|
||||
|
|
|
|||
|
|
@ -162,13 +162,13 @@ impl IcapConnectionPollRequest {
|
|||
|
||||
pub(super) struct IcapConnectionEofPoller {
|
||||
conn: IcapClientConnection,
|
||||
req_receiver: flume::Receiver<IcapConnectionPollRequest>,
|
||||
req_receiver: kanal::AsyncReceiver<IcapConnectionPollRequest>,
|
||||
}
|
||||
|
||||
impl IcapConnectionEofPoller {
|
||||
pub(super) fn new(
|
||||
conn: IcapClientConnection,
|
||||
req_receiver: &flume::Receiver<IcapConnectionPollRequest>,
|
||||
req_receiver: &kanal::AsyncReceiver<IcapConnectionPollRequest>,
|
||||
) -> Option<Self> {
|
||||
if conn.reusable() {
|
||||
Some(IcapConnectionEofPoller {
|
||||
|
|
@ -186,7 +186,7 @@ impl IcapConnectionEofPoller {
|
|||
tokio::select! {
|
||||
_ = self.conn.reader.fill_wait_data() => {}
|
||||
_ = idle_sleep => {}
|
||||
r = self.req_receiver.recv_async() => {
|
||||
r = self.req_receiver.recv() => {
|
||||
if let Ok(req) = r {
|
||||
let IcapConnectionPollRequest {
|
||||
client_sender,
|
||||
|
|
|
|||
|
|
@ -33,25 +33,25 @@ pub(super) struct IcapServicePool {
|
|||
options: Arc<IcapServiceOptions>,
|
||||
connector: Arc<IcapConnector>,
|
||||
check_interval: Interval,
|
||||
client_cmd_receiver: flume::Receiver<IcapServiceClientCommand>,
|
||||
client_cmd_receiver: kanal::AsyncReceiver<IcapServiceClientCommand>,
|
||||
pool_cmd_sender: mpsc::Sender<IcapServicePoolCommand>,
|
||||
pool_cmd_receiver: mpsc::Receiver<IcapServicePoolCommand>,
|
||||
conn_req_sender: flume::Sender<IcapConnectionPollRequest>,
|
||||
conn_req_receiver: flume::Receiver<IcapConnectionPollRequest>,
|
||||
conn_req_sender: kanal::AsyncSender<IcapConnectionPollRequest>,
|
||||
conn_req_receiver: kanal::AsyncReceiver<IcapConnectionPollRequest>,
|
||||
idle_conn_count: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
impl IcapServicePool {
|
||||
pub(super) fn new(
|
||||
config: Arc<IcapServiceConfig>,
|
||||
client_cmd_receiver: flume::Receiver<IcapServiceClientCommand>,
|
||||
client_cmd_receiver: kanal::AsyncReceiver<IcapServiceClientCommand>,
|
||||
connector: Arc<IcapConnector>,
|
||||
) -> Self {
|
||||
let options = Arc::new(IcapServiceOptions::new_expired(config.method));
|
||||
let check_interval = tokio::time::interval(config.connection_pool.check_interval());
|
||||
let (pool_cmd_sender, pool_cmd_receiver) = mpsc::channel(POOL_CMD_CHANNEL_SIZE);
|
||||
let (conn_req_sender, conn_req_receiver) =
|
||||
flume::bounded(config.connection_pool.max_idle_count());
|
||||
kanal::bounded_async(config.connection_pool.max_idle_count());
|
||||
IcapServicePool {
|
||||
config,
|
||||
options,
|
||||
|
|
@ -78,7 +78,7 @@ impl IcapServicePool {
|
|||
_ = self.check_interval.tick() => {
|
||||
self.check();
|
||||
}
|
||||
r = self.client_cmd_receiver.recv_async() => {
|
||||
r = self.client_cmd_receiver.recv() => {
|
||||
match r {
|
||||
Ok(cmd) => self.handle_client_cmd(cmd),
|
||||
Err(_) => break,
|
||||
|
|
@ -137,7 +137,7 @@ impl IcapServicePool {
|
|||
let options = self.options.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = req_sender
|
||||
.send_async(IcapConnectionPollRequest::new(sender, options))
|
||||
.send(IcapConnectionPollRequest::new(sender, options))
|
||||
.await;
|
||||
});
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -17,5 +17,5 @@ once_cell.workspace = true
|
|||
memchr.workspace = true
|
||||
itoa.workspace = true
|
||||
ryu.workspace = true
|
||||
flume.workspace = true
|
||||
kanal.workspace = true
|
||||
g3-types = { workspace = true, features = ["async-log"] }
|
||||
|
|
|
|||
|
|
@ -5,8 +5,6 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use flume::Receiver;
|
||||
|
||||
use g3_types::log::{AsyncLogConfig, AsyncLogger, LogStats};
|
||||
|
||||
#[macro_use]
|
||||
|
|
@ -41,7 +39,7 @@ pub fn new_async_logger(
|
|||
async_conf: &AsyncLogConfig,
|
||||
journal_conf: JournalConfig,
|
||||
) -> AsyncLogger<Vec<u8>, JournalFormatter> {
|
||||
let (sender, receiver) = flume::bounded::<Vec<u8>>(async_conf.channel_capacity);
|
||||
let (sender, receiver) = kanal::bounded::<Vec<u8>>(async_conf.channel_capacity);
|
||||
|
||||
let stats = Arc::new(LogStats::default());
|
||||
|
||||
|
|
@ -62,7 +60,7 @@ pub fn new_async_logger(
|
|||
}
|
||||
|
||||
struct AsyncIoThread {
|
||||
receiver: Receiver<Vec<u8>>,
|
||||
receiver: kanal::Receiver<Vec<u8>>,
|
||||
stats: Arc<LogStats>,
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ hickory-client = { workspace = true, optional = true }
|
|||
hickory-proto = { workspace = true, optional = true, features = ["tokio"] }
|
||||
rustls = { workspace = true, optional = true }
|
||||
rustls-pki-types = { workspace = true, optional = true }
|
||||
flume = { workspace = true, optional = true, features = ["async"] }
|
||||
kanal = { workspace = true, optional = true, features = ["async"] }
|
||||
async-recursion = { workspace = true, optional = true }
|
||||
yaml-rust = { workspace = true, optional = true }
|
||||
g3-types = { workspace = true, optional = true }
|
||||
|
|
@ -35,5 +35,5 @@ default = []
|
|||
yaml = ["dep:yaml-rust", "dep:g3-yaml"]
|
||||
c-ares = ["dep:c-ares", "dep:c-ares-resolver", "dep:c-ares-sys"]
|
||||
vendored-c-ares = ["c-ares", "c-ares-resolver/vendored", "c-ares/vendored"]
|
||||
hickory = ["dep:hickory-client", "dep:hickory-proto", "dep:flume", "dep:rustls", "dep:rustls-pki-types", "dep:async-recursion", "dep:g3-hickory-client", "g3-types/rustls", "dep:g3-socket"]
|
||||
hickory = ["dep:hickory-client", "dep:hickory-proto", "dep:kanal", "dep:rustls", "dep:rustls-pki-types", "dep:async-recursion", "dep:g3-hickory-client", "g3-types/rustls", "dep:g3-socket"]
|
||||
quic = ["g3-types?/quic", "g3-hickory-client?/quic"]
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ impl HickoryClient {
|
|||
|
||||
pub(super) async fn run(
|
||||
mut self,
|
||||
req_receiver: flume::Receiver<(DnsRequest, mpsc::Sender<ResolvedRecord>)>,
|
||||
req_receiver: kanal::AsyncReceiver<(DnsRequest, mpsc::Sender<ResolvedRecord>)>,
|
||||
) {
|
||||
let (client_sender, mut client_receiver) = mpsc::channel(1);
|
||||
let mut check_interval = tokio::time::interval(Duration::from_secs(60));
|
||||
|
|
@ -86,7 +86,7 @@ impl HickoryClient {
|
|||
tokio::select! {
|
||||
biased;
|
||||
|
||||
r = req_receiver.recv_async() => {
|
||||
r = req_receiver.recv() => {
|
||||
let Ok((req, rsp_sender)) = r else {
|
||||
break;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -146,7 +146,7 @@ impl HickoryDriverConfig {
|
|||
tcp_misc_opts: self.tcp_misc_opts.clone(),
|
||||
udp_misc_opts: self.udp_misc_opts,
|
||||
};
|
||||
let (req_sender, req_receiver) = flume::unbounded();
|
||||
let (req_sender, req_receiver) = kanal::unbounded_async();
|
||||
driver.push_client(req_sender);
|
||||
tokio::spawn(async move {
|
||||
let client = HickoryClient::new(client_config).await.unwrap(); // TODO
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ pub struct HickoryResolver {
|
|||
each_timeout: Duration,
|
||||
retry_interval: Duration,
|
||||
negative_min_ttl: u32,
|
||||
clients: Vec<flume::Sender<(DnsRequest, mpsc::Sender<ResolvedRecord>)>>,
|
||||
clients: Vec<kanal::AsyncSender<(DnsRequest, mpsc::Sender<ResolvedRecord>)>>,
|
||||
}
|
||||
|
||||
impl ResolveDriver for HickoryResolver {
|
||||
|
|
@ -85,7 +85,7 @@ impl HickoryResolver {
|
|||
|
||||
pub(super) fn push_client(
|
||||
&mut self,
|
||||
req_sender: flume::Sender<(DnsRequest, mpsc::Sender<ResolvedRecord>)>,
|
||||
req_sender: kanal::AsyncSender<(DnsRequest, mpsc::Sender<ResolvedRecord>)>,
|
||||
) {
|
||||
self.clients.push(req_sender);
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@ impl HickoryResolver {
|
|||
);
|
||||
};
|
||||
if client
|
||||
.send_async((request.clone(), rsp_sender.clone()))
|
||||
.send((request.clone(), rsp_sender.clone()))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ rust-version.workspace = true
|
|||
[dependencies]
|
||||
slog.workspace = true
|
||||
chrono = { workspace = true, features = ["clock"] }
|
||||
flume.workspace = true
|
||||
kanal.workspace = true
|
||||
anstyle = "1.0"
|
||||
itoa.workspace = true
|
||||
ryu.workspace = true
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ use std::io::{self, IsTerminal, Write};
|
|||
use std::sync::Arc;
|
||||
|
||||
use chrono::Local;
|
||||
use flume::Receiver;
|
||||
use slog::Level;
|
||||
|
||||
use g3_types::log::{AsyncLogConfig, AsyncLogger, LogStats};
|
||||
|
|
@ -40,7 +39,7 @@ pub fn new_async_logger(
|
|||
append_code_position: bool,
|
||||
use_stdout: bool,
|
||||
) -> AsyncLogger<StdLogValue, StdLogFormatter> {
|
||||
let (sender, receiver) = flume::bounded::<StdLogValue>(async_conf.channel_capacity);
|
||||
let (sender, receiver) = kanal::bounded::<StdLogValue>(async_conf.channel_capacity);
|
||||
|
||||
let stats = Arc::new(LogStats::default());
|
||||
|
||||
|
|
@ -63,7 +62,7 @@ pub fn new_async_logger(
|
|||
}
|
||||
|
||||
struct AsyncIoThread {
|
||||
receiver: Receiver<StdLogValue>,
|
||||
receiver: kanal::Receiver<StdLogValue>,
|
||||
stats: Arc<LogStats>,
|
||||
}
|
||||
|
||||
|
|
@ -100,7 +99,7 @@ impl AsyncIoThread {
|
|||
let _ = self.write_plain(&mut buf, v);
|
||||
self.write_buf(&mut io, &buf);
|
||||
|
||||
while let Ok(v) = self.receiver.try_recv() {
|
||||
while let Ok(Some(v)) = self.receiver.try_recv() {
|
||||
buf.clear();
|
||||
let _ = self.write_plain(&mut buf, v);
|
||||
self.write_buf(&mut io, &buf);
|
||||
|
|
@ -132,7 +131,7 @@ impl AsyncIoThread {
|
|||
let _ = self.write_console(&mut buf, v);
|
||||
self.write_buf(&mut io, &buf);
|
||||
|
||||
while let Ok(v) = self.receiver.try_recv() {
|
||||
while let Ok(Some(v)) = self.receiver.try_recv() {
|
||||
buf.clear();
|
||||
let _ = self.write_console(&mut buf, v);
|
||||
self.write_buf(&mut io, &buf);
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ slog.workspace = true
|
|||
chrono = { workspace = true, features = ["clock"] }
|
||||
itoa.workspace = true
|
||||
ryu.workspace = true
|
||||
flume.workspace = true
|
||||
kanal= { workspace = true, features = ["std-mutex"] }
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
log.workspace = true
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ use std::cell::RefCell;
|
|||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use flume::{Receiver, Sender, TrySendError};
|
||||
use log::warn;
|
||||
use slog::{Drain, OwnedKVList, Record};
|
||||
|
||||
|
|
@ -23,7 +22,7 @@ thread_local! {
|
|||
|
||||
pub struct AsyncSyslogStreamer {
|
||||
header: SyslogHeader,
|
||||
sender: Sender<String>,
|
||||
sender: kanal::Sender<String>,
|
||||
formatter: BoxSyslogFormatter,
|
||||
stats: Arc<LogStats>,
|
||||
}
|
||||
|
|
@ -35,7 +34,7 @@ impl AsyncSyslogStreamer {
|
|||
formatter: BoxSyslogFormatter,
|
||||
backend_builder: &SyslogBackendBuilder,
|
||||
) -> Self {
|
||||
let (sender, receiver) = flume::bounded::<String>(config.channel_capacity);
|
||||
let (sender, receiver) = kanal::bounded::<String>(config.channel_capacity);
|
||||
|
||||
let stats = Arc::new(LogStats::default());
|
||||
|
||||
|
|
@ -85,9 +84,9 @@ impl Drain for AsyncSyslogStreamer {
|
|||
Ok(_) => {
|
||||
let s = unsafe { String::from_utf8_unchecked(buf.clone()) };
|
||||
match self.sender.try_send(s) {
|
||||
Ok(_) => {}
|
||||
Err(TrySendError::Full(_)) => self.stats.drop.add_channel_overflow(),
|
||||
Err(TrySendError::Disconnected(_)) => self.stats.drop.add_channel_closed(),
|
||||
Ok(true) => {}
|
||||
Ok(false) => self.stats.drop.add_channel_overflow(),
|
||||
Err(_) => self.stats.drop.add_channel_closed(),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
@ -102,7 +101,7 @@ impl Drain for AsyncSyslogStreamer {
|
|||
}
|
||||
|
||||
struct AsyncIoThread {
|
||||
receiver: Receiver<String>,
|
||||
receiver: kanal::Receiver<String>,
|
||||
backend_builder: SyslogBackendBuilder,
|
||||
stats: Arc<LogStats>,
|
||||
recv_buf: Vec<String>,
|
||||
|
|
@ -120,7 +119,7 @@ impl AsyncIoThread {
|
|||
|
||||
fn recv_send_all(&mut self) {
|
||||
while self.recv_buf.len() < MAX_BATCH_SIZE {
|
||||
let Ok(s) = self.receiver.try_recv() else {
|
||||
let Ok(Some(s)) = self.receiver.try_recv() else {
|
||||
break;
|
||||
};
|
||||
self.recv_buf.push(s);
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ lru = { workspace = true, optional = true }
|
|||
bytes = { workspace = true, optional = true }
|
||||
http = { workspace = true, optional = true }
|
||||
base64 = { workspace = true, optional = true }
|
||||
flume = { workspace = true, features = ["eventual-fairness"], optional = true }
|
||||
kanal = { workspace = true, optional = true, features = ["std-mutex"] }
|
||||
slog = { workspace = true, optional = true }
|
||||
indexmap = { workspace = true, optional = true }
|
||||
brotli = { version = "8.0", optional = true, default-features = false, features = ["std"] }
|
||||
|
|
@ -66,4 +66,4 @@ openssl = ["dep:openssl", "dep:openssl-sys", "dep:lru", "dep:bytes", "dep:ahash"
|
|||
acl-rule = ["resolve", "dep:ahash", "dep:ip_network", "dep:ip_network_table", "dep:regex", "dep:radix_trie"]
|
||||
http = ["dep:http", "dep:bytes", "dep:base64"]
|
||||
route = ["resolve", "dep:ahash", "dep:radix_trie", "dep:indexmap"]
|
||||
async-log = ["dep:flume", "dep:slog"]
|
||||
async-log = ["dep:kanal", "dep:slog"]
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
|
||||
use std::sync::Arc;
|
||||
|
||||
use flume::{Sender, TrySendError};
|
||||
use slog::{Drain, OwnedKVList, Record};
|
||||
|
||||
use super::LogStats;
|
||||
|
|
@ -41,7 +40,7 @@ pub struct AsyncLogger<T, F>
|
|||
where
|
||||
F: AsyncLogFormatter<T>,
|
||||
{
|
||||
sender: Sender<T>,
|
||||
sender: kanal::Sender<T>,
|
||||
formatter: F,
|
||||
stats: Arc<LogStats>,
|
||||
}
|
||||
|
|
@ -50,7 +49,7 @@ impl<T, F> AsyncLogger<T, F>
|
|||
where
|
||||
F: AsyncLogFormatter<T>,
|
||||
{
|
||||
pub fn new(sender: Sender<T>, formatter: F, stats: Arc<LogStats>) -> Self {
|
||||
pub fn new(sender: kanal::Sender<T>, formatter: F, stats: Arc<LogStats>) -> Self {
|
||||
AsyncLogger {
|
||||
sender,
|
||||
formatter,
|
||||
|
|
@ -76,9 +75,9 @@ where
|
|||
match self.formatter.format_slog(record, logger_values) {
|
||||
Ok(v) => {
|
||||
match self.sender.try_send(v) {
|
||||
Ok(_) => {}
|
||||
Err(TrySendError::Full(_)) => self.stats.drop.add_channel_overflow(),
|
||||
Err(TrySendError::Disconnected(_)) => self.stats.drop.add_channel_closed(),
|
||||
Ok(true) => {}
|
||||
Ok(false) => self.stats.drop.add_channel_overflow(),
|
||||
Err(_) => self.stats.drop.add_channel_closed(),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue