From a78b7597416f1a12c27eb94d9f1f4f6b92425e59 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 26 Oct 2024 00:04:22 +0800 Subject: [PATCH 1/3] feat/web (Patchset 2) (#444) This patch implement a restful server without any auth. usage: ```bash # run easytier-web, which acts as an gateway and registry for all easytier-core $> easytier-web # run easytier-core and connect to easytier-web with a token $> easytier-core --config-server udp://127.0.0.1:22020/fdsafdsa # use restful api to list session $> curl -H "Content-Type: application/json" -X GET 127.0.0.1:11211/api/v1/sessions [{"token":"fdsafdsa","client_url":"udp://127.0.0.1:48915","machine_id":"de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f"}]% # use restful api to run a network instance $> curl -H "Content-Type: application/json" -X POST 127.0.0.1:11211/api/v1/network/de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f -d '{"config": "listeners = [\"udp://0.0.0.0:12344\"]"}' # use restful api to get network instance info $> curl -H "Content-Type: application/json" -X GET 127.0.0.1:11211/api/v1/network/de3f5b8f-0f2f-d9d0-fb30-a2ac8951d92f/65437e50-b286-4098-a624-74429f2cb839 ``` --- .github/workflows/core.yml | 5 +- Cargo.lock | 128 ++++++++- Cargo.toml | 2 +- easytier-gui/src/types/network.ts | 2 +- easytier-web/Cargo.toml | 23 +- easytier-web/src/client_manager/mod.rs | 134 ++++++++++ easytier-web/src/client_manager/session.rs | 144 ++++++++++ easytier-web/src/client_manager/storage.rs | 72 +++++ easytier-web/src/main.rs | 23 +- easytier-web/src/restful/mod.rs | 246 ++++++++++++++++++ easytier/Cargo.toml | 4 + easytier/build.rs | 2 + easytier/src/common/config.rs | 8 +- easytier/src/common/mod.rs | 30 +++ .../connector/udp_hole_punch/sym_to_cone.rs | 2 +- easytier/src/easytier-cli.rs | 109 ++++---- easytier/src/easytier-core.rs | 50 +++- easytier/src/launcher.rs | 55 ++-- easytier/src/lib.rs | 1 + easytier/src/peers/foreign_network_manager.rs | 9 +- easytier/src/proto/cli.proto | 5 + easytier/src/proto/cli.rs | 112 ++++++++ easytier/src/proto/error.proto | 2 +- easytier/src/proto/error.rs | 34 +-- easytier/src/proto/mod.rs | 1 + easytier/src/proto/rpc_impl/bidirect.rs | 19 +- easytier/src/proto/tests.rs | 6 + easytier/src/proto/web.proto | 100 +++++++ easytier/src/proto/web.rs | 1 + easytier/src/utils.rs | 128 +-------- easytier/src/web_client/controller.rs | 171 ++++++++++++ easytier/src/web_client/mod.rs | 48 ++++ easytier/src/web_client/session.rs | 126 +++++++++ 33 files changed, 1539 insertions(+), 263 deletions(-) create mode 100644 easytier-web/src/client_manager/mod.rs create mode 100644 easytier-web/src/client_manager/session.rs create mode 100644 easytier-web/src/client_manager/storage.rs create mode 100644 easytier-web/src/restful/mod.rs create mode 100644 easytier/src/proto/web.proto create mode 100644 easytier/src/proto/web.rs create mode 100644 easytier/src/web_client/controller.rs create mode 100644 easytier/src/web_client/mod.rs create mode 100644 easytier/src/web_client/session.rs diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index caeb123..846cc1d 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -111,7 +111,7 @@ jobs: run: | bash ./.github/workflows/install_rust.sh if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then - cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips + cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips --package=easytier else cargo build --release --verbose --target $TARGET fi @@ -182,6 +182,9 @@ jobs: mv ./target/$TARGET/release/easytier-core"$SUFFIX" ./artifacts/objects/ mv ./target/$TARGET/release/easytier-cli"$SUFFIX" ./artifacts/objects/ + if [[ ! $TARGET =~ ^mips.*$ ]]; then + mv ./target/$TARGET/release/easytier-web"$SUFFIX" ./artifacts/objects/ + fi mv ./artifacts/objects/* ./artifacts/ rm -rf ./artifacts/objects/ diff --git a/Cargo.lock b/Cargo.lock index 9ac867e..9cfb919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -428,6 +428,73 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "axum" +version = "0.7.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504e3947307ac8326a5437504c517c4b56716c9d98fac0028c2acc7ca47d70ae" +dependencies = [ + "async-trait", + "axum-core", + "axum-macros", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", + "hyper-util", + "itoa 1.0.11", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower 0.5.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 1.0.1", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.74", +] + [[package]] name = "backtrace" version = "0.3.73" @@ -1312,9 +1379,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.0.1" +version = "6.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" dependencies = [ "cfg-if", "crossbeam-utils", @@ -1574,6 +1641,7 @@ dependencies = [ "http 1.1.0", "humansize", "indexmap 1.9.3", + "machine-uid", "mimalloc-rust", "network-interface", "nix 0.27.1", @@ -1678,7 +1746,18 @@ dependencies = [ name = "easytier-web" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", + "axum", + "clap", + "dashmap", "easytier", + "serde", + "thiserror", + "tokio", + "tracing", + "url", + "uuid", ] [[package]] @@ -2733,6 +2812,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "httparse", + "httpdate", "itoa 1.0.11", "pin-project-lite", "smallvec", @@ -2768,7 +2848,7 @@ dependencies = [ "pin-project-lite", "socket2", "tokio", - "tower", + "tower 0.4.13", "tower-service", "tracing", ] @@ -3258,6 +3338,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "machine-uid" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4506fa0abb0a2ea93f5862f55973da0a662d2ad0e98f337a1c5aac657f0892" +dependencies = [ + "libc", + "winreg 0.52.0", +] + [[package]] name = "malloc_buf" version = "0.0.6" @@ -3302,6 +3392,12 @@ version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5" +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md5" version = "0.7.0" @@ -5326,6 +5422,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af99884400da37c88f5e9146b7f1fd0fbcae8f6eec4e9da38b67d05486f814a6" +dependencies = [ + "itoa 1.0.11", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.19" @@ -6583,6 +6689,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2873938d487c3cfb9aed7546dc9f2711d867c9f90c46b889989a2cb84eba6b4f" +dependencies = [ + "futures-core", + "futures-util", + "pin-project-lite", + "sync_wrapper 0.1.2", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/Cargo.toml b/Cargo.toml index 61fe0f0..33d740e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] resolver = "2" members = ["easytier", "easytier-gui/src-tauri", "easytier-rpc-build", "easytier-web"] -default-members = ["easytier"] +default-members = ["easytier", "easytier-web"] [profile.dev] panic = "unwind" diff --git a/easytier-gui/src/types/network.ts b/easytier-gui/src/types/network.ts index 0c7f6e1..91fe982 100644 --- a/easytier-gui/src/types/network.ts +++ b/easytier-gui/src/types/network.ts @@ -11,7 +11,7 @@ export interface NetworkConfig { dhcp: boolean virtual_ipv4: string - network_length: number, + network_length: number hostname?: string network_name: string network_secret: string diff --git a/easytier-web/Cargo.toml b/easytier-web/Cargo.toml index 57fd26b..c876d9d 100644 --- a/easytier-web/Cargo.toml +++ b/easytier-web/Cargo.toml @@ -4,4 +4,25 @@ version = "0.1.0" edition = "2021" [dependencies] -easytier = { path = "../easytier" } \ No newline at end of file +easytier = { path = "../easytier" } +tracing = { version = "0.1", features = ["log"] } +anyhow = { version = "1.0" } +thiserror = "1.0" +tokio = { version = "1", features = ["full"] } +dashmap = "6.1" +url = "2.2" +async-trait = "0.1" +axum = { version = "0.7", features = ["macros"] } +clap = { version = "4.4.8", features = [ + "string", + "unicode", + "derive", + "wrap_help", +] } +serde = { version = "1.0", features = ["derive"] } +uuid = { version = "1.5.0", features = [ + "v4", + "fast-rng", + "macro-diagnostics", + "serde", +] } diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs new file mode 100644 index 0000000..7d4a81f --- /dev/null +++ b/easytier-web/src/client_manager/mod.rs @@ -0,0 +1,134 @@ +pub mod session; +pub mod storage; + +use std::sync::Arc; + +use dashmap::DashMap; +use easytier::{common::scoped_task::ScopedTask, tunnel::TunnelListener}; +use session::Session; +use storage::{Storage, StorageToken}; + +#[derive(Debug)] +pub struct ClientManager { + accept_task: Option>, + clear_task: Option>, + + client_sessions: Arc>>, + storage: Storage, +} + +impl ClientManager { + pub fn new() -> Self { + ClientManager { + accept_task: None, + clear_task: None, + + client_sessions: Arc::new(DashMap::new()), + storage: Storage::new(), + } + } + + pub async fn serve( + &mut self, + mut listener: L, + ) -> Result<(), anyhow::Error> { + listener.listen().await?; + + let sessions = self.client_sessions.clone(); + let storage = self.storage.weak_ref(); + let task = tokio::spawn(async move { + while let Ok(tunnel) = listener.accept().await { + let info = tunnel.info().unwrap(); + let client_url: url::Url = info.remote_addr.unwrap().into(); + println!("New session from {:?}", tunnel.info()); + let session = Session::new(tunnel, storage.clone(), client_url.clone()); + sessions.insert(client_url, Arc::new(session)); + } + }); + + self.accept_task = Some(ScopedTask::from(task)); + + let sessions = self.client_sessions.clone(); + let task = tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(15)).await; + sessions.retain(|_, session| session.is_running()); + } + }); + self.clear_task = Some(ScopedTask::from(task)); + + Ok(()) + } + + pub fn is_running(&self) -> bool { + self.accept_task.is_some() && self.clear_task.is_some() + } + + pub async fn list_sessions(&self) -> Vec { + let sessions = self + .client_sessions + .iter() + .map(|item| item.value().clone()) + .collect::>(); + + let mut ret: Vec = vec![]; + for s in sessions { + if let Some(t) = s.get_token().await { + ret.push(t); + } + } + + ret + } + + pub fn get_session_by_machine_id(&self, machine_id: &uuid::Uuid) -> Option> { + let c_url = self.storage.get_client_url_by_machine_id(machine_id)?; + self.client_sessions + .get(&c_url) + .map(|item| item.value().clone()) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use easytier::{ + tunnel::{ + common::tests::wait_for_condition, + udp::{UdpTunnelConnector, UdpTunnelListener}, + }, + web_client::WebClient, + }; + + use crate::client_manager::ClientManager; + + #[tokio::test] + async fn test_client() { + let listener = UdpTunnelListener::new("udp://0.0.0.0:54333".parse().unwrap()); + let mut mgr = ClientManager::new(); + mgr.serve(Box::new(listener)).await.unwrap(); + + let connector = UdpTunnelConnector::new("udp://127.0.0.1:54333".parse().unwrap()); + let _c = WebClient::new(connector, "test"); + + wait_for_condition( + || async { mgr.client_sessions.len() == 1 }, + Duration::from_secs(6), + ) + .await; + + let mut a = mgr + .client_sessions + .iter() + .next() + .unwrap() + .data() + .read() + .await + .heartbeat_waiter(); + let req = a.recv().await.unwrap(); + println!("{:?}", req); + println!("{:?}", mgr); + } +} diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs new file mode 100644 index 0000000..506974d --- /dev/null +++ b/easytier-web/src/client_manager/session.rs @@ -0,0 +1,144 @@ +use std::{fmt::Debug, sync::Arc}; + +use easytier::{ + proto::{ + rpc_impl::bidirect::BidirectRpcManager, + rpc_types::{self, controller::BaseController}, + web::{ + HeartbeatRequest, HeartbeatResponse, WebClientService, WebClientServiceClientFactory, + WebServerService, WebServerServiceServer, + }, + }, + tunnel::Tunnel, +}; +use tokio::sync::{broadcast, RwLock}; + +use super::storage::{Storage, StorageToken, WeakRefStorage}; + +#[derive(Debug)] +pub struct SessionData { + storage: WeakRefStorage, + client_url: url::Url, + + storage_token: Option, + notifier: broadcast::Sender, + req: Option, +} + +impl SessionData { + fn new(storage: WeakRefStorage, client_url: url::Url) -> Self { + let (tx, _rx1) = broadcast::channel(2); + + SessionData { + storage, + client_url, + storage_token: None, + notifier: tx, + req: None, + } + } + + pub fn req(&self) -> Option { + self.req.clone() + } + + pub fn heartbeat_waiter(&self) -> broadcast::Receiver { + self.notifier.subscribe() + } +} + +impl Drop for SessionData { + fn drop(&mut self) { + if let Ok(storage) = Storage::try_from(self.storage.clone()) { + if let Some(token) = self.storage_token.as_ref() { + storage.remove_client(token); + } + } + } +} + +pub type SharedSessionData = Arc>; + +#[derive(Clone)] +struct SessionRpcService { + data: SharedSessionData, +} + +#[async_trait::async_trait] +impl WebServerService for SessionRpcService { + type Controller = BaseController; + + async fn heartbeat( + &self, + _: BaseController, + req: HeartbeatRequest, + ) -> rpc_types::error::Result { + let mut data = self.data.write().await; + if data.req.replace(req.clone()).is_none() { + assert!(data.storage_token.is_none()); + data.storage_token = Some(StorageToken { + token: req.user_token.clone().into(), + client_url: data.client_url.clone(), + machine_id: req + .machine_id + .clone() + .map(Into::into) + .unwrap_or(uuid::Uuid::new_v4()), + }); + if let Ok(storage) = Storage::try_from(data.storage.clone()) { + storage.add_client(data.storage_token.as_ref().unwrap().clone()); + } + } + let _ = data.notifier.send(req); + Ok(HeartbeatResponse {}) + } +} + +pub struct Session { + rpc_mgr: BidirectRpcManager, + + data: SharedSessionData, +} + +impl Debug for Session { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Session").field("data", &self.data).finish() + } +} + +impl Session { + pub fn new(tunnel: Box, storage: WeakRefStorage, client_url: url::Url) -> Self { + let rpc_mgr = + BidirectRpcManager::new().set_rx_timeout(Some(std::time::Duration::from_secs(30))); + rpc_mgr.run_with_tunnel(tunnel); + + let data = Arc::new(RwLock::new(SessionData::new(storage, client_url))); + + rpc_mgr.rpc_server().registry().register( + WebServerServiceServer::new(SessionRpcService { data: data.clone() }), + "", + ); + + Session { rpc_mgr, data } + } + + pub fn is_running(&self) -> bool { + self.rpc_mgr.is_running() + } + + pub fn data(&self) -> SharedSessionData { + self.data.clone() + } + + pub fn scoped_rpc_client( + &self, + ) -> Box + Send> { + self.rpc_mgr + .rpc_client() + .scoped_client::>(1, 1, "".to_string()) + } + + pub async fn get_token(&self) -> Option { + self.data.read().await.storage_token.clone() + } +} diff --git a/easytier-web/src/client_manager/storage.rs b/easytier-web/src/client_manager/storage.rs new file mode 100644 index 0000000..79dac9a --- /dev/null +++ b/easytier-web/src/client_manager/storage.rs @@ -0,0 +1,72 @@ +use std::sync::{Arc, Weak}; + +use dashmap::{DashMap, DashSet}; + +// use this to maintain Storage +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct StorageToken { + pub token: String, + pub client_url: url::Url, + pub machine_id: uuid::Uuid, +} + +#[derive(Debug)] +pub struct StorageInner { + // some map for indexing + pub token_clients_map: DashMap>, + pub machine_client_url_map: DashMap, +} + +#[derive(Debug, Clone)] +pub struct Storage(Arc); +pub type WeakRefStorage = Weak; + +impl TryFrom for Storage { + type Error = (); + + fn try_from(weak: Weak) -> Result { + weak.upgrade().map(|inner| Storage(inner)).ok_or(()) + } +} + +impl Storage { + pub fn new() -> Self { + Storage(Arc::new(StorageInner { + token_clients_map: DashMap::new(), + machine_client_url_map: DashMap::new(), + })) + } + + pub fn add_client(&self, stoken: StorageToken) { + let inner = self + .0 + .token_clients_map + .entry(stoken.token) + .or_insert_with(DashSet::new); + inner.insert(stoken.client_url.clone()); + + self.0 + .machine_client_url_map + .insert(stoken.machine_id, stoken.client_url.clone()); + } + + pub fn remove_client(&self, stoken: &StorageToken) { + self.0.token_clients_map.remove_if(&stoken.token, |_, set| { + set.remove(&stoken.client_url); + set.is_empty() + }); + + self.0.machine_client_url_map.remove(&stoken.machine_id); + } + + pub fn weak_ref(&self) -> WeakRefStorage { + Arc::downgrade(&self.0) + } + + pub fn get_client_url_by_machine_id(&self, machine_id: &uuid::Uuid) -> Option { + self.0 + .machine_client_url_map + .get(&machine_id) + .map(|url| url.clone()) + } +} diff --git a/easytier-web/src/main.rs b/easytier-web/src/main.rs index e7a11a9..2df5702 100644 --- a/easytier-web/src/main.rs +++ b/easytier-web/src/main.rs @@ -1,3 +1,22 @@ -fn main() { - println!("Hello, world!"); +#![allow(dead_code)] + +use std::sync::Arc; + +use easytier::tunnel::udp::UdpTunnelListener; + +mod client_manager; +mod restful; + +#[tokio::main] +async fn main() { + let listener = UdpTunnelListener::new("udp://0.0.0.0:22020".parse().unwrap()); + let mut mgr = client_manager::ClientManager::new(); + mgr.serve(listener).await.unwrap(); + let mgr = Arc::new(mgr); + + let mut restful_server = + restful::RestfulServer::new("0.0.0.0:11211".parse().unwrap(), mgr.clone()); + restful_server.start().await.unwrap(); + + tokio::signal::ctrl_c().await.unwrap(); } diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs new file mode 100644 index 0000000..fe5c06e --- /dev/null +++ b/easytier-web/src/restful/mod.rs @@ -0,0 +1,246 @@ +use std::vec; +use std::{net::SocketAddr, sync::Arc}; + +use axum::extract::{Path, Query}; +use axum::http::StatusCode; +use axum::routing::post; +use axum::{extract::State, routing::get, Json, Router}; +use easytier::proto::{self, rpc_types, web::*}; +use easytier::{common::scoped_task::ScopedTask, proto::rpc_types::controller::BaseController}; +use tokio::net::TcpListener; + +use crate::client_manager::session::Session; +use crate::client_manager::storage::StorageToken; +use crate::client_manager::ClientManager; + +pub struct RestfulServer { + bind_addr: SocketAddr, + client_mgr: Arc, + + serve_task: Option>, +} + +type AppStateInner = Arc; +type AppState = State; + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ListSessionJsonResp(Vec); + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ValidateConfigJsonReq { + config: String, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct RunNetworkJsonReq { + config: String, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ColletNetworkInfoJsonReq { + inst_ids: Option>, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct RemoveNetworkJsonReq { + inst_ids: Vec, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct ListNetworkInstanceIdsJsonResp(Vec); + +type Error = proto::error::Error; +type ErrorKind = proto::error::error::ErrorKind; +type RpcError = rpc_types::error::Error; +type HttpHandleError = (StatusCode, Json); + +fn convert_rpc_error(e: RpcError) -> (StatusCode, Json) { + let status_code = match &e { + RpcError::ExecutionError(_) => StatusCode::BAD_REQUEST, + RpcError::Timeout(_) => StatusCode::GATEWAY_TIMEOUT, + _ => StatusCode::BAD_GATEWAY, + }; + let error = Error::from(&e); + (status_code, Json(error)) +} + +impl RestfulServer { + pub fn new(bind_addr: SocketAddr, client_mgr: Arc) -> Self { + assert!(client_mgr.is_running()); + RestfulServer { + bind_addr, + client_mgr, + serve_task: None, + } + } + + async fn get_session_by_machine_id( + client_mgr: &ClientManager, + machine_id: &uuid::Uuid, + ) -> Result, HttpHandleError> { + let Some(result) = client_mgr.get_session_by_machine_id(machine_id) else { + return Err(( + StatusCode::NOT_FOUND, + Error { + error_kind: Some(ErrorKind::OtherError(proto::error::OtherError { + error_message: "No such session".to_string(), + })), + } + .into(), + )); + }; + + Ok(result) + } + + async fn handle_list_all_sessions( + State(client_mgr): AppState, + ) -> Result, HttpHandleError> { + let ret = client_mgr.list_sessions().await; + Ok(ListSessionJsonResp(ret).into()) + } + + async fn handle_validate_config( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result<(), HttpHandleError> { + let config = payload.config; + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.validate_config(BaseController::default(), ValidateConfigRequest { config }) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + async fn handle_run_network_instance( + State(client_mgr): AppState, + Path(machine_id): Path, + Json(payload): Json, + ) -> Result<(), HttpHandleError> { + let config = payload.config; + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.run_network_instance( + BaseController::default(), + RunNetworkInstanceRequest { config }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + async fn handle_collect_one_network_info( + State(client_mgr): AppState, + Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .collect_network_info( + BaseController::default(), + CollectNetworkInfoRequest { + inst_ids: vec![inst_id.into()], + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(ret.into()) + } + + async fn handle_collect_network_info( + State(client_mgr): AppState, + Path(machine_id): Path, + Query(payload): Query, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .collect_network_info( + BaseController::default(), + CollectNetworkInfoRequest { + inst_ids: payload + .inst_ids + .unwrap_or_default() + .into_iter() + .map(Into::into) + .collect(), + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(ret.into()) + } + + async fn handle_list_network_instance_ids( + State(client_mgr): AppState, + Path(machine_id): Path, + ) -> Result, HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + let ret = c + .list_network_instance(BaseController::default(), ListNetworkInstanceRequest {}) + .await + .map_err(convert_rpc_error)?; + Ok( + ListNetworkInstanceIdsJsonResp(ret.inst_ids.into_iter().map(Into::into).collect()) + .into(), + ) + } + + async fn handle_remove_network_instance( + State(client_mgr): AppState, + Path((machine_id, inst_id)): Path<(uuid::Uuid, uuid::Uuid)>, + ) -> Result<(), HttpHandleError> { + let result = Self::get_session_by_machine_id(&client_mgr, &machine_id).await?; + + let c = result.scoped_rpc_client(); + c.delete_network_instance( + BaseController::default(), + DeleteNetworkInstanceRequest { + inst_ids: vec![inst_id.into()], + }, + ) + .await + .map_err(convert_rpc_error)?; + Ok(()) + } + + pub async fn start(&mut self) -> Result<(), anyhow::Error> { + let listener = TcpListener::bind(self.bind_addr).await.unwrap(); + + let app = Router::new() + .route("/api/v1/sessions", get(Self::handle_list_all_sessions)) + .route( + "/api/v1/network/:machine-id/validate-config", + post(Self::handle_validate_config), + ) + .route( + "/api/v1/network/:machine-id", + post(Self::handle_run_network_instance).get(Self::handle_list_network_instance_ids), + ) + .route( + "/api/v1/network/:machine-id/info", + get(Self::handle_collect_network_info), + ) + .route( + "/api/v1/network/:machine-id/:inst-id", + get(Self::handle_collect_one_network_info) + .delete(Self::handle_remove_network_instance), + ) + .with_state(self.client_mgr.clone()); + + let task = tokio::spawn(async move { + axum::serve(listener, app).await.unwrap(); + }); + self.serve_task = Some(task.into()); + + Ok(()) + } +} diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 516a898..9f3bec5 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -17,6 +17,7 @@ readme = "README.md" [[bin]] name = "easytier-core" path = "src/easytier-core.rs" +test = false [[bin]] name = "easytier-cli" @@ -180,6 +181,9 @@ sys-locale = "0.3" ringbuf = "0.4.5" async-ringbuf = "0.3.1" +[target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies] +machine-uid = "0.5.3" + [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.52", features = [ "Win32_Networking_WinSock", diff --git a/easytier/build.rs b/easytier/build.rs index 499346d..44430fa 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -127,6 +127,7 @@ fn main() -> Result<(), Box> { "src/proto/error.proto", "src/proto/tests.proto", "src/proto/cli.proto", + "src/proto/web.proto", ]; for proto_file in &proto_files { @@ -138,6 +139,7 @@ fn main() -> Result<(), Box> { .type_attribute(".common", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".cli", "#[derive(serde::Serialize, serde::Deserialize)]") + .type_attribute(".web", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute( "peer_rpc.GetIpListResponse", "#[derive(serde::Serialize, serde::Deserialize)]", diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index ce61f61..6cda91a 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -231,12 +231,8 @@ impl Default for TomlConfigLoader { impl TomlConfigLoader { pub fn new_from_str(config_str: &str) -> Result { - let mut config = toml::de::from_str::(config_str).with_context(|| { - format!( - "failed to parse config file: {}\n{}", - config_str, config_str - ) - })?; + let mut config = toml::de::from_str::(config_str) + .with_context(|| format!("failed to parse config file: {}", config_str))?; config.flags_struct = Some(Self::gen_flags(config.flags.clone().unwrap_or_default())); diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index 8f3bd45..ecd68ec 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -80,6 +80,36 @@ pub fn join_joinset_background( ); } +pub fn get_machine_id() -> uuid::Uuid { + // TODO: load from local file + + #[cfg(any( + target_os = "linux", + target_os = "macos", + target_os = "windows", + target_os = "freebsd" + ))] + let gen_mid = machine_uid::get() + .map(|x| { + let mut b = [0u8; 16]; + crate::tunnel::generate_digest_from_str("", x.as_str(), &mut b); + uuid::Uuid::from_bytes(b) + }) + .unwrap_or(uuid::Uuid::new_v4()); + + #[cfg(not(any( + target_os = "linux", + target_os = "macos", + target_os = "windows", + target_os = "freebsd" + )))] + let gen_mid = uuid::Uuid::new_v4(); + + // TODO: save to local file + + gen_mid +} + #[cfg(test)] mod tests { use super::*; diff --git a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs index e935181..3a563b1 100644 --- a/easytier/src/connector/udp_hole_punch/sym_to_cone.rs +++ b/easytier/src/connector/udp_hole_punch/sym_to_cone.rs @@ -318,7 +318,7 @@ impl PunchSymToConeHoleClient { let punch_random = self.punch_randomly.load(Ordering::Relaxed); let punch_predicable = self.punch_predicablely.load(Ordering::Relaxed); let scoped_punch_task: ScopedTask> = tokio::spawn(async move { - if punch_predicable { + if punch_predicable && base_port_for_easy_sym.is_some() { if let Some(inc) = my_nat_info.get_inc_of_easy_sym() { let req = SendPunchPacketEasySymRequest { listener_mapped_addr: remote_mapped_addr.clone().into(), diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 7cfce53..a2d7879 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -7,14 +7,18 @@ use tabled::settings::Style; use tokio::time::timeout; use easytier::{ - common::{constants::EASYTIER_VERSION, stun::StunInfoCollector, stun::StunInfoCollectorTrait}, + common::{ + constants::EASYTIER_VERSION, + stun::{StunInfoCollector, StunInfoCollectorTrait}, + }, proto::{ cli::{ - ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest, - GetVpnPortalInfoRequest, ListConnectorRequest, ListForeignNetworkRequest, - ListGlobalForeignNetworkRequest, ListPeerRequest, ListPeerResponse, ListRouteRequest, - ListRouteResponse, NodeInfo, PeerManageRpc, PeerManageRpcClientFactory, - ShowNodeInfoRequest, VpnPortalRpc, VpnPortalRpcClientFactory, + list_peer_route_pair, ConnectorManageRpc, ConnectorManageRpcClientFactory, + DumpRouteRequest, GetVpnPortalInfoRequest, ListConnectorRequest, + ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListPeerRequest, + ListPeerResponse, ListRouteRequest, ListRouteResponse, NodeInfo, PeerManageRpc, + PeerManageRpcClientFactory, ShowNodeInfoRequest, VpnPortalRpc, + VpnPortalRpcClientFactory, }, common::NatType, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, @@ -22,7 +26,7 @@ use easytier::{ rpc_types::controller::BaseController, }, tunnel::tcp::TcpTunnelConnector, - utils::{cost_to_str, float_to_str, list_peer_route_pair, PeerRoutePair}, + utils::{cost_to_str, float_to_str, PeerRoutePair}, }; #[derive(Parser, Debug)] @@ -222,25 +226,26 @@ impl CommandHandler { impl From for PeerTableItem { fn from(p: PeerRoutePair) -> Self { + let route = p.route.clone().unwrap_or_default(); PeerTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - cost: cost_to_str(p.route.cost), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + cost: cost_to_str(route.cost), lat_ms: float_to_str(p.get_latency_ms().unwrap_or(0.0), 3), loss_rate: float_to_str(p.get_loss_rate().unwrap_or(0.0), 3), rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL), tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL), - tunnel_proto: p.get_conn_protos().unwrap_or_default().join(",").to_string(), + tunnel_proto: p + .get_conn_protos() + .unwrap_or_default() + .join(",") + .to_string(), nat_type: p.get_udp_nat_type(), - id: p.route.peer_id.to_string(), - version: if p.route.version.is_empty() { + id: route.peer_id.to_string(), + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, } } @@ -287,10 +292,7 @@ impl CommandHandler { items.push(p.into()); } - println!( - "{}", - tabled::Table::new(items).with(Style::modern()) - ); + println!("{}", tabled::Table::new(items).with(Style::modern())); Ok(()) } @@ -404,62 +406,59 @@ impl CommandHandler { }); let peer_routes = self.list_peer_route_pair().await?; for p in peer_routes.iter() { - let Some(next_hop_pair) = peer_routes - .iter() - .find(|pair| pair.route.peer_id == p.route.next_hop_peer_id) - else { + let Some(next_hop_pair) = peer_routes.iter().find(|pair| { + pair.route.clone().unwrap_or_default().peer_id + == p.route.clone().unwrap_or_default().next_hop_peer_id + }) else { continue; }; - if p.route.cost == 1 { + let route = p.route.clone().unwrap_or_default(); + if route.cost == 1 { items.push(RouteTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - proxy_cidrs: p.route.proxy_cidrs.clone().join(",").to_string(), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), next_hop_ipv4: "DIRECT".to_string(), next_hop_hostname: "".to_string(), next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - cost: p.route.cost, - version: if p.route.version.is_empty() { + cost: route.cost, + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, }); } else { items.push(RouteTableItem { - ipv4: p - .route - .ipv4_addr - .map(|ip| ip.to_string()) - .unwrap_or_default(), - hostname: p.route.hostname.clone(), - proxy_cidrs: p.route.proxy_cidrs.clone().join(",").to_string(), + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), next_hop_ipv4: next_hop_pair .route + .clone() + .unwrap_or_default() .ipv4_addr .map(|ip| ip.to_string()) .unwrap_or_default(), - next_hop_hostname: next_hop_pair.route.hostname.clone(), + next_hop_hostname: next_hop_pair + .route + .clone() + .unwrap_or_default() + .hostname + .clone(), next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - cost: p.route.cost, - version: if p.route.version.is_empty() { + cost: route.cost, + version: if route.version.is_empty() { "unknown".to_string() } else { - p.route.version.to_string() + route.version.to_string() }, }); } } - println!( - "{}", - tabled::Table::new(items).with(Style::modern()) - ); + println!("{}", tabled::Table::new(items).with(Style::modern())); Ok(()) } @@ -576,11 +575,7 @@ async fn main() -> Result<(), Error> { }); } - println!( - "{}", - tabled::Table::new(table_rows) - .with(Style::modern()) - ); + println!("{}", tabled::Table::new(table_rows).with(Style::modern())); } SubCommand::VpnPortal => { let vpn_portal_client = handler.get_vpn_portal_client().await?; diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 7b7103e..4c11ede 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + #[macro_use] extern crate rust_i18n; @@ -21,7 +23,9 @@ use easytier::{ scoped_task::ScopedTask, }, launcher, proto, + tunnel::udp::UdpTunnelConnector, utils::{init_logger, setup_panic_handler}, + web_client, }; #[cfg(feature = "mimalloc")] @@ -34,6 +38,13 @@ static GLOBAL_MIMALLOC: GlobalMiMalloc = GlobalMiMalloc; #[derive(Parser, Debug)] #[command(name = "easytier-core", author, version = EASYTIER_VERSION , about, long_about = None)] struct Cli { + #[arg( + short = 'w', + long, + help = t!("core_clap.config_server").to_string() + )] + config_server: Option, + #[arg( short, long, @@ -640,12 +651,47 @@ pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<( #[tokio::main] async fn main() { - setup_panic_handler(); - let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); let cli = Cli::parse(); + + setup_panic_handler(); + + if cli.config_server.is_some() { + let config_server_url_s = cli.config_server.clone().unwrap(); + let config_server_url = match url::Url::parse(&config_server_url_s) { + Ok(u) => u, + Err(_) => format!( + "udp://config-server.easytier.top:22020/{}", + config_server_url_s + ) + .parse() + .unwrap(), + }; + + let mut c_url = config_server_url.clone(); + c_url.set_path(""); + let token = config_server_url + .path_segments() + .and_then(|mut x| x.next()) + .map(|x| x.to_string()) + .unwrap_or_default(); + + println!( + "Entering config client mode...\n server: {}\n token: {}", + c_url, token, + ); + + if token.is_empty() { + panic!("empty token"); + } + + let _wc = web_client::WebClient::new(UdpTunnelConnector::new(c_url), token.to_string()); + tokio::signal::ctrl_c().await.unwrap(); + return; + } + let cfg = TomlConfigLoader::from(cli); init_logger(&cfg, false).unwrap(); diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 4eccd08..0e56c09 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -12,27 +12,12 @@ use crate::{ }, instance::instance::Instance, peers::rpc_service::PeerManagerRpcService, - proto::{ - cli::{PeerInfo, Route}, - common::StunInfo, - peer_rpc::GetIpListResponse, - }, - utils::{list_peer_route_pair, PeerRoutePair}, + proto::cli::{list_peer_route_pair, PeerInfo, Route}, }; use chrono::{DateTime, Local}; -use serde::{Deserialize, Serialize}; use tokio::{sync::broadcast, task::JoinSet}; -#[derive(Default, Clone, Debug, Serialize, Deserialize)] -pub struct MyNodeInfo { - pub virtual_ipv4: String, - pub hostname: String, - pub version: String, - pub ips: GetIpListResponse, - pub stun_info: StunInfo, - pub listeners: Vec, - pub vpn_portal_cfg: Option, -} +pub type MyNodeInfo = crate::proto::web::MyNodeInfo; struct EasyTierData { events: RwLock, GlobalCtxEvent)>>, @@ -164,18 +149,15 @@ impl EasyTierLauncher { global_ctx_c.get_flags().dev_name.clone(); let node_info = MyNodeInfo { - virtual_ipv4: global_ctx_c - .get_ipv4() - .map(|x| x.to_string()) - .unwrap_or_default(), + virtual_ipv4: global_ctx_c.get_ipv4().map(|x| x.address().into()), hostname: global_ctx_c.get_hostname(), version: EASYTIER_VERSION.to_string(), - ips: global_ctx_c.get_ip_collector().collect_ip_addrs().await, - stun_info: global_ctx_c.get_stun_info_collector().get_stun_info(), + ips: Some(global_ctx_c.get_ip_collector().collect_ip_addrs().await), + stun_info: Some(global_ctx_c.get_stun_info_collector().get_stun_info()), listeners: global_ctx_c .get_running_listeners() - .iter() - .map(|x| x.to_string()) + .into_iter() + .map(Into::into) .collect(), vpn_portal_cfg: Some( vpn_portal @@ -311,18 +293,7 @@ impl Drop for EasyTierLauncher { } } -#[derive(Deserialize, Serialize, Debug)] -pub struct NetworkInstanceRunningInfo { - pub dev_name: String, - pub my_node_info: MyNodeInfo, - pub events: Vec<(DateTime, GlobalCtxEvent)>, - pub node_info: MyNodeInfo, - pub routes: Vec, - pub peers: Vec, - pub peer_route_pairs: Vec, - pub running: bool, - pub error_msg: Option, -} +pub type NetworkInstanceRunningInfo = crate::proto::web::NetworkInstanceRunningInfo; pub struct NetworkInstance { config: TomlConfigLoader, @@ -362,9 +333,13 @@ impl NetworkInstance { Some(NetworkInstanceRunningInfo { dev_name: launcher.get_dev_name(), - my_node_info: launcher.get_node_info(), - events: launcher.get_events(), - node_info: launcher.get_node_info(), + my_node_info: Some(launcher.get_node_info()), + events: launcher + .get_events() + .iter() + .map(|(t, e)| (t.to_string(), format!("{:?}", e))) + .collect(), + node_info: Some(launcher.get_node_info()), routes, peers, peer_route_pairs, diff --git a/easytier/src/lib.rs b/easytier/src/lib.rs index f719d88..367e8e7 100644 --- a/easytier/src/lib.rs +++ b/easytier/src/lib.rs @@ -13,6 +13,7 @@ pub mod peers; pub mod proto; pub mod tunnel; pub mod utils; +pub mod web_client; #[cfg(test)] mod tests; diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 1ff813d..08944cd 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -353,12 +353,13 @@ impl ForeignNetworkManagerData { } async fn clear_no_conn_peer(&self, network_name: &String) { - let peer_map = self + let Some(peer_map) = self .network_peer_maps .get(network_name) - .unwrap() - .peer_map - .clone(); + .and_then(|v| Some(v.peer_map.clone())) + else { + return; + }; peer_map.clean_peer_without_conn().await; } diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 74321ca..4221635 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -56,6 +56,11 @@ message Route { common.PeerFeatureFlag feature_flag = 10; } +message PeerRoutePair { + Route route = 1; + PeerInfo peer = 2; +} + message NodeInfo { uint32 peer_id = 1; string ipv4_addr = 2; diff --git a/easytier/src/proto/cli.rs b/easytier/src/proto/cli.rs index 31bbf4d..236df89 100644 --- a/easytier/src/proto/cli.rs +++ b/easytier/src/proto/cli.rs @@ -1 +1,113 @@ include!(concat!(env!("OUT_DIR"), "/cli.rs")); + +impl PeerRoutePair { + pub fn get_latency_ms(&self) -> Option { + let mut ret = u64::MAX; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret = ret.min(stats.latency_us); + } + + if ret == u64::MAX { + None + } else { + Some(f64::from(ret as u32) / 1000.0) + } + } + + pub fn get_rx_bytes(&self) -> Option { + let mut ret = 0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret += stats.rx_bytes; + } + + if ret == 0 { + None + } else { + Some(ret) + } + } + + pub fn get_tx_bytes(&self) -> Option { + let mut ret = 0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(stats) = &conn.stats else { + continue; + }; + ret += stats.tx_bytes; + } + + if ret == 0 { + None + } else { + Some(ret) + } + } + + pub fn get_loss_rate(&self) -> Option { + let mut ret = 0.0; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + ret += conn.loss_rate; + } + + if ret == 0.0 { + None + } else { + Some(ret as f64) + } + } + + pub fn get_conn_protos(&self) -> Option> { + let mut ret = vec![]; + let p = self.peer.as_ref()?; + for conn in p.conns.iter() { + let Some(tunnel_info) = &conn.tunnel else { + continue; + }; + // insert if not exists + if !ret.contains(&tunnel_info.tunnel_type) { + ret.push(tunnel_info.tunnel_type.clone()); + } + } + + if ret.is_empty() { + None + } else { + Some(ret) + } + } + + pub fn get_udp_nat_type(self: &Self) -> String { + use crate::proto::common::NatType; + let mut ret = NatType::Unknown; + if let Some(r) = &self.route.clone().unwrap_or_default().stun_info { + ret = NatType::try_from(r.udp_nat_type).unwrap(); + } + format!("{:?}", ret) + } +} + +pub fn list_peer_route_pair(peers: Vec, routes: Vec) -> Vec { + let mut pairs: Vec = vec![]; + + for route in routes.iter() { + let peer = peers.iter().find(|peer| peer.peer_id == route.peer_id); + let pair = PeerRoutePair { + route: Some(route.clone()), + peer: peer.cloned(), + }; + + pairs.push(pair); + } + + pairs +} diff --git a/easytier/src/proto/error.proto b/easytier/src/proto/error.proto index 80d12f4..5b5f537 100644 --- a/easytier/src/proto/error.proto +++ b/easytier/src/proto/error.proto @@ -21,7 +21,7 @@ message MalformatRpcPacket { string error_message = 1; } message Timeout { string error_message = 1; } message Error { - oneof error { + oneof error_kind { OtherError other_error = 1; InvalidMethodIndex invalid_method_index = 2; InvalidService invalid_service = 3; diff --git a/easytier/src/proto/error.rs b/easytier/src/proto/error.rs index c3ab2bd..b2cb2d3 100644 --- a/easytier/src/proto/error.rs +++ b/easytier/src/proto/error.rs @@ -6,44 +6,44 @@ include!(concat!(env!("OUT_DIR"), "/error.rs")); impl From<&rpc_types::error::Error> for Error { fn from(e: &rpc_types::error::Error) -> Self { - use super::error::error::Error as ProtoError; + use super::error::error::ErrorKind as ProtoError; match e { rpc_types::error::Error::ExecutionError(e) => Self { - error: Some(ProtoError::ExecuteError(ExecuteError { - error_message: e.to_string(), + error_kind: Some(ProtoError::ExecuteError(ExecuteError { + error_message: format!("{:?}", e), })), }, rpc_types::error::Error::DecodeError(_) => Self { - error: Some(ProtoError::ProstDecodeError(ProstDecodeError {})), + error_kind: Some(ProtoError::ProstDecodeError(ProstDecodeError {})), }, rpc_types::error::Error::EncodeError(_) => Self { - error: Some(ProtoError::ProstEncodeError(ProstEncodeError {})), + error_kind: Some(ProtoError::ProstEncodeError(ProstEncodeError {})), }, rpc_types::error::Error::InvalidMethodIndex(m, s) => Self { - error: Some(ProtoError::InvalidMethodIndex(InvalidMethodIndex { + error_kind: Some(ProtoError::InvalidMethodIndex(InvalidMethodIndex { method_index: *m as u32, - service_name: s.to_string(), + service_name: format!("{:?}", s), })), }, rpc_types::error::Error::InvalidServiceKey(s, _) => Self { - error: Some(ProtoError::InvalidService(InvalidService { - service_name: s.to_string(), + error_kind: Some(ProtoError::InvalidService(InvalidService { + service_name: format!("{:?}", s), })), }, rpc_types::error::Error::MalformatRpcPacket(e) => Self { - error: Some(ProtoError::MalformatRpcPacket(MalformatRpcPacket { - error_message: e.to_string(), + error_kind: Some(ProtoError::MalformatRpcPacket(MalformatRpcPacket { + error_message: format!("{:?}", e), })), }, rpc_types::error::Error::Timeout(e) => Self { - error: Some(ProtoError::Timeout(Timeout { - error_message: e.to_string(), + error_kind: Some(ProtoError::Timeout(Timeout { + error_message: format!("{:?}", e), })), }, #[allow(unreachable_patterns)] e => Self { - error: Some(ProtoError::OtherError(OtherError { - error_message: e.to_string(), + error_kind: Some(ProtoError::OtherError(OtherError { + error_message: format!("{:?}", e), })), }, } @@ -52,8 +52,8 @@ impl From<&rpc_types::error::Error> for Error { impl From<&Error> for rpc_types::error::Error { fn from(e: &Error) -> Self { - use super::error::error::Error as ProtoError; - match &e.error { + use super::error::error::ErrorKind as ProtoError; + match &e.error_kind { Some(ProtoError::ExecuteError(e)) => { Self::ExecutionError(anyhow::anyhow!(e.error_message.clone())) } diff --git a/easytier/src/proto/mod.rs b/easytier/src/proto/mod.rs index 4610ba6..0db7766 100644 --- a/easytier/src/proto/mod.rs +++ b/easytier/src/proto/mod.rs @@ -5,6 +5,7 @@ pub mod cli; pub mod common; pub mod error; pub mod peer_rpc; +pub mod web; #[cfg(test)] pub mod tests; diff --git a/easytier/src/proto/rpc_impl/bidirect.rs b/easytier/src/proto/rpc_impl/bidirect.rs index 28d94b1..a673949 100644 --- a/easytier/src/proto/rpc_impl/bidirect.rs +++ b/easytier/src/proto/rpc_impl/bidirect.rs @@ -1,9 +1,10 @@ -use std::sync::{Arc, Mutex}; +use std::sync::{atomic::AtomicBool, Arc, Mutex}; use futures::{SinkExt as _, StreamExt}; use tokio::{task::JoinSet, time::timeout}; use crate::{ + defer, proto::rpc_types::error::Error, tunnel::{packet_def::PacketType, ring::create_ring_tunnel_pair, Tunnel}, }; @@ -17,6 +18,7 @@ pub struct BidirectRpcManager { rx_timeout: Option, error: Arc>>, tunnel: Mutex>>, + running: Arc, tasks: Mutex>>, } @@ -30,6 +32,7 @@ impl BidirectRpcManager { rx_timeout: None, error: Arc::new(Mutex::new(None)), tunnel: Mutex::new(None), + running: Arc::new(AtomicBool::new(false)), tasks: Mutex::new(None), } @@ -50,6 +53,8 @@ impl BidirectRpcManager { let mut tasks = JoinSet::new(); self.rpc_client.run(); self.rpc_server.run(); + self.running + .store(true, std::sync::atomic::Ordering::Relaxed); let (server_tx, mut server_rx) = ( self.rpc_server.get_transport_sink(), @@ -64,7 +69,11 @@ impl BidirectRpcManager { self.tunnel.lock().unwrap().replace(inner); let e_clone = self.error.clone(); + let r_clone = self.running.clone(); tasks.spawn(async move { + defer! { + r_clone.store(false, std::sync::atomic::Ordering::Relaxed); + } loop { let packet = tokio::select! { Some(Ok(packet)) = server_rx.next() => { @@ -90,7 +99,11 @@ impl BidirectRpcManager { let recv_timeout = self.rx_timeout; let e_clone = self.error.clone(); + let r_clone = self.running.clone(); tasks.spawn(async move { + defer! { + r_clone.store(false, std::sync::atomic::Ordering::Relaxed); + } loop { let ret = if let Some(recv_timeout) = recv_timeout { match timeout(recv_timeout, inner_rx.next()).await { @@ -161,4 +174,8 @@ impl BidirectRpcManager { tasks.abort_all(); } } + + pub fn is_running(&self) -> bool { + self.running.load(std::sync::atomic::Ordering::Relaxed) + } } diff --git a/easytier/src/proto/tests.rs b/easytier/src/proto/tests.rs index 546f1c4..93c872f 100644 --- a/easytier/src/proto/tests.rs +++ b/easytier/src/proto/tests.rs @@ -307,6 +307,7 @@ async fn test_bidirect_rpc_manager() { use crate::proto::rpc_impl::bidirect::BidirectRpcManager; use crate::tunnel::tcp::{TcpTunnelConnector, TcpTunnelListener}; use crate::tunnel::{TunnelConnector, TunnelListener}; + use tokio::sync::Notify; let c = BidirectRpcManager::new(); let s = BidirectRpcManager::new(); @@ -323,6 +324,8 @@ async fn test_bidirect_rpc_manager() { }); s.rpc_server().registry().register(service, "test"); + let server_test_done = Arc::new(Notify::new()); + let server_test_done_clone = server_test_done.clone(); let mut tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:55443".parse().unwrap()); let s_task: ScopedTask<()> = tokio::spawn(async move { tcp_listener.listen().await.unwrap(); @@ -344,6 +347,8 @@ async fn test_bidirect_rpc_manager() { assert_eq!(ret.greeting, "Hello Client world!"); println!("server done, {:?}", ret); + server_test_done_clone.notify_one(); + s.wait().await; }) .into(); @@ -369,6 +374,7 @@ async fn test_bidirect_rpc_manager() { assert_eq!(ret.greeting, "Hello Server world!"); println!("client done, {:?}", ret); + server_test_done.notified().await; drop(c); s_task.await.unwrap(); } diff --git a/easytier/src/proto/web.proto b/easytier/src/proto/web.proto new file mode 100644 index 0000000..9976c3f --- /dev/null +++ b/easytier/src/proto/web.proto @@ -0,0 +1,100 @@ +syntax = "proto3"; + +import "common.proto"; +import "peer_rpc.proto"; +import "cli.proto"; + +package web; + +message MyNodeInfo { + common.Ipv4Addr virtual_ipv4 = 1; + string hostname = 2; + string version = 3; + peer_rpc.GetIpListResponse ips = 4; + common.StunInfo stun_info = 5; + repeated common.Url listeners = 6; + optional string vpn_portal_cfg = 7; +} + +message NetworkInstanceRunningInfo { + string dev_name = 1; + MyNodeInfo my_node_info = 2; + map events = 3; + MyNodeInfo node_info = 4; + repeated cli.Route routes = 5; + repeated cli.PeerInfo peers = 6; + repeated cli.PeerRoutePair peer_route_pairs = 7; + bool running = 8; + optional string error_msg = 9; +} + +message NetworkInstanceRunningInfoMap { + map map = 1; +} + +message HeartbeatRequest { + common.UUID machine_id = 1; + common.UUID inst_id = 2; + string user_token = 3; +} + +message HeartbeatResponse { +} + +service WebServerService { + rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse) {} +} + +message ValidateConfigRequest { + string config = 1; +} + +message ValidateConfigResponse { +} + +message RunNetworkInstanceRequest { + string config = 1; +} + +message RunNetworkInstanceResponse { +} + +message RetainNetworkInstanceRequest { + repeated common.UUID inst_ids = 1; +} + +message RetainNetworkInstanceResponse { + repeated common.UUID remain_inst_ids = 1; +} + +message CollectNetworkInfoRequest { + repeated common.UUID inst_ids = 1; +} + +message CollectNetworkInfoResponse { + NetworkInstanceRunningInfoMap info = 1; +} + +message ListNetworkInstanceRequest { +} + +message ListNetworkInstanceResponse { + repeated common.UUID inst_ids = 1; +} + +message DeleteNetworkInstanceRequest { + repeated common.UUID inst_ids = 1; +} + +message DeleteNetworkInstanceResponse { + repeated common.UUID remain_inst_ids = 1; +} + +service WebClientService { + rpc ValidateConfig(ValidateConfigRequest) returns (ValidateConfigResponse) {} + rpc RunNetworkInstance(RunNetworkInstanceRequest) returns (RunNetworkInstanceResponse) {} + rpc RetainNetworkInstance(RetainNetworkInstanceRequest) returns (RetainNetworkInstanceResponse) {} + rpc CollectNetworkInfo(CollectNetworkInfoRequest) returns (CollectNetworkInfoResponse) {} + rpc ListNetworkInstance(ListNetworkInstanceRequest) returns (ListNetworkInstanceResponse) {} + rpc DeleteNetworkInstance(DeleteNetworkInstanceRequest) returns (DeleteNetworkInstanceResponse) {} +} diff --git a/easytier/src/proto/web.rs b/easytier/src/proto/web.rs new file mode 100644 index 0000000..a3254ec --- /dev/null +++ b/easytier/src/proto/web.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/web.rs")); diff --git a/easytier/src/utils.rs b/easytier/src/utils.rs index 31e3a56..36583c5 100644 --- a/easytier/src/utils.rs +++ b/easytier/src/utils.rs @@ -1,132 +1,10 @@ use anyhow::Context; -use serde::{Deserialize, Serialize}; use tracing::level_filters::LevelFilter; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; -use crate::{ - common::{config::ConfigLoader, get_logger_timer_rfc3339}, - proto::{ - cli::{PeerInfo, Route}, - common::NatType, - }, -}; +use crate::common::{config::ConfigLoader, get_logger_timer_rfc3339}; -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct PeerRoutePair { - pub route: Route, - pub peer: Option, -} - -impl PeerRoutePair { - pub fn get_latency_ms(&self) -> Option { - let mut ret = u64::MAX; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret = ret.min(stats.latency_us); - } - - if ret == u64::MAX { - None - } else { - Some(f64::from(ret as u32) / 1000.0) - } - } - - pub fn get_rx_bytes(&self) -> Option { - let mut ret = 0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret += stats.rx_bytes; - } - - if ret == 0 { - None - } else { - Some(ret) - } - } - - pub fn get_tx_bytes(&self) -> Option { - let mut ret = 0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(stats) = &conn.stats else { - continue; - }; - ret += stats.tx_bytes; - } - - if ret == 0 { - None - } else { - Some(ret) - } - } - - pub fn get_loss_rate(&self) -> Option { - let mut ret = 0.0; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - ret += conn.loss_rate; - } - - if ret == 0.0 { - None - } else { - Some(ret as f64) - } - } - - pub fn get_conn_protos(&self) -> Option> { - let mut ret = vec![]; - let p = self.peer.as_ref()?; - for conn in p.conns.iter() { - let Some(tunnel_info) = &conn.tunnel else { - continue; - }; - // insert if not exists - if !ret.contains(&tunnel_info.tunnel_type) { - ret.push(tunnel_info.tunnel_type.clone()); - } - } - - if ret.is_empty() { - None - } else { - Some(ret) - } - } - - pub fn get_udp_nat_type(self: &Self) -> String { - let mut ret = NatType::Unknown; - if let Some(r) = &self.route.stun_info { - ret = NatType::try_from(r.udp_nat_type).unwrap(); - } - format!("{:?}", ret) - } -} - -pub fn list_peer_route_pair(peers: Vec, routes: Vec) -> Vec { - let mut pairs: Vec = vec![]; - - for route in routes.iter() { - let peer = peers.iter().find(|peer| peer.peer_id == route.peer_id); - let pair = PeerRoutePair { - route: route.clone(), - peer: peer.cloned(), - }; - - pairs.push(pair); - } - - pairs -} +pub type PeerRoutePair = crate::proto::cli::PeerRoutePair; pub fn cost_to_str(cost: i32) -> String { if cost == 1 { @@ -250,7 +128,7 @@ pub fn setup_panic_handler() { use std::io::Write; std::panic::set_hook(Box::new(|info| { let backtrace = backtrace::Backtrace::force_capture(); - println!("panic occurred: {:?}", info); + println!("panic occurred: {:?}, backtrace: {:#?}", info, backtrace); let _ = std::fs::File::create("easytier-panic.log") .and_then(|mut f| f.write_all(format!("{:?}\n{:#?}", info, backtrace).as_bytes())); std::process::exit(1); diff --git a/easytier/src/web_client/controller.rs b/easytier/src/web_client/controller.rs new file mode 100644 index 0000000..fa1c0d1 --- /dev/null +++ b/easytier/src/web_client/controller.rs @@ -0,0 +1,171 @@ +use std::collections::BTreeMap; + +use dashmap::DashMap; + +use crate::{ + common::config::{ConfigLoader, TomlConfigLoader}, + launcher::NetworkInstance, + proto::{ + rpc_types::{self, controller::BaseController}, + web::{ + CollectNetworkInfoRequest, CollectNetworkInfoResponse, DeleteNetworkInstanceRequest, + DeleteNetworkInstanceResponse, ListNetworkInstanceRequest, ListNetworkInstanceResponse, + NetworkInstanceRunningInfoMap, RetainNetworkInstanceRequest, + RetainNetworkInstanceResponse, RunNetworkInstanceRequest, RunNetworkInstanceResponse, + ValidateConfigRequest, ValidateConfigResponse, WebClientService, + }, + }, +}; + +pub struct Controller { + token: String, + instance_map: DashMap, +} + +impl Controller { + pub fn new(token: String) -> Self { + Controller { + token, + instance_map: DashMap::new(), + } + } + + pub fn run_network_instance(&self, cfg: TomlConfigLoader) -> Result<(), anyhow::Error> { + let instance_id = cfg.get_id(); + if self.instance_map.contains_key(&instance_id) { + anyhow::bail!("instance {} already exists", instance_id); + } + + let mut instance = NetworkInstance::new(cfg); + instance.start()?; + + println!("instance {} started", instance_id); + self.instance_map.insert(instance_id, instance); + Ok(()) + } + + pub fn retain_network_instance( + &self, + instance_ids: Vec, + ) -> Result { + self.instance_map.retain(|k, _| instance_ids.contains(k)); + let remain = self + .instance_map + .iter() + .map(|item| item.key().clone().into()) + .collect::>(); + println!("instance {:?} retained", remain); + Ok(RetainNetworkInstanceResponse { + remain_inst_ids: remain, + }) + } + + pub fn collect_network_infos(&self) -> Result { + let mut map = BTreeMap::new(); + for instance in self.instance_map.iter() { + if let Some(info) = instance.get_running_info() { + map.insert(instance.key().to_string(), info); + } + } + Ok(NetworkInstanceRunningInfoMap { map }) + } + + pub fn list_network_instance_ids(&self) -> Vec { + self.instance_map + .iter() + .map(|item| item.key().clone()) + .collect() + } + + pub fn token(&self) -> String { + self.token.clone() + } +} + +#[async_trait::async_trait] +impl WebClientService for Controller { + type Controller = BaseController; + + async fn validate_config( + &self, + _: BaseController, + req: ValidateConfigRequest, + ) -> Result { + let _ = TomlConfigLoader::new_from_str(&req.config)?; + Ok(ValidateConfigResponse {}) + } + + async fn run_network_instance( + &self, + _: BaseController, + req: RunNetworkInstanceRequest, + ) -> Result { + let cfg = TomlConfigLoader::new_from_str(&req.config)?; + self.run_network_instance(cfg)?; + Ok(RunNetworkInstanceResponse {}) + } + + async fn retain_network_instance( + &self, + _: BaseController, + req: RetainNetworkInstanceRequest, + ) -> Result { + Ok(self.retain_network_instance(req.inst_ids.into_iter().map(Into::into).collect())?) + } + + async fn collect_network_info( + &self, + _: BaseController, + req: CollectNetworkInfoRequest, + ) -> Result { + let mut ret = self.collect_network_infos()?; + let include_inst_ids = req + .inst_ids + .iter() + .cloned() + .map(|id| id.to_string()) + .collect::>(); + if !include_inst_ids.is_empty() { + let mut to_remove = Vec::new(); + for (k, _) in ret.map.iter() { + if !include_inst_ids.contains(&k) { + to_remove.push(k.clone()); + } + } + + for k in to_remove { + ret.map.remove(&k); + } + } + Ok(CollectNetworkInfoResponse { info: Some(ret) }) + } + + // rpc ListNetworkInstance(ListNetworkInstanceRequest) returns (ListNetworkInstanceResponse) {} + async fn list_network_instance( + &self, + _: BaseController, + _: ListNetworkInstanceRequest, + ) -> Result { + Ok(ListNetworkInstanceResponse { + inst_ids: self + .list_network_instance_ids() + .into_iter() + .map(Into::into) + .collect(), + }) + } + + // rpc DeleteNetworkInstance(DeleteNetworkInstanceRequest) returns (DeleteNetworkInstanceResponse) {} + async fn delete_network_instance( + &self, + _: BaseController, + req: DeleteNetworkInstanceRequest, + ) -> Result { + let mut inst_ids = self.list_network_instance_ids(); + inst_ids.retain(|id| !req.inst_ids.contains(&(id.clone().into()))); + self.retain_network_instance(inst_ids.clone())?; + Ok(DeleteNetworkInstanceResponse { + remain_inst_ids: inst_ids.into_iter().map(Into::into).collect(), + }) + } +} diff --git a/easytier/src/web_client/mod.rs b/easytier/src/web_client/mod.rs new file mode 100644 index 0000000..524afc1 --- /dev/null +++ b/easytier/src/web_client/mod.rs @@ -0,0 +1,48 @@ +use std::sync::Arc; + +use crate::{common::scoped_task::ScopedTask, tunnel::TunnelConnector}; + +pub mod controller; +pub mod session; + +pub struct WebClient { + controller: Arc, + tasks: ScopedTask<()>, +} + +impl WebClient { + pub fn new(connector: T, token: S) -> Self { + let controller = Arc::new(controller::Controller::new(token.to_string())); + + let controller_clone = controller.clone(); + let tasks = ScopedTask::from(tokio::spawn(async move { + Self::routine(controller_clone, Box::new(connector)).await; + })); + + WebClient { controller, tasks } + } + + async fn routine( + controller: Arc, + mut connector: Box, + ) { + loop { + let conn = match connector.connect().await { + Ok(conn) => conn, + Err(e) => { + println!( + "Failed to connect to the server ({}), retrying in 5 seconds...", + e + ); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + continue; + } + }; + + println!("Successfully connected to {:?}", conn.info()); + + let mut session = session::Session::new(conn, controller.clone()); + session.wait().await; + } + } +} diff --git a/easytier/src/web_client/session.rs b/easytier/src/web_client/session.rs new file mode 100644 index 0000000..5706f18 --- /dev/null +++ b/easytier/src/web_client/session.rs @@ -0,0 +1,126 @@ +use std::sync::Arc; + +use tokio::{ + sync::{broadcast, Mutex}, + task::JoinSet, + time::interval, +}; + +use crate::{ + common::get_machine_id, + proto::{ + rpc_impl::bidirect::BidirectRpcManager, + rpc_types::controller::BaseController, + web::{ + HeartbeatRequest, HeartbeatResponse, WebClientServiceServer, + WebServerServiceClientFactory, + }, + }, + tunnel::Tunnel, +}; + +use super::controller::Controller; + +#[derive(Debug, Clone)] +struct HeartbeatCtx { + notifier: Arc>, + resp: Arc>>, +} + +pub struct Session { + rpc_mgr: BidirectRpcManager, + controller: Arc, + + heartbeat_ctx: HeartbeatCtx, + + tasks: Mutex>, +} + +impl Session { + pub fn new(tunnel: Box, controller: Arc) -> Self { + let rpc_mgr = BidirectRpcManager::new(); + rpc_mgr.run_with_tunnel(tunnel); + + rpc_mgr + .rpc_server() + .registry() + .register(WebClientServiceServer::new(controller.clone()), ""); + + let mut tasks: JoinSet<()> = JoinSet::new(); + let heartbeat_ctx = Self::heartbeat_routine(&rpc_mgr, controller.token(), &mut tasks); + + Session { + rpc_mgr, + controller, + heartbeat_ctx, + tasks: Mutex::new(tasks), + } + } + + fn heartbeat_routine( + rpc_mgr: &BidirectRpcManager, + token: String, + tasks: &mut JoinSet<()>, + ) -> HeartbeatCtx { + let (tx, _rx1) = broadcast::channel(2); + + let ctx = HeartbeatCtx { + notifier: Arc::new(tx), + resp: Arc::new(Mutex::new(None)), + }; + + let mid = get_machine_id(); + let inst_id = uuid::Uuid::new_v4(); + let token = token; + + let ctx_clone = ctx.clone(); + let mut tick = interval(std::time::Duration::from_secs(1)); + let client = rpc_mgr + .rpc_client() + .scoped_client::>(1, 1, "".to_string()); + tasks.spawn(async move { + let req = HeartbeatRequest { + machine_id: Some(mid.into()), + inst_id: Some(inst_id.into()), + user_token: token.to_string(), + }; + loop { + tick.tick().await; + match client + .heartbeat(BaseController::default(), req.clone()) + .await + { + Err(e) => { + tracing::error!("heartbeat failed: {:?}", e); + break; + } + Ok(resp) => { + tracing::debug!("heartbeat response: {:?}", resp); + let _ = ctx_clone.notifier.send(resp.clone()); + ctx_clone.resp.lock().await.replace(resp); + } + } + } + }); + + ctx + } + + async fn wait_routines(&self) { + self.tasks.lock().await.join_next().await; + // if any task failed, we should abort all tasks + self.tasks.lock().await.abort_all(); + } + + pub async fn wait(&mut self) { + tokio::select! { + _ = self.rpc_mgr.wait() => {} + _ = self.wait_routines() => {} + } + } + + pub async fn wait_next_heartbeat(&self) -> Option { + let mut rx = self.heartbeat_ctx.notifier.subscribe(); + rx.recv().await.ok() + } +} From 1ac2e1c8e3907285353c1479e8e812b634a58f8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=92=B8=E9=B1=BC=E8=80=8C=E5=B7=B2?= <64669899+hvvvvvvv@users.noreply.github.com> Date: Sat, 26 Oct 2024 01:32:25 +0800 Subject: [PATCH 2/3] Support running as a Windows service. (#445) When the program is started by the Service Control Manager (SCM), it needs to call StartServiceCtrlDispatcher (which corresponds to service_dispatcher::start in the windows-service crate) to inform the SCM of the service's entry function. The SCM then calls the service entry function passed to StartServiceCtrlDispatcher. The process calling StartServiceCtrlDispatcher will block until the service's status is set to Stopped. If the current program is not run through the SCM, StartServiceCtrlDispatcher will return the error ERROR_FAILED_SERVICE_CONTROLLER_CONNECT, and the program will run according to its original mechanism. For more details about SCM, please refer to Microsoft's documentation. --- Cargo.lock | 18 ++++++ easytier/Cargo.toml | 1 + easytier/src/easytier-core.rs | 110 +++++++++++++++++++++++++++++++++- 3 files changed, 128 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 9cfb919..e885bd2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1691,6 +1691,7 @@ dependencies = [ "url", "uuid", "wildmatch", + "windows-service", "windows-sys 0.52.0", "winreg 0.52.0", "zerocopy", @@ -7345,6 +7346,12 @@ dependencies = [ "rustix", ] +[[package]] +name = "widestring" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7219d36b6eac893fa81e84ebe06485e7dcbb616177469b142df14f1f4deb1311" + [[package]] name = "wildmatch" version = "2.3.4" @@ -7466,6 +7473,17 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-service" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d24d6bcc7f734a4091ecf8d7a64c5f7d7066f45585c1861eba06449909609c8a" +dependencies = [ + "bitflags 2.6.0", + "widestring", + "windows-sys 0.52.0", +] + [[package]] name = "windows-sys" version = "0.45.0" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 9f3bec5..57e6146 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -193,6 +193,7 @@ windows-sys = { version = "0.52", features = [ ] } encoding = "0.2" winreg = "0.52" +windows-service = "0.7.0" [build-dependencies] tonic-build = "0.12" diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 4c11ede..0c22229 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -4,7 +4,7 @@ extern crate rust_i18n; use std::{ - net::{Ipv4Addr, SocketAddr}, + net::{Ipv4Addr, SocketAddr}, path::PathBuf, }; @@ -28,6 +28,9 @@ use easytier::{ web_client, }; +#[cfg(target_os = "windows")] +windows_service::define_windows_service!(ffi_service_main, win_service_main); + #[cfg(feature = "mimalloc")] use mimalloc_rust::GlobalMiMalloc; @@ -649,11 +652,116 @@ pub fn handle_event(mut events: EventBusSubscriber) -> tokio::task::JoinHandle<( }) } +#[cfg(target_os = "windows")] +fn win_service_event_loop( + stop_notify: std::sync::Arc, + inst: launcher::NetworkInstance, + status_handle: windows_service::service_control_handler::ServiceStatusHandle, +) { + use tokio::runtime::Runtime; + use std::time::Duration; + use windows_service::service::*; + + std::thread::spawn(move || { + let rt = Runtime::new().unwrap(); + rt.block_on(async move { + tokio::select! { + res = inst.wait() => { + if let Some(e) = res { + status_handle.set_service_status(ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Stopped, + controls_accepted: ServiceControlAccept::empty(), + checkpoint: 0, + wait_hint: Duration::default(), + exit_code: ServiceExitCode::ServiceSpecific(1u32), + process_id: None + }).unwrap(); + panic!("launcher error: {:?}", e); + } + }, + _ = stop_notify.notified() => { + status_handle.set_service_status(ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Stopped, + controls_accepted: ServiceControlAccept::empty(), + checkpoint: 0, + wait_hint: Duration::default(), + exit_code: ServiceExitCode::Win32(0), + process_id: None + }).unwrap(); + std::process::exit(0); + } + } + }); + }); +} + +#[cfg(target_os = "windows")] +fn win_service_main(_: Vec) { + use std::time::Duration; + use windows_service::service_control_handler::*; + use windows_service::service::*; + use std::sync::Arc; + use tokio::sync::Notify; + + let cli = Cli::parse(); + let cfg = TomlConfigLoader::from(cli); + + init_logger(&cfg, false).unwrap(); + + let stop_notify_send = Arc::new(Notify::new()); + let stop_notify_recv = Arc::clone(&stop_notify_send); + let event_handler = move |control_event| -> ServiceControlHandlerResult { + match control_event { + ServiceControl::Interrogate => { + ServiceControlHandlerResult::NoError + } + ServiceControl::Stop => + { + stop_notify_send.notify_one(); + ServiceControlHandlerResult::NoError + } + _ => ServiceControlHandlerResult::NotImplemented, + } + }; + let status_handle = register(String::new(), event_handler).expect("register service fail"); + let next_status = ServiceStatus { + service_type: ServiceType::OWN_PROCESS, + current_state: ServiceState::Running, + controls_accepted: ServiceControlAccept::STOP, + exit_code: ServiceExitCode::Win32(0), + checkpoint: 0, + wait_hint: Duration::default(), + process_id: None, + }; + let mut inst = launcher::NetworkInstance::new(cfg).set_fetch_node_info(false); + + inst.start().unwrap(); + status_handle.set_service_status(next_status).expect("set service status fail"); + win_service_event_loop(stop_notify_recv, inst, status_handle); +} + #[tokio::main] async fn main() { let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); + #[cfg(target_os = "windows")] + match windows_service::service_dispatcher::start(String::new(), ffi_service_main) { + Ok(_) => std::thread::park(), + Err(e) => + { + let should_panic = if let windows_service::Error::Winapi(ref io_error) = e { + io_error.raw_os_error() != Some(0x427) // ERROR_FAILED_SERVICE_CONTROLLER_CONNECT + } else { true }; + + if should_panic { + panic!("SCM start an error: {}", e); + } + } + }; + let cli = Cli::parse(); setup_panic_handler(); From 18da94bf33096ccc51572e80f8d845105e1cf95a Mon Sep 17 00:00:00 2001 From: Jiangqiu Shen Date: Fri, 25 Oct 2024 13:33:04 -0400 Subject: [PATCH 3/3] make the panic message more useful (#437) when panic happend, previouse panic info: panic occurred: PanicHookInfo { payload: Any { .. }, location: Location { file: "easytier/src/easytier-core.rs", line: 680, col: 9 }, can_unwind: true, force_no_backtrace: false } the new panic info: panic occurred: payload:launcher error: "anyhow error: failed to add listener tcp://0.0.0.0:11010", location: Some(Location { file: "easytier/src/easytier-core.rs", line: 680, col: 9 }) backtrace saved to easytier-panic.log --- easytier/locales/app.yml | 5 +++++ easytier/src/easytier-cli.rs | 2 ++ easytier/src/lib.rs | 1 + easytier/src/utils.rs | 20 +++++++++++++++++++- 4 files changed, 27 insertions(+), 1 deletion(-) diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index 7e0475c..7519036 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -120,3 +120,8 @@ core_clap: ipv6_listener: en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port" zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听" + +core_app: + panic_backtrace_save: + en: "backtrace saved to easytier-panic.log" + zh-CN: "回溯信息已保存到easytier-panic.log" diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index a2d7879..943ed67 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -29,6 +29,8 @@ use easytier::{ utils::{cost_to_str, float_to_str, PeerRoutePair}, }; +rust_i18n::i18n!("locales", fallback = "en"); + #[derive(Parser, Debug)] #[command(name = "easytier-cli", author, version = EASYTIER_VERSION, about, long_about = None)] struct Cli { diff --git a/easytier/src/lib.rs b/easytier/src/lib.rs index 367e8e7..769d7cf 100644 --- a/easytier/src/lib.rs +++ b/easytier/src/lib.rs @@ -19,3 +19,4 @@ pub mod web_client; mod tests; pub const VERSION: &str = common::constants::EASYTIER_VERSION; +rust_i18n::i18n!("locales", fallback = "en"); diff --git a/easytier/src/utils.rs b/easytier/src/utils.rs index 36583c5..5160f04 100644 --- a/easytier/src/utils.rs +++ b/easytier/src/utils.rs @@ -128,7 +128,25 @@ pub fn setup_panic_handler() { use std::io::Write; std::panic::set_hook(Box::new(|info| { let backtrace = backtrace::Backtrace::force_capture(); - println!("panic occurred: {:?}, backtrace: {:#?}", info, backtrace); + let payload = info.payload(); + let payload_str: Option<&str> = if let Some(s) = payload.downcast_ref::<&str>() { + Some(s) + } else if let Some(s) = payload.downcast_ref::() { + Some(s) + } else { + None + }; + + if let Some(payload_str) = payload_str { + println!( + "panic occurred: payload:{}, location: {:?}", + payload_str, + info.location() + ); + } else { + println!("panic occurred: location: {:?}", info.location()); + } + println!("{}", rust_i18n::t!("core_app.panic_backtrace_save")); let _ = std::fs::File::create("easytier-panic.log") .and_then(|mut f| f.write_all(format!("{:?}\n{:#?}", info, backtrace).as_bytes())); std::process::exit(1);