From d2291628e0841af8e98892656725c213269e4f6a Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Fri, 11 Oct 2024 00:12:14 +0800 Subject: [PATCH] mpsc tunnel may be stuck by slow tcp stream, should not panic for this (#406) * mpsc tunnel may be stuck by slow tcp stream, should not panic for this * disallow node connect to self --- easytier/src/peers/peer_conn.rs | 32 +++++++++++++++++++++++++-- easytier/src/peers/peer_conn_ping.rs | 2 ++ easytier/src/tunnel/mpsc.rs | 33 ++++++++++++++++++++-------- 3 files changed, 56 insertions(+), 11 deletions(-) diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 0389c91..b1c1962 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -224,7 +224,12 @@ impl PeerConn { self.info = Some(rsp); self.is_client = Some(false); self.send_handshake().await?; - Ok(()) + + if self.get_peer_id() == self.my_peer_id { + Err(Error::WaitRespError("peer id conflict".to_owned())) + } else { + Ok(()) + } } #[tracing::instrument] @@ -235,7 +240,12 @@ impl PeerConn { tracing::info!("handshake response: {:?}", rsp); self.info = Some(rsp); self.is_client = Some(true); - Ok(()) + + if self.get_peer_id() == self.my_peer_id { + Err(Error::WaitRespError("peer id conflict".to_owned())) + } else { + Ok(()) + } } pub fn handshake_done(&self) -> bool { @@ -396,6 +406,24 @@ mod tests { use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::ring::create_ring_tunnel_pair; + #[tokio::test] + async fn peer_conn_handshake_same_id() { + let (c, s) = create_ring_tunnel_pair(); + let c_peer_id = new_peer_id(); + let s_peer_id = c_peer_id; + + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + + assert!(c_ret.is_err()); + assert!(s_ret.is_err()); + } + #[tokio::test] async fn peer_conn_handshake() { let (c, s) = create_ring_tunnel_pair(); diff --git a/easytier/src/peers/peer_conn_ping.rs b/easytier/src/peers/peer_conn_ping.rs index fd6c056..5192549 100644 --- a/easytier/src/peers/peer_conn_ping.rs +++ b/easytier/src/peers/peer_conn_ping.rs @@ -294,6 +294,8 @@ impl PeerConnPinger { let need_close = if last_rx_packets != current_rx_packets { // if we receive some packet from peers, we should relax the condition counter > 50 && loss_rate_1 > 0.5 + + // TODO: wait more time to see if the loss rate is still high after no rx } else { true }; diff --git a/easytier/src/tunnel/mpsc.rs b/easytier/src/tunnel/mpsc.rs index 37f3498..927688c 100644 --- a/easytier/src/tunnel/mpsc.rs +++ b/easytier/src/tunnel/mpsc.rs @@ -70,17 +70,32 @@ impl MpscTunnel { sink: &mut Pin>, ) -> Result<(), TunnelError> { let item = rx.recv().await.with_context(|| "recv error")?; - sink.feed(item).await?; - while let Ok(item) = rx.try_recv() { - if let Err(e) = timeout(Duration::from_secs(5), sink.feed(item)) - .await - .unwrap() - { - tracing::error!(?e, "feed error"); - break; + + match timeout(Duration::from_secs(10), async move { + sink.feed(item).await?; + while let Ok(item) = rx.try_recv() { + match sink.feed(item).await { + Err(e) => { + tracing::error!(?e, "feed error"); + return Err(e); + } + Ok(_) => {} + } + } + sink.flush().await + }) + .await + { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => { + tracing::error!(?e, "forward error"); + Err(e) + } + Err(e) => { + tracing::error!(?e, "forward timeout"); + Err(e.into()) } } - sink.flush().await } pub fn get_stream(&mut self) -> Pin> {