diff --git a/g3proxy/src/escape/direct_fixed/udp_connect/recv.rs b/g3proxy/src/escape/direct_fixed/udp_connect/recv.rs index 33343272..e6976be9 100644 --- a/g3proxy/src/escape/direct_fixed/udp_connect/recv.rs +++ b/g3proxy/src/escape/direct_fixed/udp_connect/recv.rs @@ -23,6 +23,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpCopyRemoteError, UdpCopyRemoteRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpCopyPacket, UdpCopyPacketMeta}; @@ -62,6 +63,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/escape/direct_fixed/udp_connect/send.rs b/g3proxy/src/escape/direct_fixed/udp_connect/send.rs index b18a3666..18351795 100644 --- a/g3proxy/src/escape/direct_fixed/udp_connect/send.rs +++ b/g3proxy/src/escape/direct_fixed/udp_connect/send.rs @@ -24,6 +24,7 @@ use g3_io_ext::{AsyncUdpSend, UdpCopyRemoteError, UdpCopyRemoteSend}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{SendMsgHdr, UdpCopyPacket}; @@ -90,4 +91,29 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_send_packets( + &mut self, + cx: &mut Context<'_>, + packets: &[UdpCopyPacket], + ) -> Poll> { + use std::io::IoSlice; + + let mut msgs: Vec> = packets + .iter() + .map(|p| SendMsgHdr::new([IoSlice::new(p.payload())], None)) + .collect(); + + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, &mut msgs)) + .map_err(UdpCopyRemoteError::SendFailed)?; + if count == 0 { + Poll::Ready(Err(UdpCopyRemoteError::SendFailed(io::Error::new( + io::ErrorKind::WriteZero, + "write zero packet into sender", + )))) + } else { + Poll::Ready(Ok(count)) + } + } } diff --git a/g3proxy/src/escape/direct_fixed/udp_relay/recv.rs b/g3proxy/src/escape/direct_fixed/udp_relay/recv.rs index bb715192..0d662fa4 100644 --- a/g3proxy/src/escape/direct_fixed/udp_relay/recv.rs +++ b/g3proxy/src/escape/direct_fixed/udp_relay/recv.rs @@ -24,6 +24,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpRelayRemoteError, UdpRelayRemoteRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpRelayPacket, UdpRelayPacketMeta}; use g3_types::net::UpstreamAddr; @@ -101,6 +102,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( inner: &mut T, @@ -157,6 +159,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/escape/proxy_socks5/udp_connect/recv.rs b/g3proxy/src/escape/proxy_socks5/udp_connect/recv.rs index af177ad2..ee52e04b 100644 --- a/g3proxy/src/escape/proxy_socks5/udp_connect/recv.rs +++ b/g3proxy/src/escape/proxy_socks5/udp_connect/recv.rs @@ -27,6 +27,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpCopyRemoteError, UdpCopyRemoteRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpCopyPacket, UdpCopyPacketMeta}; use g3_socks::v5::UdpInput; @@ -111,6 +112,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/escape/proxy_socks5/udp_connect/send.rs b/g3proxy/src/escape/proxy_socks5/udp_connect/send.rs index 1b109e03..35babca5 100644 --- a/g3proxy/src/escape/proxy_socks5/udp_connect/send.rs +++ b/g3proxy/src/escape/proxy_socks5/udp_connect/send.rs @@ -24,6 +24,7 @@ use g3_io_ext::{AsyncUdpSend, UdpCopyRemoteError, UdpCopyRemoteSend}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{SendMsgHdr, UdpCopyPacket}; use g3_socks::v5::UdpOutput; @@ -107,4 +108,32 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_send_packets( + &mut self, + cx: &mut Context<'_>, + packets: &[UdpCopyPacket], + ) -> Poll> { + let mut msgs: Vec> = packets + .iter() + .map(|p| { + SendMsgHdr::new( + [IoSlice::new(&self.socks5_header), IoSlice::new(p.payload())], + None, + ) + }) + .collect(); + + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, &mut msgs)) + .map_err(UdpCopyRemoteError::SendFailed)?; + if count == 0 { + Poll::Ready(Err(UdpCopyRemoteError::SendFailed(io::Error::new( + io::ErrorKind::WriteZero, + "write zero packet into sender", + )))) + } else { + Poll::Ready(Ok(count)) + } + } } diff --git a/g3proxy/src/escape/proxy_socks5/udp_relay/recv.rs b/g3proxy/src/escape/proxy_socks5/udp_relay/recv.rs index d2431f2d..2631d874 100644 --- a/g3proxy/src/escape/proxy_socks5/udp_relay/recv.rs +++ b/g3proxy/src/escape/proxy_socks5/udp_relay/recv.rs @@ -28,6 +28,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpRelayRemoteError, UdpRelayRemoteRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpRelayPacket, UdpRelayPacketMeta}; use g3_socks::v5::UdpInput; @@ -133,6 +134,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/escape/proxy_socks5/udp_relay/send.rs b/g3proxy/src/escape/proxy_socks5/udp_relay/send.rs index 5eb734b0..7125b074 100644 --- a/g3proxy/src/escape/proxy_socks5/udp_relay/send.rs +++ b/g3proxy/src/escape/proxy_socks5/udp_relay/send.rs @@ -25,6 +25,7 @@ use g3_io_ext::{AsyncUdpSend, UdpRelayRemoteError, UdpRelayRemoteSend}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{SendMsgHdr, UdpRelayPacket}; use g3_socks::v5::SocksUdpHeader; @@ -117,4 +118,37 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_send_packets( + &mut self, + cx: &mut Context<'_>, + packets: &[UdpRelayPacket], + ) -> Poll> { + if packets.len() > self.socks_headers.len() { + self.socks_headers.resize(packets.len(), Default::default()); + } + let mut msgs = Vec::with_capacity(packets.len()); + for (p, h) in packets.iter().zip(self.socks_headers.iter_mut()) { + msgs.push(SendMsgHdr::new( + [ + IoSlice::new(h.encode(p.upstream())), + IoSlice::new(p.payload()), + ], + None, + )); + } + + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, &mut msgs)) + .map_err(|e| UdpRelayRemoteError::SendFailed(self.local_addr, self.peer_addr, e))?; + if count == 0 { + Poll::Ready(Err(UdpRelayRemoteError::SendFailed( + self.local_addr, + self.peer_addr, + io::Error::new(io::ErrorKind::WriteZero, "write zero packet into sender"), + ))) + } else { + Poll::Ready(Ok(count)) + } + } } diff --git a/g3proxy/src/serve/socks_proxy/task/udp_associate/recv.rs b/g3proxy/src/serve/socks_proxy/task/udp_associate/recv.rs index 645cc677..e40d7792 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_associate/recv.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_associate/recv.rs @@ -26,6 +26,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpRelayClientError, UdpRelayClientRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpRelayPacket, UdpRelayPacketMeta}; use g3_socks::v5::UdpInput; @@ -240,6 +241,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/serve/socks_proxy/task/udp_associate/send.rs b/g3proxy/src/serve/socks_proxy/task/udp_associate/send.rs index 4490f26f..21191598 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_associate/send.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_associate/send.rs @@ -25,6 +25,7 @@ use g3_io_ext::{AsyncUdpSend, UdpRelayClientError, UdpRelayClientSend}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{SendMsgHdr, UdpRelayPacket}; use g3_socks::v5::SocksUdpHeader; @@ -113,4 +114,36 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_send_packets( + &mut self, + cx: &mut Context<'_>, + packets: &[UdpRelayPacket], + ) -> Poll> { + if packets.len() > self.socks_headers.len() { + self.socks_headers.resize(packets.len(), Default::default()); + } + let mut msgs = Vec::with_capacity(packets.len()); + for (p, h) in packets.iter().zip(self.socks_headers.iter_mut()) { + msgs.push(SendMsgHdr::new( + [ + IoSlice::new(h.encode(p.upstream())), + IoSlice::new(p.payload()), + ], + None, + )); + } + + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, &mut msgs)) + .map_err(UdpRelayClientError::SendFailed)?; + if count == 0 { + Poll::Ready(Err(UdpRelayClientError::SendFailed(io::Error::new( + io::ErrorKind::WriteZero, + "write zero packet into sender", + )))) + } else { + Poll::Ready(Ok(count)) + } + } } diff --git a/g3proxy/src/serve/socks_proxy/task/udp_connect/recv.rs b/g3proxy/src/serve/socks_proxy/task/udp_connect/recv.rs index 4ccafbe3..ad8ea0e1 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_connect/recv.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_connect/recv.rs @@ -26,6 +26,7 @@ use g3_io_ext::{AsyncUdpRecv, UdpCopyClientError, UdpCopyClientRecv}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{RecvMsgHdr, UdpCopyPacket, UdpCopyPacketMeta}; use g3_socks::v5::UdpInput; @@ -166,6 +167,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, diff --git a/g3proxy/src/serve/socks_proxy/task/udp_connect/send.rs b/g3proxy/src/serve/socks_proxy/task/udp_connect/send.rs index 32084d51..4e93dfc1 100644 --- a/g3proxy/src/serve/socks_proxy/task/udp_connect/send.rs +++ b/g3proxy/src/serve/socks_proxy/task/udp_connect/send.rs @@ -24,6 +24,7 @@ use g3_io_ext::{AsyncUdpSend, UdpCopyClientError, UdpCopyClientSend}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use g3_io_ext::{SendMsgHdr, UdpCopyPacket}; use g3_socks::v5::UdpOutput; @@ -107,4 +108,32 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_send_packets( + &mut self, + cx: &mut Context<'_>, + packets: &[UdpCopyPacket], + ) -> Poll> { + let mut msgs: Vec> = packets + .iter() + .map(|p| { + SendMsgHdr::new( + [IoSlice::new(&self.socks5_header), IoSlice::new(p.payload())], + None, + ) + }) + .collect(); + + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, &mut msgs)) + .map_err(UdpCopyClientError::SendFailed)?; + if count == 0 { + Poll::Ready(Err(UdpCopyClientError::SendFailed(io::Error::new( + io::ErrorKind::WriteZero, + "write zero packet into sender", + )))) + } else { + Poll::Ready(Ok(count)) + } + } } diff --git a/lib/g3-io-ext/src/udp/copy/client.rs b/lib/g3-io-ext/src/udp/copy/client.rs index 9f44ae67..c4b2738d 100644 --- a/lib/g3-io-ext/src/udp/copy/client.rs +++ b/lib/g3-io-ext/src/udp/copy/client.rs @@ -25,6 +25,7 @@ use thiserror::Error; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::UdpCopyPacket; @@ -61,6 +62,7 @@ pub trait UdpCopyClientRecv { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -83,6 +85,7 @@ pub trait UdpCopyClientSend { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, diff --git a/lib/g3-io-ext/src/udp/copy/mod.rs b/lib/g3-io-ext/src/udp/copy/mod.rs index 9aa974cf..258f4ddb 100644 --- a/lib/g3-io-ext/src/udp/copy/mod.rs +++ b/lib/g3-io-ext/src/udp/copy/mod.rs @@ -156,6 +156,7 @@ impl<'a, T: UdpCopyClientRecv + ?Sized> UdpCopyRecv for ClientRecv<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -191,6 +192,7 @@ impl<'a, T: UdpCopyRemoteRecv + ?Sized> UdpCopyRecv for RemoteRecv<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -252,6 +254,7 @@ impl<'a, T: UdpCopyClientSend + ?Sized> UdpCopySend for ClientSend<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, @@ -283,6 +286,7 @@ impl<'a, T: UdpCopyRemoteSend + ?Sized> UdpCopySend for RemoteSend<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, diff --git a/lib/g3-io-ext/src/udp/copy/remote.rs b/lib/g3-io-ext/src/udp/copy/remote.rs index 3c80fb51..5cf2bb78 100644 --- a/lib/g3-io-ext/src/udp/copy/remote.rs +++ b/lib/g3-io-ext/src/udp/copy/remote.rs @@ -25,6 +25,7 @@ use thiserror::Error; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::UdpCopyPacket; @@ -61,6 +62,7 @@ pub trait UdpCopyRemoteRecv { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -83,6 +85,7 @@ pub trait UdpCopyRemoteSend { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, diff --git a/lib/g3-io-ext/src/udp/ext/macos.rs b/lib/g3-io-ext/src/udp/ext/macos.rs new file mode 100644 index 00000000..efdac2dd --- /dev/null +++ b/lib/g3-io-ext/src/udp/ext/macos.rs @@ -0,0 +1,34 @@ +/* + * Copyright 2024 ByteDance and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use libc::{c_int, c_uint, c_void, iovec, size_t, socklen_t, ssize_t}; + +#[repr(C)] +pub(super) struct msghdr_x { + pub msg_name: *mut c_void, + pub msg_namelen: socklen_t, + pub msg_iov: *mut iovec, + pub msg_iovlen: c_int, + pub msg_control: *mut c_void, + pub msg_controllen: socklen_t, + pub msg_flags: c_int, + pub msg_datalen: size_t, +} + +extern "C" { + pub(super) fn sendmsg_x(s: c_int, msgp: *mut msghdr_x, cnt: c_uint, flags: c_int) -> ssize_t; + pub(super) fn recvmsg_x(s: c_int, msgp: *mut msghdr_x, cnt: c_uint, flags: c_int) -> ssize_t; +} diff --git a/lib/g3-io-ext/src/udp/ext/mod.rs b/lib/g3-io-ext/src/udp/ext/mod.rs index 92907a55..1a015fbd 100644 --- a/lib/g3-io-ext/src/udp/ext/mod.rs +++ b/lib/g3-io-ext/src/udp/ext/mod.rs @@ -20,6 +20,8 @@ use std::io::{self, IoSlice}; use std::net::SocketAddr; use std::task::{Context, Poll}; +#[cfg(target_os = "macos")] +mod macos; #[cfg(unix)] mod unix; #[cfg(unix)] @@ -58,12 +60,20 @@ pub trait UdpSocketExt { msgs: &mut [SendMsgHdr<'_, C>], ) -> Poll>; + #[cfg(target_os = "macos")] + fn poll_batch_sendmsg_x( + &self, + cx: &mut Context<'_>, + msgs: &mut [SendMsgHdr<'_, C>], + ) -> Poll>; + #[cfg(any( target_os = "linux", target_os = "android", target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_batch_recvmsg( &self, diff --git a/lib/g3-io-ext/src/udp/ext/unix.rs b/lib/g3-io-ext/src/udp/ext/unix.rs index baa4f0f9..a52d6f98 100644 --- a/lib/g3-io-ext/src/udp/ext/unix.rs +++ b/lib/g3-io-ext/src/udp/ext/unix.rs @@ -15,12 +15,11 @@ */ use std::cell::UnsafeCell; -use std::io::{self, IoSlice, IoSliceMut}; -use std::mem; +use std::io::{IoSlice, IoSliceMut}; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::os::fd::AsFd; -use std::ptr; use std::task::{ready, Context, Poll}; +use std::{io, mem, ptr}; use rustix::net::{ recvmsg, sendmsg, sendmsg_v4, sendmsg_v6, RecvAncillaryBuffer, RecvFlags, SendAncillaryBuffer, @@ -140,6 +139,17 @@ impl<'a, const C: usize> SendMsgHdr<'a, C> { h.msg_iovlen = C as _; h } + + /// # Safety + /// + /// `self` should not be dropped before the returned value + #[cfg(target_os = "macos")] + unsafe fn to_msghdr_x(&self) -> super::macos::msghdr_x { + let mut h = mem::zeroed::(); + h.msg_iov = self.iov.as_ptr() as _; + h.msg_iovlen = C as _; + h + } } impl<'a, const C: usize> AsRef<[IoSlice<'a>]> for SendMsgHdr<'a, C> { @@ -182,6 +192,22 @@ impl<'a, const C: usize> RecvMsgHdr<'a, C> { h.msg_iovlen = C as _; h } + + /// # Safety + /// + /// `self` should not be dropped before the returned value + #[cfg(target_os = "macos")] + unsafe fn to_msghdr_x(&self) -> super::macos::msghdr_x { + let c_addr = &mut *self.c_addr.get(); + let (c_addr, c_addr_len) = c_addr.get_ptr_and_size(); + + let mut h = mem::zeroed::(); + h.msg_name = c_addr as _; + h.msg_namelen = c_addr_len as _; + h.msg_iov = self.iov.as_ptr() as _; + h.msg_iovlen = C as _; + h + } } impl UdpSocketExt for UdpSocket { @@ -342,6 +368,53 @@ impl UdpSocketExt for UdpSocket { } } + #[cfg(target_os = "macos")] + fn poll_batch_sendmsg_x( + &self, + cx: &mut Context<'_>, + msgs: &mut [SendMsgHdr<'_, C>], + ) -> Poll> { + use smallvec::SmallVec; + use std::os::fd::AsRawFd; + + let mut msgvec: SmallVec<[_; 32]> = SmallVec::with_capacity(msgs.len()); + for m in msgs.iter_mut() { + msgvec.push(unsafe { m.to_msghdr_x() }); + } + + let raw_fd = self.as_raw_fd(); + let flags = libc::MSG_DONTWAIT; + let mut sendmsg_x = || { + let r = unsafe { + super::macos::sendmsg_x(raw_fd, msgvec.as_mut_ptr(), msgvec.len() as _, flags as _) + }; + if r < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(r as usize) + } + }; + + loop { + ready!(self.poll_send_ready(cx))?; + match self.try_io(Interest::WRITABLE, &mut sendmsg_x) { + Ok(count) => { + for m in msgs.iter_mut().take(count) { + m.n_send = m.iov.iter().map(|iov| iov.len()).sum(); + } + return Poll::Ready(Ok(count)); + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } else { + return Poll::Ready(Err(e)); + } + } + } + } + } + #[cfg(any( target_os = "linux", target_os = "android", @@ -402,6 +475,57 @@ impl UdpSocketExt for UdpSocket { } } } + + #[cfg(target_os = "macos")] + fn poll_batch_recvmsg( + &self, + cx: &mut Context<'_>, + hdr_v: &mut [RecvMsgHdr<'_, C>], + ) -> Poll> { + use smallvec::SmallVec; + use std::os::fd::AsRawFd; + + let mut msgvec: SmallVec<[_; 32]> = SmallVec::with_capacity(hdr_v.len()); + for m in hdr_v.iter_mut() { + msgvec.push(unsafe { m.to_msghdr_x() }); + } + + let raw_fd = self.as_raw_fd(); + let mut recvmsg_x = || { + let r = unsafe { + super::macos::recvmsg_x( + raw_fd, + msgvec.as_mut_ptr(), + msgvec.len() as _, + libc::MSG_DONTWAIT as _, + ) + }; + if r < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(r as usize) + } + }; + + loop { + ready!(self.poll_recv_ready(cx))?; + match self.try_io(Interest::READABLE, &mut recvmsg_x) { + Ok(count) => { + for (m, h) in hdr_v.iter_mut().take(count).zip(msgvec) { + m.n_recv = h.msg_datalen; + } + return Poll::Ready(Ok(count)); + } + Err(e) => { + if e.kind() == io::ErrorKind::WouldBlock { + continue; + } else { + return Poll::Ready(Err(e)); + } + } + } + } + } } #[cfg(test)] @@ -415,6 +539,7 @@ mod tests { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] #[tokio::test] async fn batch_msg_connect() { @@ -433,9 +558,14 @@ mod tests { SendMsgHdr::new([IoSlice::new(msg_2)], None), ]; + #[cfg(not(target_os = "macos"))] let count = poll_fn(|cx| c_sock.poll_batch_sendmsg(cx, &mut msgs)) .await .unwrap(); + #[cfg(target_os = "macos")] + let count = poll_fn(|cx| c_sock.poll_batch_sendmsg_x(cx, &mut msgs)) + .await + .unwrap(); assert_eq!(count, 2); assert_eq!(msgs[0].n_send, msg_1.len()); assert_eq!(msgs[1].n_send, msg_2.len()); diff --git a/lib/g3-io-ext/src/udp/recv.rs b/lib/g3-io-ext/src/udp/recv.rs index 4d58d04b..aa8e77e7 100644 --- a/lib/g3-io-ext/src/udp/recv.rs +++ b/lib/g3-io-ext/src/udp/recv.rs @@ -30,6 +30,7 @@ use tokio::time::{Instant, Sleep}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::RecvMsgHdr; use crate::limit::{DatagramLimitAction, DatagramLimiter}; @@ -50,6 +51,7 @@ pub trait AsyncUdpRecv { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_batch_recvmsg( &mut self, @@ -216,6 +218,7 @@ where target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_batch_recvmsg( &mut self, diff --git a/lib/g3-io-ext/src/udp/relay/client.rs b/lib/g3-io-ext/src/udp/relay/client.rs index c348fad0..48949ba2 100644 --- a/lib/g3-io-ext/src/udp/relay/client.rs +++ b/lib/g3-io-ext/src/udp/relay/client.rs @@ -27,6 +27,7 @@ use g3_types::net::UpstreamAddr; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::UdpRelayPacket; @@ -65,6 +66,7 @@ pub trait UdpRelayClientRecv { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -88,6 +90,7 @@ pub trait UdpRelayClientSend { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, diff --git a/lib/g3-io-ext/src/udp/relay/mod.rs b/lib/g3-io-ext/src/udp/relay/mod.rs index 9a4d5f69..92d5be89 100644 --- a/lib/g3-io-ext/src/udp/relay/mod.rs +++ b/lib/g3-io-ext/src/udp/relay/mod.rs @@ -174,6 +174,7 @@ impl<'a, T: UdpRelayClientRecv + ?Sized> UdpRelayRecv for ClientRecv<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -210,6 +211,7 @@ impl<'a, T: UdpRelayRemoteRecv + ?Sized> UdpRelayRecv for RemoteRecv<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -271,6 +273,7 @@ impl<'a, T: UdpRelayClientSend + ?Sized> UdpRelaySend for ClientSend<'a, T> { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_send_packets( &mut self, @@ -296,13 +299,6 @@ impl<'a, T: UdpRelayRemoteSend + ?Sized> UdpRelaySend for RemoteSend<'a, T> { .map_err(|e| UdpRelayError::RemoteError(Some(packet.ups.clone()), e)) } - #[cfg(any( - target_os = "linux", - target_os = "android", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - ))] fn poll_send_packets( &mut self, cx: &mut Context<'_>, diff --git a/lib/g3-io-ext/src/udp/relay/remote.rs b/lib/g3-io-ext/src/udp/relay/remote.rs index ff1bc1b9..24fe576c 100644 --- a/lib/g3-io-ext/src/udp/relay/remote.rs +++ b/lib/g3-io-ext/src/udp/relay/remote.rs @@ -24,13 +24,6 @@ use thiserror::Error; use g3_resolver::ResolveError; use g3_types::net::UpstreamAddr; -#[cfg(any( - target_os = "linux", - target_os = "android", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", -))] use super::UdpRelayPacket; #[derive(Error, Debug)] @@ -77,6 +70,7 @@ pub trait UdpRelayRemoteRecv { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv_packets( &mut self, @@ -94,16 +88,25 @@ pub trait UdpRelayRemoteSend { to: &UpstreamAddr, ) -> Poll>; - #[cfg(any( - target_os = "linux", - target_os = "android", - target_os = "freebsd", - target_os = "netbsd", - target_os = "openbsd", - ))] fn poll_send_packets( &mut self, cx: &mut Context<'_>, packets: &[UdpRelayPacket], - ) -> Poll>; + ) -> Poll> { + let mut count = 0; + for packet in packets { + match self.poll_send_packet(cx, packet.payload(), packet.upstream()) { + Poll::Pending => { + return if count > 0 { + Poll::Ready(Ok(count)) + } else { + Poll::Pending + }; + } + Poll::Ready(Ok(_)) => count += 1, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), + } + } + Poll::Ready(Ok(count)) + } } diff --git a/lib/g3-io-ext/src/udp/send.rs b/lib/g3-io-ext/src/udp/send.rs index 2b0ae0db..78759e28 100644 --- a/lib/g3-io-ext/src/udp/send.rs +++ b/lib/g3-io-ext/src/udp/send.rs @@ -30,6 +30,7 @@ use tokio::time::{Instant, Sleep}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::SendMsgHdr; use crate::limit::{DatagramLimitAction, DatagramLimiter}; @@ -64,6 +65,13 @@ pub trait AsyncUdpSend { cx: &mut Context<'_>, msgs: &mut [SendMsgHdr<'_, C>], ) -> Poll>; + + #[cfg(target_os = "macos")] + fn poll_batch_sendmsg_x( + &mut self, + cx: &mut Context<'_>, + msgs: &mut [SendMsgHdr<'_, C>], + ) -> Poll>; } pub struct LimitedUdpSend { @@ -345,4 +353,72 @@ where Poll::Ready(Ok(count)) } } + + #[cfg(target_os = "macos")] + fn poll_batch_sendmsg_x( + &mut self, + cx: &mut Context<'_>, + msgs: &mut [SendMsgHdr<'_, C>], + ) -> Poll> { + use smallvec::SmallVec; + + if self.limit.is_set() { + let dur_millis = self.started.elapsed().as_millis() as u64; + let mut total_size_v = SmallVec::<[usize; 32]>::with_capacity(msgs.len()); + let mut total_size = 0; + for msg in msgs.iter() { + total_size += msg.iov.iter().map(|v| v.len()).sum::(); + total_size_v.push(total_size); + } + match self.limit.check_packets(dur_millis, total_size_v.as_ref()) { + DatagramLimitAction::Advance(n) => { + match self.inner.poll_batch_sendmsg_x(cx, &mut msgs[0..n]) { + Poll::Ready(Ok(count)) => { + let len = msgs.iter().take(count).map(|v| v.n_send).sum(); + self.limit.set_advance(count, len); + self.stats.add_send_packets(count); + self.stats.add_send_bytes(len); + Poll::Ready(Ok(count)) + } + Poll::Ready(Err(e)) => { + self.limit.release_global(); + Poll::Ready(Err(e)) + } + Poll::Pending => { + self.limit.release_global(); + Poll::Pending + } + } + } + DatagramLimitAction::DelayUntil(t) => { + self.delay.as_mut().reset(t); + match self.delay.poll_unpin(cx) { + Poll::Ready(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } + DatagramLimitAction::DelayFor(ms) => { + self.delay + .as_mut() + .reset(self.started + Duration::from_millis(dur_millis + ms)); + match self.delay.poll_unpin(cx) { + Poll::Ready(_) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } + } + } else { + let count = ready!(self.inner.poll_batch_sendmsg_x(cx, msgs))?; + self.stats.add_send_packets(count); + self.stats + .add_send_bytes(msgs.iter().take(count).map(|h| h.n_send).sum()); + Poll::Ready(Ok(count)) + } + } } diff --git a/lib/g3-io-ext/src/udp/split.rs b/lib/g3-io-ext/src/udp/split.rs index 7e9adf70..c617b5bc 100644 --- a/lib/g3-io-ext/src/udp/split.rs +++ b/lib/g3-io-ext/src/udp/split.rs @@ -31,6 +31,7 @@ use super::{AsyncUdpRecv, AsyncUdpSend, UdpSocketExt}; target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] use super::{RecvMsgHdr, SendMsgHdr}; @@ -114,6 +115,15 @@ impl AsyncUdpSend for SendHalf { ) -> Poll> { self.0.poll_batch_sendmsg(cx, msgs) } + + #[cfg(target_os = "macos")] + fn poll_batch_sendmsg_x( + &mut self, + cx: &mut Context<'_>, + msgs: &mut [SendMsgHdr<'_, C>], + ) -> Poll> { + self.0.poll_batch_sendmsg_x(cx, msgs) + } } impl RecvHalf { @@ -149,6 +159,7 @@ impl AsyncUdpRecv for RecvHalf { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_batch_recvmsg( &mut self, diff --git a/lib/g3-socks/src/v5/quic.rs b/lib/g3-socks/src/v5/quic.rs index 3fdedf4b..72cf3fd0 100644 --- a/lib/g3-socks/src/v5/quic.rs +++ b/lib/g3-socks/src/v5/quic.rs @@ -201,6 +201,7 @@ impl AsyncUdpSocket for Socks5UdpSocket { target_os = "freebsd", target_os = "netbsd", target_os = "openbsd", + target_os = "macos", ))] fn poll_recv( &self, @@ -289,7 +290,7 @@ impl AsyncUdpSocket for Socks5UdpSocket { } } - #[cfg(target_os = "macos")] + #[cfg(target_os = "dragonfly")] fn poll_recv( &self, cx: &mut Context, diff --git a/lib/g3-udpdump/src/stream/sink.rs b/lib/g3-udpdump/src/stream/sink.rs index 5cee7296..25cbc3a9 100644 --- a/lib/g3-udpdump/src/stream/sink.rs +++ b/lib/g3-udpdump/src/stream/sink.rs @@ -70,7 +70,25 @@ impl Sinker { Ok(()) } - #[cfg(any(windows, target_os = "macos", target_os = "dragonfly"))] + #[cfg(target_os = "macos")] + async fn send_udp(&self, packets: &[Vec]) -> io::Result<()> { + use g3_io_ext::{SendMsgHdr, UdpSocketExt}; + use std::future::poll_fn; + use std::io::IoSlice; + + let mut msgs: Vec<_> = packets + .iter() + .map(|v| SendMsgHdr::new([IoSlice::new(v.as_slice())], None)) + .collect(); + let mut offset = 0; + while offset < msgs.len() { + offset += + poll_fn(|cx| self.socket.poll_batch_sendmsg_x(cx, &mut msgs[offset..])).await?; + } + Ok(()) + } + + #[cfg(any(windows, target_os = "dragonfly"))] async fn send_udp(&self, packets: &[Vec]) -> io::Result<()> { for pkt in packets { self.socket.send(pkt.as_slice()).await?;