diff --git a/easytier-core/proto/cli.proto b/easytier-core/proto/cli.proto index a72f63d..f7e6f40 100644 --- a/easytier-core/proto/cli.proto +++ b/easytier-core/proto/cli.proto @@ -29,6 +29,7 @@ message PeerConnInfo { repeated string features = 4; TunnelInfo tunnel = 5; PeerConnStats stats = 6; + float loss_rate = 7; } message PeerInfo { diff --git a/easytier-core/src/peers/packet.rs b/easytier-core/src/peers/packet.rs index c1127c7..52a8113 100644 --- a/easytier-core/src/peers/packet.rs +++ b/easytier-core/src/peers/packet.rs @@ -71,8 +71,8 @@ pub struct RoutePacket { pub enum CtrlPacketBody { HandShake(HandShake), RoutePacket(RoutePacket), - Ping, - Pong, + Ping(u32), + Pong(u32), TaRpc(u32, bool, Vec), // u32: service_id, bool: is_req, Vec: rpc body } @@ -155,19 +155,19 @@ impl Packet { } } - pub fn new_ping_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid) -> Self { + pub fn new_ping_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self { Packet { from_peer: from_peer.into(), to_peer: Some(to_peer.into()), - body: PacketBody::Ctrl(CtrlPacketBody::Ping), + body: PacketBody::Ctrl(CtrlPacketBody::Ping(seq)), } } - pub fn new_pong_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid) -> Self { + pub fn new_pong_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self { Packet { from_peer: from_peer.into(), to_peer: Some(to_peer.into()), - body: PacketBody::Ctrl(CtrlPacketBody::Pong), + body: PacketBody::Ctrl(CtrlPacketBody::Pong(seq)), } } diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index fd1eb1a..ef981db 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -1,11 +1,18 @@ -use std::{pin::Pin, sync::Arc}; +use std::{ + fmt::Debug, + pin::Pin, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; use easytier_rpc::{PeerConnInfo, PeerConnStats}; use futures::{SinkExt, StreamExt}; use pnet::datalink::NetworkInterface; use tokio::{ - sync::{broadcast, mpsc}, + sync::{broadcast, mpsc, Mutex}, task::JoinSet, time::{timeout, Duration}, }; @@ -56,6 +63,19 @@ macro_rules! wait_response { }; } +fn build_ctrl_msg(msg: Bytes, is_req: bool) -> Bytes { + let prefix: &'static [u8] = if is_req { + CTRL_REQ_PACKET_PREFIX + } else { + CTRL_RESP_PACKET_PREFIX + }; + let mut new_msg = BytesMut::new(); + new_msg.reserve(prefix.len() + msg.len()); + new_msg.extend_from_slice(prefix); + new_msg.extend_from_slice(&msg); + new_msg.into() +} + pub struct PeerInfo { magic: u32, pub my_peer_id: uuid::Uuid, @@ -76,6 +96,202 @@ impl<'a> From<&ArchivedHandShake> for PeerInfo { } } +struct PeerConnPinger { + my_node_id: uuid::Uuid, + peer_id: uuid::Uuid, + sink: Arc>>>, + ctrl_sender: broadcast::Sender, + latency_stats: Arc, + loss_rate_stats: Arc, + tasks: JoinSet>, +} + +impl Debug for PeerConnPinger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerConnPinger") + .field("my_node_id", &self.my_node_id) + .field("peer_id", &self.peer_id) + .finish() + } +} + +impl PeerConnPinger { + pub fn new( + my_node_id: uuid::Uuid, + peer_id: uuid::Uuid, + sink: Pin>, + ctrl_sender: broadcast::Sender, + latency_stats: Arc, + loss_rate_stats: Arc, + ) -> Self { + Self { + my_node_id, + peer_id, + sink: Arc::new(Mutex::new(sink)), + tasks: JoinSet::new(), + latency_stats, + ctrl_sender, + loss_rate_stats, + } + } + + pub async fn ping(&self) -> Result<(), TunnelError> { + let mut sink = self.sink.lock().await; + let ping_packet = Packet::new_ping_packet(uuid::Uuid::new_v4(), uuid::Uuid::new_v4(), 0); + sink.send(ping_packet.into()).await?; + Ok(()) + } + + async fn do_pingpong_once( + my_node_id: uuid::Uuid, + peer_id: uuid::Uuid, + sink: Arc>>>, + receiver: &mut broadcast::Receiver, + seq: u32, + ) -> Result { + // should add seq here. so latency can be calculated more accurately + let req = build_ctrl_msg( + packet::Packet::new_ping_packet(my_node_id, peer_id, seq).into(), + true, + ); + tracing::trace!("send ping packet: {:?}", req); + sink.lock().await.send(req).await.map_err(|e| { + tracing::warn!("send ping packet error: {:?}", e); + TunnelError::CommonError("send ping packet error".to_owned()) + })?; + + let now = std::time::Instant::now(); + + // wait until we get a pong packet in ctrl_resp_receiver + let resp = timeout(Duration::from_secs(1), async { + loop { + match receiver.recv().await { + Ok(p) => { + if let packet::ArchivedPacketBody::Ctrl( + packet::ArchivedCtrlPacketBody::Pong(resp_seq), + ) = &Packet::decode(&p).body + { + if *resp_seq == seq { + break; + } + } + } + Err(e) => { + log::warn!("recv pong resp error: {:?}", e); + return Err(TunnelError::WaitRespError( + "recv pong resp error".to_owned(), + )); + } + } + } + Ok(()) + }) + .await; + + tracing::trace!(?resp, "wait ping response done"); + + if resp.is_err() { + return Err(TunnelError::WaitRespError( + "wait ping response timeout".to_owned(), + )); + } + + if resp.as_ref().unwrap().is_err() { + return Err(resp.unwrap().err().unwrap()); + } + + Ok(now.elapsed().as_micros()) + } + + async fn pingpong(&mut self) { + let sink = self.sink.clone(); + let my_node_id = self.my_node_id; + let peer_id = self.peer_id; + let latency_stats = self.latency_stats.clone(); + + let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100); + + // generate a pingpong task every 200ms + let mut pingpong_tasks = JoinSet::new(); + let ctrl_resp_sender = self.ctrl_sender.clone(); + self.tasks.spawn(async move { + let mut req_seq = 0; + loop { + let receiver = ctrl_resp_sender.subscribe(); + let ping_res_sender = ping_res_sender.clone(); + let sink = sink.clone(); + + while pingpong_tasks.len() > 5 { + pingpong_tasks.join_next().await; + } + + pingpong_tasks.spawn(async move { + let mut receiver = receiver.resubscribe(); + let pingpong_once_ret = Self::do_pingpong_once( + my_node_id, + peer_id, + sink.clone(), + &mut receiver, + req_seq, + ) + .await; + + let _ = ping_res_sender.send(pingpong_once_ret).await; + }); + + req_seq += 1; + tokio::time::sleep(Duration::from_millis(350)).await; + } + }); + + // one with 1% precision + let loss_rate_stats_1 = WindowLatency::new(100); + // one with 20% precision, so we can fast fail this conn. + let loss_rate_stats_20 = WindowLatency::new(5); + + let mut counter: u64 = 0; + + while let Some(ret) = ping_res_receiver.recv().await { + counter += 1; + + if let Ok(lat) = ret { + latency_stats.record_latency(lat as u32); + + loss_rate_stats_1.record_latency(0); + loss_rate_stats_20.record_latency(0); + } else { + loss_rate_stats_1.record_latency(1); + loss_rate_stats_20.record_latency(1); + } + + let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us(); + let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us(); + + tracing::warn!( + ?ret, + ?self, + ?loss_rate_1, + ?loss_rate_20, + "pingpong task recv pingpong_once result" + ); + + if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) { + log::warn!( + "pingpong loss rate too high, my_node_id: {}, peer_id: {}, loss_rate_20: {}, loss_rate_1: {}", + my_node_id, + peer_id, + loss_rate_20, + loss_rate_1, + ); + break; + } + + self.loss_rate_stats + .store((loss_rate_20 * 100.0) as u32, Ordering::Relaxed); + } + } +} + define_tunnel_filter_chain!(PeerConnTunnel, stats = StatsRecorderTunnelFilter); pub struct PeerConn { @@ -97,6 +313,7 @@ pub struct PeerConn { latency_stats: Arc, throughput: Arc, + loss_rate_stats: Arc, } enum PeerConnPacketType { @@ -132,6 +349,7 @@ impl PeerConn { latency_stats: Arc::new(WindowLatency::new(15)), throughput: peer_conn_tunnel.stats.get_throughput().clone(), + loss_rate_stats: Arc::new(AtomicU32::new(0)), } } @@ -177,104 +395,6 @@ impl PeerConn { self.info.is_some() } - async fn do_pingpong_once( - my_node_id: uuid::Uuid, - peer_id: uuid::Uuid, - sink: &mut Pin>, - receiver: &mut broadcast::Receiver, - ) -> Result { - // should add seq here. so latency can be calculated more accurately - let req = Self::build_ctrl_msg( - packet::Packet::new_ping_packet(my_node_id, peer_id).into(), - true, - ); - log::trace!("send ping packet: {:?}", req); - sink.send(req).await?; - - let now = std::time::Instant::now(); - - // wait until we get a pong packet in ctrl_resp_receiver - let resp = timeout(Duration::from_secs(4), async { - loop { - match receiver.recv().await { - Ok(p) => { - if let packet::ArchivedPacketBody::Ctrl( - packet::ArchivedCtrlPacketBody::Pong, - ) = &Packet::decode(&p).body - { - break; - } - } - Err(e) => { - log::warn!("recv pong resp error: {:?}", e); - return Err(TunnelError::WaitRespError( - "recv pong resp error".to_owned(), - )); - } - } - } - Ok(()) - }) - .await; - - if resp.is_err() { - return Err(TunnelError::WaitRespError( - "wait ping response timeout".to_owned(), - )); - } - - if resp.as_ref().unwrap().is_err() { - return Err(resp.unwrap().err().unwrap()); - } - - Ok(now.elapsed().as_micros()) - } - - fn start_pingpong(&mut self) { - let mut sink = self.tunnel.pin_sink(); - let my_node_id = self.my_node_id; - let peer_id = self.get_peer_id(); - let receiver = self.ctrl_resp_sender.subscribe(); - let close_event_sender = self.close_event_sender.clone().unwrap(); - let conn_id = self.conn_id; - let latency_stats = self.latency_stats.clone(); - - self.tasks.spawn(async move { - //sleep 1s - tokio::time::sleep(Duration::from_secs(1)).await; - loop { - let mut receiver = receiver.resubscribe(); - if let Ok(lat) = - Self::do_pingpong_once(my_node_id, peer_id, &mut sink, &mut receiver).await - { - log::trace!( - "pingpong latency: {}us, my_node_id: {}, peer_id: {}", - lat, - my_node_id, - peer_id - ); - latency_stats.record_latency(lat as u64); - - tokio::time::sleep(Duration::from_secs(1)).await; - } else { - break; - } - } - - log::warn!( - "pingpong task exit, my_node_id: {}, peer_id: {}", - my_node_id, - peer_id, - ); - - if let Err(e) = close_event_sender.send(conn_id).await { - log::warn!("close event sender error: {:?}", e); - } - - Ok(()) - }); - } - fn get_packet_type(mut bytes_item: Bytes) -> PeerConnPacketType { if bytes_item.starts_with(CTRL_REQ_PACKET_PREFIX) { PeerConnPacketType::CtrlReq(bytes_item.split_off(CTRL_REQ_PACKET_PREFIX.len())) @@ -291,12 +411,13 @@ impl PeerConn { ) -> Result { let packet = Packet::decode(&bytes_item); match packet.body { - packet::ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::Ping) => { + packet::ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::Ping(seq)) => { log::trace!("recv ping packet: {:?}", packet); - Ok(Self::build_ctrl_msg( + Ok(build_ctrl_msg( packet::Packet::new_pong_packet( conn_info.my_node_id.parse().unwrap(), conn_info.peer_id.parse().unwrap(), + seq.into(), ) .into(), false, @@ -309,6 +430,32 @@ impl PeerConn { } } + fn start_pingpong(&mut self) { + let mut pingpong = PeerConnPinger::new( + self.my_node_id, + self.get_peer_id(), + self.tunnel.pin_sink(), + self.ctrl_resp_sender.clone(), + self.latency_stats.clone(), + self.loss_rate_stats.clone(), + ); + + let close_event_sender = self.close_event_sender.clone().unwrap(); + let conn_id = self.conn_id; + + self.tasks.spawn(async move { + pingpong.pingpong().await; + + tracing::warn!(?pingpong, "pingpong task exit"); + + if let Err(e) = close_event_sender.send(conn_id).await { + log::warn!("close event sender error: {:?}", e); + } + + Ok(()) + }); + } + pub fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) { let mut stream = self.tunnel.pin_stream(); let mut sink = self.tunnel.pin_sink(); @@ -356,27 +503,12 @@ impl PeerConn { tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument), ), ); - - self.start_pingpong(); } pub async fn send_msg(&mut self, msg: Bytes) -> Result<(), TunnelError> { self.sink.send(msg).await } - fn build_ctrl_msg(msg: Bytes, is_req: bool) -> Bytes { - let prefix: &'static [u8] = if is_req { - CTRL_REQ_PACKET_PREFIX - } else { - CTRL_RESP_PACKET_PREFIX - }; - let mut new_msg = BytesMut::new(); - new_msg.reserve(prefix.len() + msg.len()); - new_msg.extend_from_slice(prefix); - new_msg.extend_from_slice(&msg); - new_msg.into() - } - pub fn get_peer_id(&self) -> uuid::Uuid { self.info.as_ref().unwrap().my_peer_id } @@ -405,6 +537,7 @@ impl PeerConn { features: self.info.as_ref().unwrap().features.clone(), tunnel: self.tunnel.info(), stats: Some(self.get_stats()), + loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32, } } } @@ -426,8 +559,10 @@ mod tests { use super::*; use crate::common::config_fs::ConfigFs; + use crate::common::global_ctx::tests::get_mock_global_ctx; use crate::common::global_ctx::GlobalCtx; use crate::common::netns::NetNS; + use crate::tunnels::tunnel_filter::tests::DropSendTunnelFilter; use crate::tunnels::tunnel_filter::{PacketRecorderTunnelFilter, TunnelWithFilter}; #[tokio::test] @@ -481,4 +616,50 @@ mod tests { assert_eq!(c_peer.get_peer_id(), s_uuid); assert_eq!(s_peer.get_peer_id(), c_uuid); } + + async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) { + use crate::tunnels::ring_tunnel::create_ring_tunnel_pair; + let (c, s) = create_ring_tunnel_pair(); + + // drop 1-3 packets should not affect pingpong + let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end)); + let c = TunnelWithFilter::new(c, c_recorder.clone()); + + let c_uuid = uuid::Uuid::new_v4(); + let s_uuid = uuid::Uuid::new_v4(); + + let mut c_peer = PeerConn::new(c_uuid, get_mock_global_ctx(), Box::new(c)); + let mut s_peer = PeerConn::new(s_uuid, get_mock_global_ctx(), Box::new(s)); + + let (c_ret, s_ret) = tokio::join!( + c_peer.do_handshake_as_client(), + s_peer.do_handshake_as_server() + ); + + s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0); + s_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0); + + assert!(c_ret.is_ok()); + assert!(s_ret.is_ok()); + + let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1); + c_peer.set_close_event_sender(close_send); + c_peer.start_pingpong(); + c_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0); + + // wait 5s, conn should not be disconnected + tokio::time::sleep(Duration::from_secs(5)).await; + + if conn_closed { + assert!(close_recv.try_recv().is_ok()); + } else { + assert!(close_recv.try_recv().is_err()); + } + } + + #[tokio::test] + async fn peer_conn_pingpong_timeout() { + peer_conn_pingpong_test_common(3, 5, false).await; + peer_conn_pingpong_test_common(5, 12, true).await; + } } diff --git a/easytier-core/src/tunnels/stats.rs b/easytier-core/src/tunnels/stats.rs index 8937f49..c0680bc 100644 --- a/easytier-core/src/tunnels/stats.rs +++ b/easytier-core/src/tunnels/stats.rs @@ -1,7 +1,7 @@ use std::sync::atomic::{AtomicU32, AtomicU64}; pub struct WindowLatency { - latency_us_window: Vec, + latency_us_window: Vec, latency_us_window_index: AtomicU32, latency_us_window_size: AtomicU32, } @@ -9,13 +9,13 @@ pub struct WindowLatency { impl WindowLatency { pub fn new(window_size: u32) -> Self { Self { - latency_us_window: (0..window_size).map(|_| AtomicU64::new(0)).collect(), + latency_us_window: (0..window_size).map(|_| AtomicU32::new(0)).collect(), latency_us_window_index: AtomicU32::new(0), latency_us_window_size: AtomicU32::new(window_size), } } - pub fn record_latency(&self, latency_us: u64) { + pub fn record_latency(&self, latency_us: u32) { let index = self .latency_us_window_index .fetch_add(1, std::sync::atomic::Ordering::Relaxed); @@ -27,25 +27,27 @@ impl WindowLatency { .store(latency_us, std::sync::atomic::Ordering::Relaxed); } - pub fn get_latency_us(&self) -> u64 { + pub fn get_latency_us + std::ops::Div>(&self) -> T { let window_size = self .latency_us_window_size .load(std::sync::atomic::Ordering::Relaxed); let mut sum = 0; let mut count = 0; for i in 0..window_size { - let latency_us = - self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed); - if latency_us > 0 { - sum += latency_us; - count += 1; + if i >= self + .latency_us_window_index + .load(std::sync::atomic::Ordering::Relaxed) + { + break; } + sum += self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed); + count += 1; } if count == 0 { - 0 + 0.into() } else { - sum / count + (T::from(sum)) / T::from(count) } } } diff --git a/easytier-core/src/tunnels/tunnel_filter.rs b/easytier-core/src/tunnels/tunnel_filter.rs index 05d57ce..e1901b7 100644 --- a/easytier-core/src/tunnels/tunnel_filter.rs +++ b/easytier-core/src/tunnels/tunnel_filter.rs @@ -12,8 +12,15 @@ use super::*; use crate::tunnels::{DatagramSink, DatagramStream, SinkError, SinkItem, StreamItem, Tunnel}; pub trait TunnelFilter { - fn before_send(&self, data: SinkItem) -> Result; - fn after_received(&self, data: StreamItem) -> Result; + fn before_send(&self, data: SinkItem) -> Option> { + Some(Ok(data)) + } + fn after_received(&self, data: StreamItem) -> Option> { + match data { + Ok(v) => Some(Ok(v)), + Err(e) => Some(Err(e)), + } + } } pub struct TunnelWithFilter { @@ -45,8 +52,10 @@ where } fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> { - let item = self.filter.before_send(item)?; - self.get_mut().sink.start_send_unpin(item) + let Some(item) = self.filter.before_send(item) else { + return Ok(()); + }; + self.get_mut().sink.start_send_unpin(item?) } fn poll_flush( @@ -83,12 +92,21 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let self_mut = self.get_mut(); - match self_mut.stream.poll_next_unpin(cx) { - Poll::Ready(Some(ret)) => { - Poll::Ready(Some(self_mut.filter.after_received(ret))) + loop { + match self_mut.stream.poll_next_unpin(cx) { + Poll::Ready(Some(ret)) => { + let Some(ret) = self_mut.filter.after_received(ret) else { + continue; + }; + return Poll::Ready(Some(ret)); + } + Poll::Ready(None) => { + return Poll::Ready(None); + } + Poll::Pending => { + return Poll::Pending; + } } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, } } } @@ -120,18 +138,18 @@ pub struct PacketRecorderTunnelFilter { } impl TunnelFilter for PacketRecorderTunnelFilter { - fn before_send(&self, data: SinkItem) -> Result { + fn before_send(&self, data: SinkItem) -> Option> { self.received.lock().unwrap().push(data.clone()); - Ok(data) + Some(Ok(data)) } - fn after_received(&self, data: StreamItem) -> Result { + fn after_received(&self, data: StreamItem) -> Option> { match data { Ok(v) => { self.sent.lock().unwrap().push(v.clone().into()); - Ok(v) + Some(Ok(v)) } - Err(e) => Err(e), + Err(e) => Some(Err(e)), } } } @@ -150,18 +168,18 @@ pub struct StatsRecorderTunnelFilter { } impl TunnelFilter for StatsRecorderTunnelFilter { - fn before_send(&self, data: SinkItem) -> Result { + fn before_send(&self, data: SinkItem) -> Option> { self.throughput.record_tx_bytes(data.len() as u64); - Ok(data) + Some(Ok(data)) } - fn after_received(&self, data: StreamItem) -> Result { + fn after_received(&self, data: StreamItem) -> Option> { match data { Ok(v) => { self.throughput.record_rx_bytes(v.len() as u64); - Ok(v) + Some(Ok(v)) } - Err(e) => Err(e), + Err(e) => Some(Err(e)), } } } @@ -203,9 +221,42 @@ macro_rules! define_tunnel_filter_chain { } #[cfg(test)] -mod tests { +pub mod tests { + use std::sync::atomic::{AtomicU32, Ordering}; + use super::*; use crate::tunnels::ring_tunnel::RingTunnel; + + pub struct DropSendTunnelFilter { + start: AtomicU32, + end: AtomicU32, + cur: AtomicU32, + } + + impl TunnelFilter for DropSendTunnelFilter { + fn before_send(&self, data: SinkItem) -> Option> { + self.cur.fetch_add(1, Ordering::SeqCst); + if self.cur.load(Ordering::SeqCst) >= self.start.load(Ordering::SeqCst) + && self.cur.load(std::sync::atomic::Ordering::SeqCst) + < self.end.load(Ordering::SeqCst) + { + tracing::trace!("drop packet: {:?}", data); + return None; + } + Some(Ok(data)) + } + } + + impl DropSendTunnelFilter { + pub fn new(start: u32, end: u32) -> Self { + Self { + start: AtomicU32::new(start), + end: AtomicU32::new(end), + cur: AtomicU32::new(0), + } + } + } + #[tokio::test] async fn test_nested_filter() { define_tunnel_filter_chain!(