diff --git a/g3statsd/examples/export_console/g3statsd.yaml b/g3statsd/examples/export_console/g3statsd.yaml index 7c116974..3c956988 100644 --- a/g3statsd/examples/export_console/g3statsd.yaml +++ b/g3statsd/examples/export_console/g3statsd.yaml @@ -1,8 +1,12 @@ importer: - - name: statsd - type: statsd + - name: statsd_udp + type: statsd_udp collector: aggregate_4s listen: 127.0.0.1:8125 + - name: statsd_unix + type: statsd_unix + collector: aggregate_4s + listen: /tmp/g3statsd.sock collector: - name: aggregate_4s diff --git a/g3statsd/src/config/importer/mod.rs b/g3statsd/src/config/importer/mod.rs index a1aa7d09..9a77fb95 100644 --- a/g3statsd/src/config/importer/mod.rs +++ b/g3statsd/src/config/importer/mod.rs @@ -47,7 +47,9 @@ pub(crate) trait ImporterConfig { #[allow(clippy::large_enum_variant)] pub(crate) enum AnyImporterConfig { Dummy(dummy::DummyImporterConfig), - StatsD(statsd::StatsdImporterConfig), + StatsDUdp(statsd::StatsdUdpImporterConfig), + #[cfg(unix)] + StatsDUnix(statsd::StatsdUnixImporterConfig), } pub(crate) fn load_all(v: &Yaml, conf_dir: &Path) -> anyhow::Result<()> { @@ -88,10 +90,16 @@ fn load_importer( .context("failed to load this Dummy importer")?; Ok(AnyImporterConfig::Dummy(importer)) } - "statsd" => { - let importer = statsd::StatsdImporterConfig::parse(map, position) - .context("failed to load this StatsD importer")?; - Ok(AnyImporterConfig::StatsD(importer)) + "statsd" | "statsd_udp" => { + let importer = statsd::StatsdUdpImporterConfig::parse(map, position) + .context("failed to load this StatsD_UDP importer")?; + Ok(AnyImporterConfig::StatsDUdp(importer)) + } + #[cfg(unix)] + "statsd_unix" => { + let importer = statsd::StatsdUnixImporterConfig::parse(map, position) + .context("failed to load this StatsD_UNIX importer")?; + Ok(AnyImporterConfig::StatsDUnix(importer)) } _ => Err(anyhow!("unsupported importer type {}", importer_type)), } diff --git a/g3statsd/src/config/importer/statsd/mod.rs b/g3statsd/src/config/importer/statsd/mod.rs new file mode 100644 index 00000000..121e3797 --- /dev/null +++ b/g3statsd/src/config/importer/statsd/mod.rs @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2025 ByteDance and/or its affiliates. + */ + +use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction}; +use super::{CONFIG_KEY_IMPORTER_NAME, CONFIG_KEY_IMPORTER_TYPE}; + +mod udp; +pub(crate) use udp::StatsdUdpImporterConfig; + +#[cfg(unix)] +mod unix; +#[cfg(unix)] +pub(crate) use unix::StatsdUnixImporterConfig; diff --git a/g3statsd/src/config/importer/statsd.rs b/g3statsd/src/config/importer/statsd/udp.rs similarity index 91% rename from g3statsd/src/config/importer/statsd.rs rename to g3statsd/src/config/importer/statsd/udp.rs index dbc2c9c9..245deba9 100644 --- a/g3statsd/src/config/importer/statsd.rs +++ b/g3statsd/src/config/importer/statsd/udp.rs @@ -13,10 +13,10 @@ use g3_yaml::YamlDocPosition; use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction}; -const IMPORTER_CONFIG_TYPE: &str = "StatsD"; +const IMPORTER_CONFIG_TYPE: &str = "StatsD_UDP"; #[derive(Clone, Debug, Eq, PartialEq)] -pub(crate) struct StatsdImporterConfig { +pub(crate) struct StatsdUdpImporterConfig { name: NodeName, position: Option, pub(crate) collector: NodeName, @@ -25,9 +25,9 @@ pub(crate) struct StatsdImporterConfig { pub(crate) ingress_net_filter: Option, } -impl StatsdImporterConfig { +impl StatsdUdpImporterConfig { fn new(position: Option) -> Self { - StatsdImporterConfig { + StatsdUdpImporterConfig { name: NodeName::default(), position, collector: Default::default(), @@ -41,7 +41,7 @@ impl StatsdImporterConfig { map: &yaml::Hash, position: Option, ) -> anyhow::Result { - let mut importer = StatsdImporterConfig::new(position); + let mut importer = StatsdUdpImporterConfig::new(position); g3_yaml::foreach_kv(map, |k, v| importer.set(k, v))?; @@ -94,7 +94,7 @@ impl StatsdImporterConfig { } } -impl ImporterConfig for StatsdImporterConfig { +impl ImporterConfig for StatsdUdpImporterConfig { fn name(&self) -> &NodeName { &self.name } @@ -108,7 +108,7 @@ impl ImporterConfig for StatsdImporterConfig { } fn diff_action(&self, new: &AnyImporterConfig) -> ImporterConfigDiffAction { - let AnyImporterConfig::StatsD(new) = new else { + let AnyImporterConfig::StatsDUdp(new) = new else { return ImporterConfigDiffAction::SpawnNew; }; diff --git a/g3statsd/src/config/importer/statsd/unix.rs b/g3statsd/src/config/importer/statsd/unix.rs new file mode 100644 index 00000000..fac5b516 --- /dev/null +++ b/g3statsd/src/config/importer/statsd/unix.rs @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2025 ByteDance and/or its affiliates. + */ + +use std::path::PathBuf; + +use anyhow::{Context, anyhow}; +use yaml_rust::{Yaml, yaml}; + +use g3_types::metrics::NodeName; +use g3_yaml::YamlDocPosition; + +use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction}; + +const IMPORTER_CONFIG_TYPE: &str = "StatsD_UNIX"; + +#[derive(Clone, Debug, Eq, PartialEq)] +pub(crate) struct StatsdUnixImporterConfig { + name: NodeName, + position: Option, + pub(crate) collector: NodeName, + pub(crate) listen: PathBuf, +} + +impl StatsdUnixImporterConfig { + fn new(position: Option) -> Self { + StatsdUnixImporterConfig { + name: NodeName::default(), + position, + collector: Default::default(), + listen: PathBuf::new(), + } + } + + pub(crate) fn parse( + map: &yaml::Hash, + position: Option, + ) -> anyhow::Result { + let mut importer = StatsdUnixImporterConfig::new(position); + + g3_yaml::foreach_kv(map, |k, v| importer.set(k, v))?; + + importer.check()?; + Ok(importer) + } + + fn set(&mut self, k: &str, v: &Yaml) -> anyhow::Result<()> { + match g3_yaml::key::normalize(k).as_str() { + super::CONFIG_KEY_IMPORTER_TYPE => Ok(()), + super::CONFIG_KEY_IMPORTER_NAME => { + self.name = g3_yaml::value::as_metric_node_name(v)?; + Ok(()) + } + "collector" => { + self.collector = g3_yaml::value::as_metric_node_name(v)?; + Ok(()) + } + "listen" => { + self.listen = g3_yaml::value::as_absolute_path(v) + .context(format!("invalid unix listen path value for key {k}"))?; + Ok(()) + } + _ => Err(anyhow!("invalid key {k}")), + } + } + + fn check(&mut self) -> anyhow::Result<()> { + if self.name.is_empty() { + return Err(anyhow!("name is not set")); + } + if self.collector.is_empty() { + return Err(anyhow!("collector is not set")); + } + if self.listen.as_os_str().is_empty() { + return Err(anyhow!("listen path is not set")); + } + + Ok(()) + } +} + +impl ImporterConfig for StatsdUnixImporterConfig { + fn name(&self) -> &NodeName { + &self.name + } + + fn position(&self) -> Option { + self.position.clone() + } + + fn importer_type(&self) -> &'static str { + IMPORTER_CONFIG_TYPE + } + + fn diff_action(&self, new: &AnyImporterConfig) -> ImporterConfigDiffAction { + let AnyImporterConfig::StatsDUnix(new) = new else { + return ImporterConfigDiffAction::SpawnNew; + }; + + if self.eq(new) { + return ImporterConfigDiffAction::NoAction; + } + + if self.listen != new.listen { + return ImporterConfigDiffAction::ReloadAndRespawn; + } + + ImporterConfigDiffAction::ReloadNoRespawn + } + + fn collector(&self) -> &NodeName { + &self.collector + } +} diff --git a/g3statsd/src/import/dummy/mod.rs b/g3statsd/src/import/dummy/mod.rs index f8d1de3d..621c36f9 100644 --- a/g3statsd/src/import/dummy/mod.rs +++ b/g3statsd/src/import/dummy/mod.rs @@ -7,9 +7,13 @@ use std::net::SocketAddr; use std::sync::Arc; use anyhow::anyhow; +#[cfg(unix)] +use tokio::net::unix::SocketAddr as UnixSocketAddr; use tokio::sync::broadcast; use g3_daemon::listen::ReceiveUdpServer; +#[cfg(unix)] +use g3_daemon::listen::ReceiveUnixDatagramServer; use g3_daemon::server::{BaseServer, ServerReloadCommand}; use g3_types::metrics::NodeName; @@ -113,7 +117,7 @@ impl BaseServer for DummyImporter { } impl ReceiveUdpServer for DummyImporter { - fn receive_packet( + fn receive_udp_packet( &self, _packet: &[u8], _client_addr: SocketAddr, @@ -123,6 +127,11 @@ impl ReceiveUdpServer for DummyImporter { } } +#[cfg(unix)] +impl ReceiveUnixDatagramServer for DummyImporter { + fn receive_unix_packet(&self, _packet: &[u8], _peer_addr: UnixSocketAddr) {} +} + impl Importer for DummyImporter { fn collector(&self) -> &NodeName { Default::default() diff --git a/g3statsd/src/import/mod.rs b/g3statsd/src/import/mod.rs index 740b7f77..3630e436 100644 --- a/g3statsd/src/import/mod.rs +++ b/g3statsd/src/import/mod.rs @@ -6,9 +6,13 @@ use std::net::SocketAddr; use std::sync::Arc; +#[cfg(unix)] +use tokio::net::unix::SocketAddr as UnixSocketAddr; use tokio::sync::broadcast; use g3_daemon::listen::ReceiveUdpServer; +#[cfg(unix)] +use g3_daemon::listen::ReceiveUnixDatagramServer; use g3_daemon::server::{BaseServer, ReloadServer, ServerReloadCommand}; use g3_types::metrics::NodeName; @@ -25,6 +29,13 @@ pub use ops::{spawn_all, stop_all}; mod dummy; mod statsd; +#[cfg(unix)] +pub(crate) trait Importer: + ReceiveUdpServer + ReceiveUnixDatagramServer + BaseServer +{ + fn collector(&self) -> &NodeName; +} +#[cfg(not(unix))] pub(crate) trait Importer: ReceiveUdpServer + BaseServer { fn collector(&self) -> &NodeName; } @@ -77,7 +88,7 @@ impl ReloadServer for WrapArcImporter { } impl ReceiveUdpServer for WrapArcImporter { - fn receive_packet( + fn receive_udp_packet( &self, packet: &[u8], client_addr: SocketAddr, @@ -85,7 +96,14 @@ impl ReceiveUdpServer for WrapArcImporter { worker_id: Option, ) { self.0 - .receive_packet(packet, client_addr, server_addr, worker_id) + .receive_udp_packet(packet, client_addr, server_addr, worker_id) + } +} + +#[cfg(unix)] +impl ReceiveUnixDatagramServer for WrapArcImporter { + fn receive_unix_packet(&self, packet: &[u8], peer_addr: UnixSocketAddr) { + self.0.receive_unix_packet(packet, peer_addr) } } diff --git a/g3statsd/src/import/ops.rs b/g3statsd/src/import/ops.rs index 7605d870..4511f390 100644 --- a/g3statsd/src/import/ops.rs +++ b/g3statsd/src/import/ops.rs @@ -157,8 +157,12 @@ fn delete_existed_unlocked(name: &NodeName) { fn spawn_new_unlocked(config: AnyImporterConfig) -> anyhow::Result<()> { let importer = match config { AnyImporterConfig::Dummy(config) => super::dummy::DummyImporter::prepare_initial(config)?, - AnyImporterConfig::StatsD(config) => { - super::statsd::StatsdImporter::prepare_initial(config)? + AnyImporterConfig::StatsDUdp(config) => { + super::statsd::StatsdUdpImporter::prepare_initial(config)? + } + #[cfg(unix)] + AnyImporterConfig::StatsDUnix(config) => { + super::statsd::StatsdUnixImporter::prepare_initial(config)? } }; registry::add(importer) diff --git a/g3statsd/src/import/statsd/mod.rs b/g3statsd/src/import/statsd/mod.rs index f4602adc..11fc2958 100644 --- a/g3statsd/src/import/statsd/mod.rs +++ b/g3statsd/src/import/statsd/mod.rs @@ -3,8 +3,13 @@ * Copyright 2025 ByteDance and/or its affiliates. */ -mod import; -pub(super) use import::StatsdImporter; - mod parser; use parser::StatsdRecordVisitor; + +mod udp; +pub(super) use udp::StatsdUdpImporter; + +#[cfg(unix)] +mod unix; +#[cfg(unix)] +pub(super) use unix::StatsdUnixImporter; diff --git a/g3statsd/src/import/statsd/import.rs b/g3statsd/src/import/statsd/udp.rs similarity index 81% rename from g3statsd/src/import/statsd/import.rs rename to g3statsd/src/import/statsd/udp.rs index 289bf2fd..2e3adee7 100644 --- a/g3statsd/src/import/statsd/import.rs +++ b/g3statsd/src/import/statsd/udp.rs @@ -10,8 +10,12 @@ use anyhow::anyhow; use arc_swap::ArcSwap; use chrono::Utc; use log::debug; +#[cfg(unix)] +use tokio::net::unix::SocketAddr as UnixSocketAddr; use tokio::sync::broadcast; +#[cfg(unix)] +use g3_daemon::listen::ReceiveUnixDatagramServer; use g3_daemon::listen::{ReceiveUdpRuntime, ReceiveUdpServer}; use g3_daemon::server::{BaseServer, ServerReloadCommand}; use g3_types::acl::{AclAction, AclNetworkRule}; @@ -19,14 +23,14 @@ use g3_types::metrics::NodeName; use super::StatsdRecordVisitor; use crate::collect::ArcCollector; -use crate::config::importer::statsd::StatsdImporterConfig; +use crate::config::importer::statsd::StatsdUdpImporterConfig; use crate::config::importer::{AnyImporterConfig, ImporterConfig}; use crate::import::{ ArcImporter, ArcImporterInternal, Importer, ImporterInternal, ImporterRegistry, WrapArcImporter, }; -pub(crate) struct StatsdImporter { - config: StatsdImporterConfig, +pub(crate) struct StatsdUdpImporter { + config: StatsdUdpImporterConfig, ingress_net_filter: Option, reload_sender: broadcast::Sender, @@ -34,8 +38,8 @@ pub(crate) struct StatsdImporter { reload_version: usize, } -impl StatsdImporter { - fn new(config: StatsdImporterConfig, reload_version: usize) -> Self { +impl StatsdUdpImporter { + fn new(config: StatsdUdpImporterConfig, reload_version: usize) -> Self { let reload_sender = crate::import::new_reload_notify_channel(); let ingress_net_filter = config @@ -45,7 +49,7 @@ impl StatsdImporter { let collector = Arc::new(crate::collect::get_or_insert_default(config.collector())); - StatsdImporter { + StatsdUdpImporter { config, ingress_net_filter, reload_sender, @@ -55,15 +59,15 @@ impl StatsdImporter { } pub(crate) fn prepare_initial( - config: StatsdImporterConfig, + config: StatsdUdpImporterConfig, ) -> anyhow::Result { - let server = StatsdImporter::new(config, 1); + let server = StatsdUdpImporter::new(config, 1); Ok(Arc::new(server)) } - fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result { - if let AnyImporterConfig::StatsD(config) = config { - Ok(StatsdImporter::new(config, self.reload_version + 1)) + fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result { + if let AnyImporterConfig::StatsDUdp(config) = config { + Ok(StatsdUdpImporter::new(config, self.reload_version + 1)) } else { Err(anyhow!( "config type mismatch: expect {}, actual {}", @@ -90,9 +94,9 @@ impl StatsdImporter { } } -impl ImporterInternal for StatsdImporter { +impl ImporterInternal for StatsdUdpImporter { fn _clone_config(&self) -> AnyImporterConfig { - AnyImporterConfig::StatsD(self.config.clone()) + AnyImporterConfig::StatsDUdp(self.config.clone()) } fn _reload_config_notify_runtime(&self) { @@ -137,7 +141,7 @@ impl ImporterInternal for StatsdImporter { } } -impl BaseServer for StatsdImporter { +impl BaseServer for StatsdUdpImporter { #[inline] fn name(&self) -> &NodeName { self.config.name() @@ -154,8 +158,8 @@ impl BaseServer for StatsdImporter { } } -impl ReceiveUdpServer for StatsdImporter { - fn receive_packet( +impl ReceiveUdpServer for StatsdUdpImporter { + fn receive_udp_packet( &self, packet: &[u8], client_addr: SocketAddr, @@ -179,7 +183,12 @@ impl ReceiveUdpServer for StatsdImporter { } } -impl Importer for StatsdImporter { +#[cfg(unix)] +impl ReceiveUnixDatagramServer for StatsdUdpImporter { + fn receive_unix_packet(&self, _packet: &[u8], _peer_addr: UnixSocketAddr) {} +} + +impl Importer for StatsdUdpImporter { fn collector(&self) -> &NodeName { self.config.collector() } diff --git a/g3statsd/src/import/statsd/unix.rs b/g3statsd/src/import/statsd/unix.rs new file mode 100644 index 00000000..2856992f --- /dev/null +++ b/g3statsd/src/import/statsd/unix.rs @@ -0,0 +1,163 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2025 ByteDance and/or its affiliates. + */ + +use std::sync::Arc; + +use anyhow::anyhow; +use arc_swap::ArcSwap; +use chrono::Utc; +use log::debug; +use tokio::net::unix::SocketAddr; +use tokio::sync::broadcast; + +use g3_daemon::listen::{ReceiveUdpServer, ReceiveUnixDatagramRuntime, ReceiveUnixDatagramServer}; +use g3_daemon::server::{BaseServer, ServerReloadCommand}; +use g3_types::metrics::NodeName; + +use super::StatsdRecordVisitor; +use crate::collect::ArcCollector; +use crate::config::importer::statsd::StatsdUnixImporterConfig; +use crate::config::importer::{AnyImporterConfig, ImporterConfig}; +use crate::import::{ + ArcImporter, ArcImporterInternal, Importer, ImporterInternal, ImporterRegistry, WrapArcImporter, +}; + +pub(crate) struct StatsdUnixImporter { + config: StatsdUnixImporterConfig, + reload_sender: broadcast::Sender, + + collector: ArcSwap, + reload_version: usize, +} + +impl StatsdUnixImporter { + fn new(config: StatsdUnixImporterConfig, reload_version: usize) -> Self { + let reload_sender = crate::import::new_reload_notify_channel(); + + let collector = Arc::new(crate::collect::get_or_insert_default(config.collector())); + + StatsdUnixImporter { + config, + reload_sender, + collector: ArcSwap::new(collector), + reload_version, + } + } + + pub(crate) fn prepare_initial( + config: StatsdUnixImporterConfig, + ) -> anyhow::Result { + let server = StatsdUnixImporter::new(config, 1); + Ok(Arc::new(server)) + } + + fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result { + if let AnyImporterConfig::StatsDUnix(config) = config { + Ok(StatsdUnixImporter::new(config, self.reload_version + 1)) + } else { + Err(anyhow!( + "config type mismatch: expect {}, actual {}", + self.config.importer_type(), + config.importer_type() + )) + } + } +} + +impl ImporterInternal for StatsdUnixImporter { + fn _clone_config(&self) -> AnyImporterConfig { + AnyImporterConfig::StatsDUnix(self.config.clone()) + } + + fn _reload_config_notify_runtime(&self) { + let cmd = ServerReloadCommand::ReloadVersion(self.reload_version); + let _ = self.reload_sender.send(cmd); + } + + fn _update_collector_in_place(&self) { + let collector = crate::collect::get_or_insert_default(self.config.collector()); + self.collector.store(Arc::new(collector)); + } + + fn _reload_with_old_notifier( + &self, + config: AnyImporterConfig, + _registry: &mut ImporterRegistry, + ) -> anyhow::Result { + let mut server = self.prepare_reload(config)?; + server.reload_sender = self.reload_sender.clone(); + Ok(Arc::new(server)) + } + + fn _reload_with_new_notifier( + &self, + config: AnyImporterConfig, + _registry: &mut ImporterRegistry, + ) -> anyhow::Result { + let server = self.prepare_reload(config)?; + Ok(Arc::new(server)) + } + + fn _start_runtime(&self, importer: ArcImporter) -> anyhow::Result<()> { + let runtime = ReceiveUnixDatagramRuntime::new( + WrapArcImporter(importer.clone()), + self.config.listen.clone(), + ); + runtime.spawn(&self.reload_sender) + } + + fn _abort_runtime(&self) { + let _ = self.reload_sender.send(ServerReloadCommand::QuitRuntime); + } +} + +impl BaseServer for StatsdUnixImporter { + #[inline] + fn name(&self) -> &NodeName { + self.config.name() + } + + #[inline] + fn r#type(&self) -> &'static str { + self.config.importer_type() + } + + #[inline] + fn version(&self) -> usize { + self.reload_version + } +} + +impl ReceiveUdpServer for StatsdUnixImporter { + fn receive_udp_packet( + &self, + _packet: &[u8], + _client_addr: std::net::SocketAddr, + _server_addr: std::net::SocketAddr, + _worker_id: Option, + ) { + } +} + +impl ReceiveUnixDatagramServer for StatsdUnixImporter { + fn receive_unix_packet(&self, packet: &[u8], client_addr: SocketAddr) { + let time = Utc::now(); + let iter = StatsdRecordVisitor::new(packet); + for r in iter { + match r { + Ok(r) => self.collector.load().add_metric(time, r, None), + Err(e) => { + debug!("invalid StatsD record from {client_addr:?}: {e}"); + } + } + } + } +} + +impl Importer for StatsdUnixImporter { + fn collector(&self) -> &NodeName { + self.config.collector() + } +} diff --git a/lib/g3-daemon/src/listen/mod.rs b/lib/g3-daemon/src/listen/mod.rs index cc44209c..972e8f7d 100644 --- a/lib/g3-daemon/src/listen/mod.rs +++ b/lib/g3-daemon/src/listen/mod.rs @@ -16,3 +16,8 @@ pub use udp::{ReceiveUdpRuntime, ReceiveUdpServer}; #[cfg_attr(not(feature = "quic"), path = "no_quic.rs")] mod quic; pub use quic::{AcceptQuicServer, ListenQuicConf, ListenQuicRuntime}; + +#[cfg(unix)] +mod unix; +#[cfg(unix)] +pub use unix::*; diff --git a/lib/g3-daemon/src/listen/udp.rs b/lib/g3-daemon/src/listen/udp.rs index 459a5025..d3ebb25f 100644 --- a/lib/g3-daemon/src/listen/udp.rs +++ b/lib/g3-daemon/src/listen/udp.rs @@ -18,7 +18,7 @@ use g3_types::net::UdpListenConfig; use crate::server::{BaseServer, ReloadServer, ServerReloadCommand}; pub trait ReceiveUdpServer: BaseServer { - fn receive_packet( + fn receive_udp_packet( &self, packet: &[u8], client_addr: SocketAddr, @@ -128,7 +128,7 @@ where match r { Ok((len, peer_addr, local_addr)) => { // TODO add stats - self.server.receive_packet(&buf[..len], peer_addr, local_addr, self.worker_id); + self.server.receive_udp_packet(&buf[..len], peer_addr, local_addr, self.worker_id); } Err(e) => { warn!("SRT[{}_v{}#{}] error receiving data from socket, error: {e}", diff --git a/lib/g3-daemon/src/listen/unix/datagram.rs b/lib/g3-daemon/src/listen/unix/datagram.rs new file mode 100644 index 00000000..40dfc8d3 --- /dev/null +++ b/lib/g3-daemon/src/listen/unix/datagram.rs @@ -0,0 +1,151 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2025 ByteDance and/or its affiliates. + */ + +use anyhow::anyhow; +use log::{info, warn}; +use std::path::PathBuf; +use tokio::net::UnixDatagram; +use tokio::net::unix::SocketAddr as UnixSocketAddr; +use tokio::sync::broadcast; + +use crate::server::{BaseServer, ReloadServer, ServerReloadCommand}; + +pub trait ReceiveUnixDatagramServer: BaseServer { + fn receive_unix_packet(&self, packet: &[u8], peer_addr: UnixSocketAddr); +} + +#[derive(Clone)] +pub struct ReceiveUnixDatagramRuntime { + server: S, + server_type: &'static str, + server_version: usize, + listen_path: PathBuf, + //listen_stats: Arc, +} + +impl ReceiveUnixDatagramRuntime +where + S: ReceiveUnixDatagramServer + ReloadServer + Clone + Send + Sync + 'static, +{ + pub fn new(server: S, listen_path: PathBuf) -> Self { + let server_type = server.r#type(); + let server_version = server.version(); + ReceiveUnixDatagramRuntime { + server, + server_type, + server_version, + listen_path, + } + } + + fn pre_start(&self) { + info!( + "started {} SRT[{}_v{}]", + self.server_type, + self.server.name(), + self.server_version, + ); + //self.listen_stats.add_running_runtime(); + } + + fn pre_stop(&self) { + info!( + "stopping {} SRT[{}_v{}]", + self.server_type, + self.server.name(), + self.server_version, + ); + } + + fn post_stop(&self) { + info!( + "stopped {} SRT[{}_v{}]", + self.server_type, + self.server.name(), + self.server_version, + ); + //self.listen_stats.del_running_runtime(); + } + + async fn run( + mut self, + socket: UnixDatagram, + mut server_reload_channel: broadcast::Receiver, + ) { + use broadcast::error::RecvError; + + let mut buf = [0u8; u16::MAX as usize]; + loop { + tokio::select! { + biased; + + ev = server_reload_channel.recv() => { + match ev { + Ok(ServerReloadCommand::ReloadVersion(version)) => { + info!("SRT[{}_v{}] received reload request from v{version}", + self.server.name(), self.server_version); + let new_server = self.server.reload(); + self.server_version = new_server.version(); + self.server = new_server; + continue; + } + Ok(ServerReloadCommand::QuitRuntime) => {}, + Err(RecvError::Closed) => {}, + Err(RecvError::Lagged(dropped)) => { + warn!("SRT[{}_v{}] server {} reload notify channel overflowed, {dropped} msg dropped", + self.server.name(), self.server_version, self.server.name()); + continue; + } + } + + info!("SRT[{}_v{}] will go offline", + self.server.name(), self.server_version); + self.pre_stop(); + break; + } + r = socket.recv_from(&mut buf) => { + match r { + Ok((len, peer_addr)) => { + // TODO add stats + self.server.receive_unix_packet(&buf[..len], peer_addr); + } + Err(e) => { + warn!("SRT[{}_v{}] error receiving data from socket, error: {e}", + self.server.name(), self.server_version); + } + } + } + } + } + + self.post_stop(); + } + + pub fn spawn( + self, + server_reload_sender: &broadcast::Sender, + ) -> anyhow::Result<()> { + if self.listen_path.exists() { + std::fs::remove_file(&self.listen_path).map_err(|e| { + anyhow!( + "failed to delete existed socket file {}: {e}", + self.listen_path.display() + ) + })?; + } + let socket = UnixDatagram::bind(&self.listen_path).map_err(|e| { + anyhow!( + "failed to create unix datagram socket on path {}: {e}", + self.listen_path.display() + ) + })?; + let server_reload_channel = server_reload_sender.subscribe(); + tokio::spawn(async move { + self.pre_start(); + self.run(socket, server_reload_channel).await; + }); + Ok(()) + } +} diff --git a/lib/g3-daemon/src/listen/unix/mod.rs b/lib/g3-daemon/src/listen/unix/mod.rs new file mode 100644 index 00000000..1fb57b15 --- /dev/null +++ b/lib/g3-daemon/src/listen/unix/mod.rs @@ -0,0 +1,7 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2025 ByteDance and/or its affiliates. + */ + +mod datagram; +pub use datagram::{ReceiveUnixDatagramRuntime, ReceiveUnixDatagramServer}; diff --git a/scripts/coverage/g3proxy/0006_chain_http_proxy/g3proxy.yaml b/scripts/coverage/g3proxy/0006_chain_http_proxy/g3proxy.yaml index bab3cd75..c491917d 100644 --- a/scripts/coverage/g3proxy/0006_chain_http_proxy/g3proxy.yaml +++ b/scripts/coverage/g3proxy/0006_chain_http_proxy/g3proxy.yaml @@ -4,7 +4,7 @@ log: journal stat: target: - udp: 127.0.0.1:8125 + unix: /tmp/g3statsd.sock resolver: - name: cares1 diff --git a/scripts/coverage/g3proxy/0022_audit_icap_no_preview/g3proxy.yaml b/scripts/coverage/g3proxy/0022_audit_icap_no_preview/g3proxy.yaml index 1101155a..ba1d3489 100644 --- a/scripts/coverage/g3proxy/0022_audit_icap_no_preview/g3proxy.yaml +++ b/scripts/coverage/g3proxy/0022_audit_icap_no_preview/g3proxy.yaml @@ -4,7 +4,7 @@ log: stdout stat: target: - udp: 127.0.0.1:8125 + unix: /tmp/g3statsd.sock resolver: - name: default diff --git a/scripts/coverage/g3proxy/g3statsd.yaml b/scripts/coverage/g3proxy/g3statsd.yaml index 129c1eda..4da7410a 100644 --- a/scripts/coverage/g3proxy/g3statsd.yaml +++ b/scripts/coverage/g3proxy/g3statsd.yaml @@ -5,11 +5,15 @@ worker: thread_number: 2 importer: - - name: statsd - type: statsd + - name: statsd_udp + type: statsd_udp collector: aggregate_1s listen: 127.0.0.1:8125 listen_in_worker: true + - name: statsd_unix + type: statsd_unix + collector: aggregate_1s + listen: /tmp/g3statsd.sock collector: - name: aggregate_1s