Skip to content

Commit

Permalink
Merge pull request #23 from faust-streaming/recovery_hang
Browse files Browse the repository at this point in the history
fixed recovery hang and undefined set_close method in aiokafka
  • Loading branch information
patkivikram authored Nov 11, 2020
2 parents f64099d + 107142c commit cbccf69
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 75 deletions.
105 changes: 56 additions & 49 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import asyncio
import statistics
import typing
from asyncio import Event
from collections import defaultdict, deque
from time import monotonic
from typing import (
Expand All @@ -17,9 +18,8 @@
)

import opentracing
from mode import Service
from mode import Service, get_logger
from mode.services import WaitArgT
from mode.utils.locks import Event
from mode.utils.times import humanize_seconds, humanize_seconds_ago
from mode.utils.typing import Counter, Deque

Expand Down Expand Up @@ -51,6 +51,7 @@ class _TableManager:
Most likely you have removed data from the topics without
removing the RocksDB database file for this partition.
"""
logger = get_logger(__name__)


class RecoveryStats(NamedTuple):
Expand Down Expand Up @@ -108,7 +109,6 @@ class Recovery(Service):

_signal_recovery_start: Optional[Event] = None
_signal_recovery_end: Optional[Event] = None
_signal_recovery_reset: Optional[Event] = None

completed: Event
in_recovery: bool = False
Expand Down Expand Up @@ -202,13 +202,6 @@ def signal_recovery_end(self) -> Event:
self._signal_recovery_end = Event(loop=self.loop)
return self._signal_recovery_end

@property
def signal_recovery_reset(self) -> Event:
"""Event used to signal that recovery is restarting."""
if self._signal_recovery_reset is None:
self._signal_recovery_reset = Event(loop=self.loop)
return self._signal_recovery_reset

async def on_stop(self) -> None:
"""Call when recovery service stops."""
# Flush buffers when stopping.
Expand Down Expand Up @@ -244,18 +237,18 @@ def on_partitions_revoked(self, revoked: Set[TP]) -> None:
"""Call when rebalancing and partitions are revoked."""
T = traced_from_parent_span()
T(self.flush_buffers)()
self.signal_recovery_reset.set()
self.signal_recovery_start.set()

async def on_rebalance(
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]
) -> None:
"""Call when cluster is rebalancing."""
# removing all the sleeps so control does not go back to the loop
app = self.app
assigned_standbys = app.assignor.assigned_standbys()
assigned_actives = app.assignor.assigned_actives()

for tp in revoked:
await asyncio.sleep(0)
self.revoke(tp)

self.standby_tps.clear()
Expand All @@ -267,14 +260,10 @@ async def on_rebalance(
table = self.tables._changelogs.get(tp.topic)
if table is not None:
self.add_standby(table, tp)
await asyncio.sleep(0)
await asyncio.sleep(0)
for tp in assigned_actives:
table = self.tables._changelogs.get(tp.topic)
if table is not None:
self.add_active(table, tp)
await asyncio.sleep(0)
await asyncio.sleep(0)

active_offsets = {
tp: offset
Expand All @@ -284,16 +273,13 @@ async def on_rebalance(
self.active_offsets.clear()
self.active_offsets.update(active_offsets)

await asyncio.sleep(0)

rebalancing_span = cast(_App, self.app)._rebalancing_span
if app.tracer and rebalancing_span:
self._recovery_span = app.tracer.get_tracer("_faust").start_span(
"recovery",
child_of=rebalancing_span,
)
app._span_add_default_tags(self._recovery_span)
self.signal_recovery_reset.clear()
self.signal_recovery_start.set()

async def _resume_streams(self) -> None:
Expand All @@ -307,7 +293,9 @@ async def _resume_streams(self) -> None:
assignment = consumer.assignment()
if assignment:
self.log.info("Seek stream partitions to committed offsets.")
await self._wait(consumer.perform_seek())
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.log.dev("Resume stream partitions")
consumer.resume_partitions(assignment)
else:
Expand Down Expand Up @@ -335,8 +323,6 @@ async def _restart_recovery(self) -> None:

while not self.should_stop:
self.log.dev("WAITING FOR NEXT RECOVERY TO START")
self.signal_recovery_reset.clear()
self._set_recovery_ended()
if await self.wait_for_stopped(self.signal_recovery_start):
self.signal_recovery_start.clear()
break # service was stopped
Expand Down Expand Up @@ -371,24 +357,33 @@ async def _restart_recovery(self) -> None:
T(self.flush_buffers)()
producer = cast(_App, self.app)._producer
if producer is not None:
await self._wait(T(producer.flush)())
await self._wait(
T(producer.flush)(),
timeout=self.app.conf.broker_request_timeout,
)

self.log.dev("Build highwaters for active partitions")
await self._wait(
T(self._build_highwaters)(
consumer, assigned_active_tps, active_highwaters, "active"
)
),
timeout=self.app.conf.broker_request_timeout,
)

self.log.dev("Build offsets for active partitions")
await self._wait(
T(self._build_offsets)(
consumer, assigned_active_tps, active_offsets, "active"
)
),
timeout=self.app.conf.broker_request_timeout,
)

for tp in assigned_active_tps:
if active_offsets[tp] > active_highwaters[tp]:
if (
active_offsets[tp]
and active_highwaters[tp]
and active_offsets[tp] > active_highwaters[tp]
):
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
Expand All @@ -401,17 +396,24 @@ async def _restart_recovery(self) -> None:
await self._wait(
T(self._build_offsets)(
consumer, assigned_standby_tps, standby_offsets, "standby"
)
),
timeout=self.app.conf.broker_request_timeout,
)

self.log.dev("Seek offsets for active partitions")
await self._wait(
T(self._seek_offsets)(
consumer, assigned_active_tps, active_offsets, "active"
)
),
timeout=self.app.conf.broker_request_timeout,
)
if self.signal_recovery_start.is_set():
logger.info("Restarting Recovery")
continue

if self.need_recovery():
self._set_recovery_started()
self.standbys_pending = True
self.log.info("Restoring state from changelog topics...")
T(consumer.resume_partitions)(active_tps)
# Resume partitions and start fetching.
Expand All @@ -430,8 +432,7 @@ async def _restart_recovery(self) -> None:
)
self.app._span_add_default_tags(span)
try:
self.signal_recovery_end.clear()
await self._wait(self.signal_recovery_end)
await self._wait(self.signal_recovery_end.wait())
except Exception as exc:
finish_span(self._actives_span, error=exc)
else:
Expand All @@ -450,7 +451,6 @@ async def _restart_recovery(self) -> None:
self.log.info("Recovery complete")
if span:
span.set_tag("Recovery-Completed", True)
self._set_recovery_ended()

if standby_tps:
self.log.info("Starting standby partitions...")
Expand All @@ -459,7 +459,8 @@ async def _restart_recovery(self) -> None:
await self._wait(
T(self._seek_offsets)(
consumer, standby_tps, standby_offsets, "standby"
)
),
timeout=self.app.conf.broker_request_timeout,
)

self.log.dev("Build standby highwaters")
Expand All @@ -470,10 +471,15 @@ async def _restart_recovery(self) -> None:
standby_highwaters,
"standby",
),
timeout=self.app.conf.broker_request_timeout,
)

for tp in standby_tps:
if standby_offsets[tp] > standby_highwaters[tp]:
if (
standby_offsets[tp]
and standby_highwaters[tp]
and standby_offsets[tp] > standby_highwaters[tp]
):
raise ConsistencyError(
E_PERSISTED_OFFSET.format(
tp,
Expand All @@ -491,10 +497,11 @@ async def _restart_recovery(self) -> None:
self.app._span_add_default_tags(span)
self.log.dev("Resume standby partitions")
T(consumer.resume_partitions)(standby_tps)
T(consumer.resume_flow)()
T(self.app.flow_control.resume)()

# Pause all our topic partitions,
# to make sure we don't fetch any more records from them.
await self._wait(asyncio.sleep(0.1)) # still needed?
await self._wait(T(self.on_recovery_completed)())
except RebalanceAgain as exc:
self.log.dev("RAISED REBALANCE AGAIN")
Expand All @@ -514,7 +521,6 @@ async def _restart_recovery(self) -> None:
for _span in spans:
finish_span(_span)
# restart - wait for next rebalance.
self._set_recovery_ended()

def _set_recovery_started(self) -> None:
self.in_recovery = True
Expand Down Expand Up @@ -545,22 +551,17 @@ def _estimated_active_remaining_secs(self, remaining: float) -> Optional[float]:
else:
return None

async def _wait(self, coro: WaitArgT) -> None:
wait_result = await self.wait_first(
coro,
self.signal_recovery_reset,
self.signal_recovery_start,
)
async def _wait(self, coro: WaitArgT, timeout: int = None) -> None:
signal = self.signal_recovery_start.wait()
wait_result = await self.wait_first(coro, signal, timeout=timeout)
if wait_result.stopped:
# service was stopped.
raise ServiceStopped()
elif self.signal_recovery_start in wait_result.done:
# another rebalance started
raise RebalanceAgain()
elif self.signal_recovery_reset in wait_result.done:
raise RebalanceAgain()
else:
return None

return None

async def on_recovery_completed(self) -> None:
"""Call when active table recovery is completed."""
Expand All @@ -580,12 +581,16 @@ async def on_recovery_completed(self) -> None:
assignment = consumer.assignment()
if assignment:
self.log.info("Seek stream partitions to committed offsets.")
await consumer.perform_seek()
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)
self.completed.set()
self.log.dev("Resume stream partitions")
consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)
consumer.resume_flow()
self.app.flow_control.resume()
# finally make sure the fetcher is running.
await cast(_App, self.app)._fetcher.maybe_start()
self.tables.on_actives_ready()
Expand Down Expand Up @@ -708,15 +713,17 @@ async def _slurp_changelogs(self) -> None:
processing_times = self._processing_times

def _maybe_signal_recovery_end() -> None:
if self.in_recovery and not self.active_remaining_total():
if not self.active_remaining_total():
# apply anything stuck in the buffers
self.flush_buffers()
self._set_recovery_ended()
if self._actives_span is not None:
self._actives_span.set_tag("Actives-Ready", True)
logger.debug("Setting recovery end")
self.signal_recovery_end.set()

while not self.should_stop:
self.signal_recovery_end.clear()
try:
event: EventT = await asyncio.wait_for(
changelog_queue.get(), timeout=5.0
Expand All @@ -726,7 +733,6 @@ def _maybe_signal_recovery_end() -> None:
return
_maybe_signal_recovery_end()
continue

now = monotonic()
message = event.message
tp = message.tp
Expand Down Expand Up @@ -774,7 +780,8 @@ def _maybe_signal_recovery_end() -> None:

_maybe_signal_recovery_end()

if self.standbys_pending and not self.standby_remaining_total():
if not self.standby_remaining_total():
logger.debug("Completed standby partition fetch")
if self._standbys_span:
finish_span(self._standbys_span)
self._standbys_span = None
Expand Down
8 changes: 2 additions & 6 deletions faust/transport/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import asyncio
import gc
import typing
from asyncio import Event
from collections import defaultdict
from time import monotonic
from typing import (
Expand All @@ -73,7 +74,6 @@
from mode import Service, ServiceT, flight_recorder, get_logger
from mode.threads import MethodQueue, QueueServiceThread
from mode.utils.futures import notify
from mode.utils.locks import Event
from mode.utils.text import pluralize
from mode.utils.times import Seconds

Expand Down Expand Up @@ -172,11 +172,7 @@ async def on_stop(self) -> None:
async def _fetcher(self) -> None:
try:
consumer = cast(Consumer, self.app.consumer)
self._drainer = asyncio.ensure_future(
consumer._drain_messages(self),
loop=self.loop,
)
await self._drainer
await consumer._drain_messages(self)
except asyncio.CancelledError:
pass
finally:
Expand Down
Loading

0 comments on commit cbccf69

Please sign in to comment.