diff --git a/mycelium/src/peer.rs b/mycelium/src/peer.rs index 7eb1c99..001235d 100644 --- a/mycelium/src/peer.rs +++ b/mycelium/src/peer.rs @@ -1,4 +1,5 @@ use futures::{SinkExt, StreamExt}; +use std::time::Duration; use std::{ error::Error, io, @@ -7,6 +8,7 @@ use std::{ Arc, RwLock, Weak, }, }; +use tokio::time::timeout; use tokio::{ select, sync::{mpsc, Notify}, @@ -90,6 +92,10 @@ impl Peer { }), }; + // timeout to flush buffered packets. + // we need this to avoid waiting indefinitely for the peer to flush packets. + const FLUSH_TIMEOUT: Duration = Duration::from_secs(180); + // Framed for peer // Used to send and receive packets from a TCP stream let mut framed = Framed::new(connection, packet::Codec::new()); @@ -156,7 +162,13 @@ impl Peer { } } - if let Err(e) = framed.flush().await { + if let Err(e) = match timeout(FLUSH_TIMEOUT, framed.flush()).await { + Ok(result) => result, + Err(e) => { + error!("Timeout while flushing buffered peer connection data packets. elapsed time:{}",e); + break; + } + } { error!("Failed to flush buffered peer connection data packets: {e}"); break } @@ -182,8 +194,14 @@ impl Peer { } } - if let Err(e) = framed.flush().await { - error!("Failed to flush buffered peer connection control packets: {e}"); + if let Err(e) = match timeout(FLUSH_TIMEOUT, framed.flush()).await { + Ok(result) => result, + Err(e) => { + error!("Timeout while flushing buffered peer connection data packets.elapsed time:{}",e); + break; + } + } { + error!("Failed to flush buffered peer connection data packets: {e}"); break } }