rework signal handlers

This commit is contained in:
Zhang Jingqiang 2024-05-10 16:40:37 +08:00
parent 746649d1fc
commit 1dc578bb64
21 changed files with 182 additions and 262 deletions

19
Cargo.lock generated
View file

@ -1367,13 +1367,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "g3-signal"
version = "0.3.0"
dependencies = [
"tokio",
]
[[package]]
name = "g3-slog-types"
version = "0.1.0"
@ -1607,7 +1600,6 @@ dependencies = [
"g3-io-ext",
"g3-openssl",
"g3-runtime",
"g3-signal",
"g3-socket",
"g3-socks",
"g3-statsd-client",
@ -1712,7 +1704,6 @@ dependencies = [
"g3-histogram",
"g3-io-ext",
"g3-openssl",
"g3-signal",
"g3-slog-types",
"g3-socket",
"g3-statsd-client",
@ -1815,7 +1806,6 @@ dependencies = [
"g3-msgpack",
"g3-openssl",
"g3-resolver",
"g3-signal",
"g3-slog-types",
"g3-smtp-proto",
"g3-socket",
@ -1936,7 +1926,6 @@ dependencies = [
"g3-histogram",
"g3-io-ext",
"g3-openssl",
"g3-signal",
"g3-slog-types",
"g3-socket",
"g3-statsd-client",
@ -3349,14 +3338,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "test-int-signal"
version = "0.1.0"
dependencies = [
"g3-signal",
"tokio",
]
[[package]]
name = "test-resolver"
version = "0.1.0"

View file

@ -5,7 +5,6 @@ members = [
"lib/g3-daemon",
"lib/g3-ctl",
"lib/g3-socket",
"lib/g3-signal",
"lib/g3-compat",
"lib/g3-clap",
"lib/g3-yaml",
@ -52,7 +51,6 @@ members = [
"g3keymess",
"g3keymess/proto",
"g3keymess/utils/ctl",
"demo/test-int-signal",
"demo/test-tcp-relay",
"demo/test-resolver",
"demo/test-copy-yield",
@ -208,7 +206,6 @@ g3-msgpack = { version = "0.2", path = "lib/g3-msgpack" }
g3-hickory-client = { version = "0.1", path = "lib/g3-hickory-client" }
g3-resolver = { version = "0.5", path = "lib/g3-resolver" }
g3-runtime = { version = "0.3", path = "lib/g3-runtime" }
g3-signal = { version = "0.3", path = "lib/g3-signal" }
g3-socket = { version = "0.4", path = "lib/g3-socket" }
g3-socks = { version = "0.1", path = "lib/g3-socks" }
g3-openssl = { version = "0.1", path = "lib/g3-openssl" }

View file

@ -1,11 +0,0 @@
[package]
name = "test-int-signal"
version = "0.1.0"
license.workspace = true
edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
g3-signal.workspace = true

View file

@ -1,39 +0,0 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use tokio::signal::unix::SignalKind;
use g3_signal::{ActionSignal, SigResult};
fn do_at_quit(count: u32) -> SigResult {
match count {
1 => {
println!("press 'Ctrl-C' again to quit");
SigResult::Continue
}
_ => {
println!("quit");
SigResult::Break
}
}
}
#[tokio::main]
async fn main() {
let sig = ActionSignal::new(SignalKind::interrupt(), &do_at_quit).unwrap();
println!("SIGINT registered, press 'Ctrl-C' to quit");
sig.await;
}

View file

@ -14,7 +14,7 @@ anyhow.workspace = true
clap.workspace = true
clap_complete.workspace = true
indicatif = "0.17"
tokio = { workspace = true, features = ["rt", "net", "macros"] }
tokio = { workspace = true, features = ["rt", "net", "macros", "signal"] }
http.workspace = true
url.workspace = true
h2.workspace = true
@ -39,7 +39,6 @@ governor = { workspace = true, features = ["std", "jitter"] }
hickory-client.workspace = true
hickory-proto.workspace = true
g3-runtime.workspace = true
g3-signal.workspace = true
g3-types = { workspace = true, features = ["openssl", "rustls"] }
g3-clap.workspace = true
g3-socket.workspace = true

View file

@ -22,11 +22,9 @@ use std::time::Duration;
use anyhow::{anyhow, Context};
use governor::RateLimiter;
use hdrhistogram::Histogram;
use tokio::signal::unix::SignalKind;
use tokio::sync::{mpsc, Barrier, Semaphore};
use tokio::time::{Instant, MissedTickBehavior};
use g3_signal::{ActionSignal, SigResult};
use g3_statsd_client::StatsdClient;
use super::ProcArgs;
@ -169,9 +167,13 @@ where
fn notify_finish(&mut self) {}
}
fn quit_at_sigint(_count: u32) -> SigResult {
stats::mark_force_quit();
SigResult::Break
fn register_signal_handler() {
tokio::spawn(async move {
if let Err(e) = tokio::signal::ctrl_c().await {
eprintln!("error when waiting Ctrl-C: {e}");
}
stats::mark_force_quit();
});
}
async fn run<RS, H, C, T>(mut target: T, proc_args: &ProcArgs) -> anyhow::Result<()>
@ -188,10 +190,7 @@ where
let progress_counter = progress.as_ref().map(|p| p.counter());
stats::init_global_state(proc_args.requests, proc_args.log_error_count);
tokio::spawn(
ActionSignal::new(SignalKind::interrupt(), &quit_at_sigint)
.map_err(|e| anyhow!("failed to set handler for SIGINT: {e:?}"))?,
);
register_signal_handler();
let rate_limit = proc_args
.rate_limit

View file

@ -31,7 +31,6 @@ itoa.workspace = true
arc-swap.workspace = true
serde_json.workspace = true
g3-daemon = { workspace = true, features = ["register"] }
g3-signal.workspace = true
g3-yaml = { workspace = true, features = ["histogram"] }
g3-types = { workspace = true, features = [] }
g3-socket.workspace = true

View file

@ -120,7 +120,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
});
}
g3keymess::signal::setup_and_spawn().context("failed to setup signal handler")?;
g3keymess::signal::register().context("failed to setup signal handler")?;
g3keymess::store::load_all()
.await

View file

@ -15,31 +15,12 @@
*/
use log::{error, info, warn};
use tokio::signal::unix::SignalKind;
use tokio::sync::Mutex;
use g3_signal::{ActionSignal, SigResult};
use g3_daemon::signal::AsyncSignalAction;
static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(());
fn do_quit(_: u32) -> SigResult {
info!("got quit signal");
tokio::spawn(crate::control::UniqueController::abort_immediately());
SigResult::Break
}
fn go_offline(_: u32) -> SigResult {
info!("got offline signal");
tokio::spawn(crate::control::DaemonController::abort());
SigResult::Break
}
fn call_reload(_: u32) -> SigResult {
info!("got reload signal");
tokio::spawn(do_reload());
SigResult::Continue
}
async fn do_reload() {
let _guard = RELOAD_MUTEX.lock().await;
info!("reloading config");
@ -59,10 +40,33 @@ async fn do_reload() {
info!("reload finished");
}
pub fn setup_and_spawn() -> anyhow::Result<()> {
tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?);
tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?);
Ok(())
#[derive(Clone, Copy)]
struct QuitAction {}
impl AsyncSignalAction for QuitAction {
async fn run(&self) {
crate::control::UniqueController::abort_immediately().await
}
}
#[derive(Clone, Copy)]
struct OfflineAction {}
impl AsyncSignalAction for OfflineAction {
async fn run(&self) {
crate::control::DaemonController::abort().await
}
}
#[derive(Clone, Copy)]
struct ReloadAction {}
impl AsyncSignalAction for ReloadAction {
async fn run(&self) {
do_reload().await
}
}
pub fn register() -> anyhow::Result<()> {
g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {})
}

View file

@ -63,7 +63,6 @@ pyo3 = { workspace = true, features = ["auto-initialize"], optional = true }
g3-types = { workspace = true, features = ["auth-crypt", "rustls", "openssl", "acl-rule", "http", "route", "async-log"] }
g3-socket.workspace = true
g3-daemon.workspace = true
g3-signal.workspace = true
g3-datetime.workspace = true
g3-statsd-client.workspace = true
g3-histogram.workspace = true

View file

@ -111,7 +111,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
});
}
g3proxy::signal::setup_and_spawn().context("failed to setup signal handler")?;
g3proxy::signal::register().context("failed to setup signal handler")?;
g3proxy::resolve::spawn_all()
.await
.context("failed to spawn all resolvers")?;

View file

@ -15,31 +15,12 @@
*/
use log::{error, info, warn};
use tokio::signal::unix::SignalKind;
use tokio::sync::Mutex;
use g3_signal::{ActionSignal, SigResult};
use g3_daemon::signal::AsyncSignalAction;
static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(());
fn do_quit(_: u32) -> SigResult {
info!("got quit signal");
tokio::spawn(crate::control::UniqueController::abort_immediately());
SigResult::Break
}
fn go_offline(_: u32) -> SigResult {
info!("got offline signal");
tokio::spawn(crate::control::DaemonController::abort());
SigResult::Break
}
fn call_reload(_: u32) -> SigResult {
info!("got reload signal");
tokio::spawn(do_reload());
SigResult::Continue
}
async fn do_reload() {
let _guard = RELOAD_MUTEX.lock().await;
info!("reloading config");
@ -68,10 +49,33 @@ async fn do_reload() {
info!("reload finished");
}
pub fn setup_and_spawn() -> anyhow::Result<()> {
tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?);
tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?);
Ok(())
#[derive(Clone, Copy)]
struct QuitAction {}
impl AsyncSignalAction for QuitAction {
async fn run(&self) {
crate::control::UniqueController::abort_immediately().await
}
}
#[derive(Clone, Copy)]
struct OfflineAction {}
impl AsyncSignalAction for OfflineAction {
async fn run(&self) {
crate::control::DaemonController::abort().await
}
}
#[derive(Clone, Copy)]
struct ReloadAction {}
impl AsyncSignalAction for ReloadAction {
async fn run(&self) {
do_reload().await
}
}
pub fn register() -> anyhow::Result<()> {
g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {})
}

View file

@ -41,7 +41,6 @@ bitflags.workspace = true
flume.workspace = true
rustc-hash.workspace = true
g3-daemon.workspace = true
g3-signal.workspace = true
g3-yaml = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls", "histogram"] }
g3-types = { workspace = true, features = ["acl-rule", "route", "openssl", "rustls"] }
g3-socket.workspace = true

View file

@ -96,7 +96,7 @@ fn tokio_run(args: &ProcArgs) -> anyhow::Result<()> {
});
}
g3tiles::signal::setup_and_spawn().context("failed to setup signal handler")?;
g3tiles::signal::register().context("failed to setup signal handler")?;
g3tiles::discover::load_all()
.await

View file

@ -15,31 +15,12 @@
*/
use log::{error, info, warn};
use tokio::signal::unix::SignalKind;
use tokio::sync::Mutex;
use g3_signal::{ActionSignal, SigResult};
use g3_daemon::signal::AsyncSignalAction;
static RELOAD_MUTEX: Mutex<()> = Mutex::const_new(());
fn do_quit(_: u32) -> SigResult {
info!("got quit signal");
tokio::spawn(crate::control::UniqueController::abort_immediately());
SigResult::Break
}
fn go_offline(_: u32) -> SigResult {
info!("got offline signal");
tokio::spawn(crate::control::DaemonController::abort());
SigResult::Break
}
fn call_reload(_: u32) -> SigResult {
info!("got reload signal");
tokio::spawn(do_reload());
SigResult::Continue
}
async fn do_reload() {
let _guard = RELOAD_MUTEX.lock().await;
info!("reloading config");
@ -62,10 +43,33 @@ async fn do_reload() {
info!("reload finished");
}
pub fn setup_and_spawn() -> anyhow::Result<()> {
tokio::spawn(ActionSignal::new(SignalKind::quit(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::interrupt(), &do_quit)?);
tokio::spawn(ActionSignal::new(SignalKind::terminate(), &go_offline)?);
tokio::spawn(ActionSignal::new(SignalKind::hangup(), &call_reload)?);
Ok(())
#[derive(Clone, Copy)]
struct QuitAction {}
impl AsyncSignalAction for QuitAction {
async fn run(&self) {
crate::control::UniqueController::abort_immediately().await
}
}
#[derive(Clone, Copy)]
struct OfflineAction {}
impl AsyncSignalAction for OfflineAction {
async fn run(&self) {
crate::control::DaemonController::abort().await
}
}
#[derive(Clone, Copy)]
struct ReloadAction {}
impl AsyncSignalAction for ReloadAction {
async fn run(&self) {
do_reload().await
}
}
pub fn register() -> anyhow::Result<()> {
g3_daemon::signal::register(QuitAction {}, OfflineAction {}, ReloadAction {})
}

View file

@ -27,7 +27,7 @@ rand.workspace = true
fastrand.workspace = true
uuid = { workspace = true, features = ["v1"] }
chrono.workspace = true
tokio = { workspace = true, features = ["net", "io-util"] }
tokio = { workspace = true, features = ["net", "io-util", "signal"] }
tokio-util = { workspace = true, features = ["compat"] }
http = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }

View file

@ -22,6 +22,7 @@ pub mod metrics;
pub mod opts;
pub mod runtime;
pub mod server;
pub mod signal;
pub mod stat;
#[cfg(unix)]

View file

@ -0,0 +1,78 @@
/*
* Copyright 2024 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::future::{poll_fn, Future};
use anyhow::anyhow;
use log::info;
pub trait AsyncSignalAction: Copy {
fn run(&self) -> impl Future<Output = ()> + Send;
}
pub fn register<QUIT, OFFLINE, RELOAD>(
do_quit: QUIT,
go_offline: OFFLINE,
call_reload: RELOAD,
) -> anyhow::Result<()>
where
QUIT: AsyncSignalAction + Send + 'static,
OFFLINE: AsyncSignalAction + Send + 'static,
RELOAD: AsyncSignalAction + Send + 'static,
{
use tokio::signal::unix::{signal, SignalKind};
let mut quit_sig = signal(SignalKind::quit())
.map_err(|e| anyhow!("failed to create SIGQUIT listener: {e}"))?;
tokio::spawn(async move {
if poll_fn(|cx| quit_sig.poll_recv(cx)).await.is_some() {
info!("got offline signal");
do_quit.run().await;
}
});
let mut int_sig = signal(SignalKind::interrupt())
.map_err(|e| anyhow!("failed to create SIGINT listener: {e}"))?;
tokio::spawn(async move {
if poll_fn(|cx| int_sig.poll_recv(cx)).await.is_some() {
info!("got offline signal");
do_quit.run().await;
}
});
let mut term_sig = signal(SignalKind::terminate())
.map_err(|e| anyhow!("failed to create SIGTERM listener: {e}"))?;
tokio::spawn(async move {
if poll_fn(|cx| term_sig.poll_recv(cx)).await.is_some() {
info!("got offline signal");
go_offline.run().await;
}
});
let mut hup_sig = signal(SignalKind::hangup())
.map_err(|e| anyhow!("failed to create SIGHUP listener: {e}"))?;
tokio::spawn(async move {
loop {
if poll_fn(|cx| hup_sig.poll_recv(cx)).await.is_none() {
break;
}
info!("got reload signal");
call_reload.run().await;
}
});
Ok(())
}

View file

@ -1,10 +0,0 @@
[package]
name = "g3-signal"
version = "0.3.0"
license.workspace = true
edition.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
tokio = { workspace = true, features = ["signal"] }

View file

@ -1,19 +0,0 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod signal;
pub use signal::{ActionSignal, SigResult};

View file

@ -1,65 +0,0 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::signal::unix::{signal, Signal, SignalKind};
pub enum SigResult {
Continue,
Break,
}
type SigAction = dyn Fn(u32) -> SigResult + Sync;
pub struct ActionSignal<'a> {
signal: Signal,
count: u32,
action: &'a SigAction,
}
impl<'a> ActionSignal<'a> {
pub fn new(signo: SignalKind, action: &'a SigAction) -> io::Result<Self> {
Ok(ActionSignal {
signal: signal(signo)?,
count: 0,
action,
})
}
}
impl<'a> Future for ActionSignal<'a> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match Pin::new(&mut self.signal).poll_recv(cx) {
Poll::Ready(Some(_)) => {
self.count += 1;
match (self.action)(self.count) {
SigResult::Continue => continue,
SigResult::Break => return Poll::Ready(()),
}
}
Poll::Ready(None) => return Poll::Ready(()),
Poll::Pending => return Poll::Pending,
}
}
}
}