Skip to content

Commit

Permalink
fix disconnect handling for consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
Colin Stearns committed Jul 10, 2020
1 parent 57247b8 commit baa28f8
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ impl<T: DeserializeMessage> TopicConsumer<T> {
let mut interval = client.executor.interval(Duration::from_millis(500));
if client.executor.spawn(Box::pin(async move {
while interval.next().await.is_some() {
if let Err(e) = redelivery_tx.send(AckMessage::UnackedRedelivery).await {
error!("could not send redelivery ticker: {:?}", e);
if redelivery_tx.send(AckMessage::UnackedRedelivery).await.is_err() {
// Consumer shut down - stop ticker
break;
}
}
}))
Expand Down Expand Up @@ -421,9 +422,7 @@ impl<T: DeserializeMessage> Stream for TopicConsumer<T> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.messages.as_mut().poll_next(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(None) => {
Poll::Ready(Some(Err(Error::Connection(ConnectionError::Disconnected))))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Ready(Some(Ok((id, payload)))) => {
self.last_message_received = Some(Utc::now());
self.messages_received += 1;
Expand Down

0 comments on commit baa28f8

Please sign in to comment.