use UnsafeCell instead of direct casting

This commit is contained in:
Zhang Jingqiang 2023-08-03 12:13:08 +08:00
parent 0796e4c349
commit 7f3d4ea202
27 changed files with 217 additions and 460 deletions

View file

@ -46,7 +46,7 @@ async fn process_socket(mut clt_stream: TcpStream) -> io::Result<()> {
let (clt_r, clt_w) = clt_stream.split();
let (ups_r, ups_w) = ups_stream.split();
let task_stats = Arc::new(TaskStats::new());
let task_stats = Arc::new(TaskStats::default());
let (clt_r_stats, clt_w_stats) = CltStats::new_pair(Arc::clone(&task_stats));
let mut clt_r = LimitedReader::new(clt_r, *SHIFT_MILLIS, *MAX_BYTES, clt_r_stats);

View file

@ -14,72 +14,45 @@
* limitations under the License.
*/
use std::cell::UnsafeCell;
use std::sync::Arc;
use g3_io_ext::{
ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats,
};
#[derive(Debug)]
#[derive(Debug, Default)]
struct HalfConnectionStats {
bytes: u64,
#[allow(unused)]
delay: u64,
bytes: UnsafeCell<u64>,
}
unsafe impl Sync for HalfConnectionStats {}
impl HalfConnectionStats {
fn new() -> Self {
HalfConnectionStats { bytes: 0, delay: 0 }
}
fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
let r = unsafe { &mut *self.bytes.get() };
*r += size;
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
struct ConnectionStats {
read: HalfConnectionStats,
write: HalfConnectionStats,
}
impl ConnectionStats {
fn new() -> Self {
ConnectionStats {
read: HalfConnectionStats::new(),
write: HalfConnectionStats::new(),
}
}
}
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct TaskStats {
clt: ConnectionStats,
ups: ConnectionStats,
}
impl TaskStats {
pub fn new() -> Self {
TaskStats {
clt: ConnectionStats::new(),
ups: ConnectionStats::new(),
}
}
fn print(&self) {
println!("{self:?}");
}
}
impl Default for TaskStats {
fn default() -> Self {
Self::new()
}
}
impl Drop for TaskStats {
fn drop(&mut self) {
self.print()

View file

@ -14,6 +14,7 @@
* limitations under the License.
*/
use std::cell::UnsafeCell;
use std::sync::Arc;
use anyhow::{anyhow, Context};
@ -30,25 +31,30 @@ use crate::target::BenchError;
#[derive(Default)]
struct LocalRequestPicker {
id: usize,
id: UnsafeCell<usize>,
}
unsafe impl Sync for LocalRequestPicker {}
impl LocalRequestPicker {
fn set_id(&self, v: usize) {
unsafe {
let p = &self.id as *const usize as *mut usize;
*p = v;
}
let p = unsafe { &mut *self.id.get() };
*p = v;
}
fn get_id(&self) -> usize {
let p = unsafe { &*self.id.get() };
*p
}
}
impl DnsRequestPickState for LocalRequestPicker {
fn pick_next(&self, max: usize) -> usize {
let next = self.id;
if self.id >= max {
let next = self.get_id();
if next >= max {
self.set_id(0);
} else {
self.set_id(self.id + 1);
self.set_id(next + 1);
}
next
}

View file

@ -20,4 +20,4 @@ mod task;
pub(super) use task::HttpProxyConnectTask;
mod stats;
use stats::{TcpConnectTaskCltWrapperStats, TcpConnectTaskStats};
use stats::TcpConnectTaskCltWrapperStats;

View file

@ -16,8 +16,6 @@
use super::HttpProxyServerStats;
mod task;
mod wrapper;
pub(super) use task::TcpConnectTaskStats;
pub(super) use wrapper::TcpConnectTaskCltWrapperStats;

View file

@ -1,83 +0,0 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;
use g3_daemon::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats};
pub(crate) struct TcpConnectHalfConnectionStats {
bytes: u64,
}
impl TcpConnectHalfConnectionStats {
fn new() -> Self {
TcpConnectHalfConnectionStats { bytes: 0 }
}
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
}
pub(crate) struct TcpConnectConnectionStats {
pub(crate) read: TcpConnectHalfConnectionStats,
pub(crate) write: TcpConnectHalfConnectionStats,
}
impl TcpConnectConnectionStats {
fn new() -> Self {
TcpConnectConnectionStats {
read: TcpConnectHalfConnectionStats::new(),
write: TcpConnectHalfConnectionStats::new(),
}
}
}
pub(crate) struct TcpConnectTaskStats {
pub(crate) clt: TcpConnectConnectionStats,
pub(crate) ups: TcpConnectConnectionStats,
}
impl TcpConnectTaskStats {
pub(crate) fn new() -> Self {
TcpConnectTaskStats {
clt: TcpConnectConnectionStats::new(),
ups: TcpConnectConnectionStats::new(),
}
}
#[inline]
pub(crate) fn for_escaper(self: &Arc<Self>) -> ArcTcpConnectionTaskRemoteStats {
Arc::clone(self) as ArcTcpConnectionTaskRemoteStats
}
}
impl TcpConnectionTaskRemoteStats for TcpConnectTaskStats {
fn add_read_bytes(&self, size: u64) {
self.ups.read.add_bytes(size);
}
fn add_write_bytes(&self, size: u64) {
self.ups.write.add_bytes(size);
}
}

View file

@ -16,11 +16,12 @@
use std::sync::Arc;
use g3_daemon::stat::task::TcpStreamTaskStats;
use g3_io_ext::{
ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats,
};
use super::{HttpProxyServerStats, TcpConnectTaskStats};
use super::HttpProxyServerStats;
use crate::auth::UserTrafficStats;
trait TcpConnectTaskCltStatsWrapper {
@ -43,12 +44,12 @@ impl TcpConnectTaskCltStatsWrapper for UserTrafficStats {
#[derive(Clone)]
pub(crate) struct TcpConnectTaskCltWrapperStats {
server: Arc<HttpProxyServerStats>,
task: Arc<TcpConnectTaskStats>,
task: Arc<TcpStreamTaskStats>,
others: Vec<ArcTcpConnectTaskCltStatsWrapper>,
}
impl TcpConnectTaskCltWrapperStats {
pub(crate) fn new(server: &Arc<HttpProxyServerStats>, task: &Arc<TcpConnectTaskStats>) -> Self {
pub(crate) fn new(server: &Arc<HttpProxyServerStats>, task: &Arc<TcpStreamTaskStats>) -> Self {
TcpConnectTaskCltWrapperStats {
server: Arc::clone(server),
task: Arc::clone(task),

View file

@ -20,12 +20,13 @@ use http::Version;
use log::debug;
use tokio::io::{AsyncRead, AsyncWrite};
use g3_daemon::stat::task::TcpStreamTaskStats;
use g3_io_ext::{LimitedReader, LimitedWriter};
use g3_types::acl::AclAction;
use g3_types::net::ProxyRequestType;
use super::protocol::{HttpClientWriter, HttpProxyRequest};
use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats, TcpConnectTaskStats};
use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats};
use crate::config::server::ServerConfig;
use crate::inspect::StreamInspectContext;
use crate::log::task::tcp_connect::TaskLogForTcpConnect;
@ -42,7 +43,7 @@ pub(crate) struct HttpProxyConnectTask {
back_to_http: bool,
task_notes: ServerTaskNotes,
tcp_notes: TcpConnectTaskNotes,
task_stats: Arc<TcpConnectTaskStats>,
task_stats: Arc<TcpStreamTaskStats>,
http_version: Version,
}
@ -58,7 +59,7 @@ impl HttpProxyConnectTask {
back_to_http: false,
task_notes,
tcp_notes: TcpConnectTaskNotes::new(req.upstream.clone()),
task_stats: Arc::new(TcpConnectTaskStats::new()),
task_stats: Arc::new(TcpStreamTaskStats::default()),
http_version: req.inner.version,
}
}

View file

@ -16,36 +16,14 @@
use std::sync::Arc;
use g3_daemon::stat::task::TcpStreamConnectionStats;
use crate::module::http_forward::{ArcHttpForwardTaskRemoteStats, HttpForwardTaskRemoteStats};
#[derive(Default)]
pub(crate) struct HttpForwardHalfConnectionStats {
bytes: u64,
}
impl HttpForwardHalfConnectionStats {
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
}
#[derive(Default)]
pub(crate) struct HttpForwardConnectionStats {
pub(crate) read: HttpForwardHalfConnectionStats,
pub(crate) write: HttpForwardHalfConnectionStats,
}
#[derive(Default)]
pub(crate) struct HttpForwardTaskStats {
pub(crate) clt: HttpForwardConnectionStats,
pub(crate) ups: HttpForwardConnectionStats,
pub(crate) clt: TcpStreamConnectionStats,
pub(crate) ups: TcpStreamConnectionStats,
}
impl HttpForwardTaskStats {

View file

@ -14,43 +14,21 @@
* limitations under the License.
*/
use g3_daemon::stat::task::{TcpStreamConnectionStats, TcpStreamHalfConnectionStats};
use crate::module::ftp_over_http::{FtpTaskRemoteControlStats, FtpTaskRemoteTransferStats};
#[derive(Default)]
pub(crate) struct TcpHalfConnectionStats {
bytes: u64,
}
impl TcpHalfConnectionStats {
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
}
#[derive(Default)]
pub(crate) struct FtpOverHttpClientStats {
pub(crate) read: TcpHalfConnectionStats,
pub(crate) write: TcpHalfConnectionStats,
}
#[derive(Default)]
pub(crate) struct FtpOverHttpServerStats {
pub(crate) control_read: TcpHalfConnectionStats,
pub(crate) control_write: TcpHalfConnectionStats,
pub(crate) transfer_read: TcpHalfConnectionStats,
pub(crate) transfer_write: TcpHalfConnectionStats,
pub(crate) control_read: TcpStreamHalfConnectionStats,
pub(crate) control_write: TcpStreamHalfConnectionStats,
pub(crate) transfer_read: TcpStreamHalfConnectionStats,
pub(crate) transfer_write: TcpStreamHalfConnectionStats,
}
#[derive(Default)]
pub(crate) struct FtpOverHttpTaskStats {
pub(crate) http_client: FtpOverHttpClientStats,
pub(crate) http_client: TcpStreamConnectionStats,
pub(crate) ftp_server: FtpOverHttpServerStats,
}

View file

@ -16,36 +16,14 @@
use std::sync::Arc;
use g3_daemon::stat::task::TcpStreamConnectionStats;
use crate::module::http_forward::{ArcHttpForwardTaskRemoteStats, HttpForwardTaskRemoteStats};
#[derive(Default)]
pub(crate) struct HttpForwardHalfConnectionStats {
bytes: u64,
}
impl HttpForwardHalfConnectionStats {
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
}
#[derive(Default)]
pub(crate) struct HttpForwardConnectionStats {
pub(crate) read: HttpForwardHalfConnectionStats,
pub(crate) write: HttpForwardHalfConnectionStats,
}
#[derive(Default)]
pub(crate) struct HttpForwardTaskStats {
pub(crate) clt: HttpForwardConnectionStats,
pub(crate) ups: HttpForwardConnectionStats,
pub(crate) clt: TcpStreamConnectionStats,
pub(crate) ups: TcpStreamConnectionStats,
}
impl HttpForwardTaskStats {

View file

@ -131,7 +131,7 @@ impl ClientHelloAcceptTask {
protocol,
final_upstream,
self.time_accepted.elapsed(),
*self.pre_handshake_stats,
self.pre_handshake_stats.as_ref().clone(),
)
.into_running(clt_r, clt_r_buf, clt_w)
.await;
@ -148,7 +148,7 @@ impl ClientHelloAcceptTask {
protocol,
upstream,
self.time_accepted.elapsed(),
*self.pre_handshake_stats,
self.pre_handshake_stats.as_ref().clone(),
)
.into_running(clt_r, clt_r_buf, clt_w)
.await;

View file

@ -20,4 +20,4 @@ mod task;
pub(super) use task::SocksProxyTcpConnectTask;
mod stats;
use stats::{TcpConnectTaskCltWrapperStats, TcpConnectTaskStats};
use stats::TcpConnectTaskCltWrapperStats;

View file

@ -16,8 +16,6 @@
use super::SocksProxyServerStats;
mod task;
mod wrapper;
pub(super) use task::TcpConnectTaskStats;
pub(super) use wrapper::TcpConnectTaskCltWrapperStats;

View file

@ -1,83 +0,0 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::sync::Arc;
use g3_daemon::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats};
pub(crate) struct TcpConnectHalfConnectionStats {
bytes: u64,
}
impl TcpConnectHalfConnectionStats {
fn new() -> Self {
TcpConnectHalfConnectionStats { bytes: 0 }
}
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
}
pub(crate) struct TcpConnectConnectionStats {
pub(crate) read: TcpConnectHalfConnectionStats,
pub(crate) write: TcpConnectHalfConnectionStats,
}
impl TcpConnectConnectionStats {
fn new() -> Self {
TcpConnectConnectionStats {
read: TcpConnectHalfConnectionStats::new(),
write: TcpConnectHalfConnectionStats::new(),
}
}
}
pub(crate) struct TcpConnectTaskStats {
pub(crate) clt: TcpConnectConnectionStats,
pub(crate) ups: TcpConnectConnectionStats,
}
impl TcpConnectTaskStats {
pub(crate) fn new() -> Self {
TcpConnectTaskStats {
clt: TcpConnectConnectionStats::new(),
ups: TcpConnectConnectionStats::new(),
}
}
#[inline]
pub(crate) fn for_escaper(self: &Arc<Self>) -> ArcTcpConnectionTaskRemoteStats {
Arc::clone(self) as ArcTcpConnectionTaskRemoteStats
}
}
impl TcpConnectionTaskRemoteStats for TcpConnectTaskStats {
fn add_read_bytes(&self, size: u64) {
self.ups.read.add_bytes(size);
}
fn add_write_bytes(&self, size: u64) {
self.ups.write.add_bytes(size);
}
}

View file

@ -16,11 +16,12 @@
use std::sync::Arc;
use g3_daemon::stat::task::TcpStreamTaskStats;
use g3_io_ext::{
ArcLimitedReaderStats, ArcLimitedWriterStats, LimitedReaderStats, LimitedWriterStats,
};
use super::{SocksProxyServerStats, TcpConnectTaskStats};
use super::SocksProxyServerStats;
use crate::auth::UserTrafficStats;
trait TcpConnectTaskCltStatsWrapper {
@ -43,15 +44,12 @@ impl TcpConnectTaskCltStatsWrapper for UserTrafficStats {
#[derive(Clone)]
pub(crate) struct TcpConnectTaskCltWrapperStats {
server: Arc<SocksProxyServerStats>,
task: Arc<TcpConnectTaskStats>,
task: Arc<TcpStreamTaskStats>,
others: Vec<ArcTcpConnectTaskCltStatsWrapper>,
}
impl TcpConnectTaskCltWrapperStats {
pub(crate) fn new(
server: &Arc<SocksProxyServerStats>,
task: &Arc<TcpConnectTaskStats>,
) -> Self {
pub(crate) fn new(server: &Arc<SocksProxyServerStats>, task: &Arc<TcpStreamTaskStats>) -> Self {
TcpConnectTaskCltWrapperStats {
server: Arc::clone(server),
task: Arc::clone(task),

View file

@ -20,12 +20,13 @@ use std::sync::Arc;
use log::debug;
use tokio::io::{AsyncRead, AsyncWrite};
use g3_daemon::stat::task::TcpStreamTaskStats;
use g3_io_ext::{LimitedReader, LimitedWriter};
use g3_socks::{v4a, v5, SocksVersion};
use g3_types::acl::AclAction;
use g3_types::net::{ProxyRequestType, UpstreamAddr};
use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats, TcpConnectTaskStats};
use super::{CommonTaskContext, TcpConnectTaskCltWrapperStats};
use crate::config::server::ServerConfig;
use crate::inspect::StreamInspectContext;
use crate::log::task::tcp_connect::TaskLogForTcpConnect;
@ -40,7 +41,7 @@ pub(crate) struct SocksProxyTcpConnectTask {
ctx: CommonTaskContext,
task_notes: ServerTaskNotes,
tcp_notes: TcpConnectTaskNotes,
task_stats: Arc<TcpConnectTaskStats>,
task_stats: Arc<TcpStreamTaskStats>,
}
impl SocksProxyTcpConnectTask {
@ -62,7 +63,7 @@ impl SocksProxyTcpConnectTask {
ctx,
task_notes,
tcp_notes: TcpConnectTaskNotes::new(upstream),
task_stats: Arc::new(TcpConnectTaskStats::new()),
task_stats: Arc::new(TcpStreamTaskStats::default()),
}
}

View file

@ -17,42 +17,14 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use g3_daemon::stat::task::UdpConnectHalfConnectionStats;
use crate::module::udp_relay::{ArcUdpRelayTaskRemoteStats, UdpRelayTaskRemoteStats};
#[derive(Default)]
pub(crate) struct UdpAssociateClientSideHalfStats {
bytes: u64,
packets: u64,
}
impl UdpAssociateClientSideHalfStats {
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn get_packets(&self) -> u64 {
self.packets
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
pub(crate) fn add_packet(&self) {
unsafe {
let r = &self.packets as *const u64 as *mut u64;
*r += 1;
}
}
}
#[derive(Default)]
pub(crate) struct UdpAssociateClientSideStats {
pub(crate) recv: UdpAssociateClientSideHalfStats,
pub(crate) send: UdpAssociateClientSideHalfStats,
pub(crate) recv: UdpConnectHalfConnectionStats,
pub(crate) send: UdpConnectHalfConnectionStats,
}
#[derive(Default)]

View file

@ -16,54 +16,14 @@
use std::sync::Arc;
use g3_daemon::stat::task::UdpConnectConnectionStats;
use crate::module::udp_connect::{ArcUdpConnectTaskRemoteStats, UdpConnectTaskRemoteStats};
#[derive(Default)]
pub(crate) struct UdpConnectHalfStats {
bytes: u64,
packets: u64,
}
impl UdpConnectHalfStats {
pub(crate) fn get_bytes(&self) -> u64 {
self.bytes
}
pub(crate) fn get_packets(&self) -> u64 {
self.packets
}
pub(crate) fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
}
}
pub(crate) fn add_packet(&self) {
unsafe {
let r = &self.packets as *const u64 as *mut u64;
*r += 1;
}
}
}
#[derive(Default)]
pub(crate) struct UdpConnectClientSideStats {
pub(crate) recv: UdpConnectHalfStats,
pub(crate) send: UdpConnectHalfStats,
}
#[derive(Default)]
pub(crate) struct UdpConnectRemoteSideStats {
pub(crate) recv: UdpConnectHalfStats,
pub(crate) send: UdpConnectHalfStats,
}
#[derive(Default)]
pub(crate) struct UdpConnectTaskStats {
pub(crate) clt: UdpConnectClientSideStats,
pub(crate) ups: UdpConnectRemoteSideStats,
pub(crate) clt: UdpConnectConnectionStats,
pub(crate) ups: UdpConnectConnectionStats,
}
impl UdpConnectTaskStats {

View file

@ -54,7 +54,9 @@ impl OpensslRelayTask {
host,
service,
task_notes,
task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(*pre_handshake_stats)),
task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(
pre_handshake_stats.as_ref().clone(),
)),
}
}

View file

@ -54,7 +54,9 @@ impl RustlsRelayTask {
host,
service,
task_notes,
task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(*pre_handshake_stats)),
task_stats: Arc::new(TcpStreamTaskStats::with_clt_stats(
pre_handshake_stats.as_ref().clone(),
)),
}
}

View file

@ -15,4 +15,7 @@
*/
mod tcp_stream;
pub use tcp_stream::{TcpStreamConnectionStats, TcpStreamTaskStats};
pub use tcp_stream::{TcpStreamConnectionStats, TcpStreamHalfConnectionStats, TcpStreamTaskStats};
mod udp_connect;
pub use udp_connect::{UdpConnectConnectionStats, UdpConnectHalfConnectionStats};

View file

@ -14,29 +14,39 @@
* limitations under the License.
*/
use std::cell::UnsafeCell;
use std::sync::Arc;
use crate::stat::remote::{ArcTcpConnectionTaskRemoteStats, TcpConnectionTaskRemoteStats};
#[derive(Copy, Clone, Default)]
#[derive(Default)]
pub struct TcpStreamHalfConnectionStats {
bytes: u64,
bytes: UnsafeCell<u64>,
}
impl TcpStreamHalfConnectionStats {
pub fn get_bytes(&self) -> u64 {
self.bytes
}
unsafe impl Sync for TcpStreamHalfConnectionStats {}
pub fn add_bytes(&self, size: u64) {
unsafe {
let r = &self.bytes as *const u64 as *mut u64;
*r += size;
impl Clone for TcpStreamHalfConnectionStats {
fn clone(&self) -> Self {
TcpStreamHalfConnectionStats {
bytes: UnsafeCell::new(self.get_bytes()),
}
}
}
#[derive(Copy, Clone, Default)]
impl TcpStreamHalfConnectionStats {
pub fn get_bytes(&self) -> u64 {
let r = unsafe { &*self.bytes.get() };
*r
}
pub fn add_bytes(&self, size: u64) {
let r = unsafe { &mut *self.bytes.get() };
*r += size;
}
}
#[derive(Clone, Default)]
pub struct TcpStreamConnectionStats {
pub read: TcpStreamHalfConnectionStats,
pub write: TcpStreamHalfConnectionStats,

View file

@ -0,0 +1,53 @@
/*
* Copyright 2023 ByteDance and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::cell::UnsafeCell;
#[derive(Default)]
pub struct UdpConnectHalfConnectionStats {
bytes: UnsafeCell<u64>,
packets: UnsafeCell<u64>,
}
unsafe impl Sync for UdpConnectHalfConnectionStats {}
impl UdpConnectHalfConnectionStats {
pub fn get_bytes(&self) -> u64 {
let r = unsafe { &*self.bytes.get() };
*r
}
pub fn get_packets(&self) -> u64 {
let r = unsafe { &*self.packets.get() };
*r
}
pub fn add_bytes(&self, size: u64) {
let r = unsafe { &mut *self.bytes.get() };
*r += size;
}
pub fn add_packet(&self) {
let r = unsafe { &mut *self.packets.get() };
*r += 1;
}
}
#[derive(Default)]
pub struct UdpConnectConnectionStats {
pub recv: UdpConnectHalfConnectionStats,
pub send: UdpConnectHalfConnectionStats,
}

View file

@ -14,6 +14,33 @@
* limitations under the License.
*/
macro_rules! impl_per_thread_unsafe_add_size {
($method:ident, $field:ident) => {
fn $method(&self, size: u64) {
let r = unsafe { &mut *self.$field.get() };
*r += size;
}
};
}
macro_rules! impl_per_thread_unsafe_add_packet {
($method:ident, $field:ident) => {
fn $method(&self) {
let r = unsafe { &mut *self.$field.get() };
*r += 1;
}
};
}
macro_rules! impl_per_thread_unsafe_get {
($method:ident, $field:ident, $r:ty) => {
fn $method(&self) -> $r {
let r = unsafe { &*self.$field.get() };
*r
}
};
}
mod id;
pub use id::StatId;

View file

@ -14,6 +14,7 @@
* limitations under the License.
*/
use std::cell::UnsafeCell;
use std::ops;
use std::sync::atomic::{AtomicU64, Ordering};
@ -61,31 +62,23 @@ impl TcpIoStats {
}
}
#[derive(Clone, Copy, Default)]
#[derive(Default)]
struct PerThreadTcpIoStats {
in_bytes: u64,
out_bytes: u64,
in_bytes: UnsafeCell<u64>,
out_bytes: UnsafeCell<u64>,
}
impl PerThreadTcpIoStats {
fn add_in_bytes(&self, size: u64) {
unsafe {
let r = &self.in_bytes as *const u64 as *mut u64;
*r += size;
}
}
impl_per_thread_unsafe_add_size!(add_in_bytes, in_bytes);
impl_per_thread_unsafe_add_size!(add_out_bytes, out_bytes);
fn add_out_bytes(&self, size: u64) {
unsafe {
let r = &self.out_bytes as *const u64 as *mut u64;
*r += size;
}
}
impl_per_thread_unsafe_get!(get_in_bytes, in_bytes, u64);
impl_per_thread_unsafe_get!(get_out_bytes, out_bytes, u64);
fn snapshot(&self) -> TcpIoSnapshot {
TcpIoSnapshot {
in_bytes: self.in_bytes,
out_bytes: self.out_bytes,
in_bytes: self.get_in_bytes(),
out_bytes: self.get_out_bytes(),
}
}
}
@ -97,9 +90,13 @@ pub struct ThreadedTcpIoStats {
impl ThreadedTcpIoStats {
pub fn new(thread_count: usize) -> Self {
let mut p = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
p.push(PerThreadTcpIoStats::default());
}
ThreadedTcpIoStats {
a: TcpIoStats::default(),
p: vec![PerThreadTcpIoStats::default(); thread_count],
p,
}
}
@ -116,7 +113,7 @@ impl ThreadedTcpIoStats {
pub fn get_in_bytes(&self) -> u64 {
self.p
.iter()
.map(|x| x.in_bytes)
.map(|x| x.get_in_bytes())
.fold(self.a.get_in_bytes(), |acc, x| acc + x)
}

View file

@ -14,6 +14,7 @@
* limitations under the License.
*/
use std::cell::UnsafeCell;
use std::ops;
use std::sync::atomic::{AtomicU64, Ordering};
@ -73,49 +74,31 @@ impl UdpIoStats {
}
}
#[derive(Default, Clone, Copy)]
#[derive(Default)]
struct PerThreadUdpIoStats {
in_packets: u64,
in_bytes: u64,
out_packets: u64,
out_bytes: u64,
in_packets: UnsafeCell<u64>,
in_bytes: UnsafeCell<u64>,
out_packets: UnsafeCell<u64>,
out_bytes: UnsafeCell<u64>,
}
impl PerThreadUdpIoStats {
fn add_in_packet(&self) {
unsafe {
let r = &self.in_packets as *const u64 as *mut u64;
*r += 1;
}
}
impl_per_thread_unsafe_add_size!(add_in_bytes, in_bytes);
impl_per_thread_unsafe_add_packet!(add_in_packet, in_packets);
impl_per_thread_unsafe_add_size!(add_out_bytes, out_bytes);
impl_per_thread_unsafe_add_packet!(add_out_packet, out_packets);
fn add_in_bytes(&self, size: u64) {
unsafe {
let r = &self.in_bytes as *const u64 as *mut u64;
*r += size;
}
}
fn add_out_packet(&self) {
unsafe {
let r = &self.out_packets as *const u64 as *mut u64;
*r += 1;
}
}
fn add_out_bytes(&self, size: u64) {
unsafe {
let r = &self.out_bytes as *const u64 as *mut u64;
*r += size;
}
}
impl_per_thread_unsafe_get!(get_in_bytes, in_bytes, u64);
impl_per_thread_unsafe_get!(get_in_packets, in_packets, u64);
impl_per_thread_unsafe_get!(get_out_bytes, out_bytes, u64);
impl_per_thread_unsafe_get!(get_out_packets, out_packets, u64);
fn snapshot(&self) -> UdpIoSnapshot {
UdpIoSnapshot {
in_packets: self.in_packets,
in_bytes: self.in_bytes,
out_packets: self.out_packets,
out_bytes: self.out_bytes,
in_packets: self.get_in_packets(),
in_bytes: self.get_in_bytes(),
out_packets: self.get_out_packets(),
out_bytes: self.get_out_bytes(),
}
}
}
@ -127,9 +110,13 @@ pub struct ThreadedUdpIoStats {
impl ThreadedUdpIoStats {
pub fn new(thread_count: usize) -> Self {
let mut p = Vec::with_capacity(thread_count);
for _ in 0..thread_count {
p.push(PerThreadUdpIoStats::default());
}
ThreadedUdpIoStats {
a: UdpIoStats::default(),
p: vec![PerThreadUdpIoStats::default(); thread_count],
p,
}
}