mirror of
https://github.com/bytedance/g3.git
synced 2026-05-09 19:42:52 +00:00
g3statsd: allow to listen to unix statsd socket
This commit is contained in:
parent
f65d8061a5
commit
53028622d3
18 changed files with 562 additions and 45 deletions
|
|
@ -1,8 +1,12 @@
|
|||
importer:
|
||||
- name: statsd
|
||||
type: statsd
|
||||
- name: statsd_udp
|
||||
type: statsd_udp
|
||||
collector: aggregate_4s
|
||||
listen: 127.0.0.1:8125
|
||||
- name: statsd_unix
|
||||
type: statsd_unix
|
||||
collector: aggregate_4s
|
||||
listen: /tmp/g3statsd.sock
|
||||
|
||||
collector:
|
||||
- name: aggregate_4s
|
||||
|
|
|
|||
|
|
@ -47,7 +47,9 @@ pub(crate) trait ImporterConfig {
|
|||
#[allow(clippy::large_enum_variant)]
|
||||
pub(crate) enum AnyImporterConfig {
|
||||
Dummy(dummy::DummyImporterConfig),
|
||||
StatsD(statsd::StatsdImporterConfig),
|
||||
StatsDUdp(statsd::StatsdUdpImporterConfig),
|
||||
#[cfg(unix)]
|
||||
StatsDUnix(statsd::StatsdUnixImporterConfig),
|
||||
}
|
||||
|
||||
pub(crate) fn load_all(v: &Yaml, conf_dir: &Path) -> anyhow::Result<()> {
|
||||
|
|
@ -88,10 +90,16 @@ fn load_importer(
|
|||
.context("failed to load this Dummy importer")?;
|
||||
Ok(AnyImporterConfig::Dummy(importer))
|
||||
}
|
||||
"statsd" => {
|
||||
let importer = statsd::StatsdImporterConfig::parse(map, position)
|
||||
.context("failed to load this StatsD importer")?;
|
||||
Ok(AnyImporterConfig::StatsD(importer))
|
||||
"statsd" | "statsd_udp" => {
|
||||
let importer = statsd::StatsdUdpImporterConfig::parse(map, position)
|
||||
.context("failed to load this StatsD_UDP importer")?;
|
||||
Ok(AnyImporterConfig::StatsDUdp(importer))
|
||||
}
|
||||
#[cfg(unix)]
|
||||
"statsd_unix" => {
|
||||
let importer = statsd::StatsdUnixImporterConfig::parse(map, position)
|
||||
.context("failed to load this StatsD_UNIX importer")?;
|
||||
Ok(AnyImporterConfig::StatsDUnix(importer))
|
||||
}
|
||||
_ => Err(anyhow!("unsupported importer type {}", importer_type)),
|
||||
}
|
||||
|
|
|
|||
15
g3statsd/src/config/importer/statsd/mod.rs
Normal file
15
g3statsd/src/config/importer/statsd/mod.rs
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction};
|
||||
use super::{CONFIG_KEY_IMPORTER_NAME, CONFIG_KEY_IMPORTER_TYPE};
|
||||
|
||||
mod udp;
|
||||
pub(crate) use udp::StatsdUdpImporterConfig;
|
||||
|
||||
#[cfg(unix)]
|
||||
mod unix;
|
||||
#[cfg(unix)]
|
||||
pub(crate) use unix::StatsdUnixImporterConfig;
|
||||
|
|
@ -13,10 +13,10 @@ use g3_yaml::YamlDocPosition;
|
|||
|
||||
use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction};
|
||||
|
||||
const IMPORTER_CONFIG_TYPE: &str = "StatsD";
|
||||
const IMPORTER_CONFIG_TYPE: &str = "StatsD_UDP";
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub(crate) struct StatsdImporterConfig {
|
||||
pub(crate) struct StatsdUdpImporterConfig {
|
||||
name: NodeName,
|
||||
position: Option<YamlDocPosition>,
|
||||
pub(crate) collector: NodeName,
|
||||
|
|
@ -25,9 +25,9 @@ pub(crate) struct StatsdImporterConfig {
|
|||
pub(crate) ingress_net_filter: Option<AclNetworkRuleBuilder>,
|
||||
}
|
||||
|
||||
impl StatsdImporterConfig {
|
||||
impl StatsdUdpImporterConfig {
|
||||
fn new(position: Option<YamlDocPosition>) -> Self {
|
||||
StatsdImporterConfig {
|
||||
StatsdUdpImporterConfig {
|
||||
name: NodeName::default(),
|
||||
position,
|
||||
collector: Default::default(),
|
||||
|
|
@ -41,7 +41,7 @@ impl StatsdImporterConfig {
|
|||
map: &yaml::Hash,
|
||||
position: Option<YamlDocPosition>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut importer = StatsdImporterConfig::new(position);
|
||||
let mut importer = StatsdUdpImporterConfig::new(position);
|
||||
|
||||
g3_yaml::foreach_kv(map, |k, v| importer.set(k, v))?;
|
||||
|
||||
|
|
@ -94,7 +94,7 @@ impl StatsdImporterConfig {
|
|||
}
|
||||
}
|
||||
|
||||
impl ImporterConfig for StatsdImporterConfig {
|
||||
impl ImporterConfig for StatsdUdpImporterConfig {
|
||||
fn name(&self) -> &NodeName {
|
||||
&self.name
|
||||
}
|
||||
|
|
@ -108,7 +108,7 @@ impl ImporterConfig for StatsdImporterConfig {
|
|||
}
|
||||
|
||||
fn diff_action(&self, new: &AnyImporterConfig) -> ImporterConfigDiffAction {
|
||||
let AnyImporterConfig::StatsD(new) = new else {
|
||||
let AnyImporterConfig::StatsDUdp(new) = new else {
|
||||
return ImporterConfigDiffAction::SpawnNew;
|
||||
};
|
||||
|
||||
115
g3statsd/src/config/importer/statsd/unix.rs
Normal file
115
g3statsd/src/config/importer/statsd/unix.rs
Normal file
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
use std::path::PathBuf;
|
||||
|
||||
use anyhow::{Context, anyhow};
|
||||
use yaml_rust::{Yaml, yaml};
|
||||
|
||||
use g3_types::metrics::NodeName;
|
||||
use g3_yaml::YamlDocPosition;
|
||||
|
||||
use super::{AnyImporterConfig, ImporterConfig, ImporterConfigDiffAction};
|
||||
|
||||
const IMPORTER_CONFIG_TYPE: &str = "StatsD_UNIX";
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
pub(crate) struct StatsdUnixImporterConfig {
|
||||
name: NodeName,
|
||||
position: Option<YamlDocPosition>,
|
||||
pub(crate) collector: NodeName,
|
||||
pub(crate) listen: PathBuf,
|
||||
}
|
||||
|
||||
impl StatsdUnixImporterConfig {
|
||||
fn new(position: Option<YamlDocPosition>) -> Self {
|
||||
StatsdUnixImporterConfig {
|
||||
name: NodeName::default(),
|
||||
position,
|
||||
collector: Default::default(),
|
||||
listen: PathBuf::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn parse(
|
||||
map: &yaml::Hash,
|
||||
position: Option<YamlDocPosition>,
|
||||
) -> anyhow::Result<Self> {
|
||||
let mut importer = StatsdUnixImporterConfig::new(position);
|
||||
|
||||
g3_yaml::foreach_kv(map, |k, v| importer.set(k, v))?;
|
||||
|
||||
importer.check()?;
|
||||
Ok(importer)
|
||||
}
|
||||
|
||||
fn set(&mut self, k: &str, v: &Yaml) -> anyhow::Result<()> {
|
||||
match g3_yaml::key::normalize(k).as_str() {
|
||||
super::CONFIG_KEY_IMPORTER_TYPE => Ok(()),
|
||||
super::CONFIG_KEY_IMPORTER_NAME => {
|
||||
self.name = g3_yaml::value::as_metric_node_name(v)?;
|
||||
Ok(())
|
||||
}
|
||||
"collector" => {
|
||||
self.collector = g3_yaml::value::as_metric_node_name(v)?;
|
||||
Ok(())
|
||||
}
|
||||
"listen" => {
|
||||
self.listen = g3_yaml::value::as_absolute_path(v)
|
||||
.context(format!("invalid unix listen path value for key {k}"))?;
|
||||
Ok(())
|
||||
}
|
||||
_ => Err(anyhow!("invalid key {k}")),
|
||||
}
|
||||
}
|
||||
|
||||
fn check(&mut self) -> anyhow::Result<()> {
|
||||
if self.name.is_empty() {
|
||||
return Err(anyhow!("name is not set"));
|
||||
}
|
||||
if self.collector.is_empty() {
|
||||
return Err(anyhow!("collector is not set"));
|
||||
}
|
||||
if self.listen.as_os_str().is_empty() {
|
||||
return Err(anyhow!("listen path is not set"));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl ImporterConfig for StatsdUnixImporterConfig {
|
||||
fn name(&self) -> &NodeName {
|
||||
&self.name
|
||||
}
|
||||
|
||||
fn position(&self) -> Option<YamlDocPosition> {
|
||||
self.position.clone()
|
||||
}
|
||||
|
||||
fn importer_type(&self) -> &'static str {
|
||||
IMPORTER_CONFIG_TYPE
|
||||
}
|
||||
|
||||
fn diff_action(&self, new: &AnyImporterConfig) -> ImporterConfigDiffAction {
|
||||
let AnyImporterConfig::StatsDUnix(new) = new else {
|
||||
return ImporterConfigDiffAction::SpawnNew;
|
||||
};
|
||||
|
||||
if self.eq(new) {
|
||||
return ImporterConfigDiffAction::NoAction;
|
||||
}
|
||||
|
||||
if self.listen != new.listen {
|
||||
return ImporterConfigDiffAction::ReloadAndRespawn;
|
||||
}
|
||||
|
||||
ImporterConfigDiffAction::ReloadNoRespawn
|
||||
}
|
||||
|
||||
fn collector(&self) -> &NodeName {
|
||||
&self.collector
|
||||
}
|
||||
}
|
||||
|
|
@ -7,9 +7,13 @@ use std::net::SocketAddr;
|
|||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
#[cfg(unix)]
|
||||
use tokio::net::unix::SocketAddr as UnixSocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use g3_daemon::listen::ReceiveUdpServer;
|
||||
#[cfg(unix)]
|
||||
use g3_daemon::listen::ReceiveUnixDatagramServer;
|
||||
use g3_daemon::server::{BaseServer, ServerReloadCommand};
|
||||
use g3_types::metrics::NodeName;
|
||||
|
||||
|
|
@ -113,7 +117,7 @@ impl BaseServer for DummyImporter {
|
|||
}
|
||||
|
||||
impl ReceiveUdpServer for DummyImporter {
|
||||
fn receive_packet(
|
||||
fn receive_udp_packet(
|
||||
&self,
|
||||
_packet: &[u8],
|
||||
_client_addr: SocketAddr,
|
||||
|
|
@ -123,6 +127,11 @@ impl ReceiveUdpServer for DummyImporter {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ReceiveUnixDatagramServer for DummyImporter {
|
||||
fn receive_unix_packet(&self, _packet: &[u8], _peer_addr: UnixSocketAddr) {}
|
||||
}
|
||||
|
||||
impl Importer for DummyImporter {
|
||||
fn collector(&self) -> &NodeName {
|
||||
Default::default()
|
||||
|
|
|
|||
|
|
@ -6,9 +6,13 @@
|
|||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[cfg(unix)]
|
||||
use tokio::net::unix::SocketAddr as UnixSocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use g3_daemon::listen::ReceiveUdpServer;
|
||||
#[cfg(unix)]
|
||||
use g3_daemon::listen::ReceiveUnixDatagramServer;
|
||||
use g3_daemon::server::{BaseServer, ReloadServer, ServerReloadCommand};
|
||||
use g3_types::metrics::NodeName;
|
||||
|
||||
|
|
@ -25,6 +29,13 @@ pub use ops::{spawn_all, stop_all};
|
|||
mod dummy;
|
||||
mod statsd;
|
||||
|
||||
#[cfg(unix)]
|
||||
pub(crate) trait Importer:
|
||||
ReceiveUdpServer + ReceiveUnixDatagramServer + BaseServer
|
||||
{
|
||||
fn collector(&self) -> &NodeName;
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
pub(crate) trait Importer: ReceiveUdpServer + BaseServer {
|
||||
fn collector(&self) -> &NodeName;
|
||||
}
|
||||
|
|
@ -77,7 +88,7 @@ impl ReloadServer for WrapArcImporter {
|
|||
}
|
||||
|
||||
impl ReceiveUdpServer for WrapArcImporter {
|
||||
fn receive_packet(
|
||||
fn receive_udp_packet(
|
||||
&self,
|
||||
packet: &[u8],
|
||||
client_addr: SocketAddr,
|
||||
|
|
@ -85,7 +96,14 @@ impl ReceiveUdpServer for WrapArcImporter {
|
|||
worker_id: Option<usize>,
|
||||
) {
|
||||
self.0
|
||||
.receive_packet(packet, client_addr, server_addr, worker_id)
|
||||
.receive_udp_packet(packet, client_addr, server_addr, worker_id)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
impl ReceiveUnixDatagramServer for WrapArcImporter {
|
||||
fn receive_unix_packet(&self, packet: &[u8], peer_addr: UnixSocketAddr) {
|
||||
self.0.receive_unix_packet(packet, peer_addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -157,8 +157,12 @@ fn delete_existed_unlocked(name: &NodeName) {
|
|||
fn spawn_new_unlocked(config: AnyImporterConfig) -> anyhow::Result<()> {
|
||||
let importer = match config {
|
||||
AnyImporterConfig::Dummy(config) => super::dummy::DummyImporter::prepare_initial(config)?,
|
||||
AnyImporterConfig::StatsD(config) => {
|
||||
super::statsd::StatsdImporter::prepare_initial(config)?
|
||||
AnyImporterConfig::StatsDUdp(config) => {
|
||||
super::statsd::StatsdUdpImporter::prepare_initial(config)?
|
||||
}
|
||||
#[cfg(unix)]
|
||||
AnyImporterConfig::StatsDUnix(config) => {
|
||||
super::statsd::StatsdUnixImporter::prepare_initial(config)?
|
||||
}
|
||||
};
|
||||
registry::add(importer)
|
||||
|
|
|
|||
|
|
@ -3,8 +3,13 @@
|
|||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
mod import;
|
||||
pub(super) use import::StatsdImporter;
|
||||
|
||||
mod parser;
|
||||
use parser::StatsdRecordVisitor;
|
||||
|
||||
mod udp;
|
||||
pub(super) use udp::StatsdUdpImporter;
|
||||
|
||||
#[cfg(unix)]
|
||||
mod unix;
|
||||
#[cfg(unix)]
|
||||
pub(super) use unix::StatsdUnixImporter;
|
||||
|
|
|
|||
|
|
@ -10,8 +10,12 @@ use anyhow::anyhow;
|
|||
use arc_swap::ArcSwap;
|
||||
use chrono::Utc;
|
||||
use log::debug;
|
||||
#[cfg(unix)]
|
||||
use tokio::net::unix::SocketAddr as UnixSocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
#[cfg(unix)]
|
||||
use g3_daemon::listen::ReceiveUnixDatagramServer;
|
||||
use g3_daemon::listen::{ReceiveUdpRuntime, ReceiveUdpServer};
|
||||
use g3_daemon::server::{BaseServer, ServerReloadCommand};
|
||||
use g3_types::acl::{AclAction, AclNetworkRule};
|
||||
|
|
@ -19,14 +23,14 @@ use g3_types::metrics::NodeName;
|
|||
|
||||
use super::StatsdRecordVisitor;
|
||||
use crate::collect::ArcCollector;
|
||||
use crate::config::importer::statsd::StatsdImporterConfig;
|
||||
use crate::config::importer::statsd::StatsdUdpImporterConfig;
|
||||
use crate::config::importer::{AnyImporterConfig, ImporterConfig};
|
||||
use crate::import::{
|
||||
ArcImporter, ArcImporterInternal, Importer, ImporterInternal, ImporterRegistry, WrapArcImporter,
|
||||
};
|
||||
|
||||
pub(crate) struct StatsdImporter {
|
||||
config: StatsdImporterConfig,
|
||||
pub(crate) struct StatsdUdpImporter {
|
||||
config: StatsdUdpImporterConfig,
|
||||
ingress_net_filter: Option<AclNetworkRule>,
|
||||
reload_sender: broadcast::Sender<ServerReloadCommand>,
|
||||
|
||||
|
|
@ -34,8 +38,8 @@ pub(crate) struct StatsdImporter {
|
|||
reload_version: usize,
|
||||
}
|
||||
|
||||
impl StatsdImporter {
|
||||
fn new(config: StatsdImporterConfig, reload_version: usize) -> Self {
|
||||
impl StatsdUdpImporter {
|
||||
fn new(config: StatsdUdpImporterConfig, reload_version: usize) -> Self {
|
||||
let reload_sender = crate::import::new_reload_notify_channel();
|
||||
|
||||
let ingress_net_filter = config
|
||||
|
|
@ -45,7 +49,7 @@ impl StatsdImporter {
|
|||
|
||||
let collector = Arc::new(crate::collect::get_or_insert_default(config.collector()));
|
||||
|
||||
StatsdImporter {
|
||||
StatsdUdpImporter {
|
||||
config,
|
||||
ingress_net_filter,
|
||||
reload_sender,
|
||||
|
|
@ -55,15 +59,15 @@ impl StatsdImporter {
|
|||
}
|
||||
|
||||
pub(crate) fn prepare_initial(
|
||||
config: StatsdImporterConfig,
|
||||
config: StatsdUdpImporterConfig,
|
||||
) -> anyhow::Result<ArcImporterInternal> {
|
||||
let server = StatsdImporter::new(config, 1);
|
||||
let server = StatsdUdpImporter::new(config, 1);
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result<StatsdImporter> {
|
||||
if let AnyImporterConfig::StatsD(config) = config {
|
||||
Ok(StatsdImporter::new(config, self.reload_version + 1))
|
||||
fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result<StatsdUdpImporter> {
|
||||
if let AnyImporterConfig::StatsDUdp(config) = config {
|
||||
Ok(StatsdUdpImporter::new(config, self.reload_version + 1))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"config type mismatch: expect {}, actual {}",
|
||||
|
|
@ -90,9 +94,9 @@ impl StatsdImporter {
|
|||
}
|
||||
}
|
||||
|
||||
impl ImporterInternal for StatsdImporter {
|
||||
impl ImporterInternal for StatsdUdpImporter {
|
||||
fn _clone_config(&self) -> AnyImporterConfig {
|
||||
AnyImporterConfig::StatsD(self.config.clone())
|
||||
AnyImporterConfig::StatsDUdp(self.config.clone())
|
||||
}
|
||||
|
||||
fn _reload_config_notify_runtime(&self) {
|
||||
|
|
@ -137,7 +141,7 @@ impl ImporterInternal for StatsdImporter {
|
|||
}
|
||||
}
|
||||
|
||||
impl BaseServer for StatsdImporter {
|
||||
impl BaseServer for StatsdUdpImporter {
|
||||
#[inline]
|
||||
fn name(&self) -> &NodeName {
|
||||
self.config.name()
|
||||
|
|
@ -154,8 +158,8 @@ impl BaseServer for StatsdImporter {
|
|||
}
|
||||
}
|
||||
|
||||
impl ReceiveUdpServer for StatsdImporter {
|
||||
fn receive_packet(
|
||||
impl ReceiveUdpServer for StatsdUdpImporter {
|
||||
fn receive_udp_packet(
|
||||
&self,
|
||||
packet: &[u8],
|
||||
client_addr: SocketAddr,
|
||||
|
|
@ -179,7 +183,12 @@ impl ReceiveUdpServer for StatsdImporter {
|
|||
}
|
||||
}
|
||||
|
||||
impl Importer for StatsdImporter {
|
||||
#[cfg(unix)]
|
||||
impl ReceiveUnixDatagramServer for StatsdUdpImporter {
|
||||
fn receive_unix_packet(&self, _packet: &[u8], _peer_addr: UnixSocketAddr) {}
|
||||
}
|
||||
|
||||
impl Importer for StatsdUdpImporter {
|
||||
fn collector(&self) -> &NodeName {
|
||||
self.config.collector()
|
||||
}
|
||||
163
g3statsd/src/import/statsd/unix.rs
Normal file
163
g3statsd/src/import/statsd/unix.rs
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use arc_swap::ArcSwap;
|
||||
use chrono::Utc;
|
||||
use log::debug;
|
||||
use tokio::net::unix::SocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use g3_daemon::listen::{ReceiveUdpServer, ReceiveUnixDatagramRuntime, ReceiveUnixDatagramServer};
|
||||
use g3_daemon::server::{BaseServer, ServerReloadCommand};
|
||||
use g3_types::metrics::NodeName;
|
||||
|
||||
use super::StatsdRecordVisitor;
|
||||
use crate::collect::ArcCollector;
|
||||
use crate::config::importer::statsd::StatsdUnixImporterConfig;
|
||||
use crate::config::importer::{AnyImporterConfig, ImporterConfig};
|
||||
use crate::import::{
|
||||
ArcImporter, ArcImporterInternal, Importer, ImporterInternal, ImporterRegistry, WrapArcImporter,
|
||||
};
|
||||
|
||||
pub(crate) struct StatsdUnixImporter {
|
||||
config: StatsdUnixImporterConfig,
|
||||
reload_sender: broadcast::Sender<ServerReloadCommand>,
|
||||
|
||||
collector: ArcSwap<ArcCollector>,
|
||||
reload_version: usize,
|
||||
}
|
||||
|
||||
impl StatsdUnixImporter {
|
||||
fn new(config: StatsdUnixImporterConfig, reload_version: usize) -> Self {
|
||||
let reload_sender = crate::import::new_reload_notify_channel();
|
||||
|
||||
let collector = Arc::new(crate::collect::get_or_insert_default(config.collector()));
|
||||
|
||||
StatsdUnixImporter {
|
||||
config,
|
||||
reload_sender,
|
||||
collector: ArcSwap::new(collector),
|
||||
reload_version,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn prepare_initial(
|
||||
config: StatsdUnixImporterConfig,
|
||||
) -> anyhow::Result<ArcImporterInternal> {
|
||||
let server = StatsdUnixImporter::new(config, 1);
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
fn prepare_reload(&self, config: AnyImporterConfig) -> anyhow::Result<StatsdUnixImporter> {
|
||||
if let AnyImporterConfig::StatsDUnix(config) = config {
|
||||
Ok(StatsdUnixImporter::new(config, self.reload_version + 1))
|
||||
} else {
|
||||
Err(anyhow!(
|
||||
"config type mismatch: expect {}, actual {}",
|
||||
self.config.importer_type(),
|
||||
config.importer_type()
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ImporterInternal for StatsdUnixImporter {
|
||||
fn _clone_config(&self) -> AnyImporterConfig {
|
||||
AnyImporterConfig::StatsDUnix(self.config.clone())
|
||||
}
|
||||
|
||||
fn _reload_config_notify_runtime(&self) {
|
||||
let cmd = ServerReloadCommand::ReloadVersion(self.reload_version);
|
||||
let _ = self.reload_sender.send(cmd);
|
||||
}
|
||||
|
||||
fn _update_collector_in_place(&self) {
|
||||
let collector = crate::collect::get_or_insert_default(self.config.collector());
|
||||
self.collector.store(Arc::new(collector));
|
||||
}
|
||||
|
||||
fn _reload_with_old_notifier(
|
||||
&self,
|
||||
config: AnyImporterConfig,
|
||||
_registry: &mut ImporterRegistry,
|
||||
) -> anyhow::Result<ArcImporterInternal> {
|
||||
let mut server = self.prepare_reload(config)?;
|
||||
server.reload_sender = self.reload_sender.clone();
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
fn _reload_with_new_notifier(
|
||||
&self,
|
||||
config: AnyImporterConfig,
|
||||
_registry: &mut ImporterRegistry,
|
||||
) -> anyhow::Result<ArcImporterInternal> {
|
||||
let server = self.prepare_reload(config)?;
|
||||
Ok(Arc::new(server))
|
||||
}
|
||||
|
||||
fn _start_runtime(&self, importer: ArcImporter) -> anyhow::Result<()> {
|
||||
let runtime = ReceiveUnixDatagramRuntime::new(
|
||||
WrapArcImporter(importer.clone()),
|
||||
self.config.listen.clone(),
|
||||
);
|
||||
runtime.spawn(&self.reload_sender)
|
||||
}
|
||||
|
||||
fn _abort_runtime(&self) {
|
||||
let _ = self.reload_sender.send(ServerReloadCommand::QuitRuntime);
|
||||
}
|
||||
}
|
||||
|
||||
impl BaseServer for StatsdUnixImporter {
|
||||
#[inline]
|
||||
fn name(&self) -> &NodeName {
|
||||
self.config.name()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn r#type(&self) -> &'static str {
|
||||
self.config.importer_type()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
fn version(&self) -> usize {
|
||||
self.reload_version
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceiveUdpServer for StatsdUnixImporter {
|
||||
fn receive_udp_packet(
|
||||
&self,
|
||||
_packet: &[u8],
|
||||
_client_addr: std::net::SocketAddr,
|
||||
_server_addr: std::net::SocketAddr,
|
||||
_worker_id: Option<usize>,
|
||||
) {
|
||||
}
|
||||
}
|
||||
|
||||
impl ReceiveUnixDatagramServer for StatsdUnixImporter {
|
||||
fn receive_unix_packet(&self, packet: &[u8], client_addr: SocketAddr) {
|
||||
let time = Utc::now();
|
||||
let iter = StatsdRecordVisitor::new(packet);
|
||||
for r in iter {
|
||||
match r {
|
||||
Ok(r) => self.collector.load().add_metric(time, r, None),
|
||||
Err(e) => {
|
||||
debug!("invalid StatsD record from {client_addr:?}: {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Importer for StatsdUnixImporter {
|
||||
fn collector(&self) -> &NodeName {
|
||||
self.config.collector()
|
||||
}
|
||||
}
|
||||
|
|
@ -16,3 +16,8 @@ pub use udp::{ReceiveUdpRuntime, ReceiveUdpServer};
|
|||
#[cfg_attr(not(feature = "quic"), path = "no_quic.rs")]
|
||||
mod quic;
|
||||
pub use quic::{AcceptQuicServer, ListenQuicConf, ListenQuicRuntime};
|
||||
|
||||
#[cfg(unix)]
|
||||
mod unix;
|
||||
#[cfg(unix)]
|
||||
pub use unix::*;
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ use g3_types::net::UdpListenConfig;
|
|||
use crate::server::{BaseServer, ReloadServer, ServerReloadCommand};
|
||||
|
||||
pub trait ReceiveUdpServer: BaseServer {
|
||||
fn receive_packet(
|
||||
fn receive_udp_packet(
|
||||
&self,
|
||||
packet: &[u8],
|
||||
client_addr: SocketAddr,
|
||||
|
|
@ -128,7 +128,7 @@ where
|
|||
match r {
|
||||
Ok((len, peer_addr, local_addr)) => {
|
||||
// TODO add stats
|
||||
self.server.receive_packet(&buf[..len], peer_addr, local_addr, self.worker_id);
|
||||
self.server.receive_udp_packet(&buf[..len], peer_addr, local_addr, self.worker_id);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("SRT[{}_v{}#{}] error receiving data from socket, error: {e}",
|
||||
|
|
|
|||
151
lib/g3-daemon/src/listen/unix/datagram.rs
Normal file
151
lib/g3-daemon/src/listen/unix/datagram.rs
Normal file
|
|
@ -0,0 +1,151 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
use anyhow::anyhow;
|
||||
use log::{info, warn};
|
||||
use std::path::PathBuf;
|
||||
use tokio::net::UnixDatagram;
|
||||
use tokio::net::unix::SocketAddr as UnixSocketAddr;
|
||||
use tokio::sync::broadcast;
|
||||
|
||||
use crate::server::{BaseServer, ReloadServer, ServerReloadCommand};
|
||||
|
||||
pub trait ReceiveUnixDatagramServer: BaseServer {
|
||||
fn receive_unix_packet(&self, packet: &[u8], peer_addr: UnixSocketAddr);
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ReceiveUnixDatagramRuntime<S> {
|
||||
server: S,
|
||||
server_type: &'static str,
|
||||
server_version: usize,
|
||||
listen_path: PathBuf,
|
||||
//listen_stats: Arc<ListenStats>,
|
||||
}
|
||||
|
||||
impl<S> ReceiveUnixDatagramRuntime<S>
|
||||
where
|
||||
S: ReceiveUnixDatagramServer + ReloadServer + Clone + Send + Sync + 'static,
|
||||
{
|
||||
pub fn new(server: S, listen_path: PathBuf) -> Self {
|
||||
let server_type = server.r#type();
|
||||
let server_version = server.version();
|
||||
ReceiveUnixDatagramRuntime {
|
||||
server,
|
||||
server_type,
|
||||
server_version,
|
||||
listen_path,
|
||||
}
|
||||
}
|
||||
|
||||
fn pre_start(&self) {
|
||||
info!(
|
||||
"started {} SRT[{}_v{}]",
|
||||
self.server_type,
|
||||
self.server.name(),
|
||||
self.server_version,
|
||||
);
|
||||
//self.listen_stats.add_running_runtime();
|
||||
}
|
||||
|
||||
fn pre_stop(&self) {
|
||||
info!(
|
||||
"stopping {} SRT[{}_v{}]",
|
||||
self.server_type,
|
||||
self.server.name(),
|
||||
self.server_version,
|
||||
);
|
||||
}
|
||||
|
||||
fn post_stop(&self) {
|
||||
info!(
|
||||
"stopped {} SRT[{}_v{}]",
|
||||
self.server_type,
|
||||
self.server.name(),
|
||||
self.server_version,
|
||||
);
|
||||
//self.listen_stats.del_running_runtime();
|
||||
}
|
||||
|
||||
async fn run(
|
||||
mut self,
|
||||
socket: UnixDatagram,
|
||||
mut server_reload_channel: broadcast::Receiver<ServerReloadCommand>,
|
||||
) {
|
||||
use broadcast::error::RecvError;
|
||||
|
||||
let mut buf = [0u8; u16::MAX as usize];
|
||||
loop {
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
ev = server_reload_channel.recv() => {
|
||||
match ev {
|
||||
Ok(ServerReloadCommand::ReloadVersion(version)) => {
|
||||
info!("SRT[{}_v{}] received reload request from v{version}",
|
||||
self.server.name(), self.server_version);
|
||||
let new_server = self.server.reload();
|
||||
self.server_version = new_server.version();
|
||||
self.server = new_server;
|
||||
continue;
|
||||
}
|
||||
Ok(ServerReloadCommand::QuitRuntime) => {},
|
||||
Err(RecvError::Closed) => {},
|
||||
Err(RecvError::Lagged(dropped)) => {
|
||||
warn!("SRT[{}_v{}] server {} reload notify channel overflowed, {dropped} msg dropped",
|
||||
self.server.name(), self.server_version, self.server.name());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
info!("SRT[{}_v{}] will go offline",
|
||||
self.server.name(), self.server_version);
|
||||
self.pre_stop();
|
||||
break;
|
||||
}
|
||||
r = socket.recv_from(&mut buf) => {
|
||||
match r {
|
||||
Ok((len, peer_addr)) => {
|
||||
// TODO add stats
|
||||
self.server.receive_unix_packet(&buf[..len], peer_addr);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("SRT[{}_v{}] error receiving data from socket, error: {e}",
|
||||
self.server.name(), self.server_version);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.post_stop();
|
||||
}
|
||||
|
||||
pub fn spawn(
|
||||
self,
|
||||
server_reload_sender: &broadcast::Sender<ServerReloadCommand>,
|
||||
) -> anyhow::Result<()> {
|
||||
if self.listen_path.exists() {
|
||||
std::fs::remove_file(&self.listen_path).map_err(|e| {
|
||||
anyhow!(
|
||||
"failed to delete existed socket file {}: {e}",
|
||||
self.listen_path.display()
|
||||
)
|
||||
})?;
|
||||
}
|
||||
let socket = UnixDatagram::bind(&self.listen_path).map_err(|e| {
|
||||
anyhow!(
|
||||
"failed to create unix datagram socket on path {}: {e}",
|
||||
self.listen_path.display()
|
||||
)
|
||||
})?;
|
||||
let server_reload_channel = server_reload_sender.subscribe();
|
||||
tokio::spawn(async move {
|
||||
self.pre_start();
|
||||
self.run(socket, server_reload_channel).await;
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
7
lib/g3-daemon/src/listen/unix/mod.rs
Normal file
7
lib/g3-daemon/src/listen/unix/mod.rs
Normal file
|
|
@ -0,0 +1,7 @@
|
|||
/*
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
* Copyright 2025 ByteDance and/or its affiliates.
|
||||
*/
|
||||
|
||||
mod datagram;
|
||||
pub use datagram::{ReceiveUnixDatagramRuntime, ReceiveUnixDatagramServer};
|
||||
|
|
@ -4,7 +4,7 @@ log: journal
|
|||
|
||||
stat:
|
||||
target:
|
||||
udp: 127.0.0.1:8125
|
||||
unix: /tmp/g3statsd.sock
|
||||
|
||||
resolver:
|
||||
- name: cares1
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ log: stdout
|
|||
|
||||
stat:
|
||||
target:
|
||||
udp: 127.0.0.1:8125
|
||||
unix: /tmp/g3statsd.sock
|
||||
|
||||
resolver:
|
||||
- name: default
|
||||
|
|
|
|||
|
|
@ -5,11 +5,15 @@ worker:
|
|||
thread_number: 2
|
||||
|
||||
importer:
|
||||
- name: statsd
|
||||
type: statsd
|
||||
- name: statsd_udp
|
||||
type: statsd_udp
|
||||
collector: aggregate_1s
|
||||
listen: 127.0.0.1:8125
|
||||
listen_in_worker: true
|
||||
- name: statsd_unix
|
||||
type: statsd_unix
|
||||
collector: aggregate_1s
|
||||
listen: /tmp/g3statsd.sock
|
||||
|
||||
collector:
|
||||
- name: aggregate_1s
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue