mirror of
https://github.com/EasyTier/EasyTier.git
synced 2024-11-16 11:42:27 +08:00
elastic ping pong to adapt env with packet loss (#4)
* elastic ping pong to adapt env with packet loss
This commit is contained in:
parent
6271f9a7a7
commit
118954c745
|
@ -29,6 +29,7 @@ message PeerConnInfo {
|
||||||
repeated string features = 4;
|
repeated string features = 4;
|
||||||
TunnelInfo tunnel = 5;
|
TunnelInfo tunnel = 5;
|
||||||
PeerConnStats stats = 6;
|
PeerConnStats stats = 6;
|
||||||
|
float loss_rate = 7;
|
||||||
}
|
}
|
||||||
|
|
||||||
message PeerInfo {
|
message PeerInfo {
|
||||||
|
|
|
@ -71,8 +71,8 @@ pub struct RoutePacket {
|
||||||
pub enum CtrlPacketBody {
|
pub enum CtrlPacketBody {
|
||||||
HandShake(HandShake),
|
HandShake(HandShake),
|
||||||
RoutePacket(RoutePacket),
|
RoutePacket(RoutePacket),
|
||||||
Ping,
|
Ping(u32),
|
||||||
Pong,
|
Pong(u32),
|
||||||
TaRpc(u32, bool, Vec<u8>), // u32: service_id, bool: is_req, Vec<u8>: rpc body
|
TaRpc(u32, bool, Vec<u8>), // u32: service_id, bool: is_req, Vec<u8>: rpc body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -155,19 +155,19 @@ impl Packet {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_ping_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid) -> Self {
|
pub fn new_ping_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self {
|
||||||
Packet {
|
Packet {
|
||||||
from_peer: from_peer.into(),
|
from_peer: from_peer.into(),
|
||||||
to_peer: Some(to_peer.into()),
|
to_peer: Some(to_peer.into()),
|
||||||
body: PacketBody::Ctrl(CtrlPacketBody::Ping),
|
body: PacketBody::Ctrl(CtrlPacketBody::Ping(seq)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn new_pong_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid) -> Self {
|
pub fn new_pong_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self {
|
||||||
Packet {
|
Packet {
|
||||||
from_peer: from_peer.into(),
|
from_peer: from_peer.into(),
|
||||||
to_peer: Some(to_peer.into()),
|
to_peer: Some(to_peer.into()),
|
||||||
body: PacketBody::Ctrl(CtrlPacketBody::Pong),
|
body: PacketBody::Ctrl(CtrlPacketBody::Pong(seq)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,18 @@
|
||||||
use std::{pin::Pin, sync::Arc};
|
use std::{
|
||||||
|
fmt::Debug,
|
||||||
|
pin::Pin,
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicU32, Ordering},
|
||||||
|
Arc,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
use easytier_rpc::{PeerConnInfo, PeerConnStats};
|
use easytier_rpc::{PeerConnInfo, PeerConnStats};
|
||||||
use futures::{SinkExt, StreamExt};
|
use futures::{SinkExt, StreamExt};
|
||||||
use pnet::datalink::NetworkInterface;
|
use pnet::datalink::NetworkInterface;
|
||||||
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{broadcast, mpsc},
|
sync::{broadcast, mpsc, Mutex},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
time::{timeout, Duration},
|
time::{timeout, Duration},
|
||||||
};
|
};
|
||||||
|
@ -56,6 +63,19 @@ macro_rules! wait_response {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn build_ctrl_msg(msg: Bytes, is_req: bool) -> Bytes {
|
||||||
|
let prefix: &'static [u8] = if is_req {
|
||||||
|
CTRL_REQ_PACKET_PREFIX
|
||||||
|
} else {
|
||||||
|
CTRL_RESP_PACKET_PREFIX
|
||||||
|
};
|
||||||
|
let mut new_msg = BytesMut::new();
|
||||||
|
new_msg.reserve(prefix.len() + msg.len());
|
||||||
|
new_msg.extend_from_slice(prefix);
|
||||||
|
new_msg.extend_from_slice(&msg);
|
||||||
|
new_msg.into()
|
||||||
|
}
|
||||||
|
|
||||||
pub struct PeerInfo {
|
pub struct PeerInfo {
|
||||||
magic: u32,
|
magic: u32,
|
||||||
pub my_peer_id: uuid::Uuid,
|
pub my_peer_id: uuid::Uuid,
|
||||||
|
@ -76,6 +96,202 @@ impl<'a> From<&ArchivedHandShake> for PeerInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct PeerConnPinger {
|
||||||
|
my_node_id: uuid::Uuid,
|
||||||
|
peer_id: uuid::Uuid,
|
||||||
|
sink: Arc<Mutex<Pin<Box<dyn DatagramSink>>>>,
|
||||||
|
ctrl_sender: broadcast::Sender<Bytes>,
|
||||||
|
latency_stats: Arc<WindowLatency>,
|
||||||
|
loss_rate_stats: Arc<AtomicU32>,
|
||||||
|
tasks: JoinSet<Result<(), TunnelError>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Debug for PeerConnPinger {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
f.debug_struct("PeerConnPinger")
|
||||||
|
.field("my_node_id", &self.my_node_id)
|
||||||
|
.field("peer_id", &self.peer_id)
|
||||||
|
.finish()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl PeerConnPinger {
|
||||||
|
pub fn new(
|
||||||
|
my_node_id: uuid::Uuid,
|
||||||
|
peer_id: uuid::Uuid,
|
||||||
|
sink: Pin<Box<dyn DatagramSink>>,
|
||||||
|
ctrl_sender: broadcast::Sender<Bytes>,
|
||||||
|
latency_stats: Arc<WindowLatency>,
|
||||||
|
loss_rate_stats: Arc<AtomicU32>,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
my_node_id,
|
||||||
|
peer_id,
|
||||||
|
sink: Arc::new(Mutex::new(sink)),
|
||||||
|
tasks: JoinSet::new(),
|
||||||
|
latency_stats,
|
||||||
|
ctrl_sender,
|
||||||
|
loss_rate_stats,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn ping(&self) -> Result<(), TunnelError> {
|
||||||
|
let mut sink = self.sink.lock().await;
|
||||||
|
let ping_packet = Packet::new_ping_packet(uuid::Uuid::new_v4(), uuid::Uuid::new_v4(), 0);
|
||||||
|
sink.send(ping_packet.into()).await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn do_pingpong_once(
|
||||||
|
my_node_id: uuid::Uuid,
|
||||||
|
peer_id: uuid::Uuid,
|
||||||
|
sink: Arc<Mutex<Pin<Box<dyn DatagramSink>>>>,
|
||||||
|
receiver: &mut broadcast::Receiver<Bytes>,
|
||||||
|
seq: u32,
|
||||||
|
) -> Result<u128, TunnelError> {
|
||||||
|
// should add seq here. so latency can be calculated more accurately
|
||||||
|
let req = build_ctrl_msg(
|
||||||
|
packet::Packet::new_ping_packet(my_node_id, peer_id, seq).into(),
|
||||||
|
true,
|
||||||
|
);
|
||||||
|
tracing::trace!("send ping packet: {:?}", req);
|
||||||
|
sink.lock().await.send(req).await.map_err(|e| {
|
||||||
|
tracing::warn!("send ping packet error: {:?}", e);
|
||||||
|
TunnelError::CommonError("send ping packet error".to_owned())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let now = std::time::Instant::now();
|
||||||
|
|
||||||
|
// wait until we get a pong packet in ctrl_resp_receiver
|
||||||
|
let resp = timeout(Duration::from_secs(1), async {
|
||||||
|
loop {
|
||||||
|
match receiver.recv().await {
|
||||||
|
Ok(p) => {
|
||||||
|
if let packet::ArchivedPacketBody::Ctrl(
|
||||||
|
packet::ArchivedCtrlPacketBody::Pong(resp_seq),
|
||||||
|
) = &Packet::decode(&p).body
|
||||||
|
{
|
||||||
|
if *resp_seq == seq {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
log::warn!("recv pong resp error: {:?}", e);
|
||||||
|
return Err(TunnelError::WaitRespError(
|
||||||
|
"recv pong resp error".to_owned(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
tracing::trace!(?resp, "wait ping response done");
|
||||||
|
|
||||||
|
if resp.is_err() {
|
||||||
|
return Err(TunnelError::WaitRespError(
|
||||||
|
"wait ping response timeout".to_owned(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.as_ref().unwrap().is_err() {
|
||||||
|
return Err(resp.unwrap().err().unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(now.elapsed().as_micros())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pingpong(&mut self) {
|
||||||
|
let sink = self.sink.clone();
|
||||||
|
let my_node_id = self.my_node_id;
|
||||||
|
let peer_id = self.peer_id;
|
||||||
|
let latency_stats = self.latency_stats.clone();
|
||||||
|
|
||||||
|
let (ping_res_sender, mut ping_res_receiver) = tokio::sync::mpsc::channel(100);
|
||||||
|
|
||||||
|
// generate a pingpong task every 200ms
|
||||||
|
let mut pingpong_tasks = JoinSet::new();
|
||||||
|
let ctrl_resp_sender = self.ctrl_sender.clone();
|
||||||
|
self.tasks.spawn(async move {
|
||||||
|
let mut req_seq = 0;
|
||||||
|
loop {
|
||||||
|
let receiver = ctrl_resp_sender.subscribe();
|
||||||
|
let ping_res_sender = ping_res_sender.clone();
|
||||||
|
let sink = sink.clone();
|
||||||
|
|
||||||
|
while pingpong_tasks.len() > 5 {
|
||||||
|
pingpong_tasks.join_next().await;
|
||||||
|
}
|
||||||
|
|
||||||
|
pingpong_tasks.spawn(async move {
|
||||||
|
let mut receiver = receiver.resubscribe();
|
||||||
|
let pingpong_once_ret = Self::do_pingpong_once(
|
||||||
|
my_node_id,
|
||||||
|
peer_id,
|
||||||
|
sink.clone(),
|
||||||
|
&mut receiver,
|
||||||
|
req_seq,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let _ = ping_res_sender.send(pingpong_once_ret).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
req_seq += 1;
|
||||||
|
tokio::time::sleep(Duration::from_millis(350)).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// one with 1% precision
|
||||||
|
let loss_rate_stats_1 = WindowLatency::new(100);
|
||||||
|
// one with 20% precision, so we can fast fail this conn.
|
||||||
|
let loss_rate_stats_20 = WindowLatency::new(5);
|
||||||
|
|
||||||
|
let mut counter: u64 = 0;
|
||||||
|
|
||||||
|
while let Some(ret) = ping_res_receiver.recv().await {
|
||||||
|
counter += 1;
|
||||||
|
|
||||||
|
if let Ok(lat) = ret {
|
||||||
|
latency_stats.record_latency(lat as u32);
|
||||||
|
|
||||||
|
loss_rate_stats_1.record_latency(0);
|
||||||
|
loss_rate_stats_20.record_latency(0);
|
||||||
|
} else {
|
||||||
|
loss_rate_stats_1.record_latency(1);
|
||||||
|
loss_rate_stats_20.record_latency(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
let loss_rate_20: f64 = loss_rate_stats_20.get_latency_us();
|
||||||
|
let loss_rate_1: f64 = loss_rate_stats_1.get_latency_us();
|
||||||
|
|
||||||
|
tracing::warn!(
|
||||||
|
?ret,
|
||||||
|
?self,
|
||||||
|
?loss_rate_1,
|
||||||
|
?loss_rate_20,
|
||||||
|
"pingpong task recv pingpong_once result"
|
||||||
|
);
|
||||||
|
|
||||||
|
if (counter > 5 && loss_rate_20 > 0.74) || (counter > 150 && loss_rate_1 > 0.20) {
|
||||||
|
log::warn!(
|
||||||
|
"pingpong loss rate too high, my_node_id: {}, peer_id: {}, loss_rate_20: {}, loss_rate_1: {}",
|
||||||
|
my_node_id,
|
||||||
|
peer_id,
|
||||||
|
loss_rate_20,
|
||||||
|
loss_rate_1,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.loss_rate_stats
|
||||||
|
.store((loss_rate_20 * 100.0) as u32, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
define_tunnel_filter_chain!(PeerConnTunnel, stats = StatsRecorderTunnelFilter);
|
define_tunnel_filter_chain!(PeerConnTunnel, stats = StatsRecorderTunnelFilter);
|
||||||
|
|
||||||
pub struct PeerConn {
|
pub struct PeerConn {
|
||||||
|
@ -97,6 +313,7 @@ pub struct PeerConn {
|
||||||
|
|
||||||
latency_stats: Arc<WindowLatency>,
|
latency_stats: Arc<WindowLatency>,
|
||||||
throughput: Arc<Throughput>,
|
throughput: Arc<Throughput>,
|
||||||
|
loss_rate_stats: Arc<AtomicU32>,
|
||||||
}
|
}
|
||||||
|
|
||||||
enum PeerConnPacketType {
|
enum PeerConnPacketType {
|
||||||
|
@ -132,6 +349,7 @@ impl PeerConn {
|
||||||
|
|
||||||
latency_stats: Arc::new(WindowLatency::new(15)),
|
latency_stats: Arc::new(WindowLatency::new(15)),
|
||||||
throughput: peer_conn_tunnel.stats.get_throughput().clone(),
|
throughput: peer_conn_tunnel.stats.get_throughput().clone(),
|
||||||
|
loss_rate_stats: Arc::new(AtomicU32::new(0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -177,104 +395,6 @@ impl PeerConn {
|
||||||
self.info.is_some()
|
self.info.is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_pingpong_once(
|
|
||||||
my_node_id: uuid::Uuid,
|
|
||||||
peer_id: uuid::Uuid,
|
|
||||||
sink: &mut Pin<Box<dyn DatagramSink>>,
|
|
||||||
receiver: &mut broadcast::Receiver<Bytes>,
|
|
||||||
) -> Result<u128, TunnelError> {
|
|
||||||
// should add seq here. so latency can be calculated more accurately
|
|
||||||
let req = Self::build_ctrl_msg(
|
|
||||||
packet::Packet::new_ping_packet(my_node_id, peer_id).into(),
|
|
||||||
true,
|
|
||||||
);
|
|
||||||
log::trace!("send ping packet: {:?}", req);
|
|
||||||
sink.send(req).await?;
|
|
||||||
|
|
||||||
let now = std::time::Instant::now();
|
|
||||||
|
|
||||||
// wait until we get a pong packet in ctrl_resp_receiver
|
|
||||||
let resp = timeout(Duration::from_secs(4), async {
|
|
||||||
loop {
|
|
||||||
match receiver.recv().await {
|
|
||||||
Ok(p) => {
|
|
||||||
if let packet::ArchivedPacketBody::Ctrl(
|
|
||||||
packet::ArchivedCtrlPacketBody::Pong,
|
|
||||||
) = &Packet::decode(&p).body
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("recv pong resp error: {:?}", e);
|
|
||||||
return Err(TunnelError::WaitRespError(
|
|
||||||
"recv pong resp error".to_owned(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if resp.is_err() {
|
|
||||||
return Err(TunnelError::WaitRespError(
|
|
||||||
"wait ping response timeout".to_owned(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.as_ref().unwrap().is_err() {
|
|
||||||
return Err(resp.unwrap().err().unwrap());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(now.elapsed().as_micros())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn start_pingpong(&mut self) {
|
|
||||||
let mut sink = self.tunnel.pin_sink();
|
|
||||||
let my_node_id = self.my_node_id;
|
|
||||||
let peer_id = self.get_peer_id();
|
|
||||||
let receiver = self.ctrl_resp_sender.subscribe();
|
|
||||||
let close_event_sender = self.close_event_sender.clone().unwrap();
|
|
||||||
let conn_id = self.conn_id;
|
|
||||||
let latency_stats = self.latency_stats.clone();
|
|
||||||
|
|
||||||
self.tasks.spawn(async move {
|
|
||||||
//sleep 1s
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
loop {
|
|
||||||
let mut receiver = receiver.resubscribe();
|
|
||||||
if let Ok(lat) =
|
|
||||||
Self::do_pingpong_once(my_node_id, peer_id, &mut sink, &mut receiver).await
|
|
||||||
{
|
|
||||||
log::trace!(
|
|
||||||
"pingpong latency: {}us, my_node_id: {}, peer_id: {}",
|
|
||||||
lat,
|
|
||||||
my_node_id,
|
|
||||||
peer_id
|
|
||||||
);
|
|
||||||
latency_stats.record_latency(lat as u64);
|
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
log::warn!(
|
|
||||||
"pingpong task exit, my_node_id: {}, peer_id: {}",
|
|
||||||
my_node_id,
|
|
||||||
peer_id,
|
|
||||||
);
|
|
||||||
|
|
||||||
if let Err(e) = close_event_sender.send(conn_id).await {
|
|
||||||
log::warn!("close event sender error: {:?}", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_packet_type(mut bytes_item: Bytes) -> PeerConnPacketType {
|
fn get_packet_type(mut bytes_item: Bytes) -> PeerConnPacketType {
|
||||||
if bytes_item.starts_with(CTRL_REQ_PACKET_PREFIX) {
|
if bytes_item.starts_with(CTRL_REQ_PACKET_PREFIX) {
|
||||||
PeerConnPacketType::CtrlReq(bytes_item.split_off(CTRL_REQ_PACKET_PREFIX.len()))
|
PeerConnPacketType::CtrlReq(bytes_item.split_off(CTRL_REQ_PACKET_PREFIX.len()))
|
||||||
|
@ -291,12 +411,13 @@ impl PeerConn {
|
||||||
) -> Result<Bytes, TunnelError> {
|
) -> Result<Bytes, TunnelError> {
|
||||||
let packet = Packet::decode(&bytes_item);
|
let packet = Packet::decode(&bytes_item);
|
||||||
match packet.body {
|
match packet.body {
|
||||||
packet::ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::Ping) => {
|
packet::ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::Ping(seq)) => {
|
||||||
log::trace!("recv ping packet: {:?}", packet);
|
log::trace!("recv ping packet: {:?}", packet);
|
||||||
Ok(Self::build_ctrl_msg(
|
Ok(build_ctrl_msg(
|
||||||
packet::Packet::new_pong_packet(
|
packet::Packet::new_pong_packet(
|
||||||
conn_info.my_node_id.parse().unwrap(),
|
conn_info.my_node_id.parse().unwrap(),
|
||||||
conn_info.peer_id.parse().unwrap(),
|
conn_info.peer_id.parse().unwrap(),
|
||||||
|
seq.into(),
|
||||||
)
|
)
|
||||||
.into(),
|
.into(),
|
||||||
false,
|
false,
|
||||||
|
@ -309,6 +430,32 @@ impl PeerConn {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn start_pingpong(&mut self) {
|
||||||
|
let mut pingpong = PeerConnPinger::new(
|
||||||
|
self.my_node_id,
|
||||||
|
self.get_peer_id(),
|
||||||
|
self.tunnel.pin_sink(),
|
||||||
|
self.ctrl_resp_sender.clone(),
|
||||||
|
self.latency_stats.clone(),
|
||||||
|
self.loss_rate_stats.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let close_event_sender = self.close_event_sender.clone().unwrap();
|
||||||
|
let conn_id = self.conn_id;
|
||||||
|
|
||||||
|
self.tasks.spawn(async move {
|
||||||
|
pingpong.pingpong().await;
|
||||||
|
|
||||||
|
tracing::warn!(?pingpong, "pingpong task exit");
|
||||||
|
|
||||||
|
if let Err(e) = close_event_sender.send(conn_id).await {
|
||||||
|
log::warn!("close event sender error: {:?}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
pub fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
|
pub fn start_recv_loop(&mut self, packet_recv_chan: PacketRecvChan) {
|
||||||
let mut stream = self.tunnel.pin_stream();
|
let mut stream = self.tunnel.pin_stream();
|
||||||
let mut sink = self.tunnel.pin_sink();
|
let mut sink = self.tunnel.pin_sink();
|
||||||
|
@ -356,27 +503,12 @@ impl PeerConn {
|
||||||
tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument),
|
tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument),
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
self.start_pingpong();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn send_msg(&mut self, msg: Bytes) -> Result<(), TunnelError> {
|
pub async fn send_msg(&mut self, msg: Bytes) -> Result<(), TunnelError> {
|
||||||
self.sink.send(msg).await
|
self.sink.send(msg).await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_ctrl_msg(msg: Bytes, is_req: bool) -> Bytes {
|
|
||||||
let prefix: &'static [u8] = if is_req {
|
|
||||||
CTRL_REQ_PACKET_PREFIX
|
|
||||||
} else {
|
|
||||||
CTRL_RESP_PACKET_PREFIX
|
|
||||||
};
|
|
||||||
let mut new_msg = BytesMut::new();
|
|
||||||
new_msg.reserve(prefix.len() + msg.len());
|
|
||||||
new_msg.extend_from_slice(prefix);
|
|
||||||
new_msg.extend_from_slice(&msg);
|
|
||||||
new_msg.into()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_peer_id(&self) -> uuid::Uuid {
|
pub fn get_peer_id(&self) -> uuid::Uuid {
|
||||||
self.info.as_ref().unwrap().my_peer_id
|
self.info.as_ref().unwrap().my_peer_id
|
||||||
}
|
}
|
||||||
|
@ -405,6 +537,7 @@ impl PeerConn {
|
||||||
features: self.info.as_ref().unwrap().features.clone(),
|
features: self.info.as_ref().unwrap().features.clone(),
|
||||||
tunnel: self.tunnel.info(),
|
tunnel: self.tunnel.info(),
|
||||||
stats: Some(self.get_stats()),
|
stats: Some(self.get_stats()),
|
||||||
|
loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -426,8 +559,10 @@ mod tests {
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::common::config_fs::ConfigFs;
|
use crate::common::config_fs::ConfigFs;
|
||||||
|
use crate::common::global_ctx::tests::get_mock_global_ctx;
|
||||||
use crate::common::global_ctx::GlobalCtx;
|
use crate::common::global_ctx::GlobalCtx;
|
||||||
use crate::common::netns::NetNS;
|
use crate::common::netns::NetNS;
|
||||||
|
use crate::tunnels::tunnel_filter::tests::DropSendTunnelFilter;
|
||||||
use crate::tunnels::tunnel_filter::{PacketRecorderTunnelFilter, TunnelWithFilter};
|
use crate::tunnels::tunnel_filter::{PacketRecorderTunnelFilter, TunnelWithFilter};
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
|
@ -481,4 +616,50 @@ mod tests {
|
||||||
assert_eq!(c_peer.get_peer_id(), s_uuid);
|
assert_eq!(c_peer.get_peer_id(), s_uuid);
|
||||||
assert_eq!(s_peer.get_peer_id(), c_uuid);
|
assert_eq!(s_peer.get_peer_id(), c_uuid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) {
|
||||||
|
use crate::tunnels::ring_tunnel::create_ring_tunnel_pair;
|
||||||
|
let (c, s) = create_ring_tunnel_pair();
|
||||||
|
|
||||||
|
// drop 1-3 packets should not affect pingpong
|
||||||
|
let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end));
|
||||||
|
let c = TunnelWithFilter::new(c, c_recorder.clone());
|
||||||
|
|
||||||
|
let c_uuid = uuid::Uuid::new_v4();
|
||||||
|
let s_uuid = uuid::Uuid::new_v4();
|
||||||
|
|
||||||
|
let mut c_peer = PeerConn::new(c_uuid, get_mock_global_ctx(), Box::new(c));
|
||||||
|
let mut s_peer = PeerConn::new(s_uuid, get_mock_global_ctx(), Box::new(s));
|
||||||
|
|
||||||
|
let (c_ret, s_ret) = tokio::join!(
|
||||||
|
c_peer.do_handshake_as_client(),
|
||||||
|
s_peer.do_handshake_as_server()
|
||||||
|
);
|
||||||
|
|
||||||
|
s_peer.set_close_event_sender(tokio::sync::mpsc::channel(1).0);
|
||||||
|
s_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0);
|
||||||
|
|
||||||
|
assert!(c_ret.is_ok());
|
||||||
|
assert!(s_ret.is_ok());
|
||||||
|
|
||||||
|
let (close_send, mut close_recv) = tokio::sync::mpsc::channel(1);
|
||||||
|
c_peer.set_close_event_sender(close_send);
|
||||||
|
c_peer.start_pingpong();
|
||||||
|
c_peer.start_recv_loop(tokio::sync::mpsc::channel(200).0);
|
||||||
|
|
||||||
|
// wait 5s, conn should not be disconnected
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
|
||||||
|
if conn_closed {
|
||||||
|
assert!(close_recv.try_recv().is_ok());
|
||||||
|
} else {
|
||||||
|
assert!(close_recv.try_recv().is_err());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_conn_pingpong_timeout() {
|
||||||
|
peer_conn_pingpong_test_common(3, 5, false).await;
|
||||||
|
peer_conn_pingpong_test_common(5, 12, true).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
use std::sync::atomic::{AtomicU32, AtomicU64};
|
use std::sync::atomic::{AtomicU32, AtomicU64};
|
||||||
|
|
||||||
pub struct WindowLatency {
|
pub struct WindowLatency {
|
||||||
latency_us_window: Vec<AtomicU64>,
|
latency_us_window: Vec<AtomicU32>,
|
||||||
latency_us_window_index: AtomicU32,
|
latency_us_window_index: AtomicU32,
|
||||||
latency_us_window_size: AtomicU32,
|
latency_us_window_size: AtomicU32,
|
||||||
}
|
}
|
||||||
|
@ -9,13 +9,13 @@ pub struct WindowLatency {
|
||||||
impl WindowLatency {
|
impl WindowLatency {
|
||||||
pub fn new(window_size: u32) -> Self {
|
pub fn new(window_size: u32) -> Self {
|
||||||
Self {
|
Self {
|
||||||
latency_us_window: (0..window_size).map(|_| AtomicU64::new(0)).collect(),
|
latency_us_window: (0..window_size).map(|_| AtomicU32::new(0)).collect(),
|
||||||
latency_us_window_index: AtomicU32::new(0),
|
latency_us_window_index: AtomicU32::new(0),
|
||||||
latency_us_window_size: AtomicU32::new(window_size),
|
latency_us_window_size: AtomicU32::new(window_size),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn record_latency(&self, latency_us: u64) {
|
pub fn record_latency(&self, latency_us: u32) {
|
||||||
let index = self
|
let index = self
|
||||||
.latency_us_window_index
|
.latency_us_window_index
|
||||||
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
@ -27,25 +27,27 @@ impl WindowLatency {
|
||||||
.store(latency_us, std::sync::atomic::Ordering::Relaxed);
|
.store(latency_us, std::sync::atomic::Ordering::Relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_latency_us(&self) -> u64 {
|
pub fn get_latency_us<T: From<u32> + std::ops::Div<Output = T>>(&self) -> T {
|
||||||
let window_size = self
|
let window_size = self
|
||||||
.latency_us_window_size
|
.latency_us_window_size
|
||||||
.load(std::sync::atomic::Ordering::Relaxed);
|
.load(std::sync::atomic::Ordering::Relaxed);
|
||||||
let mut sum = 0;
|
let mut sum = 0;
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
for i in 0..window_size {
|
for i in 0..window_size {
|
||||||
let latency_us =
|
if i >= self
|
||||||
self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed);
|
.latency_us_window_index
|
||||||
if latency_us > 0 {
|
.load(std::sync::atomic::Ordering::Relaxed)
|
||||||
sum += latency_us;
|
{
|
||||||
count += 1;
|
break;
|
||||||
}
|
}
|
||||||
|
sum += self.latency_us_window[i as usize].load(std::sync::atomic::Ordering::Relaxed);
|
||||||
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if count == 0 {
|
if count == 0 {
|
||||||
0
|
0.into()
|
||||||
} else {
|
} else {
|
||||||
sum / count
|
(T::from(sum)) / T::from(count)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,8 +12,15 @@ use super::*;
|
||||||
use crate::tunnels::{DatagramSink, DatagramStream, SinkError, SinkItem, StreamItem, Tunnel};
|
use crate::tunnels::{DatagramSink, DatagramStream, SinkError, SinkItem, StreamItem, Tunnel};
|
||||||
|
|
||||||
pub trait TunnelFilter {
|
pub trait TunnelFilter {
|
||||||
fn before_send(&self, data: SinkItem) -> Result<SinkItem, SinkError>;
|
fn before_send(&self, data: SinkItem) -> Option<Result<SinkItem, SinkError>> {
|
||||||
fn after_received(&self, data: StreamItem) -> Result<BytesMut, TunnelError>;
|
Some(Ok(data))
|
||||||
|
}
|
||||||
|
fn after_received(&self, data: StreamItem) -> Option<Result<BytesMut, TunnelError>> {
|
||||||
|
match data {
|
||||||
|
Ok(v) => Some(Ok(v)),
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TunnelWithFilter<T, F> {
|
pub struct TunnelWithFilter<T, F> {
|
||||||
|
@ -45,8 +52,10 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
|
fn start_send(self: Pin<&mut Self>, item: SinkItem) -> Result<(), Self::Error> {
|
||||||
let item = self.filter.before_send(item)?;
|
let Some(item) = self.filter.before_send(item) else {
|
||||||
self.get_mut().sink.start_send_unpin(item)
|
return Ok(());
|
||||||
|
};
|
||||||
|
self.get_mut().sink.start_send_unpin(item?)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn poll_flush(
|
fn poll_flush(
|
||||||
|
@ -83,12 +92,21 @@ where
|
||||||
|
|
||||||
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
|
||||||
let self_mut = self.get_mut();
|
let self_mut = self.get_mut();
|
||||||
match self_mut.stream.poll_next_unpin(cx) {
|
loop {
|
||||||
Poll::Ready(Some(ret)) => {
|
match self_mut.stream.poll_next_unpin(cx) {
|
||||||
Poll::Ready(Some(self_mut.filter.after_received(ret)))
|
Poll::Ready(Some(ret)) => {
|
||||||
|
let Some(ret) = self_mut.filter.after_received(ret) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
return Poll::Ready(Some(ret));
|
||||||
|
}
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
Poll::Pending => {
|
||||||
|
return Poll::Pending;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Poll::Ready(None) => Poll::Ready(None),
|
|
||||||
Poll::Pending => Poll::Pending,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -120,18 +138,18 @@ pub struct PacketRecorderTunnelFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TunnelFilter for PacketRecorderTunnelFilter {
|
impl TunnelFilter for PacketRecorderTunnelFilter {
|
||||||
fn before_send(&self, data: SinkItem) -> Result<SinkItem, SinkError> {
|
fn before_send(&self, data: SinkItem) -> Option<Result<SinkItem, SinkError>> {
|
||||||
self.received.lock().unwrap().push(data.clone());
|
self.received.lock().unwrap().push(data.clone());
|
||||||
Ok(data)
|
Some(Ok(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn after_received(&self, data: StreamItem) -> Result<BytesMut, TunnelError> {
|
fn after_received(&self, data: StreamItem) -> Option<Result<BytesMut, TunnelError>> {
|
||||||
match data {
|
match data {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
self.sent.lock().unwrap().push(v.clone().into());
|
self.sent.lock().unwrap().push(v.clone().into());
|
||||||
Ok(v)
|
Some(Ok(v))
|
||||||
}
|
}
|
||||||
Err(e) => Err(e),
|
Err(e) => Some(Err(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -150,18 +168,18 @@ pub struct StatsRecorderTunnelFilter {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TunnelFilter for StatsRecorderTunnelFilter {
|
impl TunnelFilter for StatsRecorderTunnelFilter {
|
||||||
fn before_send(&self, data: SinkItem) -> Result<SinkItem, SinkError> {
|
fn before_send(&self, data: SinkItem) -> Option<Result<SinkItem, SinkError>> {
|
||||||
self.throughput.record_tx_bytes(data.len() as u64);
|
self.throughput.record_tx_bytes(data.len() as u64);
|
||||||
Ok(data)
|
Some(Ok(data))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn after_received(&self, data: StreamItem) -> Result<BytesMut, TunnelError> {
|
fn after_received(&self, data: StreamItem) -> Option<Result<BytesMut, TunnelError>> {
|
||||||
match data {
|
match data {
|
||||||
Ok(v) => {
|
Ok(v) => {
|
||||||
self.throughput.record_rx_bytes(v.len() as u64);
|
self.throughput.record_rx_bytes(v.len() as u64);
|
||||||
Ok(v)
|
Some(Ok(v))
|
||||||
}
|
}
|
||||||
Err(e) => Err(e),
|
Err(e) => Some(Err(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -203,9 +221,42 @@ macro_rules! define_tunnel_filter_chain {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
pub mod tests {
|
||||||
|
use std::sync::atomic::{AtomicU32, Ordering};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::tunnels::ring_tunnel::RingTunnel;
|
use crate::tunnels::ring_tunnel::RingTunnel;
|
||||||
|
|
||||||
|
pub struct DropSendTunnelFilter {
|
||||||
|
start: AtomicU32,
|
||||||
|
end: AtomicU32,
|
||||||
|
cur: AtomicU32,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TunnelFilter for DropSendTunnelFilter {
|
||||||
|
fn before_send(&self, data: SinkItem) -> Option<Result<SinkItem, SinkError>> {
|
||||||
|
self.cur.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if self.cur.load(Ordering::SeqCst) >= self.start.load(Ordering::SeqCst)
|
||||||
|
&& self.cur.load(std::sync::atomic::Ordering::SeqCst)
|
||||||
|
< self.end.load(Ordering::SeqCst)
|
||||||
|
{
|
||||||
|
tracing::trace!("drop packet: {:?}", data);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
Some(Ok(data))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DropSendTunnelFilter {
|
||||||
|
pub fn new(start: u32, end: u32) -> Self {
|
||||||
|
Self {
|
||||||
|
start: AtomicU32::new(start),
|
||||||
|
end: AtomicU32::new(end),
|
||||||
|
cur: AtomicU32::new(0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_nested_filter() {
|
async fn test_nested_filter() {
|
||||||
define_tunnel_filter_chain!(
|
define_tunnel_filter_chain!(
|
||||||
|
|
Loading…
Reference in New Issue
Block a user