From 7f3d4ea20211ea647f720e08ed0dbc524342e207 Mon Sep 17 00:00:00 2001 From: Zhang Jingqiang Date: Thu, 3 Aug 2023 12:13:08 +0800 Subject: [PATCH] use UnsafeCell instead of direct casting --- demo/test-tcp-relay/src/main.rs | 2 +- demo/test-tcp-relay/src/stats.rs | 45 ++-------- g3bench/src/target/dns/task.rs | 22 +++-- .../src/serve/http_proxy/task/connect/mod.rs | 2 +- .../http_proxy/task/connect/stats/mod.rs | 2 - .../http_proxy/task/connect/stats/task.rs | 83 ------------------- .../http_proxy/task/connect/stats/wrapper.rs | 7 +- .../src/serve/http_proxy/task/connect/task.rs | 7 +- .../http_proxy/task/forward/stats/task.rs | 30 +------ .../serve/http_proxy/task/ftp/stats/task.rs | 36 ++------ .../http_rproxy/task/forward/stats/task.rs | 30 +------ .../src/serve/sni_proxy/task/accept/task.rs | 4 +- .../serve/socks_proxy/task/tcp_connect/mod.rs | 2 +- .../socks_proxy/task/tcp_connect/stats/mod.rs | 2 - .../task/tcp_connect/stats/task.rs | 83 ------------------- .../task/tcp_connect/stats/wrapper.rs | 10 +-- .../socks_proxy/task/tcp_connect/task.rs | 7 +- .../task/udp_associate/stats/task.rs | 36 +------- .../task/udp_connect/stats/task.rs | 48 +---------- .../serve/openssl_proxy/task/relay/task.rs | 4 +- .../src/serve/rustls_proxy/task/relay/task.rs | 4 +- lib/g3-daemon/src/stat/task/mod.rs | 5 +- lib/g3-daemon/src/stat/task/tcp_stream.rs | 32 ++++--- lib/g3-daemon/src/stat/task/udp_connect.rs | 53 ++++++++++++ lib/g3-types/src/stats/mod.rs | 27 ++++++ lib/g3-types/src/stats/tcp.rs | 35 ++++---- lib/g3-types/src/stats/udp.rs | 59 +++++-------- 27 files changed, 217 insertions(+), 460 deletions(-) delete mode 100644 g3proxy/src/serve/http_proxy/task/connect/stats/task.rs delete mode 100644 g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/task.rs create mode 100644 lib/g3-daemon/src/stat/task/udp_connect.rs diff --git a/demo/test-tcp-relay/src/main.rs b/demo/test-tcp-relay/src/main.rs index 1de1f527..bbf0908a 100644 --- a/demo/test-tcp-relay/src/main.rs +++ b/demo/test-tcp-relay/src/main.rs @@ -46,7 +46,7 @@ async fn process_socket(mut clt_stream: TcpStream) -> io::Result<()> { let (clt_r, clt_w) = clt_stream.split(); let (ups_r, ups_w) = ups_stream.split(); - let task_stats = Arc::new(TaskStats::new()); + let task_stats = Arc::new(TaskStats::default()); let (clt_r_stats, clt_w_stats) = CltStats::new_pair(Arc::clone(&task_stats)); let mut clt_r = LimitedReader::new(clt_r, *SHIFT_MILLIS, *MAX_BYTES, clt_r_stats); diff --git a/demo/test-tcp-relay/src/stats.rs b/demo/test-tcp-relay/src/stats.rs index afdd3be7..68bae958 100644 --- a/demo/test-tcp-relay/src/stats.rs +++ b/demo/test-tcp-relay/src/stats.rs @@ -14,72 +14,45 @@ * limitations under the License. */ +use std::cell::UnsafeCell; use std::sync::Arc; use g3_io_ext::{ ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats, }; -#[derive(Debug)] +#[derive(Debug, Default)] struct HalfConnectionStats { - bytes: u64, - #[allow(unused)] - delay: u64, + bytes: UnsafeCell, } +unsafe impl Sync for HalfConnectionStats {} + impl HalfConnectionStats { - fn new() -> Self { - HalfConnectionStats { bytes: 0, delay: 0 } - } - fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } + let r = unsafe { &mut *self.bytes.get() }; + *r += size; } } -#[derive(Debug)] +#[derive(Debug, Default)] struct ConnectionStats { read: HalfConnectionStats, write: HalfConnectionStats, } -impl ConnectionStats { - fn new() -> Self { - ConnectionStats { - read: HalfConnectionStats::new(), - write: HalfConnectionStats::new(), - } - } -} - -#[derive(Debug)] +#[derive(Debug, Default)] pub struct TaskStats { clt: ConnectionStats, ups: ConnectionStats, } impl TaskStats { - pub fn new() -> Self { - TaskStats { - clt: ConnectionStats::new(), - ups: ConnectionStats::new(), - } - } - fn print(&self) { println!("{self:?}"); } } -impl Default for TaskStats { - fn default() -> Self { - Self::new() - } -} - impl Drop for TaskStats { fn drop(&mut self) { self.print() diff --git a/g3bench/src/target/dns/task.rs b/g3bench/src/target/dns/task.rs index 1851dab8..028b7c0a 100644 --- a/g3bench/src/target/dns/task.rs +++ b/g3bench/src/target/dns/task.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::cell::UnsafeCell; use std::sync::Arc; use anyhow::{anyhow, Context}; @@ -30,25 +31,30 @@ use crate::target::BenchError; #[derive(Default)] struct LocalRequestPicker { - id: usize, + id: UnsafeCell, } +unsafe impl Sync for LocalRequestPicker {} + impl LocalRequestPicker { fn set_id(&self, v: usize) { - unsafe { - let p = &self.id as *const usize as *mut usize; - *p = v; - } + let p = unsafe { &mut *self.id.get() }; + *p = v; + } + + fn get_id(&self) -> usize { + let p = unsafe { &*self.id.get() }; + *p } } impl DnsRequestPickState for LocalRequestPicker { fn pick_next(&self, max: usize) -> usize { - let next = self.id; - if self.id >= max { + let next = self.get_id(); + if next >= max { self.set_id(0); } else { - self.set_id(self.id + 1); + self.set_id(next + 1); } next } diff --git a/g3proxy/src/serve/http_proxy/task/connect/mod.rs b/g3proxy/src/serve/http_proxy/task/connect/mod.rs index c6651546..47cf9380 100644 --- a/g3proxy/src/serve/http_proxy/task/connect/mod.rs +++ b/g3proxy/src/serve/http_proxy/task/connect/mod.rs @@ -20,4 +20,4 @@ mod task; pub(super) use task::HttpProxyConnectTask; mod stats; -use stats::{TcpConnectTaskCltWrapperStats, TcpConnectTaskStats}; +use stats::TcpConnectTaskCltWrapperStats; diff --git a/g3proxy/src/serve/http_proxy/task/connect/stats/mod.rs b/g3proxy/src/serve/http_proxy/task/connect/stats/mod.rs index 5f0435a8..0ec71779 100644 --- a/g3proxy/src/serve/http_proxy/task/connect/stats/mod.rs +++ b/g3proxy/src/serve/http_proxy/task/connect/stats/mod.rs @@ -16,8 +16,6 @@ use super::HttpProxyServerStats; -mod task; mod wrapper; -pub(super) use task::TcpConnectTaskStats; pub(super) use wrapper::TcpConnectTaskCltWrapperStats; diff --git a/g3proxy/src/serve/http_proxy/task/connect/stats/task.rs b/g3proxy/src/serve/http_proxy/task/connect/stats/task.rs deleted file mode 100644 index a4c42358..00000000 --- a/g3proxy/src/serve/http_proxy/task/connect/stats/task.rs +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::sync::Arc; - -use g3_daemon::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats}; - -pub(crate) struct TcpConnectHalfConnectionStats { - bytes: u64, -} - -impl TcpConnectHalfConnectionStats { - fn new() -> Self { - TcpConnectHalfConnectionStats { bytes: 0 } - } - - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } -} - -pub(crate) struct TcpConnectConnectionStats { - pub(crate) read: TcpConnectHalfConnectionStats, - pub(crate) write: TcpConnectHalfConnectionStats, -} - -impl TcpConnectConnectionStats { - fn new() -> Self { - TcpConnectConnectionStats { - read: TcpConnectHalfConnectionStats::new(), - write: TcpConnectHalfConnectionStats::new(), - } - } -} - -pub(crate) struct TcpConnectTaskStats { - pub(crate) clt: TcpConnectConnectionStats, - pub(crate) ups: TcpConnectConnectionStats, -} - -impl TcpConnectTaskStats { - pub(crate) fn new() -> Self { - TcpConnectTaskStats { - clt: TcpConnectConnectionStats::new(), - ups: TcpConnectConnectionStats::new(), - } - } - - #[inline] - pub(crate) fn for_escaper(self: &Arc) -> ArcTcpConnectionTaskRemoteStats { - Arc::clone(self) as ArcTcpConnectionTaskRemoteStats - } -} - -impl TcpConnectionTaskRemoteStats for TcpConnectTaskStats { - fn add_read_bytes(&self, size: u64) { - self.ups.read.add_bytes(size); - } - - fn add_write_bytes(&self, size: u64) { - self.ups.write.add_bytes(size); - } -} diff --git a/g3proxy/src/serve/http_proxy/task/connect/stats/wrapper.rs b/g3proxy/src/serve/http_proxy/task/connect/stats/wrapper.rs index 75705751..4746080b 100644 --- a/g3proxy/src/serve/http_proxy/task/connect/stats/wrapper.rs +++ b/g3proxy/src/serve/http_proxy/task/connect/stats/wrapper.rs @@ -16,11 +16,12 @@ use std::sync::Arc; +use g3_daemon::stat::task::TcpStreamTaskStats; use g3_io_ext::{ ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats, }; -use super::{HttpProxyServerStats, TcpConnectTaskStats}; +use super::HttpProxyServerStats; use crate::auth::UserTrafficStats; trait TcpConnectTaskCltStatsWrapper { @@ -43,12 +44,12 @@ impl TcpConnectTaskCltStatsWrapper for UserTrafficStats { #[derive(Clone)] pub(crate) struct TcpConnectTaskCltWrapperStats { server: Arc, - task: Arc, + task: Arc, others: Vec, } impl TcpConnectTaskCltWrapperStats { - pub(crate) fn new(server: &Arc, task: &Arc) -> Self { + pub(crate) fn new(server: &Arc, task: &Arc) -> Self { TcpConnectTaskCltWrapperStats { server: Arc::clone(server), task: Arc::clone(task), diff --git a/g3proxy/src/serve/http_proxy/task/connect/task.rs b/g3proxy/src/serve/http_proxy/task/connect/task.rs index 7bbd353b..e5960508 100644 --- a/g3proxy/src/serve/http_proxy/task/connect/task.rs +++ b/g3proxy/src/serve/http_proxy/task/connect/task.rs @@ -20,12 +20,13 @@ use http::Version; use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::stat::task::TcpStreamTaskStats; use g3_io_ext::{LimitedReader, LimitedWriter}; use g3_types::acl::AclAction; use g3_types::net::ProxyRequestType; use super::protocol::{HttpClientWriter, HttpProxyRequest}; -use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats, TcpConnectTaskStats}; +use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats}; use crate::config::server::ServerConfig; use crate::inspect::StreamInspectContext; use crate::log::task::tcp_connect::TaskLogForTcpConnect; @@ -42,7 +43,7 @@ pub(crate) struct HttpProxyConnectTask { back_to_http: bool, task_notes: ServerTaskNotes, tcp_notes: TcpConnectTaskNotes, - task_stats: Arc, + task_stats: Arc, http_version: Version, } @@ -58,7 +59,7 @@ impl HttpProxyConnectTask { back_to_http: false, task_notes, tcp_notes: TcpConnectTaskNotes::new(req.upstream.clone()), - task_stats: Arc::new(TcpConnectTaskStats::new()), + task_stats: Arc::new(TcpStreamTaskStats::default()), http_version: req.inner.version, } } diff --git a/g3proxy/src/serve/http_proxy/task/forward/stats/task.rs b/g3proxy/src/serve/http_proxy/task/forward/stats/task.rs index 97d4439b..c1e42035 100644 --- a/g3proxy/src/serve/http_proxy/task/forward/stats/task.rs +++ b/g3proxy/src/serve/http_proxy/task/forward/stats/task.rs @@ -16,36 +16,14 @@ use std::sync::Arc; +use g3_daemon::stat::task::TcpStreamConnectionStats; + use crate::module::http_forward::{ArcHttpForwardTaskRemoteStats, HttpForwardTaskRemoteStats}; -#[derive(Default)] -pub(crate) struct HttpForwardHalfConnectionStats { - bytes: u64, -} - -impl HttpForwardHalfConnectionStats { - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } -} - -#[derive(Default)] -pub(crate) struct HttpForwardConnectionStats { - pub(crate) read: HttpForwardHalfConnectionStats, - pub(crate) write: HttpForwardHalfConnectionStats, -} - #[derive(Default)] pub(crate) struct HttpForwardTaskStats { - pub(crate) clt: HttpForwardConnectionStats, - pub(crate) ups: HttpForwardConnectionStats, + pub(crate) clt: TcpStreamConnectionStats, + pub(crate) ups: TcpStreamConnectionStats, } impl HttpForwardTaskStats { diff --git a/g3proxy/src/serve/http_proxy/task/ftp/stats/task.rs b/g3proxy/src/serve/http_proxy/task/ftp/stats/task.rs index bfb4d707..0e89c2ed 100644 --- a/g3proxy/src/serve/http_proxy/task/ftp/stats/task.rs +++ b/g3proxy/src/serve/http_proxy/task/ftp/stats/task.rs @@ -14,43 +14,21 @@ * limitations under the License. */ +use g3_daemon::stat::task::{TcpStreamConnectionStats, TcpStreamHalfConnectionStats}; + use crate::module::ftp_over_http::{FtpTaskRemoteControlStats, FtpTaskRemoteTransferStats}; -#[derive(Default)] -pub(crate) struct TcpHalfConnectionStats { - bytes: u64, -} - -impl TcpHalfConnectionStats { - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } -} - -#[derive(Default)] -pub(crate) struct FtpOverHttpClientStats { - pub(crate) read: TcpHalfConnectionStats, - pub(crate) write: TcpHalfConnectionStats, -} - #[derive(Default)] pub(crate) struct FtpOverHttpServerStats { - pub(crate) control_read: TcpHalfConnectionStats, - pub(crate) control_write: TcpHalfConnectionStats, - pub(crate) transfer_read: TcpHalfConnectionStats, - pub(crate) transfer_write: TcpHalfConnectionStats, + pub(crate) control_read: TcpStreamHalfConnectionStats, + pub(crate) control_write: TcpStreamHalfConnectionStats, + pub(crate) transfer_read: TcpStreamHalfConnectionStats, + pub(crate) transfer_write: TcpStreamHalfConnectionStats, } #[derive(Default)] pub(crate) struct FtpOverHttpTaskStats { - pub(crate) http_client: FtpOverHttpClientStats, + pub(crate) http_client: TcpStreamConnectionStats, pub(crate) ftp_server: FtpOverHttpServerStats, } diff --git a/g3proxy/src/serve/http_rproxy/task/forward/stats/task.rs b/g3proxy/src/serve/http_rproxy/task/forward/stats/task.rs index 97d4439b..c1e42035 100644 --- a/g3proxy/src/serve/http_rproxy/task/forward/stats/task.rs +++ b/g3proxy/src/serve/http_rproxy/task/forward/stats/task.rs @@ -16,36 +16,14 @@ use std::sync::Arc; +use g3_daemon::stat::task::TcpStreamConnectionStats; + use crate::module::http_forward::{ArcHttpForwardTaskRemoteStats, HttpForwardTaskRemoteStats}; -#[derive(Default)] -pub(crate) struct HttpForwardHalfConnectionStats { - bytes: u64, -} - -impl HttpForwardHalfConnectionStats { - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } -} - -#[derive(Default)] -pub(crate) struct HttpForwardConnectionStats { - pub(crate) read: HttpForwardHalfConnectionStats, - pub(crate) write: HttpForwardHalfConnectionStats, -} - #[derive(Default)] pub(crate) struct HttpForwardTaskStats { - pub(crate) clt: HttpForwardConnectionStats, - pub(crate) ups: HttpForwardConnectionStats, + pub(crate) clt: TcpStreamConnectionStats, + pub(crate) ups: TcpStreamConnectionStats, } impl HttpForwardTaskStats { diff --git a/g3proxy/src/serve/sni_proxy/task/accept/task.rs b/g3proxy/src/serve/sni_proxy/task/accept/task.rs index cab7209f..d7cad039 100644 --- a/g3proxy/src/serve/sni_proxy/task/accept/task.rs +++ b/g3proxy/src/serve/sni_proxy/task/accept/task.rs @@ -131,7 +131,7 @@ impl ClientHelloAcceptTask { protocol, final_upstream, self.time_accepted.elapsed(), - *self.pre_handshake_stats, + self.pre_handshake_stats.as_ref().clone(), ) .into_running(clt_r, clt_r_buf, clt_w) .await; @@ -148,7 +148,7 @@ impl ClientHelloAcceptTask { protocol, upstream, self.time_accepted.elapsed(), - *self.pre_handshake_stats, + self.pre_handshake_stats.as_ref().clone(), ) .into_running(clt_r, clt_r_buf, clt_w) .await; diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/mod.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/mod.rs index 64dcf3c5..3607cb0a 100644 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/mod.rs +++ b/g3proxy/src/serve/socks_proxy/task/tcp_connect/mod.rs @@ -20,4 +20,4 @@ mod task; pub(super) use task::SocksProxyTcpConnectTask; mod stats; -use stats::{TcpConnectTaskCltWrapperStats, TcpConnectTaskStats}; +use stats::TcpConnectTaskCltWrapperStats; diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/mod.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/mod.rs index 64ef2026..283bdaa3 100644 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/mod.rs +++ b/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/mod.rs @@ -16,8 +16,6 @@ use super::SocksProxyServerStats; -mod task; mod wrapper; -pub(super) use task::TcpConnectTaskStats; pub(super) use wrapper::TcpConnectTaskCltWrapperStats; diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/task.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/task.rs deleted file mode 100644 index a4c42358..00000000 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/task.rs +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2023 ByteDance and/or its affiliates. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -use std::sync::Arc; - -use g3_daemon::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats}; - -pub(crate) struct TcpConnectHalfConnectionStats { - bytes: u64, -} - -impl TcpConnectHalfConnectionStats { - fn new() -> Self { - TcpConnectHalfConnectionStats { bytes: 0 } - } - - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } -} - -pub(crate) struct TcpConnectConnectionStats { - pub(crate) read: TcpConnectHalfConnectionStats, - pub(crate) write: TcpConnectHalfConnectionStats, -} - -impl TcpConnectConnectionStats { - fn new() -> Self { - TcpConnectConnectionStats { - read: TcpConnectHalfConnectionStats::new(), - write: TcpConnectHalfConnectionStats::new(), - } - } -} - -pub(crate) struct TcpConnectTaskStats { - pub(crate) clt: TcpConnectConnectionStats, - pub(crate) ups: TcpConnectConnectionStats, -} - -impl TcpConnectTaskStats { - pub(crate) fn new() -> Self { - TcpConnectTaskStats { - clt: TcpConnectConnectionStats::new(), - ups: TcpConnectConnectionStats::new(), - } - } - - #[inline] - pub(crate) fn for_escaper(self: &Arc) -> ArcTcpConnectionTaskRemoteStats { - Arc::clone(self) as ArcTcpConnectionTaskRemoteStats - } -} - -impl TcpConnectionTaskRemoteStats for TcpConnectTaskStats { - fn add_read_bytes(&self, size: u64) { - self.ups.read.add_bytes(size); - } - - fn add_write_bytes(&self, size: u64) { - self.ups.write.add_bytes(size); - } -} diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/wrapper.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/wrapper.rs index 9f021935..4aa2563d 100644 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/wrapper.rs +++ b/g3proxy/src/serve/socks_proxy/task/tcp_connect/stats/wrapper.rs @@ -16,11 +16,12 @@ use std::sync::Arc; +use g3_daemon::stat::task::TcpStreamTaskStats; use g3_io_ext::{ ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats, }; -use super::{SocksProxyServerStats, TcpConnectTaskStats}; +use super::SocksProxyServerStats; use crate::auth::UserTrafficStats; trait TcpConnectTaskCltStatsWrapper { @@ -43,15 +44,12 @@ impl TcpConnectTaskCltStatsWrapper for UserTrafficStats { #[derive(Clone)] pub(crate) struct TcpConnectTaskCltWrapperStats { server: Arc, - task: Arc, + task: Arc, others: Vec, } impl TcpConnectTaskCltWrapperStats { - pub(crate) fn new( - server: &Arc, - task: &Arc, - ) -> Self { + pub(crate) fn new(server: &Arc, task: &Arc) -> Self { TcpConnectTaskCltWrapperStats { server: Arc::clone(server), task: Arc::clone(task), diff --git a/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs b/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs index 3bf5f45a..b6ed6385 100644 --- a/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/tcp_connect/task.rs @@ -20,12 +20,13 @@ use std::sync::Arc; use log::debug; use tokio::io::{AsyncRead, AsyncWrite}; +use g3_daemon::stat::task::TcpStreamTaskStats; use g3_io_ext::{LimitedReader, LimitedWriter}; use g3_socks::{v4a, v5, SocksVersion}; use g3_types::acl::AclAction; use g3_types::net::{ProxyRequestType, UpstreamAddr}; -use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats, TcpConnectTaskStats}; +use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats}; use crate::config::server::ServerConfig; use crate::inspect::StreamInspectContext; use crate::log::task::tcp_connect::TaskLogForTcpConnect; @@ -40,7 +41,7 @@ pub(crate) struct SocksProxyTcpConnectTask { ctx: CommonTaskContext, task_notes: ServerTaskNotes, tcp_notes: TcpConnectTaskNotes, - task_stats: Arc, + task_stats: Arc, } impl SocksProxyTcpConnectTask { @@ -62,7 +63,7 @@ impl SocksProxyTcpConnectTask { ctx, task_notes, tcp_notes: TcpConnectTaskNotes::new(upstream), - task_stats: Arc::new(TcpConnectTaskStats::new()), + task_stats: Arc::new(TcpStreamTaskStats::default()), } } diff --git a/g3proxy/src/serve/socks_proxy/task/udp_associate/stats/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_associate/stats/task.rs index 32ac8aaa..c4f719a4 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_associate/stats/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_associate/stats/task.rs @@ -17,42 +17,14 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use g3_daemon::stat::task::UdpConnectHalfConnectionStats; + use crate::module::udp_relay::{ArcUdpRelayTaskRemoteStats, UdpRelayTaskRemoteStats}; -#[derive(Default)] -pub(crate) struct UdpAssociateClientSideHalfStats { - bytes: u64, - packets: u64, -} - -impl UdpAssociateClientSideHalfStats { - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn get_packets(&self) -> u64 { - self.packets - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } - - pub(crate) fn add_packet(&self) { - unsafe { - let r = &self.packets as *const u64 as *mut u64; - *r += 1; - } - } -} - #[derive(Default)] pub(crate) struct UdpAssociateClientSideStats { - pub(crate) recv: UdpAssociateClientSideHalfStats, - pub(crate) send: UdpAssociateClientSideHalfStats, + pub(crate) recv: UdpConnectHalfConnectionStats, + pub(crate) send: UdpConnectHalfConnectionStats, } #[derive(Default)] diff --git a/g3proxy/src/serve/socks_proxy/task/udp_connect/stats/task.rs b/g3proxy/src/serve/socks_proxy/task/udp_connect/stats/task.rs index 437e1685..418ea2ef 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_connect/stats/task.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_connect/stats/task.rs @@ -16,54 +16,14 @@ use std::sync::Arc; +use g3_daemon::stat::task::UdpConnectConnectionStats; + use crate::module::udp_connect::{ArcUdpConnectTaskRemoteStats, UdpConnectTaskRemoteStats}; -#[derive(Default)] -pub(crate) struct UdpConnectHalfStats { - bytes: u64, - packets: u64, -} - -impl UdpConnectHalfStats { - pub(crate) fn get_bytes(&self) -> u64 { - self.bytes - } - - pub(crate) fn get_packets(&self) -> u64 { - self.packets - } - - pub(crate) fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; - } - } - - pub(crate) fn add_packet(&self) { - unsafe { - let r = &self.packets as *const u64 as *mut u64; - *r += 1; - } - } -} - -#[derive(Default)] -pub(crate) struct UdpConnectClientSideStats { - pub(crate) recv: UdpConnectHalfStats, - pub(crate) send: UdpConnectHalfStats, -} - -#[derive(Default)] -pub(crate) struct UdpConnectRemoteSideStats { - pub(crate) recv: UdpConnectHalfStats, - pub(crate) send: UdpConnectHalfStats, -} - #[derive(Default)] pub(crate) struct UdpConnectTaskStats { - pub(crate) clt: UdpConnectClientSideStats, - pub(crate) ups: UdpConnectRemoteSideStats, + pub(crate) clt: UdpConnectConnectionStats, + pub(crate) ups: UdpConnectConnectionStats, } impl UdpConnectTaskStats { diff --git a/g3tiles/src/serve/openssl_proxy/task/relay/task.rs b/g3tiles/src/serve/openssl_proxy/task/relay/task.rs index 90e3347a..ae0dec09 100644 --- a/g3tiles/src/serve/openssl_proxy/task/relay/task.rs +++ b/g3tiles/src/serve/openssl_proxy/task/relay/task.rs @@ -54,7 +54,9 @@ impl OpensslRelayTask { host, service, task_notes, - task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(*pre_handshake_stats)), + task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats( + pre_handshake_stats.as_ref().clone(), + )), } } diff --git a/g3tiles/src/serve/rustls_proxy/task/relay/task.rs b/g3tiles/src/serve/rustls_proxy/task/relay/task.rs index 86ff9d04..e9a3298f 100644 --- a/g3tiles/src/serve/rustls_proxy/task/relay/task.rs +++ b/g3tiles/src/serve/rustls_proxy/task/relay/task.rs @@ -54,7 +54,9 @@ impl RustlsRelayTask { host, service, task_notes, - task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(*pre_handshake_stats)), + task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats( + pre_handshake_stats.as_ref().clone(), + )), } } diff --git a/lib/g3-daemon/src/stat/task/mod.rs b/lib/g3-daemon/src/stat/task/mod.rs index dab1f722..19cd21f9 100644 --- a/lib/g3-daemon/src/stat/task/mod.rs +++ b/lib/g3-daemon/src/stat/task/mod.rs @@ -15,4 +15,7 @@ */ mod tcp_stream; -pub use tcp_stream::{TcpStreamConnectionStats, TcpStreamTaskStats}; +pub use tcp_stream::{TcpStreamConnectionStats, TcpStreamHalfConnectionStats, TcpStreamTaskStats}; + +mod udp_connect; +pub use udp_connect::{UdpConnectConnectionStats, UdpConnectHalfConnectionStats}; diff --git a/lib/g3-daemon/src/stat/task/tcp_stream.rs b/lib/g3-daemon/src/stat/task/tcp_stream.rs index 49e84eca..3f298489 100644 --- a/lib/g3-daemon/src/stat/task/tcp_stream.rs +++ b/lib/g3-daemon/src/stat/task/tcp_stream.rs @@ -14,29 +14,39 @@ * limitations under the License. */ +use std::cell::UnsafeCell; use std::sync::Arc; use crate::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats}; -#[derive(Copy, Clone, Default)] +#[derive(Default)] pub struct TcpStreamHalfConnectionStats { - bytes: u64, + bytes: UnsafeCell, } -impl TcpStreamHalfConnectionStats { - pub fn get_bytes(&self) -> u64 { - self.bytes - } +unsafe impl Sync for TcpStreamHalfConnectionStats {} - pub fn add_bytes(&self, size: u64) { - unsafe { - let r = &self.bytes as *const u64 as *mut u64; - *r += size; +impl Clone for TcpStreamHalfConnectionStats { + fn clone(&self) -> Self { + TcpStreamHalfConnectionStats { + bytes: UnsafeCell::new(self.get_bytes()), } } } -#[derive(Copy, Clone, Default)] +impl TcpStreamHalfConnectionStats { + pub fn get_bytes(&self) -> u64 { + let r = unsafe { &*self.bytes.get() }; + *r + } + + pub fn add_bytes(&self, size: u64) { + let r = unsafe { &mut *self.bytes.get() }; + *r += size; + } +} + +#[derive(Clone, Default)] pub struct TcpStreamConnectionStats { pub read: TcpStreamHalfConnectionStats, pub write: TcpStreamHalfConnectionStats, diff --git a/lib/g3-daemon/src/stat/task/udp_connect.rs b/lib/g3-daemon/src/stat/task/udp_connect.rs new file mode 100644 index 00000000..95158344 --- /dev/null +++ b/lib/g3-daemon/src/stat/task/udp_connect.rs @@ -0,0 +1,53 @@ +/* + * Copyright 2023 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::cell::UnsafeCell; + +#[derive(Default)] +pub struct UdpConnectHalfConnectionStats { + bytes: UnsafeCell, + packets: UnsafeCell, +} + +unsafe impl Sync for UdpConnectHalfConnectionStats {} + +impl UdpConnectHalfConnectionStats { + pub fn get_bytes(&self) -> u64 { + let r = unsafe { &*self.bytes.get() }; + *r + } + + pub fn get_packets(&self) -> u64 { + let r = unsafe { &*self.packets.get() }; + *r + } + + pub fn add_bytes(&self, size: u64) { + let r = unsafe { &mut *self.bytes.get() }; + *r += size; + } + + pub fn add_packet(&self) { + let r = unsafe { &mut *self.packets.get() }; + *r += 1; + } +} + +#[derive(Default)] +pub struct UdpConnectConnectionStats { + pub recv: UdpConnectHalfConnectionStats, + pub send: UdpConnectHalfConnectionStats, +} diff --git a/lib/g3-types/src/stats/mod.rs b/lib/g3-types/src/stats/mod.rs index b3980048..390910e5 100644 --- a/lib/g3-types/src/stats/mod.rs +++ b/lib/g3-types/src/stats/mod.rs @@ -14,6 +14,33 @@ * limitations under the License. */ +macro_rules! impl_per_thread_unsafe_add_size { + ($method:ident, $field:ident) => { + fn $method(&self, size: u64) { + let r = unsafe { &mut *self.$field.get() }; + *r += size; + } + }; +} + +macro_rules! impl_per_thread_unsafe_add_packet { + ($method:ident, $field:ident) => { + fn $method(&self) { + let r = unsafe { &mut *self.$field.get() }; + *r += 1; + } + }; +} + +macro_rules! impl_per_thread_unsafe_get { + ($method:ident, $field:ident, $r:ty) => { + fn $method(&self) -> $r { + let r = unsafe { &*self.$field.get() }; + *r + } + }; +} + mod id; pub use id::StatId; diff --git a/lib/g3-types/src/stats/tcp.rs b/lib/g3-types/src/stats/tcp.rs index 09a83239..1a2e43e3 100644 --- a/lib/g3-types/src/stats/tcp.rs +++ b/lib/g3-types/src/stats/tcp.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::cell::UnsafeCell; use std::ops; use std::sync::atomic::{AtomicU64, Ordering}; @@ -61,31 +62,23 @@ impl TcpIoStats { } } -#[derive(Clone, Copy, Default)] +#[derive(Default)] struct PerThreadTcpIoStats { - in_bytes: u64, - out_bytes: u64, + in_bytes: UnsafeCell, + out_bytes: UnsafeCell, } impl PerThreadTcpIoStats { - fn add_in_bytes(&self, size: u64) { - unsafe { - let r = &self.in_bytes as *const u64 as *mut u64; - *r += size; - } - } + impl_per_thread_unsafe_add_size!(add_in_bytes, in_bytes); + impl_per_thread_unsafe_add_size!(add_out_bytes, out_bytes); - fn add_out_bytes(&self, size: u64) { - unsafe { - let r = &self.out_bytes as *const u64 as *mut u64; - *r += size; - } - } + impl_per_thread_unsafe_get!(get_in_bytes, in_bytes, u64); + impl_per_thread_unsafe_get!(get_out_bytes, out_bytes, u64); fn snapshot(&self) -> TcpIoSnapshot { TcpIoSnapshot { - in_bytes: self.in_bytes, - out_bytes: self.out_bytes, + in_bytes: self.get_in_bytes(), + out_bytes: self.get_out_bytes(), } } } @@ -97,9 +90,13 @@ pub struct ThreadedTcpIoStats { impl ThreadedTcpIoStats { pub fn new(thread_count: usize) -> Self { + let mut p = Vec::with_capacity(thread_count); + for _ in 0..thread_count { + p.push(PerThreadTcpIoStats::default()); + } ThreadedTcpIoStats { a: TcpIoStats::default(), - p: vec![PerThreadTcpIoStats::default(); thread_count], + p, } } @@ -116,7 +113,7 @@ impl ThreadedTcpIoStats { pub fn get_in_bytes(&self) -> u64 { self.p .iter() - .map(|x| x.in_bytes) + .map(|x| x.get_in_bytes()) .fold(self.a.get_in_bytes(), |acc, x| acc + x) } diff --git a/lib/g3-types/src/stats/udp.rs b/lib/g3-types/src/stats/udp.rs index 9d611c3c..9e4db5d9 100644 --- a/lib/g3-types/src/stats/udp.rs +++ b/lib/g3-types/src/stats/udp.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +use std::cell::UnsafeCell; use std::ops; use std::sync::atomic::{AtomicU64, Ordering}; @@ -73,49 +74,31 @@ impl UdpIoStats { } } -#[derive(Default, Clone, Copy)] +#[derive(Default)] struct PerThreadUdpIoStats { - in_packets: u64, - in_bytes: u64, - out_packets: u64, - out_bytes: u64, + in_packets: UnsafeCell, + in_bytes: UnsafeCell, + out_packets: UnsafeCell, + out_bytes: UnsafeCell, } impl PerThreadUdpIoStats { - fn add_in_packet(&self) { - unsafe { - let r = &self.in_packets as *const u64 as *mut u64; - *r += 1; - } - } + impl_per_thread_unsafe_add_size!(add_in_bytes, in_bytes); + impl_per_thread_unsafe_add_packet!(add_in_packet, in_packets); + impl_per_thread_unsafe_add_size!(add_out_bytes, out_bytes); + impl_per_thread_unsafe_add_packet!(add_out_packet, out_packets); - fn add_in_bytes(&self, size: u64) { - unsafe { - let r = &self.in_bytes as *const u64 as *mut u64; - *r += size; - } - } - - fn add_out_packet(&self) { - unsafe { - let r = &self.out_packets as *const u64 as *mut u64; - *r += 1; - } - } - - fn add_out_bytes(&self, size: u64) { - unsafe { - let r = &self.out_bytes as *const u64 as *mut u64; - *r += size; - } - } + impl_per_thread_unsafe_get!(get_in_bytes, in_bytes, u64); + impl_per_thread_unsafe_get!(get_in_packets, in_packets, u64); + impl_per_thread_unsafe_get!(get_out_bytes, out_bytes, u64); + impl_per_thread_unsafe_get!(get_out_packets, out_packets, u64); fn snapshot(&self) -> UdpIoSnapshot { UdpIoSnapshot { - in_packets: self.in_packets, - in_bytes: self.in_bytes, - out_packets: self.out_packets, - out_bytes: self.out_bytes, + in_packets: self.get_in_packets(), + in_bytes: self.get_in_bytes(), + out_packets: self.get_out_packets(), + out_bytes: self.get_out_bytes(), } } } @@ -127,9 +110,13 @@ pub struct ThreadedUdpIoStats { impl ThreadedUdpIoStats { pub fn new(thread_count: usize) -> Self { + let mut p = Vec::with_capacity(thread_count); + for _ in 0..thread_count { + p.push(PerThreadUdpIoStats::default()); + } ThreadedUdpIoStats { a: UdpIoStats::default(), - p: vec![PerThreadUdpIoStats::default(); thread_count], + p, } }