From 2058dbc470a55f41034b79fcf01dcaa883a1130d Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 31 Aug 2024 12:44:12 +0800 Subject: [PATCH] fix wg client hang after some time (#297) wg portal doesn't know client disconnect causing msg overstocked in queue, make entire peer packet process pipeline hang. --- easytier/src/gateway/fast_socks5/mod.rs | 4 --- easytier/src/tunnel/mpsc.rs | 7 +++++ easytier/src/vpn_portal/wireguard.rs | 41 ++++++++++++++++++++++--- 3 files changed, 43 insertions(+), 9 deletions(-) diff --git a/easytier/src/gateway/fast_socks5/mod.rs b/easytier/src/gateway/fast_socks5/mod.rs index f701046..133c94c 100644 --- a/easytier/src/gateway/fast_socks5/mod.rs +++ b/easytier/src/gateway/fast_socks5/mod.rs @@ -187,10 +187,6 @@ pub enum SocksError { #[error("Error with reply: {0}.")] ReplyError(#[from] ReplyError), - #[cfg(feature = "socks4")] - #[error("Error with reply: {0}.")] - ReplySocks4Error(#[from] socks4::ReplyError), - #[error("Argument input error: `{0}`.")] ArgumentInputError(&'static str), diff --git a/easytier/src/tunnel/mpsc.rs b/easytier/src/tunnel/mpsc.rs index b49e2ac..34cb1ff 100644 --- a/easytier/src/tunnel/mpsc.rs +++ b/easytier/src/tunnel/mpsc.rs @@ -19,6 +19,13 @@ impl MpscTunnelSender { self.0.send(item).await.with_context(|| "send error")?; Ok(()) } + + pub fn try_send(&self, item: ZCPacket) -> Result<(), TunnelError> { + self.0.try_send(item).map_err(|e| match e { + tachyonix::TrySendError::Full(_) => TunnelError::BufferFull, + tachyonix::TrySendError::Closed(_) => TunnelError::Shutdown, + }) + } } pub struct MpscTunnel { diff --git a/easytier/src/vpn_portal/wireguard.rs b/easytier/src/vpn_portal/wireguard.rs index 9ef3428..43e6205 100644 --- a/easytier/src/vpn_portal/wireguard.rs +++ b/easytier/src/vpn_portal/wireguard.rs @@ -1,6 +1,7 @@ use std::{ net::{Ipv4Addr, SocketAddr}, sync::Arc, + time::Duration, }; use anyhow::Context; @@ -9,7 +10,7 @@ use cidr::Ipv4Inet; use dashmap::DashMap; use futures::StreamExt; use pnet::packet::ipv4::Ipv4Packet; -use tokio::task::JoinSet; +use tokio::{task::JoinSet, time::timeout}; use tracing::Level; use crate::{ @@ -23,7 +24,7 @@ use crate::{ mpsc::{MpscTunnel, MpscTunnelSender}, packet_def::{PacketType, ZCPacket, ZCPacketType}, wireguard::{WgConfig, WgTunnelListener}, - Tunnel, TunnelListener, + Tunnel, TunnelError, TunnelListener, }, }; @@ -92,7 +93,25 @@ impl WireGuardImpl { info.remote_addr.clone(), )); - while let Some(Ok(msg)) = stream.next().await { + let mut map_key = None; + + loop { + let msg = match timeout(Duration::from_secs(120), stream.next()).await { + Ok(Some(Ok(msg))) => msg, + Ok(Some(Err(err))) => { + tracing::error!(?err, "Failed to receive from wg client"); + break; + } + Ok(None) => { + tracing::info!("Wireguard client disconnected"); + break; + } + Err(err) => { + tracing::error!(?err, "Timeout while receiving from wg client"); + break; + } + }; + assert_eq!(msg.packet_type(), ZCPacketType::WG); let inner = msg.inner(); let Some(i) = Ipv4Packet::new(&inner) else { @@ -104,6 +123,7 @@ impl WireGuardImpl { endpoint_addr: remote_addr.parse().ok(), sink: mpsc_tunnel.get_sink(), }); + map_key = Some(i.get_source()); wg_peer_ip_table.insert(i.get_source(), client_entry.clone()); ip_registered = true; } @@ -114,6 +134,11 @@ impl WireGuardImpl { .await; } + if map_key.is_some() { + tracing::info!(?map_key, "Removing wg client from table"); + wg_peer_ip_table.remove(&map_key.unwrap()); + } + peer_mgr .get_global_ctx() .issue_event(GlobalCtxEvent::VpnPortalClientDisconnected( @@ -157,9 +182,15 @@ impl WireGuardImpl { ZCPacketType::WG, ); - if let Err(ret) = entry.sink.send(packet).await { - tracing::debug!(?ret, "Failed to send packet to wg client"); + match entry.sink.try_send(packet) { + Ok(_) => { + tracing::trace!("Sent packet to wg client"); + } + Err(e) => { + tracing::debug!(?e, "Failed to send packet to wg client"); + } } + None } }