optimize bandwidth usage (#24)

1. stable stun test result
2. stable report peers result
3. do not send same packet to rip peer
This commit is contained in:
Sijie.Sun 2024-03-02 22:29:31 +08:00 committed by GitHub
parent 7918031d8b
commit 9261d0d32d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 51 additions and 13 deletions

View File

@ -449,7 +449,16 @@ impl StunInfoCollector {
self.tasks.spawn(async move {
loop {
let detector = UdpNatTypeDetector::new(stun_servers.read().await.clone());
let ret = detector.get_udp_nat_type(0).await;
let old_nat_type = udp_nat_type.load().0;
let mut ret = NatType::Unknown;
for _ in 1..5 {
// if nat type degrade, sleep and retry. so result can be relatively stable.
ret = detector.get_udp_nat_type(0).await;
if ret == NatType::Unknown || ret <= old_nat_type {
break;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
udp_nat_type.store((ret, std::time::Instant::now()));
let sleep_sec = match ret {

View File

@ -202,7 +202,7 @@ impl PeerCenterInstance {
};
let Some(resp) = resp else {
return Ok(1000);
return Ok(5000);
};
tracing::info!(
@ -214,7 +214,7 @@ impl PeerCenterInstance {
*ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map;
*ctx.job_ctx.global_peer_map_digest.write().await = resp.digest;
Ok(5000)
Ok(10000)
})
.await;
}
@ -223,16 +223,29 @@ impl PeerCenterInstance {
struct Ctx {
service: PeerManagerRpcService,
need_send_peers: AtomicBool,
last_report_peers: Mutex<PeerInfoForGlobalMap>,
}
let ctx = Arc::new(Ctx {
service: PeerManagerRpcService::new(self.peer_mgr.clone()),
need_send_peers: AtomicBool::new(true),
last_report_peers: Mutex::new(PeerInfoForGlobalMap::default()),
});
self.client
.init_periodic_job(ctx, |client, ctx| async move {
let my_node_id = ctx.peer_mgr.my_node_id();
let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into();
// if peers are not same in next 10 seconds, report peers to center server
let mut peers = PeerInfoForGlobalMap::default();
for _ in 1..10 {
peers = ctx.job_ctx.service.list_peers().await.into();
if peers == *ctx.job_ctx.last_report_peers.lock().await {
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
*ctx.job_ctx.last_report_peers.lock().await = peers.clone();
let mut hasher = DefaultHasher::new();
peers.hash(&mut hasher);
@ -262,7 +275,7 @@ impl PeerCenterInstance {
}
ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed);
Ok(1000)
Ok(3000)
})
.await;
}

View File

@ -77,7 +77,7 @@ impl PeerCenterServer {
let now = std::time::Instant::now();
let mut to_remove = Vec::new();
for kv in locked_data.peer_update_time.iter() {
if now.duration_since(*kv.value()).as_secs() > 10 {
if now.duration_since(*kv.value()).as_secs() > 20 {
to_remove.push(*kv.key());
}
}

View File

@ -32,6 +32,10 @@ use crate::{
use super::{packet::ArchivedPacketBody, peer_manager::PeerPacketFilter};
const SEND_ROUTE_PERIOD_SEC: u64 = 60;
const SEND_ROUTE_FAST_REPLY_SEC: u64 = 5;
const ROUTE_EXPIRED_SEC: u64 = 70;
type Version = u32;
#[derive(Archive, Deserialize, Serialize, Clone, Debug, PartialEq)]
@ -192,7 +196,7 @@ pub struct BasicRoute {
version: RouteVersion,
myself: Arc<RwLock<SyncPeerInfo>>,
last_send_time_map: Arc<DashMap<PeerId, Instant>>,
last_send_time_map: Arc<DashMap<PeerId, (Version, Option<Version>, Instant)>>,
}
impl BasicRoute {
@ -373,10 +377,10 @@ impl BasicRoute {
let last_send_time_map_new = DashMap::new();
let peers = interface.list_peers().await;
for peer in peers.iter() {
let last_send_time = last_send_time_map.get(peer).map(|v| *v).unwrap_or(Instant::now() - Duration::from_secs(3600));
let last_send_time = last_send_time_map.get(peer).map(|v| *v).unwrap_or((0, None, Instant::now() - Duration::from_secs(3600)));
let my_version_peer_saved = sync_peer_from_remote.get(&peer).and_then(|v| v.packet.peer_version);
let peer_have_latest_version = my_version_peer_saved == Some(version.get());
if peer_have_latest_version && last_send_time.elapsed().as_secs() < 60 {
if peer_have_latest_version && last_send_time.2.elapsed().as_secs() < SEND_ROUTE_PERIOD_SEC {
last_send_time_map_new.insert(*peer, last_send_time);
continue;
}
@ -386,11 +390,13 @@ impl BasicRoute {
dst_peer_id = ?peer,
version = version.get(),
?my_version_peer_saved,
last_send_elapse = ?last_send_time.elapsed().as_secs(),
last_send_version = ?last_send_time.0,
last_send_peer_version = ?last_send_time.1,
last_send_elapse = ?last_send_time.2.elapsed().as_secs(),
"need send route info"
);
last_send_time_map_new.insert(*peer, Instant::now());
let peer_version_we_saved = sync_peer_from_remote.get(&peer).and_then(|v| Some(v.packet.version));
last_send_time_map_new.insert(*peer, (version.get(), peer_version_we_saved, Instant::now()));
let ret = Self::send_sync_peer_request(
interface,
my_peer_id.clone(),
@ -457,7 +463,7 @@ impl BasicRoute {
let connected_peers = interface.lock().await.as_ref().unwrap().list_peers().await;
for item in sync_peer_from_remote.iter() {
let (k, v) = item.pair();
if now.duration_since(v.last_update).as_secs() > 70
if now.duration_since(v.last_update).as_secs() > ROUTE_EXPIRED_SEC
|| !connected_peers.contains(k)
{
need_update_route = true;
@ -541,7 +547,17 @@ impl BasicRoute {
if packet.need_reply {
self.last_send_time_map
.remove(&packet.myself.peer_id.to_uuid());
.entry(packet.myself.peer_id.to_uuid())
.and_modify(|v| {
const FAST_REPLY_DURATION: u64 =
SEND_ROUTE_PERIOD_SEC - SEND_ROUTE_FAST_REPLY_SEC;
if v.0 != self.version.get() || v.1 != Some(p.version) {
v.2 = Instant::now() - Duration::from_secs(3600);
} else if v.2.elapsed().as_secs() < FAST_REPLY_DURATION {
// do not send same version route info too frequently
v.2 = Instant::now() - Duration::from_secs(FAST_REPLY_DURATION);
}
});
}
if updated || packet.need_reply {