From 7f8935a9d5847d687e206c449bdefd5bc33f61a6 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Tue, 6 Feb 2024 13:29:12 +0800 Subject: [PATCH] introduce peer center (#13) peer_center is used to collect peer info into one peer node. the center node is selected with the following rules: 1. has smallest peer id 2. TODO: has allow_to_be_center peer feature peer center is not guaranteed to be stable and can be changed when peer enter or leave. it's used to reduce the cost to exchange infos between peers. --- easytier-core/Cargo.toml | 24 +-- easytier-core/build.rs | 6 +- easytier-core/src/main.rs | 1 + easytier-core/src/peer_center/instance.rs | 252 ++++++++++++++++++++++ easytier-core/src/peer_center/mod.rs | 20 ++ easytier-core/src/peer_center/server.rs | 114 ++++++++++ easytier-core/src/peer_center/service.rs | 92 ++++++++ easytier-core/src/peers/peer_manager.rs | 10 +- easytier-core/src/peers/peer_map.rs | 25 ++- easytier-core/src/peers/rpc_service.rs | 31 ++- easytier-core/src/tests/mod.rs | 3 +- easytier-core/src/tunnels/stats.rs | 2 +- 12 files changed, 542 insertions(+), 38 deletions(-) create mode 100644 easytier-core/src/peer_center/instance.rs create mode 100644 easytier-core/src/peer_center/mod.rs create mode 100644 easytier-core/src/peer_center/server.rs create mode 100644 easytier-core/src/peer_center/service.rs diff --git a/easytier-core/Cargo.toml b/easytier-core/Cargo.toml index 5408d00..8827ebf 100644 --- a/easytier-core/Cargo.toml +++ b/easytier-core/Cargo.toml @@ -13,7 +13,11 @@ path = "src/rpc/lib.rs" [dependencies] tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "time"] } +tracing-subscriber = { version = "0.3", features = [ + "env-filter", + "local-time", + "time", +] } tracing-appender = "0.2.3" log = "0.4" thiserror = "1.0" @@ -77,21 +81,11 @@ stun-format = { git = "https://github.com/KKRainbow/stun-format.git", features = ] } rand = "0.8.5" -[dependencies.serde] -version = "1.0" -features = ["derive"] +serde = { version = "1.0", features = ["derive"] } +pnet = { version = "0.34.0", features = ["serde"] } +public-ip = { version = "0.2", features = ["default"] } -[dependencies.pnet] -version = "0.34.0" -features = ["serde"] - -[dependencies.clap] -version = "4.4" -features = ["derive"] - -[dependencies.public-ip] -version = "0.2" -features = ["default"] +clap = { version = "4.4", features = ["derive"] } [build-dependencies] tonic-build = "0.10" diff --git a/easytier-core/build.rs b/easytier-core/build.rs index e69e706..dd8a4fb 100644 --- a/easytier-core/build.rs +++ b/easytier-core/build.rs @@ -90,6 +90,10 @@ fn main() -> Result<(), Box> { #[cfg(target_os = "windows")] WindowsBuild::check_for_win(); - tonic_build::compile_protos("proto/cli.proto")?; + tonic_build::configure() + .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .compile(&["proto/cli.proto"], &["proto/"]) + .unwrap(); + // tonic_build::compile_protos("proto/cli.proto")?; Ok(()) } diff --git a/easytier-core/src/main.rs b/easytier-core/src/main.rs index 1132952..928674b 100644 --- a/easytier-core/src/main.rs +++ b/easytier-core/src/main.rs @@ -9,6 +9,7 @@ mod common; mod connector; mod gateway; mod instance; +mod peer_center; mod peers; mod tunnels; diff --git a/easytier-core/src/peer_center/instance.rs b/easytier-core/src/peer_center/instance.rs new file mode 100644 index 0000000..ee54cd6 --- /dev/null +++ b/easytier-core/src/peer_center/instance.rs @@ -0,0 +1,252 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, +}; + +use futures::Future; +use tokio::{sync::Mutex, task::JoinSet}; +use tracing::Instrument; + +use crate::peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId}; + +use super::{ + server::PeerCenterServer, + service::{PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap}, + Digest, Error, +}; + +pub struct PeerCenterClient { + peer_mgr: Arc, + tasks: Arc>>, +} + +static SERVICE_ID: u32 = 5; + +struct PeridicJobCtx { + peer_mgr: Arc, + job_ctx: T, +} + +impl PeerCenterClient { + pub async fn init(&self) -> Result<(), Error> { + self.peer_mgr.get_peer_rpc_mgr().run_service( + SERVICE_ID, + PeerCenterServer::new(self.peer_mgr.my_node_id()).serve(), + ); + + Ok(()) + } + + async fn select_center_peer(peer_mgr: &Arc) -> Option { + let peers = peer_mgr.list_routes().await; + if peers.is_empty() { + return None; + } + // find peer with alphabetical smallest id. + let mut min_peer = peer_mgr.my_node_id().to_string(); + for peer in peers.iter() { + if peer.peer_id < min_peer { + min_peer = peer.peer_id.clone(); + } + } + Some(min_peer.parse().unwrap()) + } + + async fn init_periodic_job< + T: Send + Sync + 'static + Clone, + Fut: Future> + Send + 'static, + >( + &self, + job_ctx: T, + job_fn: (impl Fn(PeerCenterServiceClient, Arc>) -> Fut + Send + Sync + 'static), + ) -> () { + let my_node_id = self.peer_mgr.my_node_id(); + let peer_mgr = self.peer_mgr.clone(); + self.tasks.lock().await.spawn( + async move { + let ctx = Arc::new(PeridicJobCtx { + peer_mgr: peer_mgr.clone(), + job_ctx, + }); + tracing::warn!(?my_node_id, "before periodic job loop"); + loop { + let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else { + tracing::warn!("no center peer found, sleep 1 second"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + tracing::warn!(?center_peer, "run periodic job"); + let rpc_mgr = peer_mgr.get_peer_rpc_mgr(); + let ret = rpc_mgr + .do_client_rpc_scoped(SERVICE_ID, center_peer, |c| async { + let client = + PeerCenterServiceClient::new(tarpc::client::Config::default(), c) + .spawn(); + job_fn(client, ctx.clone()).await + }) + .await; + + let Ok(sleep_time_ms) = ret else { + tracing::error!("periodic job to center server rpc failed: {:?}", ret); + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + }; + + if sleep_time_ms > 0 { + tokio::time::sleep(Duration::from_millis(sleep_time_ms as u64)).await; + } + } + } + .instrument(tracing::info_span!("periodic_job", ?my_node_id)), + ); + } + + pub async fn new(peer_mgr: Arc) -> Self { + PeerCenterClient { + peer_mgr, + tasks: Arc::new(Mutex::new(JoinSet::new())), + } + } +} + +struct PeerCenterInstance { + peer_mgr: Arc, + client: Arc, +} + +impl PeerCenterInstance { + pub async fn new(peer_mgr: Arc) -> Self { + let client = Arc::new(PeerCenterClient::new(peer_mgr.clone()).await); + client.init().await.unwrap(); + + PeerCenterInstance { peer_mgr, client } + } + + async fn init_get_global_info_job(&self) { + self.client + .init_periodic_job({}, |client, _ctx| async move { + let ret = client + .get_global_peer_map(tarpc::context::current(), 0) + .await?; + + let Ok(global_peer_map) = ret else { + tracing::error!( + "get global info from center server got error result: {:?}", + ret + ); + return Ok(1000); + }; + + tracing::warn!("get global info from center server: {:?}", global_peer_map); + + Ok(5000) + }) + .await; + } + + async fn init_report_peers_job(&self) { + struct Ctx { + service: PeerManagerRpcService, + need_send_peers: AtomicBool, + } + let ctx = Arc::new(Ctx { + service: PeerManagerRpcService::new(self.peer_mgr.clone()), + need_send_peers: AtomicBool::new(true), + }); + + self.client + .init_periodic_job(ctx, |client, ctx| async move { + let my_node_id = ctx.peer_mgr.my_node_id(); + let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); + let mut hasher = DefaultHasher::new(); + peers.hash(&mut hasher); + + let peers = if ctx.job_ctx.need_send_peers.load(Ordering::Relaxed) { + Some(peers) + } else { + None + }; + let mut rpc_ctx = tarpc::context::current(); + rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3); + + let ret = client + .report_peers( + rpc_ctx, + my_node_id.clone(), + peers, + hasher.finish() as Digest, + ) + .await?; + + if matches!(ret.as_ref().err(), Some(Error::DigestMismatch)) { + ctx.job_ctx.need_send_peers.store(true, Ordering::Relaxed); + return Ok(0); + } else if ret.is_err() { + tracing::error!("report peers to center server got error result: {:?}", ret); + return Ok(500); + } + + ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed); + Ok(1000) + }) + .await; + } +} + +#[cfg(test)] +mod tests { + use crate::{ + peer_center::server::get_global_data, + peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, + }; + + use super::*; + + #[tokio::test] + async fn test_peer_center_instance() { + let peer_mgr_a = create_mock_peer_manager().await; + let peer_mgr_b = create_mock_peer_manager().await; + let peer_mgr_c = create_mock_peer_manager().await; + + let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone()).await; + let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone()).await; + let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone()).await; + + peer_center_a.init_report_peers_job().await; + peer_center_b.init_report_peers_job().await; + peer_center_c.init_report_peers_job().await; + + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; + + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + .await + .unwrap(); + + let center_peer = PeerCenterClient::select_center_peer(&peer_mgr_a) + .await + .unwrap(); + let center_data = get_global_data(center_peer); + + // wait center_data has 3 records for 10 seconds + let now = std::time::Instant::now(); + loop { + if center_data.read().await.global_peer_map.map.len() == 3 { + println!( + "center data ready, {:#?}", + center_data.read().await.global_peer_map + ); + break; + } + if now.elapsed().as_secs() > 60 { + panic!("center data not ready"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} diff --git a/easytier-core/src/peer_center/mod.rs b/easytier-core/src/peer_center/mod.rs new file mode 100644 index 0000000..32dbd37 --- /dev/null +++ b/easytier-core/src/peer_center/mod.rs @@ -0,0 +1,20 @@ +// peer_center is used to collect peer info into one peer node. +// the center node is selected with the following rules: +// 1. has smallest peer id +// 2. TODO: has allow_to_be_center peer feature +// peer center is not guaranteed to be stable and can be changed when peer enter or leave. +// it's used to reduce the cost to exchange infos between peers. + +mod instance; +mod server; +mod service; + +#[derive(thiserror::Error, Debug, serde::Deserialize, serde::Serialize)] +pub enum Error { + #[error("Digest not match, need provide full peer info to center server.")] + DigestMismatch, + #[error("Not center server")] + NotCenterServer, +} + +pub type Digest = u64; diff --git a/easytier-core/src/peer_center/server.rs b/easytier-core/src/peer_center/server.rs new file mode 100644 index 0000000..efd5c4e --- /dev/null +++ b/easytier-core/src/peer_center/server.rs @@ -0,0 +1,114 @@ +use std::{ + hash::{Hash, Hasher}, + sync::Arc, +}; + +use dashmap::DashMap; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; + +use crate::peers::PeerId; + +use super::{ + service::{GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, + Digest, Error, +}; + +pub(crate) struct PeerCenterServerGlobalData { + pub global_peer_map: GlobalPeerMap, + pub digest: Digest, + pub update_time: std::time::Instant, +} + +impl PeerCenterServerGlobalData { + fn new() -> Self { + PeerCenterServerGlobalData { + global_peer_map: GlobalPeerMap::new(), + digest: Digest::default(), + update_time: std::time::Instant::now(), + } + } +} + +// a global unique instance for PeerCenterServer +pub(crate) static GLOBAL_DATA: Lazy>>> = + Lazy::new(DashMap::new); + +pub(crate) fn get_global_data(node_id: PeerId) -> Arc> { + GLOBAL_DATA + .entry(node_id) + .or_insert_with(|| Arc::new(RwLock::new(PeerCenterServerGlobalData::new()))) + .value() + .clone() +} + +#[derive(Clone, Debug)] +pub struct PeerCenterServer { + // every peer has its own server, so use per-struct dash map is ok. + my_node_id: PeerId, + digest_map: DashMap, +} + +impl PeerCenterServer { + pub fn new(my_node_id: PeerId) -> Self { + PeerCenterServer { + my_node_id, + digest_map: DashMap::new(), + } + } +} + +#[tarpc::server] +impl PeerCenterService for PeerCenterServer { + #[tracing::instrument()] + async fn report_peers( + self, + _: tarpc::context::Context, + my_peer_id: PeerId, + peers: Option, + digest: Digest, + ) -> Result<(), Error> { + tracing::warn!("receive report_peers"); + + let old_digest = self.digest_map.get(&my_peer_id); + // if digest match, no need to update + if let Some(old_digest) = old_digest { + if *old_digest == digest { + return Ok(()); + } + } + + if peers.is_none() { + return Err(Error::DigestMismatch); + } + + self.digest_map.insert(my_peer_id, digest); + let data = get_global_data(self.my_node_id); + let mut locked_data = data.write().await; + locked_data + .global_peer_map + .map + .insert(my_peer_id, peers.unwrap()); + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + locked_data.global_peer_map.map.hash(&mut hasher); + locked_data.digest = hasher.finish() as Digest; + + Ok(()) + } + + async fn get_global_peer_map( + self, + _: tarpc::context::Context, + digest: Digest, + ) -> Result, Error> { + let data = get_global_data(self.my_node_id); + if digest == data.read().await.digest { + return Ok(None); + } + + let data = get_global_data(self.my_node_id); + let locked_data = data.read().await; + Ok(Some(locked_data.global_peer_map.clone())) + } +} diff --git a/easytier-core/src/peer_center/service.rs b/easytier-core/src/peer_center/service.rs new file mode 100644 index 0000000..3c0ddf6 --- /dev/null +++ b/easytier-core/src/peer_center/service.rs @@ -0,0 +1,92 @@ +use std::collections::BTreeMap; + +use crate::peers::PeerId; + +use super::{Digest, Error}; +use easytier_rpc::PeerInfo; + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub enum LatencyLevel { + VeryLow, + Low, + Normal, + High, + VeryHigh, +} + +impl LatencyLevel { + pub const fn from_latency_ms(lat_ms: u32) -> Self { + if lat_ms < 10 { + LatencyLevel::VeryLow + } else if lat_ms < 50 { + LatencyLevel::Low + } else if lat_ms < 100 { + LatencyLevel::Normal + } else if lat_ms < 200 { + LatencyLevel::High + } else { + LatencyLevel::VeryHigh + } + } +} + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub struct PeerConnInfoForGlobalMap { + to_peer_id: PeerId, + latency_level: LatencyLevel, +} + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub struct PeerInfoForGlobalMap { + pub direct_peers: BTreeMap>, +} + +impl From> for PeerInfoForGlobalMap { + fn from(peers: Vec) -> Self { + let mut peer_map = BTreeMap::new(); + for peer in peers { + let mut conn_info = Vec::new(); + for conn in peer.conns { + conn_info.push(PeerConnInfoForGlobalMap { + to_peer_id: conn.peer_id.parse().unwrap(), + latency_level: LatencyLevel::from_latency_ms( + conn.stats.unwrap().latency_us as u32 / 1000, + ), + }); + } + // sort conn info so hash result is stable + conn_info.sort_by(|a, b| a.to_peer_id.cmp(&b.to_peer_id)); + peer_map.insert(peer.peer_id.parse().unwrap(), conn_info); + } + PeerInfoForGlobalMap { + direct_peers: peer_map, + } + } +} + +// a global peer topology map, peers can use it to find optimal path to other peers +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GlobalPeerMap { + pub map: BTreeMap, +} + +impl GlobalPeerMap { + pub fn new() -> Self { + GlobalPeerMap { + map: BTreeMap::new(), + } + } +} + +#[tarpc::service] +pub trait PeerCenterService { + // report center server which peer is directly connected to me + // digest is a hash of current peer map, if digest not match, we need to transfer the whole map + async fn report_peers( + my_peer_id: PeerId, + peers: Option, + digest: Digest, + ) -> Result<(), Error>; + + async fn get_global_peer_map(digest: Digest) -> Result, Error>; +} diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index b81aac1..39ad32b 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -128,7 +128,7 @@ impl Debug for PeerManager { impl PeerManager { pub fn new(global_ctx: ArcGlobalCtx, nic_channel: mpsc::Sender) -> Self { let (packet_send, packet_recv) = mpsc::channel(100); - let peers = Arc::new(PeerMap::new(packet_send.clone())); + let peers = Arc::new(PeerMap::new(packet_send.clone(), global_ctx.clone())); // TODO: remove these because we have impl pipeline processor. let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); @@ -166,9 +166,7 @@ impl PeerManager { peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); - self.peers - .add_new_peer_conn(peer, self.global_ctx.clone()) - .await; + self.peers.add_new_peer_conn(peer).await; Ok((peer_id, conn_id)) } @@ -189,9 +187,7 @@ impl PeerManager { tracing::info!("add tunnel as server start"); let mut peer = PeerConn::new(self.my_node_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_server().await?; - self.peers - .add_new_peer_conn(peer, self.global_ctx.clone()) - .await; + self.peers.add_new_peer_conn(peer).await; tracing::info!("add tunnel as server done"); Ok(()) } diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 7ee5c3b..c9fa745 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Context; use dashmap::DashMap; use easytier_rpc::PeerConnInfo; use tokio::sync::mpsc; @@ -13,13 +14,15 @@ use crate::{ use super::{peer::Peer, peer_conn::PeerConn, route_trait::ArcRoute, PeerId}; pub struct PeerMap { + global_ctx: ArcGlobalCtx, peer_map: DashMap>, packet_send: mpsc::Sender, } impl PeerMap { - pub fn new(packet_send: mpsc::Sender) -> Self { + pub fn new(packet_send: mpsc::Sender, global_ctx: ArcGlobalCtx) -> Self { PeerMap { + global_ctx, peer_map: DashMap::new(), packet_send, } @@ -29,11 +32,11 @@ impl PeerMap { self.peer_map.insert(peer.peer_node_id, Arc::new(peer)); } - pub async fn add_new_peer_conn(&self, peer_conn: PeerConn, global_ctx: ArcGlobalCtx) { + pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { let peer_id = peer_conn.get_peer_id(); let no_entry = self.peer_map.get(&peer_id).is_none(); if no_entry { - let new_peer = Peer::new(peer_id, self.packet_send.clone(), global_ctx); + let new_peer = Peer::new(peer_id, self.packet_send.clone(), self.global_ctx.clone()); new_peer.add_peer_conn(peer_conn).await; self.add_new_peer(new_peer).await; } else { @@ -51,6 +54,14 @@ impl PeerMap { msg: Bytes, dst_peer_id: &uuid::Uuid, ) -> Result<(), Error> { + if *dst_peer_id == self.global_ctx.get_id() { + return Ok(self + .packet_send + .send(msg) + .await + .with_context(|| "send msg to self failed")?); + } + match self.get_peer_by_id(dst_peer_id) { Some(peer) => { peer.send_msg(msg).await?; @@ -70,6 +81,14 @@ impl PeerMap { dst_peer_id: &uuid::Uuid, route: ArcRoute, ) -> Result<(), Error> { + if *dst_peer_id == self.global_ctx.get_id() { + return Ok(self + .packet_send + .send(msg) + .await + .with_context(|| "send msg to self failed")?); + } + // get route info let gateway_peer_id = route.get_next_hop(dst_peer_id).await; diff --git a/easytier-core/src/peers/rpc_service.rs b/easytier-core/src/peers/rpc_service.rs index 5814f34..2bf4d17 100644 --- a/easytier-core/src/peers/rpc_service.rs +++ b/easytier-core/src/peers/rpc_service.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use easytier_rpc::cli::PeerInfo; use easytier_rpc::peer_manage_rpc_server::PeerManageRpc; use easytier_rpc::{ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse}; use tonic::{Request, Response, Status}; @@ -14,17 +15,10 @@ impl PeerManagerRpcService { pub fn new(peer_manager: Arc) -> Self { PeerManagerRpcService { peer_manager } } -} - -#[tonic::async_trait] -impl PeerManageRpc for PeerManagerRpcService { - async fn list_peer( - &self, - _request: Request, // Accept request of type HelloRequest - ) -> Result, Status> { - let mut reply = ListPeerResponse::default(); + pub async fn list_peers(&self) -> Vec { let peers = self.peer_manager.get_peer_map().list_peers().await; + let mut peer_infos = Vec::new(); for peer in peers { let mut peer_info = easytier_rpc::PeerInfo::default(); peer_info.peer_id = peer.to_string(); @@ -38,7 +32,24 @@ impl PeerManageRpc for PeerManagerRpcService { peer_info.conns = conns; } - reply.peer_infos.push(peer_info); + peer_infos.push(peer_info); + } + + peer_infos + } +} + +#[tonic::async_trait] +impl PeerManageRpc for PeerManagerRpcService { + async fn list_peer( + &self, + _request: Request, // Accept request of type HelloRequest + ) -> Result, Status> { + let mut reply = ListPeerResponse::default(); + + let peers = self.list_peers().await; + for peer in peers { + reply.peer_infos.push(peer); } Ok(Response::new(reply)) diff --git a/easytier-core/src/tests/mod.rs b/easytier-core/src/tests/mod.rs index 83353f6..ed01a5e 100644 --- a/easytier-core/src/tests/mod.rs +++ b/easytier-core/src/tests/mod.rs @@ -116,7 +116,8 @@ pub fn enable_log() { let filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) .from_env() - .unwrap(); + .unwrap() + .add_directive("tarpc=error".parse().unwrap()); tracing_subscriber::fmt::fmt() .pretty() .with_env_filter(filter) diff --git a/easytier-core/src/tunnels/stats.rs b/easytier-core/src/tunnels/stats.rs index 9765570..8e8d7a4 100644 --- a/easytier-core/src/tunnels/stats.rs +++ b/easytier-core/src/tunnels/stats.rs @@ -23,7 +23,7 @@ impl WindowLatency { pub fn record_latency(&self, latency_us: u32) { let index = self.latency_us_window_index.fetch_add(1, Relaxed); - if index < self.latency_us_window_size { + if self.count.load(Relaxed) < self.latency_us_window_size { self.count.fetch_add(1, Relaxed); }