g3-ip-locate: allow to use standalone thread

This commit is contained in:
Zhang Jingqiang 2024-08-17 20:21:34 +08:00
parent 3772ae9214
commit 269f1bf6bb
6 changed files with 113 additions and 9 deletions

View file

@ -138,6 +138,9 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
if let Some(stats) = g3_cert_agent::spawn_cert_generate_runtime().await {
g3_daemon::runtime::metrics::add_tokio_stats(stats, "cert-generate".to_string());
}
if let Some(stats) = g3_ip_locate::spawn_ip_locate_runtime().await {
g3_daemon::runtime::metrics::add_tokio_stats(stats, "ip-locate".to_string());
}
let _workers_guard = g3_daemon::runtime::worker::spawn_workers()
.await
@ -154,6 +157,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
g3_io_ext::close_limit_schedule_runtime();
g3_cert_agent::close_cert_generate_runtime();
g3_ip_locate::close_ip_locate_runtime();
g3proxy::control::capnp::stop_working_thread();
let _ = ctl_thread_handler.join();

View file

@ -72,7 +72,7 @@ pub struct CacheQueryRequest<K, R> {
notifier: oneshot::Sender<Arc<EffectiveCacheData<R>>>,
}
pub fn spawn_effective_cache<K: Hash + Eq, R: Send + Sync>(
pub fn create_effective_cache<K: Hash + Eq, R: Send + Sync>(
request_batch_handle_count: usize,
) -> (
EffectiveCacheRuntime<K, R>,

View file

@ -21,7 +21,7 @@ mod listen;
mod udp;
pub use cache::{
spawn_effective_cache, EffectiveCacheData, EffectiveCacheHandle, EffectiveCacheRuntime,
create_effective_cache, EffectiveCacheData, EffectiveCacheHandle, EffectiveCacheRuntime,
EffectiveQueryHandle,
};
pub use io::*;

View file

@ -94,13 +94,21 @@ impl IpLocateServiceConfig {
self.query_peer_addr
)
})?;
let socket = UdpSocket::from_std(socket).context("failed to setup udp socket")?;
let (cache_runtime, cache_handle, query_handle) = super::spawn_ip_location_cache(self);
let query_runtime = IpLocationQueryRuntime::new(self, socket, query_handle);
tokio::spawn(query_runtime);
tokio::spawn(cache_runtime);
let (cache_runtime, cache_handle, query_handle) = super::crate_ip_location_cache(self);
if let Some(rt) = crate::get_ip_locate_rt_handle() {
let config = self.clone();
rt.spawn(async move {
let socket = UdpSocket::from_std(socket).expect("failed to setup udp socket");
IpLocationQueryRuntime::new(&config, socket, query_handle).await
});
rt.spawn(cache_runtime);
} else {
let socket = UdpSocket::from_std(socket).context("failed to setup udp socket")?;
let query_runtime = IpLocationQueryRuntime::new(self, socket, query_handle);
tokio::spawn(query_runtime);
tokio::spawn(cache_runtime);
}
Ok(IpLocationServiceHandle::new(
cache_handle,

View file

@ -45,6 +45,9 @@ pub use request::Request;
mod response;
pub use response::Response;
mod runtime;
pub use runtime::*;
struct CacheQueryRequest {
ip: IpAddr,
notifier: oneshot::Sender<Arc<IpLocation>>,
@ -79,7 +82,7 @@ impl IpLocationCacheResponse {
}
}
fn spawn_ip_location_cache(
fn crate_ip_location_cache(
config: &IpLocateServiceConfig,
) -> (
IpLocationCacheRuntime,

View file

@ -0,0 +1,89 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Mutex;
use std::thread::JoinHandle;
use tokio::runtime::{Handle, RuntimeMetrics};
use tokio::sync::oneshot;
static SCHEDULE_RUNTIME: Mutex<Option<Handle>> = Mutex::new(None);
static THREAD_QUIT_SENDER: Mutex<Option<oneshot::Sender<()>>> = Mutex::new(None);
static THREAD_JOIN_HANDLE: Mutex<Option<JoinHandle<()>>> = Mutex::new(None);
pub async fn spawn_ip_locate_runtime() -> Option<RuntimeMetrics> {
let (quit_sender, quit_receiver) = oneshot::channel();
set_thread_quit_sender(quit_sender);
let (rt_handle_sender, rt_handle_receiver) = oneshot::channel();
let Ok(handle) = std::thread::Builder::new()
.name("ip-locate".to_string())
.spawn(move || {
let Ok(rt) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
else {
return;
};
if rt_handle_sender.send(rt.handle().clone()).is_ok() {
let _ = rt.block_on(quit_receiver);
}
})
else {
return None;
};
set_thread_join_handle(handle);
if let Ok(handle) = rt_handle_receiver.await {
set_ip_locate_rt_handle(handle.clone());
Some(handle.metrics())
} else {
None
}
}
pub fn close_ip_locate_runtime() {
let mut lock = THREAD_QUIT_SENDER.lock().unwrap();
if let Some(sender) = lock.take() {
let _ = sender.send(());
}
drop(lock);
let mut lock = THREAD_JOIN_HANDLE.lock().unwrap();
if let Some(join_handle) = lock.take() {
let _ = join_handle.join();
}
}
fn set_thread_quit_sender(sender: oneshot::Sender<()>) {
let mut lock = THREAD_QUIT_SENDER.lock().unwrap();
*lock = Some(sender);
}
fn set_thread_join_handle(handle: JoinHandle<()>) {
let mut lock = THREAD_JOIN_HANDLE.lock().unwrap();
*lock = Some(handle);
}
fn set_ip_locate_rt_handle(handle: Handle) {
let mut lock = SCHEDULE_RUNTIME.lock().unwrap();
*lock = Some(handle);
}
pub fn get_ip_locate_rt_handle() -> Option<Handle> {
let lock = SCHEDULE_RUNTIME.lock().unwrap();
lock.as_ref().cloned()
}