diff --git a/easytier/proto/cli.proto b/easytier/proto/cli.proto index 971002b..03bdba2 100644 --- a/easytier/proto/cli.proto +++ b/easytier/proto/cli.proto @@ -2,174 +2,171 @@ syntax = "proto3"; package cli; message Status { - int32 code = 1; - string message = 2; + int32 code = 1; + string message = 2; } message PeerConnStats { - uint64 rx_bytes = 1; - uint64 tx_bytes = 2; + uint64 rx_bytes = 1; + uint64 tx_bytes = 2; - uint64 rx_packets = 3; - uint64 tx_packets = 4; + uint64 rx_packets = 3; + uint64 tx_packets = 4; - uint64 latency_us = 5; + uint64 latency_us = 5; } message TunnelInfo { - string tunnel_type = 1; - string local_addr = 2; - string remote_addr = 3; + string tunnel_type = 1; + string local_addr = 2; + string remote_addr = 3; } message PeerConnInfo { - string conn_id = 1; - uint32 my_peer_id = 2; - uint32 peer_id = 3; - repeated string features = 4; - TunnelInfo tunnel = 5; - PeerConnStats stats = 6; - float loss_rate = 7; + string conn_id = 1; + uint32 my_peer_id = 2; + uint32 peer_id = 3; + repeated string features = 4; + TunnelInfo tunnel = 5; + PeerConnStats stats = 6; + float loss_rate = 7; } message PeerInfo { - uint32 peer_id = 1; - repeated PeerConnInfo conns = 2; + uint32 peer_id = 1; + repeated PeerConnInfo conns = 2; } message ListPeerRequest {} -message ListPeerResponse { - repeated PeerInfo peer_infos = 1; -} +message ListPeerResponse { repeated PeerInfo peer_infos = 1; } enum NatType { - // has NAT; but own a single public IP, port is not changed - Unknown = 0; - OpenInternet = 1; - NoPAT = 2; - FullCone = 3; - Restricted = 4; - PortRestricted = 5; - Symmetric = 6; - SymUdpFirewall = 7; + // has NAT; but own a single public IP, port is not changed + Unknown = 0; + OpenInternet = 1; + NoPAT = 2; + FullCone = 3; + Restricted = 4; + PortRestricted = 5; + Symmetric = 6; + SymUdpFirewall = 7; } message StunInfo { - NatType udp_nat_type = 1; - NatType tcp_nat_type = 2; - int64 last_update_time = 3; - repeated string public_ip = 4; - uint32 min_port = 5; - uint32 max_port = 6; + NatType udp_nat_type = 1; + NatType tcp_nat_type = 2; + int64 last_update_time = 3; + repeated string public_ip = 4; + uint32 min_port = 5; + uint32 max_port = 6; } message Route { - uint32 peer_id = 1; - string ipv4_addr = 2; - uint32 next_hop_peer_id = 3; - int32 cost = 4; - repeated string proxy_cidrs = 5; - string hostname = 6; - StunInfo stun_info = 7; - string inst_id = 8; + uint32 peer_id = 1; + string ipv4_addr = 2; + uint32 next_hop_peer_id = 3; + int32 cost = 4; + repeated string proxy_cidrs = 5; + string hostname = 6; + StunInfo stun_info = 7; + string inst_id = 8; } message ListRouteRequest {} -message ListRouteResponse { - repeated Route routes = 1; -} +message ListRouteResponse { repeated Route routes = 1; } + +message DumpRouteRequest {} + +message DumpRouteResponse { string result = 1; } service PeerManageRpc { - rpc ListPeer (ListPeerRequest) returns (ListPeerResponse); - rpc ListRoute (ListRouteRequest) returns (ListRouteResponse); + rpc ListPeer(ListPeerRequest) returns (ListPeerResponse); + rpc ListRoute(ListRouteRequest) returns (ListRouteResponse); + rpc DumpRoute(DumpRouteRequest) returns (DumpRouteResponse); } enum ConnectorStatus { - CONNECTED = 0; - DISCONNECTED = 1; - CONNECTING = 2; + CONNECTED = 0; + DISCONNECTED = 1; + CONNECTING = 2; } message Connector { - string url = 1; - ConnectorStatus status = 2; + string url = 1; + ConnectorStatus status = 2; } message ListConnectorRequest {} -message ListConnectorResponse { - repeated Connector connectors = 1; -} +message ListConnectorResponse { repeated Connector connectors = 1; } enum ConnectorManageAction { - ADD = 0; - REMOVE = 1; + ADD = 0; + REMOVE = 1; } message ManageConnectorRequest { - ConnectorManageAction action = 1; - string url = 2; + ConnectorManageAction action = 1; + string url = 2; } -message ManageConnectorResponse { } +message ManageConnectorResponse {} service ConnectorManageRpc { - rpc ListConnector (ListConnectorRequest) returns (ListConnectorResponse); - rpc ManageConnector (ManageConnectorRequest) returns (ManageConnectorResponse); + rpc ListConnector(ListConnectorRequest) returns (ListConnectorResponse); + rpc ManageConnector(ManageConnectorRequest) returns (ManageConnectorResponse); } -message DirectConnectedPeerInfo { - int32 latency_ms = 1; -} +message DirectConnectedPeerInfo { int32 latency_ms = 1; } message PeerInfoForGlobalMap { - map direct_peers = 1; + map direct_peers = 1; } message GetGlobalPeerMapRequest {} message GetGlobalPeerMapResponse { - map global_peer_map = 1; + map global_peer_map = 1; } service PeerCenterRpc { - rpc GetGlobalPeerMap (GetGlobalPeerMapRequest) returns (GetGlobalPeerMapResponse); + rpc GetGlobalPeerMap(GetGlobalPeerMapRequest) + returns (GetGlobalPeerMapResponse); } message VpnPortalInfo { - string vpn_type = 1; - string client_config = 2; - repeated string connected_clients = 3; + string vpn_type = 1; + string client_config = 2; + repeated string connected_clients = 3; } message GetVpnPortalInfoRequest {} -message GetVpnPortalInfoResponse { - VpnPortalInfo vpn_portal_info = 1; -} +message GetVpnPortalInfoResponse { VpnPortalInfo vpn_portal_info = 1; } service VpnPortalRpc { - rpc GetVpnPortalInfo (GetVpnPortalInfoRequest) returns (GetVpnPortalInfoResponse); + rpc GetVpnPortalInfo(GetVpnPortalInfoRequest) + returns (GetVpnPortalInfoResponse); } message HandshakeRequest { - uint32 magic = 1; - uint32 my_peer_id = 2; - uint32 version = 3; - repeated string features = 4; - string network_name = 5; - bytes network_secret_digrest = 6; + uint32 magic = 1; + uint32 my_peer_id = 2; + uint32 version = 3; + repeated string features = 4; + string network_name = 5; + bytes network_secret_digrest = 6; } message TaRpcPacket { - uint32 from_peer = 1; - uint32 to_peer = 2; - uint32 service_id = 3; - uint32 transact_id = 4; - bool is_req = 5; - bytes content = 6; + uint32 from_peer = 1; + uint32 to_peer = 2; + uint32 service_id = 3; + uint32 transact_id = 4; + bool is_req = 5; + bytes content = 6; - uint32 total_pieces = 7; - uint32 piece_idx = 8; + uint32 total_pieces = 7; + uint32 piece_idx = 8; } diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index f9f8365..bfcf96c 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -42,7 +42,7 @@ enum SubCommand { Peer(PeerArgs), Connector(ConnectorArgs), Stun, - Route, + Route(RouteArgs), PeerCenter, VpnPortal, } @@ -72,6 +72,18 @@ enum PeerSubCommand { List(PeerListArgs), } +#[derive(Args, Debug)] +struct RouteArgs { + #[command(subcommand)] + sub_command: Option, +} + +#[derive(Subcommand, Debug)] +enum RouteSubCommand { + List, + Dump, +} + #[derive(Args, Debug)] struct ConnectorArgs { #[arg(short, long)] @@ -204,6 +216,14 @@ impl CommandHandler { Ok(()) } + async fn handle_route_dump(&self) -> Result<(), Error> { + let mut client = self.get_peer_manager_client().await?; + let request = tonic::Request::new(DumpRouteRequest::default()); + let response = client.dump_route(request).await?; + println!("response: {}", response.into_inner().result); + Ok(()) + } + async fn handle_route_list(&self) -> Result<(), Error> { #[derive(tabled::Tabled)] struct RouteTableItem { @@ -307,9 +327,10 @@ async fn main() -> Result<(), Error> { handler.handle_connector_list().await?; } }, - SubCommand::Route => { - handler.handle_route_list().await?; - } + SubCommand::Route(route_args) => match route_args.sub_command { + Some(RouteSubCommand::List) | None => handler.handle_route_list().await?, + Some(RouteSubCommand::Dump) => handler.handle_route_dump().await?, + }, SubCommand::Stun => { timeout(Duration::from_secs(5), async move { let collector = StunInfoCollector::new_with_default_servers(); diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 1a198b4..f10fba6 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -535,6 +535,10 @@ impl PeerManager { self.get_route().list_routes().await } + pub async fn dump_route(&self) -> String { + self.get_route().dump().await + } + async fn run_nic_packet_process_pipeline(&self, data: &mut ZCPacket) { for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() { pipeline.try_process_packet_from_nic(data).await; diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 7bde102..4222caf 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -700,9 +700,13 @@ impl Debug for PeerRouteServiceImpl { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerRouteServiceImpl") .field("my_peer_id", &self.my_peer_id) + .field("sessions", &self.sessions) + .field("route_table", &self.route_table) + .field("route_table_with_cost", &self.route_table_with_cost) + .field("synced_route_info", &self.synced_route_info) .field( - "sessions", - &self.sessions.iter().map(|x| *x.key()).collect::>(), + "cached_local_conn_map", + &self.cached_local_conn_map.lock().unwrap(), ) .finish() } @@ -842,9 +846,7 @@ impl PeerRouteServiceImpl { let all_peer_ids = &conn_bitmap.peer_ids; for (peer_idx, (peer_id, _)) in all_peer_ids.iter().enumerate() { - let Some(connected) = self.synced_route_info.conn_map.get(peer_id) else { - continue; - }; + let connected = self.synced_route_info.conn_map.get(peer_id).unwrap(); for (idx, (other_peer_id, _)) in all_peer_ids.iter().enumerate() { if connected.0.contains(other_peer_id) { @@ -942,7 +944,7 @@ impl PeerRouteServiceImpl { let my_peer_id = self.my_peer_id; let (peer_infos, conn_bitmap) = self.build_sync_request(&session); - tracing::trace!("my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", + tracing::info!("my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session); if peer_infos.is_none() @@ -952,6 +954,10 @@ impl PeerRouteServiceImpl { return true; } + session + .need_sync_initiator_info + .store(false, Ordering::Relaxed); + let ret = peer_rpc .do_client_rpc_scoped(SERVICE_ID, dst_peer_id, |c| async { let client = RouteServiceClient::new(tarpc::client::Config::default(), c).spawn(); @@ -978,10 +984,6 @@ impl PeerRouteServiceImpl { .dst_is_initiator .store(ret.is_initiator, Ordering::Relaxed); - session - .need_sync_initiator_info - .store(false, Ordering::Relaxed); - session.update_dst_session_id(ret.session_id); if let Some(peer_infos) = &peer_infos { @@ -1014,6 +1016,22 @@ struct RouteSessionManager { sync_now_broadcast: tokio::sync::broadcast::Sender<()>, } +impl Debug for RouteSessionManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RouteSessionManager") + .field( + "session_tasks", + &self + .session_tasks + .iter() + .map(|x| *x.key()) + .collect::>(), + ) + .field("dump_sessions", &self.dump_sessions()) + .finish() + } +} + #[tarpc::server] impl RouteService for RouteSessionManager { async fn sync_route_info( @@ -1052,7 +1070,7 @@ impl RouteService for RouteSessionManager { service_impl.update_route_table_and_cached_local_conn_bitmap(); - tracing::debug!( + tracing::info!( "sync_route_info: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}", from_peer_id, is_initiator, peer_infos, conn_bitmap, service_impl.synced_route_info, session, service_impl.route_table); @@ -1297,6 +1315,16 @@ pub struct PeerRoute { tasks: std::sync::Mutex>, } +impl Debug for PeerRoute { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PeerRoute") + .field("my_peer_id", &self.my_peer_id) + .field("service_impl", &self.service_impl) + .field("session_mgr", &self.session_mgr) + .finish() + } +} + impl PeerRoute { pub fn new( my_peer_id: PeerId, @@ -1322,6 +1350,8 @@ impl PeerRoute { loop { tokio::time::sleep(Duration::from_secs(60)).await; service_impl.clear_expired_peer(); + // TODO: use debug log level for this. + tracing::info!(?service_impl, "clear_expired_peer"); } } @@ -1449,6 +1479,10 @@ impl Route for PeerRoute { *self.service_impl.cost_calculator.lock().unwrap() = Some(_cost_fn); self.service_impl.update_route_table(); } + + async fn dump(&self) -> String { + format!("{:#?}", self) + } } impl PeerPacketFilter for Arc {} diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index ad64df3..aec6cad 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -78,6 +78,10 @@ pub trait Route { } async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {} + + async fn dump(&self) -> String { + "this route implementation does not support dump".to_string() + } } pub type ArcRoute = Arc>; diff --git a/easytier/src/peers/rpc_service.rs b/easytier/src/peers/rpc_service.rs index 2f76384..2323e34 100644 --- a/easytier/src/peers/rpc_service.rs +++ b/easytier/src/peers/rpc_service.rs @@ -1,9 +1,8 @@ use std::sync::Arc; use crate::rpc::{ - cli::PeerInfo, - peer_manage_rpc_server::PeerManageRpc, - {ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse}, + cli::PeerInfo, peer_manage_rpc_server::PeerManageRpc, DumpRouteRequest, DumpRouteResponse, + ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse, }; use tonic::{Request, Response, Status}; @@ -60,4 +59,13 @@ impl PeerManageRpc for PeerManagerRpcService { reply.routes = self.peer_manager.list_routes().await; Ok(Response::new(reply)) } + + async fn dump_route( + &self, + _request: Request, // Accept request of type HelloRequest + ) -> Result, Status> { + let mut reply = DumpRouteResponse::default(); + reply.result = self.peer_manager.dump_route().await; + Ok(Response::new(reply)) + } }