diff --git a/Cargo.lock b/Cargo.lock index d0bb12c..f3a23d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1271,7 +1271,6 @@ dependencies = [ "http 1.1.0", "humansize", "indexmap 1.9.3", - "log", "mimalloc-rust", "network-interface", "nix 0.27.1", @@ -4561,8 +4560,6 @@ dependencies = [ "cfg-if", "defmt", "heapless 0.8.0", - "libc", - "log", "managed", ] diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index e150278..dc824a6 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -36,7 +36,6 @@ tracing-subscriber = { version = "0.3", features = [ "time", ] } tracing-appender = "0.2.3" -log = "0.4" thiserror = "1.0" auto_impl = "1.1.0" crossbeam = "0.8.4" @@ -85,7 +84,9 @@ http = { version = "1", default-features = false, features = [ tokio-rustls = { version = "0.26", default-features = false, optional = true } # for tap device -tun = { package = "tun-easytier", version = "0.6.1", features = ["async"] } +tun = { package = "tun-easytier", version = "0.6.1", features = [ + "async", +], optional = true } # for net ns nix = { version = "0.27", features = ["sched", "socket", "ioctl"] } @@ -155,7 +156,14 @@ indexmap = { version = "~1.9.3", optional = false, features = ["std"] } atomic-shim = "0.2.0" -smoltcp = { version = "0.11.0", optional = true } +smoltcp = { version = "0.11.0", optional = true, default-features = false, features = [ + "std", + "medium-ip", + "proto-ipv4", + "proto-ipv6", + "socket-tcp", + "async", +] } parking_lot = { version = "0.12.0", optional = true } [target.'cfg(windows)'.dependencies] @@ -182,13 +190,23 @@ defguard_wireguard_rs = "0.4.2" [features] -default = ["wireguard", "mimalloc", "websocket", "smoltcp"] -full = ["quic", "websocket", "wireguard", "mimalloc", "aes-gcm", "smoltcp"] +default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun"] +full = [ + "quic", + "websocket", + "wireguard", + "mimalloc", + "aes-gcm", + "smoltcp", + "tun", +] mips = ["aes-gcm", "mimalloc", "wireguard"] +bsd = ["aes-gcm", "mimalloc", "smoltcp"] wireguard = ["dep:boringtun", "dep:ring"] quic = ["dep:quinn", "dep:rustls", "dep:rcgen"] mimalloc = ["dep:mimalloc-rust"] aes-gcm = ["dep:aes-gcm"] +tun = ["dep:tun"] websocket = [ "dep:tokio-websockets", "dep:http", diff --git a/easytier/src/common/error.rs b/easytier/src/common/error.rs index 6ebfa4e..57cb091 100644 --- a/easytier/src/common/error.rs +++ b/easytier/src/common/error.rs @@ -10,8 +10,11 @@ use super::PeerId; pub enum Error { #[error("io error")] IOError(#[from] io::Error), + + #[cfg(feature = "tun")] #[error("rust tun error {0}")] TunError(#[from] tun::Error), + #[error("tunnel error {0}")] TunnelError(#[from] tunnel::TunnelError), #[error("Peer has no conn, PeerId: {0}")] diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 9da3dcb..040caf6 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -128,7 +128,7 @@ impl GlobalCtx { if self.event_bus.receiver_count() != 0 { self.event_bus.send(event).unwrap(); } else { - log::warn!("No subscriber for event: {:?}", event); + tracing::warn!("No subscriber for event: {:?}", event); } } diff --git a/easytier/src/common/netns.rs b/easytier/src/common/netns.rs index 4433893..4f6ba3e 100644 --- a/easytier/src/common/netns.rs +++ b/easytier/src/common/netns.rs @@ -43,7 +43,7 @@ impl NetNSGuard { } let ns = std::fs::File::open(ns_path).unwrap(); - log::info!( + tracing::info!( "[INIT NS] switching to new ns_name: {:?}, ns_file: {:?}", name, ns @@ -59,7 +59,7 @@ impl Drop for NetNSGuard { if self.old_ns.is_none() { return; } - log::info!("[INIT NS] switching back to old ns, ns: {:?}", self.old_ns); + tracing::info!("[INIT NS] switching back to old ns, ns: {:?}", self.old_ns); setns( self.old_ns.as_ref().unwrap().as_fd(), CloneFlags::CLONE_NEWNET, diff --git a/easytier/src/common/network.rs b/easytier/src/common/network.rs index 25350ac..1f41db5 100644 --- a/easytier/src/common/network.rs +++ b/easytier/src/common/network.rs @@ -51,7 +51,7 @@ impl InterfaceFilter { } } -#[cfg(target_os = "macos")] +#[cfg(any(target_os = "macos", target_os = "freebsd"))] impl InterfaceFilter { async fn is_interface_physical(interface_name: &str) -> bool { let output = tokio::process::Command::new("networksetup") diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 7de6657..af2b8d8 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -220,7 +220,7 @@ impl DirectConnectorManager { } return Err(e); } else { - log::info!("try_connect_to_ip success, peer_id: {}", dst_peer_id); + tracing::info!("try_connect_to_ip success, peer_id: {}", dst_peer_id); return Ok(()); } } @@ -314,7 +314,7 @@ impl DirectConnectorManager { let mut has_succ = false; while let Some(ret) = tasks.join_next().await { if let Err(e) = ret { - log::error!("join direct connect task failed: {:?}", e); + tracing::error!("join direct connect task failed: {:?}", e); } else if let Ok(Ok(_)) = ret { has_succ = true; } @@ -345,7 +345,7 @@ impl DirectConnectorManager { } } - log::trace!("try direct connect to peer: {}", dst_peer_id); + tracing::trace!("try direct connect to peer: {}", dst_peer_id); let ip_list = peer_manager .get_peer_rpc_mgr() diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 0eab45e..6e756d3 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -89,7 +89,7 @@ impl ManualConnectorManager { where T: TunnelConnector + 'static, { - log::info!("add_connector: {}", connector.remote_url()); + tracing::info!("add_connector: {}", connector.remote_url()); self.data.connectors.insert( connector.remote_url().into(), Arc::new(Mutex::new(Box::new(connector))), @@ -102,7 +102,7 @@ impl ManualConnectorManager { } pub async fn remove_connector(&self, url: &str) -> Result<(), Error> { - log::info!("remove_connector: {}", url); + tracing::info!("remove_connector: {}", url); if !self.list_connectors().await.iter().any(|x| x.url == url) { return Err(Error::NotFound); } @@ -163,7 +163,7 @@ impl ManualConnectorManager { data: Arc, mut event_recv: Receiver, ) { - log::warn!("conn_mgr_routine started"); + tracing::warn!("conn_mgr_routine started"); let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis( use_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS), )); @@ -200,11 +200,11 @@ impl ManualConnectorManager { data_clone.connectors.insert(dead_url.clone(), connector); }); } - log::info!("reconn_interval tick, done"); + tracing::info!("reconn_interval tick, done"); } ret = reconn_result_recv.recv() => { - log::warn!("reconn_tasks done, reconn result: {:?}", ret); + tracing::warn!("reconn_tasks done, reconn result: {:?}", ret); } } } @@ -215,13 +215,13 @@ impl ManualConnectorManager { GlobalCtxEvent::PeerConnAdded(conn_info) => { let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); data.alive_conn_urls.lock().await.insert(addr); - log::warn!("peer conn added: {:?}", conn_info); + tracing::warn!("peer conn added: {:?}", conn_info); } GlobalCtxEvent::PeerConnRemoved(conn_info) => { let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); data.alive_conn_urls.lock().await.remove(&addr); - log::warn!("peer conn removed: {:?}", conn_info); + tracing::warn!("peer conn removed: {:?}", conn_info); } _ => {} @@ -233,14 +233,14 @@ impl ManualConnectorManager { for it in data.removed_conn_urls.iter() { let url = it.key(); if let Some(_) = data.connectors.remove(url) { - log::warn!("connector: {}, removed", url); + tracing::warn!("connector: {}, removed", url); continue; } else if data.reconnecting.contains(url) { - log::warn!("connector: {}, reconnecting, remove later.", url); + tracing::warn!("connector: {}, reconnecting, remove later.", url); remove_later.insert(url.clone()); continue; } else { - log::warn!("connector: {}, not found", url); + tracing::warn!("connector: {}, not found", url); } } data.removed_conn_urls.clear(); @@ -284,9 +284,9 @@ impl ManualConnectorManager { )); let _g = net_ns.guard(); - log::info!("reconnect try connect... conn: {:?}", connector); + tracing::info!("reconnect try connect... conn: {:?}", connector); let tunnel = connector.lock().await.connect().await?; - log::info!("reconnect get tunnel succ: {:?}", tunnel); + tracing::info!("reconnect get tunnel succ: {:?}", tunnel); assert_eq!( dead_url, tunnel.info().unwrap().remote_addr, @@ -294,7 +294,7 @@ impl ManualConnectorManager { tunnel.info() ); let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?; - log::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); + tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); Ok(ReconnResult { dead_url, peer_id, @@ -307,7 +307,7 @@ impl ManualConnectorManager { dead_url: String, connector: MutexConnector, ) -> Result { - log::info!("reconnect: {}", dead_url); + tracing::info!("reconnect: {}", dead_url); let mut ip_versions = vec![]; let u = url::Url::parse(&dead_url) @@ -347,7 +347,7 @@ impl ManualConnectorManager { ), ) .await; - log::info!("reconnect: {} done, ret: {:?}", dead_url, ret); + tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); if ret.is_ok() && ret.as_ref().unwrap().is_ok() { reconn_ret = ret.unwrap(); diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 66fc4b0..d508a56 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -428,7 +428,7 @@ impl From for TomlConfigLoader { f.mtu = mtu; } f.enable_exit_node = cli.enable_exit_node; - f.no_tun = cli.no_tun; + f.no_tun = cli.no_tun || cfg!(not(feature = "tun")); f.use_smoltcp = cli.use_smoltcp; cfg.set_flags(f); diff --git a/easytier/src/gateway/tokio_smoltcp/mod.rs b/easytier/src/gateway/tokio_smoltcp/mod.rs index 2b38bb4..5de00ed 100644 --- a/easytier/src/gateway/tokio_smoltcp/mod.rs +++ b/easytier/src/gateway/tokio_smoltcp/mod.rs @@ -18,9 +18,9 @@ pub use smoltcp; use smoltcp::{ iface::{Config, Interface, Routes}, time::{Duration, Instant}, - wire::{HardwareAddress, IpAddress, IpCidr, IpProtocol, IpVersion}, + wire::{HardwareAddress, IpAddress, IpCidr}, }; -pub use socket::{RawSocket, TcpListener, TcpStream, UdpSocket}; +pub use socket::{TcpListener, TcpStream}; pub use socket_allocator::BufferSize; use tokio::sync::Notify; @@ -155,19 +155,6 @@ impl Net { ) .await } - /// This function will create a new UDP socket and attempt to bind it to the `addr` provided. - pub async fn udp_bind(&self, addr: SocketAddr) -> io::Result { - let addr = self.set_address(addr); - UdpSocket::new(self.reactor.clone(), addr.into()).await - } - /// Creates a new raw socket. - pub async fn raw_socket( - &self, - ip_version: IpVersion, - ip_protocol: IpProtocol, - ) -> io::Result { - RawSocket::new(self.reactor.clone(), ip_version, ip_protocol).await - } fn set_address(&self, mut addr: SocketAddr) -> SocketAddr { if addr.ip().is_unspecified() { addr.set_ip(match self.ip_addr.address() { diff --git a/easytier/src/gateway/tokio_smoltcp/reactor.rs b/easytier/src/gateway/tokio_smoltcp/reactor.rs index 65a4aeb..c6ed005 100644 --- a/easytier/src/gateway/tokio_smoltcp/reactor.rs +++ b/easytier/src/gateway/tokio_smoltcp/reactor.rs @@ -153,8 +153,6 @@ impl Drop for Reactor { for (_, socket) in self.socket_allocator.sockets().lock().iter_mut() { match socket { Socket::Tcp(tcp) => tcp.close(), - Socket::Raw(_) => {} - Socket::Udp(udp) => udp.close(), #[allow(unreachable_patterns)] _ => {} } diff --git a/easytier/src/gateway/tokio_smoltcp/socket.rs b/easytier/src/gateway/tokio_smoltcp/socket.rs index 0d91d14..ae4e579 100644 --- a/easytier/src/gateway/tokio_smoltcp/socket.rs +++ b/easytier/src/gateway/tokio_smoltcp/socket.rs @@ -1,8 +1,8 @@ use super::{reactor::Reactor, socket_allocator::SocketHandle}; use futures::future::{self, poll_fn}; use futures::{ready, Stream}; -pub use smoltcp::socket::{raw, tcp, udp}; -use smoltcp::wire::{IpAddress, IpEndpoint, IpProtocol, IpVersion}; +pub use smoltcp::socket::tcp; +use smoltcp::wire::{IpAddress, IpEndpoint}; use std::mem::replace; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::{ @@ -231,147 +231,3 @@ impl AsyncWrite for TcpStream { Poll::Pending } } - -/// A UDP socket. -pub struct UdpSocket { - handle: SocketHandle, - reactor: Arc, - local_addr: SocketAddr, -} - -impl UdpSocket { - pub(super) async fn new( - reactor: Arc, - local_endpoint: IpEndpoint, - ) -> io::Result { - let handle = reactor.socket_allocator().new_udp_socket(); - { - let mut socket = reactor.get_socket::(*handle); - socket.bind(local_endpoint).map_err(map_err)?; - } - - let local_addr = ep2sa(&local_endpoint); - - Ok(UdpSocket { - handle, - reactor, - local_addr, - }) - } - /// Note that on multiple calls to a poll_* method in the send direction, only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. - pub fn poll_send_to( - &self, - cx: &mut Context<'_>, - buf: &[u8], - target: SocketAddr, - ) -> Poll> { - let mut socket = self.reactor.get_socket::(*self.handle); - let target_ip: IpEndpoint = target.into(); - - match socket.send_slice(buf, target_ip) { - // the buffer is full - Err(udp::SendError::BufferFull) => {} - r => { - r.map_err(map_err)?; - self.reactor.notify(); - return Poll::Ready(Ok(buf.len())); - } - } - - socket.register_send_waker(cx.waker()); - Poll::Pending - } - /// See note on `poll_send_to` - pub async fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result { - poll_fn(|cx| self.poll_send_to(cx, buf, target)).await - } - /// Note that on multiple calls to a poll_* method in the recv direction, only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. - pub fn poll_recv_from( - &self, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let mut socket = self.reactor.get_socket::(*self.handle); - - match socket.recv_slice(buf) { - // the buffer is empty - Err(udp::RecvError::Exhausted) => {} - r => { - let (size, metadata) = r.map_err(map_err)?; - self.reactor.notify(); - return Poll::Ready(Ok((size, ep2sa(&metadata.endpoint)))); - } - } - - socket.register_recv_waker(cx.waker()); - Poll::Pending - } - /// See note on `poll_recv_from` - pub async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> { - poll_fn(|cx| self.poll_recv_from(cx, buf)).await - } - pub fn local_addr(&self) -> io::Result { - Ok(self.local_addr) - } -} - -/// A raw socket. -pub struct RawSocket { - handle: SocketHandle, - reactor: Arc, -} - -impl RawSocket { - pub(super) async fn new( - reactor: Arc, - ip_version: IpVersion, - ip_protocol: IpProtocol, - ) -> io::Result { - let handle = reactor - .socket_allocator() - .new_raw_socket(ip_version, ip_protocol); - - Ok(RawSocket { handle, reactor }) - } - /// Note that on multiple calls to a poll_* method in the send direction, only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. - pub fn poll_send(&self, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - let mut socket = self.reactor.get_socket::(*self.handle); - - match socket.send_slice(buf) { - // the buffer is full - Err(raw::SendError::BufferFull) => {} - r => { - r.map_err(map_err)?; - self.reactor.notify(); - return Poll::Ready(Ok(buf.len())); - } - } - - socket.register_send_waker(cx.waker()); - Poll::Pending - } - /// See note on `poll_send` - pub async fn send(&self, buf: &[u8]) -> io::Result { - poll_fn(|cx| self.poll_send(cx, buf)).await - } - /// Note that on multiple calls to a poll_* method in the recv direction, only the Waker from the Context passed to the most recent call will be scheduled to receive a wakeup. - pub fn poll_recv(&self, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { - let mut socket = self.reactor.get_socket::(*self.handle); - - match socket.recv_slice(buf) { - // the buffer is empty - Err(raw::RecvError::Exhausted) => {} - r => { - let size = r.map_err(map_err)?; - return Poll::Ready(Ok(size)); - } - } - - socket.register_recv_waker(cx.waker()); - Poll::Pending - } - /// See note on `poll_recv` - pub async fn recv(&self, buf: &mut [u8]) -> io::Result { - poll_fn(|cx| self.poll_recv(cx, buf)).await - } -} diff --git a/easytier/src/gateway/tokio_smoltcp/socket_allocator.rs b/easytier/src/gateway/tokio_smoltcp/socket_allocator.rs index 9b1a055..89c5fc9 100644 --- a/easytier/src/gateway/tokio_smoltcp/socket_allocator.rs +++ b/easytier/src/gateway/tokio_smoltcp/socket_allocator.rs @@ -1,8 +1,7 @@ use parking_lot::Mutex; use smoltcp::{ iface::{SocketHandle as InnerSocketHandle, SocketSet}, - socket::{raw, tcp, udp}, - wire::{IpProtocol, IpVersion}, + socket::tcp, }; use std::{ ops::{Deref, DerefMut}, @@ -14,14 +13,6 @@ use std::{ pub struct BufferSize { pub tcp_rx_size: usize, pub tcp_tx_size: usize, - pub udp_rx_size: usize, - pub udp_tx_size: usize, - pub udp_rx_meta_size: usize, - pub udp_tx_meta_size: usize, - pub raw_rx_size: usize, - pub raw_tx_size: usize, - pub raw_rx_meta_size: usize, - pub raw_tx_meta_size: usize, } impl Default for BufferSize { @@ -29,14 +20,6 @@ impl Default for BufferSize { BufferSize { tcp_rx_size: 8192, tcp_tx_size: 8192, - udp_rx_size: 8192, - udp_tx_size: 8192, - udp_rx_meta_size: 32, - udp_tx_meta_size: 32, - raw_rx_size: 8192, - raw_tx_size: 8192, - raw_rx_meta_size: 32, - raw_tx_meta_size: 32, } } } @@ -65,16 +48,6 @@ impl SocketAlloctor { let handle = set.add(self.alloc_tcp_socket()); SocketHandle::new(handle, self.sockets.clone()) } - pub fn new_udp_socket(&self) -> SocketHandle { - let mut set = self.sockets.lock(); - let handle = set.add(self.alloc_udp_socket()); - SocketHandle::new(handle, self.sockets.clone()) - } - pub fn new_raw_socket(&self, ip_version: IpVersion, ip_protocol: IpProtocol) -> SocketHandle { - let mut set = self.sockets.lock(); - let handle = set.add(self.alloc_raw_socket(ip_version, ip_protocol)); - SocketHandle::new(handle, self.sockets.clone()) - } fn alloc_tcp_socket(&self) -> tcp::Socket<'static> { let rx_buffer = tcp::SocketBuffer::new(vec![0; self.buffer_size.tcp_rx_size]); let tx_buffer = tcp::SocketBuffer::new(vec![0; self.buffer_size.tcp_tx_size]); @@ -83,36 +56,6 @@ impl SocketAlloctor { tcp } - fn alloc_udp_socket(&self) -> udp::Socket<'static> { - let rx_buffer = udp::PacketBuffer::new( - vec![udp::PacketMetadata::EMPTY; self.buffer_size.udp_rx_meta_size], - vec![0; self.buffer_size.udp_rx_size], - ); - let tx_buffer = udp::PacketBuffer::new( - vec![udp::PacketMetadata::EMPTY; self.buffer_size.udp_tx_meta_size], - vec![0; self.buffer_size.udp_tx_size], - ); - let udp = udp::Socket::new(rx_buffer, tx_buffer); - - udp - } - fn alloc_raw_socket( - &self, - ip_version: IpVersion, - ip_protocol: IpProtocol, - ) -> raw::Socket<'static> { - let rx_buffer = raw::PacketBuffer::new( - vec![raw::PacketMetadata::EMPTY; self.buffer_size.raw_rx_meta_size], - vec![0; self.buffer_size.raw_rx_size], - ); - let tx_buffer = raw::PacketBuffer::new( - vec![raw::PacketMetadata::EMPTY; self.buffer_size.raw_tx_meta_size], - vec![0; self.buffer_size.raw_tx_size], - ); - let raw = raw::Socket::new(ip_version, ip_protocol, rx_buffer, tx_buffer); - - raw - } } pub struct SocketHandle(InnerSocketHandle, SharedSocketSet); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index e3e0982..17f239b 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -1,14 +1,10 @@ use std::collections::HashSet; use std::net::Ipv4Addr; -use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use anyhow::Context; use cidr::Ipv4Inet; -use futures::{SinkExt, StreamExt}; - -use pnet::packet::ipv4::Ipv4Packet; use tokio::{sync::Mutex, task::JoinSet}; use tonic::transport::server::TcpIncoming; @@ -31,15 +27,9 @@ use crate::peers::rpc_service::PeerManagerRpcService; use crate::peers::PacketRecvChanReceiver; use crate::rpc::vpn_portal_rpc_server::VpnPortalRpc; use crate::rpc::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo}; -use crate::tunnel::packet_def::ZCPacket; - -use crate::tunnel::{ZCPacketSink, ZCPacketStream}; use crate::vpn_portal::{self, VpnPortal}; use super::listeners::ListenerManager; -use super::virtual_nic; - -use crate::common::ifcfg::IfConfiguerTrait; #[derive(Clone)] struct IpProxy { @@ -82,200 +72,21 @@ impl IpProxy { } } -struct NicCtx { - global_ctx: ArcGlobalCtx, - peer_mgr: Weak, - peer_packet_receiver: Arc>, - - nic: Arc>, - tasks: JoinSet<()>, -} - +#[cfg(feature = "tun")] +type NicCtx = super::virtual_nic::NicCtx; +#[cfg(not(feature = "tun"))] +struct NicCtx; +#[cfg(not(feature = "tun"))] impl NicCtx { - fn new( - global_ctx: ArcGlobalCtx, - peer_manager: &Arc, - peer_packet_receiver: Arc>, + pub fn new( + _global_ctx: ArcGlobalCtx, + _peer_manager: &Arc, + _peer_packet_receiver: Arc>, ) -> Self { - NicCtx { - global_ctx: global_ctx.clone(), - peer_mgr: Arc::downgrade(&peer_manager), - peer_packet_receiver, - nic: Arc::new(Mutex::new(virtual_nic::VirtualNic::new(global_ctx))), - tasks: JoinSet::new(), - } + Self } - async fn assign_ipv4_to_tun_device(&self, ipv4_addr: Ipv4Addr) -> Result<(), Error> { - let nic = self.nic.lock().await; - nic.link_up().await?; - nic.remove_ip(None).await?; - nic.add_ip(ipv4_addr, 24).await?; - if cfg!(target_os = "macos") { - nic.add_route(ipv4_addr, 24).await?; - } - Ok(()) - } - - async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) { - if let Some(ipv4) = Ipv4Packet::new(ret.payload()) { - if ipv4.get_version() != 4 { - tracing::info!("[USER_PACKET] not ipv4 packet: {:?}", ipv4); - return; - } - let dst_ipv4 = ipv4.get_destination(); - tracing::trace!( - ?ret, - "[USER_PACKET] recv new packet from tun device and forward to peers." - ); - - // TODO: use zero-copy - let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await; - if send_ret.is_err() { - tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed") - } - } else { - tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet"); - } - } - - fn do_forward_nic_to_peers( - &mut self, - mut stream: Pin>, - ) -> Result<(), Error> { - // read from nic and write to corresponding tunnel - let Some(mgr) = self.peer_mgr.upgrade() else { - return Err(anyhow::anyhow!("peer manager not available").into()); - }; - self.tasks.spawn(async move { - while let Some(ret) = stream.next().await { - if ret.is_err() { - log::error!("read from nic failed: {:?}", ret); - break; - } - Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await; - } - }); - - Ok(()) - } - - fn do_forward_peers_to_nic(&mut self, mut sink: Pin>) { - let channel = self.peer_packet_receiver.clone(); - self.tasks.spawn(async move { - // unlock until coroutine finished - let mut channel = channel.lock().await; - while let Some(packet) = channel.recv().await { - tracing::trace!( - "[USER_PACKET] forward packet from peers to nic. packet: {:?}", - packet - ); - let ret = sink.send(packet).await; - if ret.is_err() { - tracing::error!(?ret, "do_forward_tunnel_to_nic sink error"); - } - } - }); - } - - async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> { - let Some(peer_mgr) = self.peer_mgr.upgrade() else { - return Err(anyhow::anyhow!("peer manager not available").into()); - }; - let global_ctx = self.global_ctx.clone(); - let net_ns = self.global_ctx.net_ns.clone(); - let nic = self.nic.lock().await; - let ifcfg = nic.get_ifcfg(); - let ifname = nic.ifname().to_owned(); - - self.tasks.spawn(async move { - let mut cur_proxy_cidrs = vec![]; - loop { - let mut proxy_cidrs = vec![]; - let routes = peer_mgr.list_routes().await; - for r in routes { - for cidr in r.proxy_cidrs { - let Ok(cidr) = cidr.parse::() else { - continue; - }; - proxy_cidrs.push(cidr); - } - } - // add vpn portal cidr to proxy_cidrs - if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() { - proxy_cidrs.push(vpn_cfg.client_cidr); - } - - // if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it. - for cidr in cur_proxy_cidrs.iter() { - if proxy_cidrs.contains(cidr) { - continue; - } - - let _g = net_ns.guard(); - let ret = ifcfg - .remove_ipv4_route( - ifname.as_str(), - cidr.first_address(), - cidr.network_length(), - ) - .await; - - if ret.is_err() { - tracing::trace!( - cidr = ?cidr, - err = ?ret, - "remove route failed.", - ); - } - } - - for cidr in proxy_cidrs.iter() { - if cur_proxy_cidrs.contains(cidr) { - continue; - } - let _g = net_ns.guard(); - let ret = ifcfg - .add_ipv4_route( - ifname.as_str(), - cidr.first_address(), - cidr.network_length(), - ) - .await; - - if ret.is_err() { - tracing::trace!( - cidr = ?cidr, - err = ?ret, - "add route failed.", - ); - } - } - - cur_proxy_cidrs = proxy_cidrs; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }); - - Ok(()) - } - - async fn run(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> { - let tunnel = { - let mut nic = self.nic.lock().await; - let ret = nic.create_dev().await?; - self.global_ctx - .issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string())); - ret - }; - - let (stream, sink) = tunnel.split(); - - self.do_forward_nic_to_peers(stream)?; - self.do_forward_peers_to_nic(sink); - - self.assign_ipv4_to_tun_device(ipv4_addr).await?; - self.run_proxy_cidrs_route_updater().await?; + pub async fn run(&mut self, _ipv4_addr: Ipv4Addr) -> Result<(), Error> { Ok(()) } } @@ -311,7 +122,7 @@ impl Instance { pub fn new(config: impl ConfigLoader + Send + Sync + 'static) -> Self { let global_ctx = Arc::new(GlobalCtx::new(config)); - log::info!( + tracing::info!( "[INIT] instance creating. config: {}", global_ctx.config.dump() ); @@ -584,7 +395,7 @@ impl Instance { pub async fn wait(&mut self) { while let Some(ret) = self.tasks.join_next().await { - log::info!("task finished: {:?}", ret); + tracing::info!("task finished: {:?}", ret); ret.unwrap(); } } diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index c58ca02..7cb4e3a 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -171,7 +171,7 @@ impl ListenerManage for listener in &self.listeners { let _guard = self.net_ns.guard(); let addr = listener.inner.lock().await.local_url(); - log::warn!("run listener: {:?}", listener); + tracing::warn!("run listener: {:?}", listener); listener .inner .lock() diff --git a/easytier/src/instance/mod.rs b/easytier/src/instance/mod.rs index da4814b..e504964 100644 --- a/easytier/src/instance/mod.rs +++ b/easytier/src/instance/mod.rs @@ -1,4 +1,7 @@ pub mod instance; pub mod listeners; + +#[cfg(feature = "tun")] pub mod tun_codec; +#[cfg(feature = "tun")] pub mod virtual_nic; diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index 86ecec1..02dd9b7 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -2,27 +2,34 @@ use std::{ io, net::Ipv4Addr, pin::Pin, + sync::{Arc, Weak}, task::{Context, Poll}, }; use crate::{ common::{ error::Error, - global_ctx::ArcGlobalCtx, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, ifcfg::{IfConfiger, IfConfiguerTrait}, }, + peers::{peer_manager::PeerManager, PacketRecvChanReceiver}, tunnel::{ common::{reserve_buf, FramedWriter, TunnelWrapper, ZCPacketToBytes}, packet_def::{ZCPacket, ZCPacketType, TAIL_RESERVED_SIZE}, - StreamItem, Tunnel, TunnelError, + StreamItem, Tunnel, TunnelError, ZCPacketSink, ZCPacketStream, }, }; use byteorder::WriteBytesExt as _; use bytes::{BufMut, BytesMut}; -use futures::{lock::BiLock, ready, Stream}; +use futures::{lock::BiLock, ready, SinkExt, Stream, StreamExt}; use pin_project_lite::pin_project; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use pnet::packet::ipv4::Ipv4Packet; +use tokio::{ + io::{AsyncRead, AsyncWrite, ReadBuf}, + sync::Mutex, + task::JoinSet, +}; use tokio_util::bytes::Bytes; use tun::{create_as_async, AsyncDevice, Configuration, Device as _, Layer}; use zerocopy::{NativeEndian, NetworkEndian}; @@ -382,6 +389,206 @@ impl VirtualNic { IfConfiger {} } } + +pub struct NicCtx { + global_ctx: ArcGlobalCtx, + peer_mgr: Weak, + peer_packet_receiver: Arc>, + + nic: Arc>, + tasks: JoinSet<()>, +} + +impl NicCtx { + pub fn new( + global_ctx: ArcGlobalCtx, + peer_manager: &Arc, + peer_packet_receiver: Arc>, + ) -> Self { + NicCtx { + global_ctx: global_ctx.clone(), + peer_mgr: Arc::downgrade(&peer_manager), + peer_packet_receiver, + nic: Arc::new(Mutex::new(VirtualNic::new(global_ctx))), + tasks: JoinSet::new(), + } + } + + async fn assign_ipv4_to_tun_device(&self, ipv4_addr: Ipv4Addr) -> Result<(), Error> { + let nic = self.nic.lock().await; + nic.link_up().await?; + nic.remove_ip(None).await?; + nic.add_ip(ipv4_addr, 24).await?; + if cfg!(target_os = "macos") { + nic.add_route(ipv4_addr, 24).await?; + } + Ok(()) + } + + async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) { + if let Some(ipv4) = Ipv4Packet::new(ret.payload()) { + if ipv4.get_version() != 4 { + tracing::info!("[USER_PACKET] not ipv4 packet: {:?}", ipv4); + return; + } + let dst_ipv4 = ipv4.get_destination(); + tracing::trace!( + ?ret, + "[USER_PACKET] recv new packet from tun device and forward to peers." + ); + + // TODO: use zero-copy + let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await; + if send_ret.is_err() { + tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed") + } + } else { + tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet"); + } + } + + fn do_forward_nic_to_peers( + &mut self, + mut stream: Pin>, + ) -> Result<(), Error> { + // read from nic and write to corresponding tunnel + let Some(mgr) = self.peer_mgr.upgrade() else { + return Err(anyhow::anyhow!("peer manager not available").into()); + }; + self.tasks.spawn(async move { + while let Some(ret) = stream.next().await { + if ret.is_err() { + tracing::error!("read from nic failed: {:?}", ret); + break; + } + Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await; + } + }); + + Ok(()) + } + + fn do_forward_peers_to_nic(&mut self, mut sink: Pin>) { + let channel = self.peer_packet_receiver.clone(); + self.tasks.spawn(async move { + // unlock until coroutine finished + let mut channel = channel.lock().await; + while let Some(packet) = channel.recv().await { + tracing::trace!( + "[USER_PACKET] forward packet from peers to nic. packet: {:?}", + packet + ); + let ret = sink.send(packet).await; + if ret.is_err() { + tracing::error!(?ret, "do_forward_tunnel_to_nic sink error"); + } + } + }); + } + + async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> { + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + return Err(anyhow::anyhow!("peer manager not available").into()); + }; + let global_ctx = self.global_ctx.clone(); + let net_ns = self.global_ctx.net_ns.clone(); + let nic = self.nic.lock().await; + let ifcfg = nic.get_ifcfg(); + let ifname = nic.ifname().to_owned(); + + self.tasks.spawn(async move { + let mut cur_proxy_cidrs = vec![]; + loop { + let mut proxy_cidrs = vec![]; + let routes = peer_mgr.list_routes().await; + for r in routes { + for cidr in r.proxy_cidrs { + let Ok(cidr) = cidr.parse::() else { + continue; + }; + proxy_cidrs.push(cidr); + } + } + // add vpn portal cidr to proxy_cidrs + if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() { + proxy_cidrs.push(vpn_cfg.client_cidr); + } + + // if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it. + for cidr in cur_proxy_cidrs.iter() { + if proxy_cidrs.contains(cidr) { + continue; + } + + let _g = net_ns.guard(); + let ret = ifcfg + .remove_ipv4_route( + ifname.as_str(), + cidr.first_address(), + cidr.network_length(), + ) + .await; + + if ret.is_err() { + tracing::trace!( + cidr = ?cidr, + err = ?ret, + "remove route failed.", + ); + } + } + + for cidr in proxy_cidrs.iter() { + if cur_proxy_cidrs.contains(cidr) { + continue; + } + let _g = net_ns.guard(); + let ret = ifcfg + .add_ipv4_route( + ifname.as_str(), + cidr.first_address(), + cidr.network_length(), + ) + .await; + + if ret.is_err() { + tracing::trace!( + cidr = ?cidr, + err = ?ret, + "add route failed.", + ); + } + } + + cur_proxy_cidrs = proxy_cidrs; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }); + + Ok(()) + } + + pub async fn run(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> { + let tunnel = { + let mut nic = self.nic.lock().await; + let ret = nic.create_dev().await?; + self.global_ctx + .issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string())); + ret + }; + + let (stream, sink) = tunnel.split(); + + self.do_forward_nic_to_peers(stream)?; + self.do_forward_peers_to_nic(sink); + + self.assign_ipv4_to_tun_device(ipv4_addr).await?; + self.run_proxy_cidrs_route_updater().await?; + + Ok(()) + } +} + #[cfg(test)] mod tests { use crate::common::{error::Error, global_ctx::tests::get_mock_global_ctx}; diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 4a6e586..b57876f 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -169,7 +169,7 @@ impl PeerConn { match self.wait_handshake(&mut need_retry).await { Ok(rsp) => return Ok(rsp), Err(e) => { - log::warn!("wait handshake error: {:?}", e); + tracing::warn!("wait handshake error: {:?}", e); if !need_retry { return Err(e); } @@ -313,7 +313,7 @@ impl PeerConn { tracing::warn!(?pingpong, "pingpong task exit"); if let Err(e) = close_event_sender.send(conn_id).await { - log::warn!("close event sender error: {:?}", e); + tracing::warn!("close event sender error: {:?}", e); } Ok(()) diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 83d5f57..1a198b4 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -333,7 +333,7 @@ impl PeerManager { let foreign_client = self.foreign_network_client.clone(); let encryptor = self.encryptor.clone(); self.tasks.lock().await.spawn(async move { - log::trace!("start_peer_recv"); + tracing::trace!("start_peer_recv"); while let Some(mut ret) = recv.next().await { let Some(hdr) = ret.mut_peer_manager_header() else { tracing::warn!(?ret, "invalid packet, skip"); @@ -572,7 +572,7 @@ impl PeerManager { } pub async fn send_msg_ipv4(&self, mut msg: ZCPacket, ipv4_addr: Ipv4Addr) -> Result<(), Error> { - log::trace!( + tracing::trace!( "do send_msg in peer manager, msg: {:?}, ipv4_addr: {}", msg, ipv4_addr diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 286b9d1..b7b71fc 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -83,7 +83,7 @@ impl PeerMap { peer.send_msg(msg).await?; } None => { - log::error!("no peer for dst_peer_id: {}", dst_peer_id); + tracing::error!("no peer for dst_peer_id: {}", dst_peer_id); return Err(Error::RouteError(Some(format!( "peer map sengmsg directly no connected dst_peer_id: {}", dst_peer_id diff --git a/easytier/src/peers/peer_rip_route.rs b/easytier/src/peers/peer_rip_route.rs index 2ce0a05..fb696d9 100644 --- a/easytier/src/peers/peer_rip_route.rs +++ b/easytier/src/peers/peer_rip_route.rs @@ -258,14 +258,14 @@ impl BasicRoute { .clone(); if ret.cost > 6 { - log::error!( + tracing::error!( "cost too large: {}, may lost connection, remove it", ret.cost ); route_table.route_info.remove(&node_id); } - log::trace!( + tracing::trace!( "update route info, to: {:?}, gateway: {:?}, cost: {}, peer: {:?}", node_id, peer_id, @@ -292,13 +292,13 @@ impl BasicRoute { continue; } update(neighbor.cost + 1, &neighbor); - log::trace!("route info: {:?}", neighbor); + tracing::trace!("route info: {:?}", neighbor); } // add the sender peer to route info update(1, &packet.myself); - log::trace!("my_id: {:?}, current route table: {:?}", my_id, route_table); + tracing::trace!("my_id: {:?}, current route table: {:?}", my_id, route_table); } async fn send_sync_peer_request( @@ -393,13 +393,13 @@ impl BasicRoute { match &ret { Ok(_) => { - log::trace!("send sync peer request to peer: {}", peer); + tracing::trace!("send sync peer request to peer: {}", peer); } Err(Error::PeerNoConnectionError(_)) => { - log::trace!("peer {} no connection", peer); + tracing::trace!("peer {} no connection", peer); } Err(e) => { - log::error!( + tracing::error!( "send sync peer request to peer: {} error: {:?}", peer, e @@ -416,10 +416,10 @@ impl BasicRoute { tokio::select! { _ = notifier.notified() => { - log::trace!("sync peer request triggered by notifier"); + tracing::trace!("sync peer request triggered by notifier"); } _ = tokio::time::sleep(Duration::from_secs(1)) => { - log::trace!("sync peer request triggered by timeout"); + tracing::trace!("sync peer request triggered by timeout"); } } } @@ -454,7 +454,7 @@ impl BasicRoute { } for k in need_remove.iter() { - log::warn!("remove expired sync peer: {:?}", k); + tracing::warn!("remove expired sync peer: {:?}", k); sync_peer_from_remote.remove(k); } @@ -565,7 +565,7 @@ impl Route for BasicRoute { return Some(info.peer_id.clone().into()); } None => { - log::error!("no route info for dst_peer_id: {}", dst_peer_id); + tracing::error!("no route info for dst_peer_id: {}", dst_peer_id); return None; } } @@ -612,7 +612,7 @@ impl Route for BasicRoute { return Some(peer_id); } - log::info!("no peer id for ipv4: {}", ipv4_addr); + tracing::info!("no peer id for ipv4: {}", ipv4_addr); return None; } } diff --git a/easytier/src/peers/peer_rpc.rs b/easytier/src/peers/peer_rpc.rs index 7d87459..f28738f 100644 --- a/easytier/src/peers/peer_rpc.rs +++ b/easytier/src/peers/peer_rpc.rs @@ -302,7 +302,7 @@ impl PeerRpcManager { ); } - log::info!( + tracing::info!( "[PEER RPC MGR] register service {} succeed, my_node_id {}", service_id, self.tspt.my_peer_id() @@ -374,7 +374,7 @@ impl PeerRpcManager { if info.is_req { if !service_registry.contains_key(&info.service_id) { - log::warn!( + tracing::warn!( "service {} not found, my_node_id: {}", info.service_id, tspt.my_peer_id() @@ -407,7 +407,7 @@ impl PeerRpcManager { tracing::error!(error = ?e, "send resp to client failed"); } } else { - log::warn!("client resp receiver not found, info: {:?}", info); + tracing::warn!("client resp receiver not found, info: {:?}", info); } } } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 7b7376d..e796239 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -67,7 +67,6 @@ pub async fn init_three_node_ex TomlConfigLoader>( proto: &str, cfg_cb: F, ) -> Vec { - log::set_max_level(log::LevelFilter::Info); prepare_linux_namespaces(); let mut inst1 = Instance::new(cfg_cb(get_inst_config( @@ -559,7 +558,7 @@ fn run_wireguard_client( // Peer secret key let mut peer = Peer::new(peer_public_key.clone()); - log::info!("endpoint"); + tracing::info!("endpoint"); // Peer endpoint and interval peer.endpoint = Some(endpoint); peer.persistent_keepalive_interval = Some(1); diff --git a/easytier/src/tunnel/common.rs b/easytier/src/tunnel/common.rs index a7c4c11..86a485a 100644 --- a/easytier/src/tunnel/common.rs +++ b/easytier/src/tunnel/common.rs @@ -343,7 +343,7 @@ pub(crate) fn get_interface_name_by_ip(local_ip: &IpAddr) -> Option { pub(crate) fn setup_sokcet2_ext( socket2_socket: &socket2::Socket, bind_addr: &SocketAddr, - bind_dev: Option, + #[allow(unused_variables)] bind_dev: Option, ) -> Result<(), TunnelError> { #[cfg(target_os = "windows")] { diff --git a/easytier/src/tunnel/ring.rs b/easytier/src/tunnel/ring.rs index 09e154f..ae10c06 100644 --- a/easytier/src/tunnel/ring.rs +++ b/easytier/src/tunnel/ring.rs @@ -288,7 +288,7 @@ impl RingTunnelListener { #[async_trait] impl TunnelListener for RingTunnelListener { async fn listen(&mut self) -> Result<(), TunnelError> { - log::info!("listen new conn of key: {}", self.listerner_addr); + tracing::info!("listen new conn of key: {}", self.listerner_addr); CONNECTION_MAP .lock() .await @@ -297,11 +297,11 @@ impl TunnelListener for RingTunnelListener { } async fn accept(&mut self) -> Result, TunnelError> { - log::info!("waiting accept new conn of key: {}", self.listerner_addr); + tracing::info!("waiting accept new conn of key: {}", self.listerner_addr); let my_addr = self.get_addr()?; if let Some(conn) = self.conn_receiver.recv().await { if conn.server.id == my_addr { - log::info!("accept new conn of key: {}", self.listerner_addr); + tracing::info!("accept new conn of key: {}", self.listerner_addr); return Ok(Box::new(get_tunnel_for_server(conn))); } else { tracing::error!(?conn.server.id, ?my_addr, "got new conn with wrong id"); @@ -341,7 +341,7 @@ impl TunnelConnector for RingTunnelConnector { .get(&remote_addr) .unwrap() .clone(); - log::info!("connecting"); + tracing::info!("connecting"); let conn = Arc::new(Connection { client: Arc::new(RingTunnel::new(RING_TUNNEL_CAP)), server: Arc::new(RingTunnel::new_with_id( diff --git a/easytier/src/tunnel/udp.rs b/easytier/src/tunnel/udp.rs index fbf4bed..d969ee1 100644 --- a/easytier/src/tunnel/udp.rs +++ b/easytier/src/tunnel/udp.rs @@ -429,7 +429,7 @@ impl TunnelListener for UdpTunnelListener { } async fn accept(&mut self) -> Result, super::TunnelError> { - log::info!("start udp accept: {:?}", self.addr); + tracing::info!("start udp accept: {:?}", self.addr); while let Some(conn) = self.conn_recv.recv().await { return Ok(conn); } @@ -619,7 +619,7 @@ impl UdpTunnelConnector { socket: Arc, addr: SocketAddr, ) -> Result, super::TunnelError> { - log::warn!("udp connect: {:?}", self.addr); + tracing::warn!("udp connect: {:?}", self.addr); #[cfg(target_os = "windows")] crate::arch::windows::disable_connection_reset(socket.as_ref())?;