mirror of
https://github.com/EasyTier/EasyTier.git
synced 2024-11-16 11:42:27 +08:00
Compare commits
2 Commits
5cd3f1218b
...
73b6bb7037
Author | SHA1 | Date | |
---|---|---|---|
|
73b6bb7037 | ||
|
9167fc94de |
|
@ -120,6 +120,9 @@ core_clap:
|
|||
ipv6_listener:
|
||||
en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port"
|
||||
zh-CN: "IPv6 监听器的URL,例如:tcp://[::]:11010,如果未设置,将在随机UDP端口上监听"
|
||||
compression:
|
||||
en: "compression algorithm to use, support none, zstd. default is none"
|
||||
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
|
||||
|
||||
core_app:
|
||||
panic_backtrace_save:
|
||||
|
|
|
@ -3,9 +3,7 @@ use tokio::io::AsyncWriteExt;
|
|||
|
||||
use zerocopy::{AsBytes as _, FromBytes as _};
|
||||
|
||||
use crate::tunnel::packet_def::{
|
||||
CompressorAlgo, CompressorTail, PacketType, ZCPacket, COMPRESSOR_TAIL_SIZE,
|
||||
};
|
||||
use crate::tunnel::packet_def::{CompressorAlgo, CompressorTail, ZCPacket, COMPRESSOR_TAIL_SIZE};
|
||||
|
||||
type Error = anyhow::Error;
|
||||
|
||||
|
@ -75,8 +73,7 @@ impl Compressor for DefaultCompressor {
|
|||
}
|
||||
|
||||
let pm_header = zc_packet.peer_manager_header().unwrap();
|
||||
if pm_header.is_compressed() || pm_header.packet_type != PacketType::Data as u8 {
|
||||
// only compress data packets
|
||||
if pm_header.is_compressed() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
|
@ -171,4 +168,24 @@ pub mod tests {
|
|||
assert_eq!(packet.payload(), text);
|
||||
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_short_text_compress() {
|
||||
let text = b"1234";
|
||||
let mut packet = ZCPacket::new_with_payload(text);
|
||||
packet.fill_peer_manager_hdr(0, 0, 0);
|
||||
|
||||
let compressor = DefaultCompressor {};
|
||||
|
||||
// short text can't be compressed
|
||||
compressor
|
||||
.compress(&mut packet, CompressorAlgo::ZstdDefault)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
|
||||
|
||||
compressor.decompress(&mut packet).await.unwrap();
|
||||
assert_eq!(packet.payload(), text);
|
||||
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use std::{
|
|||
use anyhow::Context;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::tunnel::generate_digest_from_str;
|
||||
use crate::{proto::common::CompressionAlgoPb, tunnel::generate_digest_from_str};
|
||||
|
||||
pub type Flags = crate::proto::common::FlagsInConfig;
|
||||
|
||||
|
@ -28,6 +28,7 @@ pub fn gen_default_flags() -> Flags {
|
|||
disable_udp_hole_punching: false,
|
||||
ipv6_listener: "udp://[::]:0".to_string(),
|
||||
multi_thread: false,
|
||||
data_compress_algo: CompressionAlgoPb::None.into(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -22,7 +22,8 @@ use easytier::{
|
|||
global_ctx::{EventBusSubscriber, GlobalCtxEvent},
|
||||
scoped_task::ScopedTask,
|
||||
},
|
||||
launcher, proto,
|
||||
launcher,
|
||||
proto::{self, common::CompressionAlgoPb},
|
||||
tunnel::udp::UdpTunnelConnector,
|
||||
utils::{init_logger, setup_panic_handler},
|
||||
web_client,
|
||||
|
@ -289,6 +290,13 @@ struct Cli {
|
|||
help = t!("core_clap.ipv6_listener").to_string()
|
||||
)]
|
||||
ipv6_listener: Option<String>,
|
||||
|
||||
#[arg(
|
||||
long,
|
||||
help = t!("core_clap.compression").to_string(),
|
||||
default_value = "none",
|
||||
)]
|
||||
compression: String,
|
||||
}
|
||||
|
||||
rust_i18n::i18n!("locales", fallback = "en");
|
||||
|
@ -522,6 +530,15 @@ impl From<Cli> for TomlConfigLoader {
|
|||
.unwrap();
|
||||
}
|
||||
f.multi_thread = cli.multi_thread;
|
||||
f.data_compress_algo = match cli.compression.as_str() {
|
||||
"none" => CompressionAlgoPb::None,
|
||||
"zstd" => CompressionAlgoPb::Zstd,
|
||||
_ => panic!(
|
||||
"unknown compression algorithm: {}, supported: none, zstd",
|
||||
cli.compression
|
||||
),
|
||||
}
|
||||
.into();
|
||||
cfg.set_flags(f);
|
||||
|
||||
cfg.set_exit_nodes(cli.exit_nodes.clone());
|
||||
|
|
|
@ -79,7 +79,7 @@ impl PeerRpcManagerTransport for RpcTransport {
|
|||
|
||||
async fn send(&self, mut msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
|
||||
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
|
||||
if peers.need_relay_by_foreign_network(dst_peer_id).await? {
|
||||
if !peers.need_relay_by_foreign_network(dst_peer_id).await? {
|
||||
self.encryptor
|
||||
.encrypt(&mut msg)
|
||||
.with_context(|| "encrypt failed")?;
|
||||
|
@ -215,6 +215,12 @@ impl PeerManager {
|
|||
my_peer_id,
|
||||
));
|
||||
|
||||
let data_compress_algo = global_ctx
|
||||
.get_flags()
|
||||
.data_compress_algo()
|
||||
.try_into()
|
||||
.expect("invalid data compress algo, maybe some features not enabled");
|
||||
|
||||
let exit_nodes = global_ctx.config.get_exit_nodes();
|
||||
|
||||
PeerManager {
|
||||
|
@ -241,7 +247,7 @@ impl PeerManager {
|
|||
foreign_network_client,
|
||||
|
||||
encryptor,
|
||||
data_compress_algo: CompressorAlgo::None,
|
||||
data_compress_algo,
|
||||
|
||||
exit_nodes,
|
||||
}
|
||||
|
@ -915,7 +921,7 @@ mod tests {
|
|||
peer_rpc::tests::register_service,
|
||||
tests::{connect_peer_manager, wait_route_appear},
|
||||
},
|
||||
proto::common::NatType,
|
||||
proto::common::{CompressionAlgoPb, NatType},
|
||||
tunnel::{common::tests::wait_for_condition, TunnelConnector, TunnelListener},
|
||||
};
|
||||
|
||||
|
@ -1057,6 +1063,7 @@ mod tests {
|
|||
let mock_global_ctx = get_mock_global_ctx();
|
||||
mock_global_ctx.config.set_flags(Flags {
|
||||
enable_encryption,
|
||||
data_compress_algo: CompressionAlgoPb::Zstd.into(),
|
||||
..Default::default()
|
||||
});
|
||||
let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, mock_global_ctx, s));
|
||||
|
|
|
@ -20,6 +20,7 @@ message FlagsInConfig {
|
|||
bool disable_udp_hole_punching = 13;
|
||||
string ipv6_listener = 14;
|
||||
bool multi_thread = 15;
|
||||
CompressionAlgoPb data_compress_algo = 16;
|
||||
}
|
||||
|
||||
message RpcDescriptor {
|
||||
|
|
|
@ -51,14 +51,14 @@ struct InflightRequest {
|
|||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct PeerInfo {
|
||||
peer_id: PeerId,
|
||||
compression_info: RpcCompressionInfo,
|
||||
last_active: Option<std::time::Instant>,
|
||||
pub struct PeerInfo {
|
||||
pub peer_id: PeerId,
|
||||
pub compression_info: RpcCompressionInfo,
|
||||
pub last_active: Option<std::time::Instant>,
|
||||
}
|
||||
|
||||
type InflightRequestTable = Arc<DashMap<InflightRequestKey, InflightRequest>>;
|
||||
type PeerInfoTable = Arc<DashMap<PeerId, PeerInfo>>;
|
||||
pub type PeerInfoTable = Arc<DashMap<PeerId, PeerInfo>>;
|
||||
|
||||
pub struct Client {
|
||||
mpsc: Mutex<MpscTunnel<Box<dyn Tunnel>>>,
|
||||
|
@ -298,4 +298,8 @@ impl Client {
|
|||
pub fn inflight_count(&self) -> usize {
|
||||
self.inflight_requests.len()
|
||||
}
|
||||
|
||||
pub fn peer_info_table(&self) -> PeerInfoTable {
|
||||
self.peer_info.clone()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -12,10 +12,7 @@ use tokio_stream::StreamExt;
|
|||
use crate::{
|
||||
common::{join_joinset_background, PeerId},
|
||||
proto::{
|
||||
common::{
|
||||
self, CompressionAlgoPb, RpcCompressionInfo, RpcPacket, RpcRequest,
|
||||
RpcResponse,
|
||||
},
|
||||
common::{self, CompressionAlgoPb, RpcCompressionInfo, RpcPacket, RpcRequest, RpcResponse},
|
||||
rpc_types::error::Result,
|
||||
},
|
||||
tunnel::{
|
||||
|
|
|
@ -41,6 +41,7 @@ impl Greeting for GreetingService {
|
|||
}
|
||||
}
|
||||
|
||||
use crate::proto::common::{CompressionAlgoPb, RpcCompressionInfo};
|
||||
use crate::proto::rpc_impl::client::Client;
|
||||
use crate::proto::rpc_impl::server::Server;
|
||||
|
||||
|
@ -129,6 +130,15 @@ async fn rpc_basic_test() {
|
|||
let ret = out.say_hello(ctrl, input).await;
|
||||
assert_eq!(ret.unwrap().greeting, "Hello world!");
|
||||
|
||||
assert_eq!(1, ctx.client.peer_info_table().len());
|
||||
let first_peer_info = ctx.client.peer_info_table().iter().next().unwrap().clone();
|
||||
assert_eq!(
|
||||
first_peer_info.compression_info.accepted_algo(),
|
||||
CompressionAlgoPb::Zstd,
|
||||
);
|
||||
|
||||
println!("{:?}", ctx.client.peer_info_table());
|
||||
|
||||
let ctrl = RpcController::default();
|
||||
let input = SayGoodbyeRequest {
|
||||
name: "world".to_string(),
|
||||
|
@ -145,6 +155,15 @@ async fn rpc_basic_test() {
|
|||
|
||||
assert_eq!(0, ctx.client.inflight_count());
|
||||
assert_eq!(0, ctx.server.inflight_count());
|
||||
|
||||
let first_peer_info = ctx.client.peer_info_table().iter().next().unwrap().clone();
|
||||
assert_eq!(
|
||||
first_peer_info.compression_info,
|
||||
RpcCompressionInfo {
|
||||
algo: CompressionAlgoPb::Zstd.into(),
|
||||
accepted_algo: CompressionAlgoPb::Zstd.into(),
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -14,8 +14,11 @@ use crate::{
|
|||
netns::{NetNS, ROOT_NETNS_NAME},
|
||||
},
|
||||
instance::instance::Instance,
|
||||
tunnel::common::tests::wait_for_condition,
|
||||
tunnel::{ring::RingTunnelConnector, tcp::TcpTunnelConnector, udp::UdpTunnelConnector},
|
||||
proto::common::CompressionAlgoPb,
|
||||
tunnel::{
|
||||
common::tests::wait_for_condition, ring::RingTunnelConnector, tcp::TcpTunnelConnector,
|
||||
udp::UdpTunnelConnector,
|
||||
},
|
||||
};
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
|
@ -400,6 +403,47 @@ pub async fn subnet_proxy_three_node_test(
|
|||
subnet_proxy_test_udp().await;
|
||||
}
|
||||
|
||||
#[rstest::rstest]
|
||||
#[tokio::test]
|
||||
#[serial_test::serial]
|
||||
pub async fn data_compress(
|
||||
#[values(true, false)] inst1_compress: bool,
|
||||
#[values(true, false)] inst2_compress: bool,
|
||||
) {
|
||||
let _insts = init_three_node_ex(
|
||||
"udp",
|
||||
|cfg| {
|
||||
if cfg.get_inst_name() == "inst1" && inst1_compress {
|
||||
let mut flags = cfg.get_flags();
|
||||
flags.data_compress_algo = CompressionAlgoPb::Zstd.into();
|
||||
cfg.set_flags(flags);
|
||||
}
|
||||
|
||||
if cfg.get_inst_name() == "inst3" && inst2_compress {
|
||||
let mut flags = cfg.get_flags();
|
||||
flags.data_compress_algo = CompressionAlgoPb::Zstd.into();
|
||||
cfg.set_flags(flags);
|
||||
}
|
||||
|
||||
cfg
|
||||
},
|
||||
false,
|
||||
)
|
||||
.await;
|
||||
|
||||
wait_for_condition(
|
||||
|| async { ping_test("net_a", "10.144.144.3", None).await },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
|
||||
wait_for_condition(
|
||||
|| async { ping_test("net_a", "10.144.144.3", Some(5 * 1024)).await },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[cfg(feature = "wireguard")]
|
||||
#[rstest::rstest]
|
||||
#[tokio::test]
|
||||
|
|
Loading…
Reference in New Issue
Block a user