diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index c0f424d..5ad1f5e 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -220,7 +220,7 @@ impl PeerCenterInstance { .load() .elapsed() .as_secs() - > 60 + > 120 { ctx.job_ctx.global_peer_map_digest.store(Digest::default()); } @@ -239,12 +239,12 @@ impl PeerCenterInstance { "get global info from center server got error result: {:?}", ret ); - return Ok(1000); + return Ok(10000); }; if resp == GetGlobalPeerMapResponse::default() { // digest match, no need to update - return Ok(5000); + return Ok(15000); } tracing::info!( @@ -263,7 +263,7 @@ impl PeerCenterInstance { .global_peer_map_update_time .store(Instant::now()); - Ok(5000) + Ok(15000) }) .await; } @@ -426,7 +426,7 @@ mod tests { false } }, - Duration::from_secs(10), + Duration::from_secs(20), ) .await; @@ -435,7 +435,7 @@ mod tests { let rpc_service = pc.get_rpc_service(); wait_for_condition( || async { rpc_service.global_peer_map.read().unwrap().map.len() == 3 }, - Duration::from_secs(10), + Duration::from_secs(20), ) .await; diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index f3ae4bd..0773809 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -309,6 +309,7 @@ impl PeerConn { self.ctrl_resp_sender.clone(), self.latency_stats.clone(), self.loss_rate_stats.clone(), + self.throughput.clone(), ); let close_event_sender = self.close_event_sender.clone().unwrap(); @@ -388,6 +389,7 @@ mod tests { use super::*; use crate::common::global_ctx::tests::get_mock_global_ctx; use crate::common::new_peer_id; + use crate::common::scoped_task::ScopedTask; use crate::tunnel::filter::tests::DropSendTunnelFilter; use crate::tunnel::filter::PacketRecorderTunnelFilter; use crate::tunnel::ring::create_ring_tunnel_pair; @@ -429,13 +431,25 @@ mod tests { assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default()); } - async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) { + async fn peer_conn_pingpong_test_common( + drop_start: u32, + drop_end: u32, + conn_closed: bool, + drop_both: bool, + ) { let (c, s) = create_ring_tunnel_pair(); // drop 1-3 packets should not affect pingpong let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end)); let c = TunnelWithFilter::new(c, c_recorder.clone()); + let s = if drop_both { + let s_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end)); + Box::new(TunnelWithFilter::new(s, s_recorder.clone())) + } else { + s + }; + let c_peer_id = new_peer_id(); let s_peer_id = new_peer_id(); @@ -462,7 +476,15 @@ mod tests { .start_recv_loop(tokio::sync::mpsc::channel(200).0) .await; - // wait 5s, conn should not be disconnected + let throughput = c_peer.throughput.clone(); + let _t = ScopedTask::from(tokio::spawn(async move { + // if not drop both, we mock some rx traffic for client peer to test pinger + while !drop_both { + tokio::time::sleep(Duration::from_millis(100)).await; + throughput.record_rx_bytes(3); + } + })); + tokio::time::sleep(Duration::from_secs(15)).await; if conn_closed { @@ -473,9 +495,18 @@ mod tests { } #[tokio::test] - async fn peer_conn_pingpong_timeout() { - peer_conn_pingpong_test_common(3, 5, false).await; - peer_conn_pingpong_test_common(5, 12, true).await; + async fn peer_conn_pingpong_timeout_not_close() { + peer_conn_pingpong_test_common(3, 5, false, false).await; + } + + #[tokio::test] + async fn peer_conn_pingpong_oneside_timeout() { + peer_conn_pingpong_test_common(4, 12, false, false).await; + } + + #[tokio::test] + async fn peer_conn_pingpong_bothside_timeout() { + peer_conn_pingpong_test_common(4, 12, true, true).await; } #[tokio::test] diff --git a/easytier/src/peers/peer_conn_ping.rs b/easytier/src/peers/peer_conn_ping.rs index dd450be..fd6c056 100644 --- a/easytier/src/peers/peer_conn_ping.rs +++ b/easytier/src/peers/peer_conn_ping.rs @@ -6,18 +6,98 @@ use std::{ time::Duration, }; -use tokio::{sync::broadcast, task::JoinSet, time::timeout}; +use rand::{thread_rng, Rng}; +use tokio::{ + sync::broadcast, + task::JoinSet, + time::{timeout, Interval}, +}; use crate::{ common::{error::Error, PeerId}, tunnel::{ mpsc::MpscTunnelSender, packet_def::{PacketType, ZCPacket}, - stats::WindowLatency, + stats::{Throughput, WindowLatency}, TunnelError, }, }; +struct PingIntervalController { + throughput: Arc, + loss_rate_20: Arc, + + interval: Interval, + + logic_time: u64, + last_send_logic_time: u64, + + backoff_idx: i32, + max_backoff_idx: i32, + + last_throughput: Throughput, +} + +impl PingIntervalController { + fn new(throughput: Arc, loss_rate_20: Arc) -> Self { + let last_throughput = *throughput; + + Self { + throughput, + loss_rate_20, + interval: tokio::time::interval(Duration::from_secs(1)), + logic_time: 0, + last_send_logic_time: 0, + + backoff_idx: 0, + max_backoff_idx: 5, + + last_throughput, + } + } + + async fn tick(&mut self) { + self.interval.tick().await; + self.logic_time += 1; + } + + fn tx_increase(&self) -> bool { + self.throughput.tx_packets() > self.last_throughput.tx_packets() + } + + fn rx_increase(&self) -> bool { + self.throughput.rx_packets() > self.last_throughput.rx_packets() + } + + fn should_send_ping(&mut self) -> bool { + if self.loss_rate_20.get_latency_us::() > 0.0 { + self.backoff_idx = 0; + } else if self.tx_increase() + && !self.rx_increase() + && self.logic_time - self.last_send_logic_time > 2 + { + // if tx increase but rx not increase, we should do pingpong more frequently + self.backoff_idx = 0; + } + + self.last_throughput = *self.throughput; + + if (self.logic_time - self.last_send_logic_time) < (1 << self.backoff_idx) { + return false; + } + + self.backoff_idx = std::cmp::min(self.backoff_idx + 1, self.max_backoff_idx); + + // use this makes two peers not pingpong at the same time + if self.backoff_idx > self.max_backoff_idx - 2 && thread_rng().gen_bool(0.2) { + self.backoff_idx -= 1; + } + + self.last_send_logic_time = self.logic_time; + return true; + } +} + pub struct PeerConnPinger { my_peer_id: PeerId, peer_id: PeerId, @@ -25,6 +105,7 @@ pub struct PeerConnPinger { ctrl_sender: broadcast::Sender, latency_stats: Arc, loss_rate_stats: Arc, + throughput_stats: Arc, tasks: JoinSet>, } @@ -45,6 +126,7 @@ impl PeerConnPinger { ctrl_sender: broadcast::Sender, latency_stats: Arc, loss_rate_stats: Arc, + throughput_stats: Arc, ) -> Self { Self { my_peer_id, @@ -54,6 +136,7 @@ impl PeerConnPinger { latency_stats, ctrl_sender, loss_rate_stats, + throughput_stats, } } @@ -125,17 +208,23 @@ impl PeerConnPinger { let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100); + // one with 1% precision + let loss_rate_stats_1 = WindowLatency::new(100); + // one with 20% precision, so we can fast fail this conn. + let loss_rate_stats_20 = Arc::new(WindowLatency::new(5)); + let stopped = Arc::new(AtomicU32::new(0)); // generate a pingpong task every 200ms let mut pingpong_tasks = JoinSet::new(); let ctrl_resp_sender = self.ctrl_sender.clone(); let stopped_clone = stopped.clone(); + let mut controller = + PingIntervalController::new(self.throughput_stats.clone(), loss_rate_stats_20.clone()); self.tasks.spawn(async move { let mut req_seq = 0; loop { - let receiver = ctrl_resp_sender.subscribe(); - let ping_res_sender = ping_res_sender.clone(); + controller.tick().await; if stopped_clone.load(Ordering::Relaxed) != 0 { return Ok(()); @@ -145,7 +234,13 @@ impl PeerConnPinger { pingpong_tasks.join_next().await; } + if !controller.should_send_ping() { + continue; + } + let mut sink = sink.clone(); + let receiver = ctrl_resp_sender.subscribe(); + let ping_res_sender = ping_res_sender.clone(); pingpong_tasks.spawn(async move { let mut receiver = receiver.resubscribe(); let pingpong_once_ret = Self::do_pingpong_once( @@ -163,16 +258,12 @@ impl PeerConnPinger { }); req_seq = req_seq.wrapping_add(1); - tokio::time::sleep(Duration::from_millis(1000)).await; } }); - // one with 1% precision - let loss_rate_stats_1 = WindowLatency::new(100); - // one with 20% precision, so we can fast fail this conn. - let loss_rate_stats_20 = WindowLatency::new(5); - let mut counter: u64 = 0; + let throughput = self.throughput_stats.clone(); + let mut last_rx_packets = throughput.rx_packets(); while let Some(ret) = ping_res_receiver.recv().await { counter += 1; @@ -199,16 +290,29 @@ impl PeerConnPinger { ); if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) { - tracing::warn!( - ?ret, - ?self, - ?loss_rate_1, - ?loss_rate_20, - "pingpong loss rate too high, closing" - ); - break; + let current_rx_packets = throughput.rx_packets(); + let need_close = if last_rx_packets != current_rx_packets { + // if we receive some packet from peers, we should relax the condition + counter > 50 && loss_rate_1 > 0.5 + } else { + true + }; + + if need_close { + tracing::warn!( + ?ret, + ?self, + ?loss_rate_1, + ?loss_rate_20, + ?last_rx_packets, + ?current_rx_packets, + "pingpong loss rate too high, closing" + ); + break; + } } + last_rx_packets = throughput.rx_packets(); self.loss_rate_stats .store((loss_rate_1 * 100.0) as u32, Ordering::Relaxed); } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index bf9c85b..1cf16e7 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -373,7 +373,10 @@ pub async fn subnet_proxy_three_node_test( #[tokio::test] #[serial_test::serial] pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str) { - use crate::tunnel::wireguard::{WgConfig, WgTunnelConnector}; + use crate::{ + common::scoped_task::ScopedTask, + tunnel::wireguard::{WgConfig, WgTunnelConnector}, + }; let insts = init_three_node(proto).await; let mut inst4 = Instance::new(get_inst_config("inst4", Some("net_d"), "10.144.144.4")); @@ -417,16 +420,25 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str ); set_link_status("net_d", false); - tokio::time::sleep(tokio::time::Duration::from_secs(8)).await; - let routes = insts[0].get_peer_manager().list_routes().await; - assert!( - routes - .iter() - .find(|r| r.peer_id == inst4.peer_id()) - .is_none(), - "inst4 should not be in inst1's route list, {:?}", - routes - ); + let _t = ScopedTask::from(tokio::spawn(async move { + // do some ping in net_a to trigger net_c pingpong + loop { + ping_test("net_a", "10.144.144.4", Some(1)).await; + } + })); + wait_for_condition( + || async { + insts[0] + .get_peer_manager() + .list_routes() + .await + .iter() + .find(|r| r.peer_id == inst4.peer_id()) + .is_none() + }, + Duration::from_secs(15), + ) + .await; set_link_status("net_d", true); } }); diff --git a/easytier/src/tunnel/stats.rs b/easytier/src/tunnel/stats.rs index be89327..7639d03 100644 --- a/easytier/src/tunnel/stats.rs +++ b/easytier/src/tunnel/stats.rs @@ -48,7 +48,7 @@ impl WindowLatency { } } -#[derive(Default)] +#[derive(Default, Copy, Clone)] pub struct Throughput { tx_bytes: u64, rx_bytes: u64,