diff --git a/src/replication/events.rs b/src/replication/events.rs index 967ec89..4d695ae 100644 --- a/src/replication/events.rs +++ b/src/replication/events.rs @@ -1,6 +1,6 @@ //! events related to replication use crate::{common::BitfieldUpdate, HypercoreError}; -use async_broadcast::{broadcast, Receiver, Sender}; +use async_broadcast::{broadcast, InactiveReceiver, Receiver, Sender}; static MAX_EVENT_QUEUE_CAPACITY: usize = 32; @@ -75,13 +75,19 @@ impl_from_for_enum_variant!(Event, Have); pub(crate) struct Events { /// Channel for core events pub(crate) channel: Sender<Event>, + /// Kept around so `Events::channel` stays open. + _receiver: InactiveReceiver<Event>, } impl Events { pub(crate) fn new() -> Self { - let mut channel = broadcast(MAX_EVENT_QUEUE_CAPACITY).0; + let (mut channel, receiver) = broadcast(MAX_EVENT_QUEUE_CAPACITY); channel.set_await_active(false); - Self { channel } + let mut _receiver = receiver.deactivate(); + // Message sending is best effort. Is msg queue fills up, remove old messages to make place + // for new ones. + _receiver.set_overflow(true); + Self { channel, _receiver } } /// The internal channel errors on send when no replicators are subscribed,