From 269f1bf6bb8e1b2923ae4e34e507e5bbc05d7dfe Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Sat, 17 Aug 2024 20:21:34 +0800 Subject: [PATCH] g3-ip-locate: allow to use standalone thread --- g3proxy/src/main.rs | 4 ++ lib/g3-io-ext/src/cache/mod.rs | 2 +- lib/g3-io-ext/src/lib.rs | 2 +- lib/g3-ip-locate/src/config.rs | 20 +++++--- lib/g3-ip-locate/src/lib.rs | 5 +- lib/g3-ip-locate/src/runtime.rs | 89 +++++++++++++++++++++++++++++++++ 6 files changed, 113 insertions(+), 9 deletions(-) create mode 100644 lib/g3-ip-locate/src/runtime.rs diff --git a/g3proxy/src/main.rs b/g3proxy/src/main.rs index bad54acc..f3f3efed 100644 --- a/g3proxy/src/main.rs +++ b/g3proxy/src/main.rs @@ -138,6 +138,9 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> { if let Some(stats) = g3_cert_agent::spawn_cert_generate_runtime().await { g3_daemon::runtime::metrics::add_tokio_stats(stats, "cert-generate".to_string()); } + if let Some(stats) = g3_ip_locate::spawn_ip_locate_runtime().await { + g3_daemon::runtime::metrics::add_tokio_stats(stats, "ip-locate".to_string()); + } let _workers_guard = g3_daemon::runtime::worker::spawn_workers() .await @@ -154,6 +157,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> { g3_io_ext::close_limit_schedule_runtime(); g3_cert_agent::close_cert_generate_runtime(); + g3_ip_locate::close_ip_locate_runtime(); g3proxy::control::capnp::stop_working_thread(); let _ = ctl_thread_handler.join(); diff --git a/lib/g3-io-ext/src/cache/mod.rs b/lib/g3-io-ext/src/cache/mod.rs index 2108e90c..2e485eaf 100644 --- a/lib/g3-io-ext/src/cache/mod.rs +++ b/lib/g3-io-ext/src/cache/mod.rs @@ -72,7 +72,7 @@ pub struct CacheQueryRequest { notifier: oneshot::Sender>>, } -pub fn spawn_effective_cache( +pub fn create_effective_cache( request_batch_handle_count: usize, ) -> ( EffectiveCacheRuntime, diff --git a/lib/g3-io-ext/src/lib.rs b/lib/g3-io-ext/src/lib.rs index 08f952bc..798d66c3 100644 --- a/lib/g3-io-ext/src/lib.rs +++ b/lib/g3-io-ext/src/lib.rs @@ -21,7 +21,7 @@ mod listen; mod udp; pub use cache::{ - spawn_effective_cache, EffectiveCacheData, EffectiveCacheHandle, EffectiveCacheRuntime, + create_effective_cache, EffectiveCacheData, EffectiveCacheHandle, EffectiveCacheRuntime, EffectiveQueryHandle, }; pub use io::*; diff --git a/lib/g3-ip-locate/src/config.rs b/lib/g3-ip-locate/src/config.rs index 1840392f..74c91239 100644 --- a/lib/g3-ip-locate/src/config.rs +++ b/lib/g3-ip-locate/src/config.rs @@ -94,13 +94,21 @@ impl IpLocateServiceConfig { self.query_peer_addr ) })?; - let socket = UdpSocket::from_std(socket).context("failed to setup udp socket")?; - let (cache_runtime, cache_handle, query_handle) = super::spawn_ip_location_cache(self); - let query_runtime = IpLocationQueryRuntime::new(self, socket, query_handle); - - tokio::spawn(query_runtime); - tokio::spawn(cache_runtime); + let (cache_runtime, cache_handle, query_handle) = super::crate_ip_location_cache(self); + if let Some(rt) = crate::get_ip_locate_rt_handle() { + let config = self.clone(); + rt.spawn(async move { + let socket = UdpSocket::from_std(socket).expect("failed to setup udp socket"); + IpLocationQueryRuntime::new(&config, socket, query_handle).await + }); + rt.spawn(cache_runtime); + } else { + let socket = UdpSocket::from_std(socket).context("failed to setup udp socket")?; + let query_runtime = IpLocationQueryRuntime::new(self, socket, query_handle); + tokio::spawn(query_runtime); + tokio::spawn(cache_runtime); + } Ok(IpLocationServiceHandle::new( cache_handle, diff --git a/lib/g3-ip-locate/src/lib.rs b/lib/g3-ip-locate/src/lib.rs index d60c3bc5..90e4c910 100644 --- a/lib/g3-ip-locate/src/lib.rs +++ b/lib/g3-ip-locate/src/lib.rs @@ -45,6 +45,9 @@ pub use request::Request; mod response; pub use response::Response; +mod runtime; +pub use runtime::*; + struct CacheQueryRequest { ip: IpAddr, notifier: oneshot::Sender>, @@ -79,7 +82,7 @@ impl IpLocationCacheResponse { } } -fn spawn_ip_location_cache( +fn crate_ip_location_cache( config: &IpLocateServiceConfig, ) -> ( IpLocationCacheRuntime, diff --git a/lib/g3-ip-locate/src/runtime.rs b/lib/g3-ip-locate/src/runtime.rs new file mode 100644 index 00000000..72e28b24 --- /dev/null +++ b/lib/g3-ip-locate/src/runtime.rs @@ -0,0 +1,89 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::sync::Mutex; +use std::thread::JoinHandle; + +use tokio::runtime::{Handle, RuntimeMetrics}; +use tokio::sync::oneshot; + +static SCHEDULE_RUNTIME: Mutex> = Mutex::new(None); +static THREAD_QUIT_SENDER: Mutex>> = Mutex::new(None); +static THREAD_JOIN_HANDLE: Mutex>> = Mutex::new(None); + +pub async fn spawn_ip_locate_runtime() -> Option { + let (quit_sender, quit_receiver) = oneshot::channel(); + set_thread_quit_sender(quit_sender); + + let (rt_handle_sender, rt_handle_receiver) = oneshot::channel(); + let Ok(handle) = std::thread::Builder::new() + .name("ip-locate".to_string()) + .spawn(move || { + let Ok(rt) = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + else { + return; + }; + + if rt_handle_sender.send(rt.handle().clone()).is_ok() { + let _ = rt.block_on(quit_receiver); + } + }) + else { + return None; + }; + set_thread_join_handle(handle); + if let Ok(handle) = rt_handle_receiver.await { + set_ip_locate_rt_handle(handle.clone()); + Some(handle.metrics()) + } else { + None + } +} + +pub fn close_ip_locate_runtime() { + let mut lock = THREAD_QUIT_SENDER.lock().unwrap(); + if let Some(sender) = lock.take() { + let _ = sender.send(()); + } + drop(lock); + + let mut lock = THREAD_JOIN_HANDLE.lock().unwrap(); + if let Some(join_handle) = lock.take() { + let _ = join_handle.join(); + } +} + +fn set_thread_quit_sender(sender: oneshot::Sender<()>) { + let mut lock = THREAD_QUIT_SENDER.lock().unwrap(); + *lock = Some(sender); +} + +fn set_thread_join_handle(handle: JoinHandle<()>) { + let mut lock = THREAD_JOIN_HANDLE.lock().unwrap(); + *lock = Some(handle); +} + +fn set_ip_locate_rt_handle(handle: Handle) { + let mut lock = SCHEDULE_RUNTIME.lock().unwrap(); + *lock = Some(handle); +} + +pub fn get_ip_locate_rt_handle() -> Option { + let lock = SCHEDULE_RUNTIME.lock().unwrap(); + lock.as_ref().cloned() +}