Compare commits

..

2 Commits

Author SHA1 Message Date
Sijie.Sun
73b6bb7037
Merge 9167fc94de into 4fc3ff8ce8 2024-11-15 17:18:49 +00:00
sijie.sun
9167fc94de support compress for rpc and tun data 2024-11-16 01:18:41 +08:00
10 changed files with 131 additions and 21 deletions

View File

@ -120,6 +120,9 @@ core_clap:
ipv6_listener:
en: "the url of the ipv6 listener, e.g.: tcp://[::]:11010, if not set, will listen on random udp port"
zh-CN: "IPv6 监听器的URL例如tcp://[::]:11010如果未设置将在随机UDP端口上监听"
compression:
en: "compression algorithm to use, support none, zstd. default is none"
zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none"
core_app:
panic_backtrace_save:

View File

@ -3,9 +3,7 @@ use tokio::io::AsyncWriteExt;
use zerocopy::{AsBytes as _, FromBytes as _};
use crate::tunnel::packet_def::{
CompressorAlgo, CompressorTail, PacketType, ZCPacket, COMPRESSOR_TAIL_SIZE,
};
use crate::tunnel::packet_def::{CompressorAlgo, CompressorTail, ZCPacket, COMPRESSOR_TAIL_SIZE};
type Error = anyhow::Error;
@ -75,8 +73,7 @@ impl Compressor for DefaultCompressor {
}
let pm_header = zc_packet.peer_manager_header().unwrap();
if pm_header.is_compressed() || pm_header.packet_type != PacketType::Data as u8 {
// only compress data packets
if pm_header.is_compressed() {
return Ok(());
}
@ -171,4 +168,24 @@ pub mod tests {
assert_eq!(packet.payload(), text);
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
}
#[tokio::test]
async fn test_short_text_compress() {
let text = b"1234";
let mut packet = ZCPacket::new_with_payload(text);
packet.fill_peer_manager_hdr(0, 0, 0);
let compressor = DefaultCompressor {};
// short text can't be compressed
compressor
.compress(&mut packet, CompressorAlgo::ZstdDefault)
.await
.unwrap();
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
compressor.decompress(&mut packet).await.unwrap();
assert_eq!(packet.payload(), text);
assert_eq!(packet.peer_manager_header().unwrap().is_compressed(), false);
}
}

View File

@ -7,7 +7,7 @@ use std::{
use anyhow::Context;
use serde::{Deserialize, Serialize};
use crate::tunnel::generate_digest_from_str;
use crate::{proto::common::CompressionAlgoPb, tunnel::generate_digest_from_str};
pub type Flags = crate::proto::common::FlagsInConfig;
@ -28,6 +28,7 @@ pub fn gen_default_flags() -> Flags {
disable_udp_hole_punching: false,
ipv6_listener: "udp://[::]:0".to_string(),
multi_thread: false,
data_compress_algo: CompressionAlgoPb::None.into(),
}
}

View File

@ -22,7 +22,8 @@ use easytier::{
global_ctx::{EventBusSubscriber, GlobalCtxEvent},
scoped_task::ScopedTask,
},
launcher, proto,
launcher,
proto::{self, common::CompressionAlgoPb},
tunnel::udp::UdpTunnelConnector,
utils::{init_logger, setup_panic_handler},
web_client,
@ -289,6 +290,13 @@ struct Cli {
help = t!("core_clap.ipv6_listener").to_string()
)]
ipv6_listener: Option<String>,
#[arg(
long,
help = t!("core_clap.compression").to_string(),
default_value = "none",
)]
compression: String,
}
rust_i18n::i18n!("locales", fallback = "en");
@ -522,6 +530,15 @@ impl From<Cli> for TomlConfigLoader {
.unwrap();
}
f.multi_thread = cli.multi_thread;
f.data_compress_algo = match cli.compression.as_str() {
"none" => CompressionAlgoPb::None,
"zstd" => CompressionAlgoPb::Zstd,
_ => panic!(
"unknown compression algorithm: {}, supported: none, zstd",
cli.compression
),
}
.into();
cfg.set_flags(f);
cfg.set_exit_nodes(cli.exit_nodes.clone());

View File

@ -79,7 +79,7 @@ impl PeerRpcManagerTransport for RpcTransport {
async fn send(&self, mut msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
if peers.need_relay_by_foreign_network(dst_peer_id).await? {
if !peers.need_relay_by_foreign_network(dst_peer_id).await? {
self.encryptor
.encrypt(&mut msg)
.with_context(|| "encrypt failed")?;
@ -215,6 +215,12 @@ impl PeerManager {
my_peer_id,
));
let data_compress_algo = global_ctx
.get_flags()
.data_compress_algo()
.try_into()
.expect("invalid data compress algo, maybe some features not enabled");
let exit_nodes = global_ctx.config.get_exit_nodes();
PeerManager {
@ -241,7 +247,7 @@ impl PeerManager {
foreign_network_client,
encryptor,
data_compress_algo: CompressorAlgo::None,
data_compress_algo,
exit_nodes,
}
@ -915,7 +921,7 @@ mod tests {
peer_rpc::tests::register_service,
tests::{connect_peer_manager, wait_route_appear},
},
proto::common::NatType,
proto::common::{CompressionAlgoPb, NatType},
tunnel::{common::tests::wait_for_condition, TunnelConnector, TunnelListener},
};
@ -1057,6 +1063,7 @@ mod tests {
let mock_global_ctx = get_mock_global_ctx();
mock_global_ctx.config.set_flags(Flags {
enable_encryption,
data_compress_algo: CompressionAlgoPb::Zstd.into(),
..Default::default()
});
let peer_mgr = Arc::new(PeerManager::new(RouteAlgoType::Ospf, mock_global_ctx, s));

View File

@ -20,6 +20,7 @@ message FlagsInConfig {
bool disable_udp_hole_punching = 13;
string ipv6_listener = 14;
bool multi_thread = 15;
CompressionAlgoPb data_compress_algo = 16;
}
message RpcDescriptor {

View File

@ -51,14 +51,14 @@ struct InflightRequest {
}
#[derive(Debug, Clone, Default)]
struct PeerInfo {
peer_id: PeerId,
compression_info: RpcCompressionInfo,
last_active: Option<std::time::Instant>,
pub struct PeerInfo {
pub peer_id: PeerId,
pub compression_info: RpcCompressionInfo,
pub last_active: Option<std::time::Instant>,
}
type InflightRequestTable = Arc<DashMap<InflightRequestKey, InflightRequest>>;
type PeerInfoTable = Arc<DashMap<PeerId, PeerInfo>>;
pub type PeerInfoTable = Arc<DashMap<PeerId, PeerInfo>>;
pub struct Client {
mpsc: Mutex<MpscTunnel<Box<dyn Tunnel>>>,
@ -298,4 +298,8 @@ impl Client {
pub fn inflight_count(&self) -> usize {
self.inflight_requests.len()
}
pub fn peer_info_table(&self) -> PeerInfoTable {
self.peer_info.clone()
}
}

View File

@ -12,10 +12,7 @@ use tokio_stream::StreamExt;
use crate::{
common::{join_joinset_background, PeerId},
proto::{
common::{
self, CompressionAlgoPb, RpcCompressionInfo, RpcPacket, RpcRequest,
RpcResponse,
},
common::{self, CompressionAlgoPb, RpcCompressionInfo, RpcPacket, RpcRequest, RpcResponse},
rpc_types::error::Result,
},
tunnel::{

View File

@ -41,6 +41,7 @@ impl Greeting for GreetingService {
}
}
use crate::proto::common::{CompressionAlgoPb, RpcCompressionInfo};
use crate::proto::rpc_impl::client::Client;
use crate::proto::rpc_impl::server::Server;
@ -129,6 +130,15 @@ async fn rpc_basic_test() {
let ret = out.say_hello(ctrl, input).await;
assert_eq!(ret.unwrap().greeting, "Hello world!");
assert_eq!(1, ctx.client.peer_info_table().len());
let first_peer_info = ctx.client.peer_info_table().iter().next().unwrap().clone();
assert_eq!(
first_peer_info.compression_info.accepted_algo(),
CompressionAlgoPb::Zstd,
);
println!("{:?}", ctx.client.peer_info_table());
let ctrl = RpcController::default();
let input = SayGoodbyeRequest {
name: "world".to_string(),
@ -145,6 +155,15 @@ async fn rpc_basic_test() {
assert_eq!(0, ctx.client.inflight_count());
assert_eq!(0, ctx.server.inflight_count());
let first_peer_info = ctx.client.peer_info_table().iter().next().unwrap().clone();
assert_eq!(
first_peer_info.compression_info,
RpcCompressionInfo {
algo: CompressionAlgoPb::Zstd.into(),
accepted_algo: CompressionAlgoPb::Zstd.into(),
}
);
}
#[tokio::test]

View File

@ -14,8 +14,11 @@ use crate::{
netns::{NetNS, ROOT_NETNS_NAME},
},
instance::instance::Instance,
tunnel::common::tests::wait_for_condition,
tunnel::{ring::RingTunnelConnector, tcp::TcpTunnelConnector, udp::UdpTunnelConnector},
proto::common::CompressionAlgoPb,
tunnel::{
common::tests::wait_for_condition, ring::RingTunnelConnector, tcp::TcpTunnelConnector,
udp::UdpTunnelConnector,
},
};
#[cfg(feature = "wireguard")]
@ -400,6 +403,47 @@ pub async fn subnet_proxy_three_node_test(
subnet_proxy_test_udp().await;
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn data_compress(
#[values(true, false)] inst1_compress: bool,
#[values(true, false)] inst2_compress: bool,
) {
let _insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" && inst1_compress {
let mut flags = cfg.get_flags();
flags.data_compress_algo = CompressionAlgoPb::Zstd.into();
cfg.set_flags(flags);
}
if cfg.get_inst_name() == "inst3" && inst2_compress {
let mut flags = cfg.get_flags();
flags.data_compress_algo = CompressionAlgoPb::Zstd.into();
cfg.set_flags(flags);
}
cfg
},
false,
)
.await;
wait_for_condition(
|| async { ping_test("net_a", "10.144.144.3", None).await },
Duration::from_secs(5),
)
.await;
wait_for_condition(
|| async { ping_test("net_a", "10.144.144.3", Some(5 * 1024)).await },
Duration::from_secs(5),
)
.await;
}
#[cfg(feature = "wireguard")]
#[rstest::rstest]
#[tokio::test]