diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 88f803105..c83c75b51 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -724,7 +724,7 @@ async def _maybe_signal_recovery_end(timeout=False, timeout_count=0) -> None: # the aiokafka consumer position and draining of the queue if timeout and self.app.in_transaction and timeout_count > 1: await detect_aborted_tx() - if not self.active_remaining_total(): + if not self.active_remaining_total() and self.in_recovery: # apply anything stuck in the buffers self.flush_buffers() self._set_recovery_ended()