move reconn task join into select! (#88)

if join_next stuck, may miss global event and cause panic
This commit is contained in:
Sijie.Sun 2024-05-09 18:51:58 +08:00 committed by GitHub
parent 68c077820f
commit 7d3b8e42fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -167,7 +167,6 @@ impl ManualConnectorManager {
let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis( let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis(
use_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS), use_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS),
)); ));
let mut reconn_tasks = JoinSet::new();
let (reconn_result_send, mut reconn_result_recv) = mpsc::channel(100); let (reconn_result_send, mut reconn_result_recv) = mpsc::channel(100);
loop { loop {
@ -176,8 +175,8 @@ impl ManualConnectorManager {
if let Ok(event) = event { if let Ok(event) = event {
Self::handle_event(&event, data.clone()).await; Self::handle_event(&event, data.clone()).await;
} else { } else {
log::warn!("event_recv closed"); tracing::warn!(?event, "event_recv got error");
panic!("event_recv closed"); panic!("event_recv got error, err: {:?}", event);
} }
} }
@ -193,7 +192,7 @@ impl ManualConnectorManager {
let insert_succ = data.reconnecting.insert(dead_url.clone()); let insert_succ = data.reconnecting.insert(dead_url.clone());
assert!(insert_succ); assert!(insert_succ);
reconn_tasks.spawn(async move { tokio::spawn(async move {
let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), connector.clone()).await; let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), connector.clone()).await;
sender.send(reconn_ret).await.unwrap(); sender.send(reconn_ret).await.unwrap();
@ -205,8 +204,7 @@ impl ManualConnectorManager {
} }
ret = reconn_result_recv.recv() => { ret = reconn_result_recv.recv() => {
log::warn!("reconn_tasks done, out: {:?}", ret); log::warn!("reconn_tasks done, reconn result: {:?}", ret);
let _ = reconn_tasks.join_next().await.unwrap();
} }
} }
} }