diff --git a/g3proxy/src/serve/dummy_close/server.rs b/g3proxy/src/serve/dummy_close/server.rs index 080fb2f9..8f2503ff 100644 --- a/g3proxy/src/serve/dummy_close/server.rs +++ b/g3proxy/src/serve/dummy_close/server.rs @@ -20,6 +20,7 @@ use anyhow::anyhow; use async_trait::async_trait; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -168,11 +169,19 @@ impl Server for DummyCloseServer { ) { } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/http_proxy/server.rs b/g3proxy/src/serve/http_proxy/server.rs index e9439daa..b4c057c7 100644 --- a/g3proxy/src/serve/http_proxy/server.rs +++ b/g3proxy/src/serve/http_proxy/server.rs @@ -22,8 +22,10 @@ use anyhow::{anyhow, Context}; use async_trait::async_trait; use log::debug; use slog::Logger; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::sync::{broadcast, mpsc}; +use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; use g3_daemon::listen::ListenStats; @@ -190,12 +192,14 @@ impl HttpProxyServer { false } - async fn spawn_tls_task( + async fn spawn_stream_task( &self, - stream: TlsStream, + stream: T, cc_info: ClientConnectionInfo, run_ctx: ServerRunContext, - ) { + ) where + T: AsyncRead + AsyncWrite + Send + Sync + 'static, + { let ctx = self.get_common_task_context( cc_info, run_ctx.escaper, @@ -369,7 +373,7 @@ impl Server for HttpProxyServer { if let Some(tls_acceptor) = &self.tls_acceptor { match tokio::time::timeout(self.tls_accept_timeout, tls_acceptor.accept(stream)).await { - Ok(Ok(tls_stream)) => self.spawn_tls_task(tls_stream, cc_info, ctx).await, + Ok(Ok(tls_stream)) => self.spawn_stream_task(tls_stream, cc_info, ctx).await, Ok(Err(e)) => { self.listen_stats.add_failed(); debug!( @@ -394,7 +398,7 @@ impl Server for HttpProxyServer { } } - async fn run_tls_task( + async fn run_rustls_task( &self, stream: TlsStream, cc_info: ClientConnectionInfo, @@ -406,6 +410,21 @@ impl Server for HttpProxyServer { return; } - self.spawn_tls_task(stream, cc_info, ctx).await; + self.spawn_stream_task(stream, cc_info, ctx).await; + } + + async fn run_openssl_task( + &self, + stream: SslStream, + cc_info: ClientConnectionInfo, + ctx: ServerRunContext, + ) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.spawn_stream_task(stream, cc_info, ctx).await; } } diff --git a/g3proxy/src/serve/http_rproxy/server.rs b/g3proxy/src/serve/http_rproxy/server.rs index 04ca6f38..91ec7ad4 100644 --- a/g3proxy/src/serve/http_rproxy/server.rs +++ b/g3proxy/src/serve/http_rproxy/server.rs @@ -22,8 +22,10 @@ use anyhow::{anyhow, Context}; use async_trait::async_trait; use log::debug; use slog::Logger; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::sync::{broadcast, mpsc}; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use tokio_rustls::LazyConfigAcceptor; @@ -182,12 +184,14 @@ impl HttpRProxyServer { false } - async fn spawn_tls_task( + async fn spawn_stream_task( &self, - stream: TlsStream, + stream: T, cc_info: ClientConnectionInfo, run_ctx: ServerRunContext, - ) { + ) where + T: AsyncRead + AsyncWrite + Send + Sync + 'static, + { let ctx = self.get_common_task_context(cc_info, run_ctx.escaper, run_ctx.worker_id); let pipeline_stats = Arc::new(HttpRProxyPipelineStats::default()); let (task_sender, task_receiver) = mpsc::channel(ctx.server_config.pipeline_size); @@ -373,7 +377,9 @@ impl Server for HttpRProxyServer { ) .await { - Ok(Ok(stream)) => self.spawn_tls_task(stream, cc_info, ctx).await, + Ok(Ok(stream)) => { + self.spawn_stream_task(stream, cc_info, ctx).await + } Ok(Err(e)) => { self.listen_stats.add_failed(); debug!( @@ -429,7 +435,7 @@ impl Server for HttpRProxyServer { } } - async fn run_tls_task( + async fn run_rustls_task( &self, stream: TlsStream, cc_info: ClientConnectionInfo, @@ -441,6 +447,21 @@ impl Server for HttpRProxyServer { return; } - self.spawn_tls_task(stream, cc_info, ctx).await; + self.spawn_stream_task(stream, cc_info, ctx).await; + } + + async fn run_openssl_task( + &self, + stream: SslStream, + cc_info: ClientConnectionInfo, + ctx: ServerRunContext, + ) { + let client_addr = cc_info.client_addr(); + self.server_stats.add_conn(client_addr); + if self.drop_early(client_addr) { + return; + } + + self.spawn_stream_task(stream, cc_info, ctx).await; } } diff --git a/g3proxy/src/serve/intelli_proxy/server.rs b/g3proxy/src/serve/intelli_proxy/server.rs index 50c44983..549c12d4 100644 --- a/g3proxy/src/serve/intelli_proxy/server.rs +++ b/g3proxy/src/serve/intelli_proxy/server.rs @@ -21,6 +21,7 @@ use arc_swap::ArcSwap; use async_trait::async_trait; use tokio::net::TcpStream; use tokio::sync::{broadcast, watch}; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -200,11 +201,19 @@ impl Server for IntelliProxy { ) { } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/mod.rs b/g3proxy/src/serve/mod.rs index 7030e65d..873cb9ad 100644 --- a/g3proxy/src/serve/mod.rs +++ b/g3proxy/src/serve/mod.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use log::warn; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -192,12 +193,19 @@ pub(crate) trait Server: ServerInternal { ctx: ServerRunContext, ); - async fn run_tls_task( + async fn run_rustls_task( &self, stream: TlsStream, cc_info: ClientConnectionInfo, ctx: ServerRunContext, ); + + async fn run_openssl_task( + &self, + stream: SslStream, + cc_info: ClientConnectionInfo, + ctx: ServerRunContext, + ); } pub(crate) type ArcServer = Arc; diff --git a/g3proxy/src/serve/plain_tcp_port/mod.rs b/g3proxy/src/serve/plain_tcp_port/mod.rs index ec4df9cf..66bb0332 100644 --- a/g3proxy/src/serve/plain_tcp_port/mod.rs +++ b/g3proxy/src/serve/plain_tcp_port/mod.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use tokio::net::TcpStream; use tokio::runtime::Handle; use tokio::sync::{broadcast, watch}; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -286,11 +287,19 @@ impl Server for PlainTcpPort { ) { } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/plain_tls_port/mod.rs b/g3proxy/src/serve/plain_tls_port/mod.rs index fdcc8a8e..025d7c95 100644 --- a/g3proxy/src/serve/plain_tls_port/mod.rs +++ b/g3proxy/src/serve/plain_tls_port/mod.rs @@ -23,6 +23,7 @@ use log::debug; use tokio::net::TcpStream; use tokio::runtime::Handle; use tokio::sync::{broadcast, watch}; +use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; use g3_daemon::listen::ListenStats; @@ -108,7 +109,7 @@ impl AuxiliaryServerConfig for PlainTlsPortAuxConfig { } match tokio::time::timeout(tls_accept_timeout, tls_acceptor.accept(stream)).await { - Ok(Ok(tls_stream)) => next_server.run_tls_task(tls_stream, cc_info, ctx).await, + Ok(Ok(tls_stream)) => next_server.run_rustls_task(tls_stream, cc_info, ctx).await, Ok(Err(e)) => { listen_stats.add_failed(); debug!( @@ -332,11 +333,19 @@ impl Server for PlainTlsPort { ) { } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/sni_proxy/server.rs b/g3proxy/src/serve/sni_proxy/server.rs index 4864d79e..f7c9c116 100644 --- a/g3proxy/src/serve/sni_proxy/server.rs +++ b/g3proxy/src/serve/sni_proxy/server.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -274,11 +275,19 @@ impl Server for SniProxyServer { self.run_task(stream, cc_info, ctx).await } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/socks_proxy/server.rs b/g3proxy/src/serve/socks_proxy/server.rs index 0a6f8372..b66a85cf 100644 --- a/g3proxy/src/serve/socks_proxy/server.rs +++ b/g3proxy/src/serve/socks_proxy/server.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -281,7 +282,7 @@ impl Server for SocksProxyServer { self.run_task(stream, cc_info, ctx).await } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, cc_info: ClientConnectionInfo, @@ -290,4 +291,14 @@ impl Server for SocksProxyServer { self.server_stats.add_conn(cc_info.client_addr()); self.listen_stats.add_dropped(); } + + async fn run_openssl_task( + &self, + _stream: SslStream, + cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + self.server_stats.add_conn(cc_info.client_addr()); + self.listen_stats.add_dropped(); + } } diff --git a/g3proxy/src/serve/tcp_stream/server.rs b/g3proxy/src/serve/tcp_stream/server.rs index 4e68ee92..ab5c763e 100644 --- a/g3proxy/src/serve/tcp_stream/server.rs +++ b/g3proxy/src/serve/tcp_stream/server.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::server::TlsStream; use g3_daemon::listen::ListenStats; @@ -313,11 +314,19 @@ impl Server for TcpStreamServer { self.run_task(stream, cc_info, ctx).await } - async fn run_tls_task( + async fn run_rustls_task( &self, _stream: TlsStream, _cc_info: ClientConnectionInfo, _ctx: ServerRunContext, ) { } + + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { + } } diff --git a/g3proxy/src/serve/tcp_stream/task.rs b/g3proxy/src/serve/tcp_stream/task.rs index 4618ecbd..c0995a8f 100644 --- a/g3proxy/src/serve/tcp_stream/task.rs +++ b/g3proxy/src/serve/tcp_stream/task.rs @@ -206,7 +206,7 @@ impl TcpStreamTask { LimitedReader, LimitedWriter, ) { - let (clt_r, clt_w) = tokio::io::split(clt_stream); + let (clt_r, clt_w) = clt_stream.into_split(); let (clt_r_stats, clt_w_stats) = TcpStreamTaskCltWrapperStats::new_pair(&self.ctx.server_stats, &self.task_stats); diff --git a/g3proxy/src/serve/tls_stream/server.rs b/g3proxy/src/serve/tls_stream/server.rs index d88bb05f..5b83b46a 100644 --- a/g3proxy/src/serve/tls_stream/server.rs +++ b/g3proxy/src/serve/tls_stream/server.rs @@ -24,6 +24,7 @@ use log::debug; use slog::Logger; use tokio::net::TcpStream; use tokio::sync::broadcast; +use tokio_openssl::SslStream; use tokio_rustls::{server::TlsStream, TlsAcceptor}; use g3_daemon::listen::ListenStats; @@ -346,18 +347,19 @@ impl Server for TlsStreamServer { } } - async fn run_tls_task( + async fn run_rustls_task( &self, - stream: TlsStream, - cc_info: ClientConnectionInfo, - ctx: ServerRunContext, + _stream: TlsStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, ) { - let client_addr = cc_info.client_addr(); - self.server_stats.add_conn(client_addr); - if self.drop_early(client_addr) { - return; - } + } - self.run_task(stream, cc_info, ctx).await + async fn run_openssl_task( + &self, + _stream: SslStream, + _cc_info: ClientConnectionInfo, + _ctx: ServerRunContext, + ) { } }