g3proxy: allow to pass openssl client stream

This commit is contained in:
Zhang Jingqiang 2023-11-03 14:49:25 +08:00
parent 044957ae03
commit cfd4e3229d
12 changed files with 147 additions and 32 deletions

View file

@ -20,6 +20,7 @@ use anyhow::anyhow;
use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -168,11 +169,19 @@ impl Server for DummyCloseServer {
) {
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -22,8 +22,10 @@ use anyhow::{anyhow, Context};
use async_trait::async_trait;
use log::debug;
use slog::Logger;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc};
use tokio_openssl::SslStream;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use g3_daemon::listen::ListenStats;
@ -190,12 +192,14 @@ impl HttpProxyServer {
false
}
async fn spawn_tls_task(
async fn spawn_stream_task<T>(
&self,
stream: TlsStream<TcpStream>,
stream: T,
cc_info: ClientConnectionInfo,
run_ctx: ServerRunContext,
) {
) where
T: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
let ctx = self.get_common_task_context(
cc_info,
run_ctx.escaper,
@ -369,7 +373,7 @@ impl Server for HttpProxyServer {
if let Some(tls_acceptor) = &self.tls_acceptor {
match tokio::time::timeout(self.tls_accept_timeout, tls_acceptor.accept(stream)).await {
Ok(Ok(tls_stream)) => self.spawn_tls_task(tls_stream, cc_info, ctx).await,
Ok(Ok(tls_stream)) => self.spawn_stream_task(tls_stream, cc_info, ctx).await,
Ok(Err(e)) => {
self.listen_stats.add_failed();
debug!(
@ -394,7 +398,7 @@ impl Server for HttpProxyServer {
}
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
stream: TlsStream<TcpStream>,
cc_info: ClientConnectionInfo,
@ -406,6 +410,21 @@ impl Server for HttpProxyServer {
return;
}
self.spawn_tls_task(stream, cc_info, ctx).await;
self.spawn_stream_task(stream, cc_info, ctx).await;
}
async fn run_openssl_task(
&self,
stream: SslStream<TcpStream>,
cc_info: ClientConnectionInfo,
ctx: ServerRunContext,
) {
let client_addr = cc_info.client_addr();
self.server_stats.add_conn(client_addr);
if self.drop_early(client_addr) {
return;
}
self.spawn_stream_task(stream, cc_info, ctx).await;
}
}

View file

@ -22,8 +22,10 @@ use anyhow::{anyhow, Context};
use async_trait::async_trait;
use log::debug;
use slog::Logger;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::sync::{broadcast, mpsc};
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use tokio_rustls::LazyConfigAcceptor;
@ -182,12 +184,14 @@ impl HttpRProxyServer {
false
}
async fn spawn_tls_task(
async fn spawn_stream_task<T>(
&self,
stream: TlsStream<TcpStream>,
stream: T,
cc_info: ClientConnectionInfo,
run_ctx: ServerRunContext,
) {
) where
T: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
let ctx = self.get_common_task_context(cc_info, run_ctx.escaper, run_ctx.worker_id);
let pipeline_stats = Arc::new(HttpRProxyPipelineStats::default());
let (task_sender, task_receiver) = mpsc::channel(ctx.server_config.pipeline_size);
@ -373,7 +377,9 @@ impl Server for HttpRProxyServer {
)
.await
{
Ok(Ok(stream)) => self.spawn_tls_task(stream, cc_info, ctx).await,
Ok(Ok(stream)) => {
self.spawn_stream_task(stream, cc_info, ctx).await
}
Ok(Err(e)) => {
self.listen_stats.add_failed();
debug!(
@ -429,7 +435,7 @@ impl Server for HttpRProxyServer {
}
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
stream: TlsStream<TcpStream>,
cc_info: ClientConnectionInfo,
@ -441,6 +447,21 @@ impl Server for HttpRProxyServer {
return;
}
self.spawn_tls_task(stream, cc_info, ctx).await;
self.spawn_stream_task(stream, cc_info, ctx).await;
}
async fn run_openssl_task(
&self,
stream: SslStream<TcpStream>,
cc_info: ClientConnectionInfo,
ctx: ServerRunContext,
) {
let client_addr = cc_info.client_addr();
self.server_stats.add_conn(client_addr);
if self.drop_early(client_addr) {
return;
}
self.spawn_stream_task(stream, cc_info, ctx).await;
}
}

View file

@ -21,6 +21,7 @@ use arc_swap::ArcSwap;
use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::sync::{broadcast, watch};
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -200,11 +201,19 @@ impl Server for IntelliProxy {
) {
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -20,6 +20,7 @@ use async_trait::async_trait;
use log::warn;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -192,12 +193,19 @@ pub(crate) trait Server: ServerInternal {
ctx: ServerRunContext,
);
async fn run_tls_task(
async fn run_rustls_task(
&self,
stream: TlsStream<TcpStream>,
cc_info: ClientConnectionInfo,
ctx: ServerRunContext,
);
async fn run_openssl_task(
&self,
stream: SslStream<TcpStream>,
cc_info: ClientConnectionInfo,
ctx: ServerRunContext,
);
}
pub(crate) type ArcServer = Arc<dyn Server + Send + Sync>;

View file

@ -22,6 +22,7 @@ use async_trait::async_trait;
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, watch};
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -286,11 +287,19 @@ impl Server for PlainTcpPort {
) {
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -23,6 +23,7 @@ use log::debug;
use tokio::net::TcpStream;
use tokio::runtime::Handle;
use tokio::sync::{broadcast, watch};
use tokio_openssl::SslStream;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use g3_daemon::listen::ListenStats;
@ -108,7 +109,7 @@ impl AuxiliaryServerConfig for PlainTlsPortAuxConfig {
}
match tokio::time::timeout(tls_accept_timeout, tls_acceptor.accept(stream)).await {
Ok(Ok(tls_stream)) => next_server.run_tls_task(tls_stream, cc_info, ctx).await,
Ok(Ok(tls_stream)) => next_server.run_rustls_task(tls_stream, cc_info, ctx).await,
Ok(Err(e)) => {
listen_stats.add_failed();
debug!(
@ -332,11 +333,19 @@ impl Server for PlainTlsPort {
) {
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -22,6 +22,7 @@ use async_trait::async_trait;
use slog::Logger;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -274,11 +275,19 @@ impl Server for SniProxyServer {
self.run_task(stream, cc_info, ctx).await
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -22,6 +22,7 @@ use async_trait::async_trait;
use slog::Logger;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -281,7 +282,7 @@ impl Server for SocksProxyServer {
self.run_task(stream, cc_info, ctx).await
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
cc_info: ClientConnectionInfo,
@ -290,4 +291,14 @@ impl Server for SocksProxyServer {
self.server_stats.add_conn(cc_info.client_addr());
self.listen_stats.add_dropped();
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
self.server_stats.add_conn(cc_info.client_addr());
self.listen_stats.add_dropped();
}
}

View file

@ -22,6 +22,7 @@ use async_trait::async_trait;
use slog::Logger;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::server::TlsStream;
use g3_daemon::listen::ListenStats;
@ -313,11 +314,19 @@ impl Server for TcpStreamServer {
self.run_task(stream, cc_info, ctx).await
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}

View file

@ -206,7 +206,7 @@ impl TcpStreamTask {
LimitedReader<impl AsyncRead>,
LimitedWriter<impl AsyncWrite>,
) {
let (clt_r, clt_w) = tokio::io::split(clt_stream);
let (clt_r, clt_w) = clt_stream.into_split();
let (clt_r_stats, clt_w_stats) =
TcpStreamTaskCltWrapperStats::new_pair(&self.ctx.server_stats, &self.task_stats);

View file

@ -24,6 +24,7 @@ use log::debug;
use slog::Logger;
use tokio::net::TcpStream;
use tokio::sync::broadcast;
use tokio_openssl::SslStream;
use tokio_rustls::{server::TlsStream, TlsAcceptor};
use g3_daemon::listen::ListenStats;
@ -346,18 +347,19 @@ impl Server for TlsStreamServer {
}
}
async fn run_tls_task(
async fn run_rustls_task(
&self,
stream: TlsStream<TcpStream>,
cc_info: ClientConnectionInfo,
ctx: ServerRunContext,
_stream: TlsStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
let client_addr = cc_info.client_addr();
self.server_stats.add_conn(client_addr);
if self.drop_early(client_addr) {
return;
}
}
self.run_task(stream, cc_info, ctx).await
async fn run_openssl_task(
&self,
_stream: SslStream<TcpStream>,
_cc_info: ClientConnectionInfo,
_ctx: ServerRunContext,
) {
}
}