Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip anonymous events; node realtime watchdog #911

Merged
merged 3 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning].

## [Unreleased]

### Added

- cli: Added `DIPDUP_CONFIG` and `DIPDUP_ENV_FILE` environment variables corresponding to `--config` and `--env-file` options.

### Fixed

- evm.node: Fixed crash on anonymous event logs during the last mile indexing.
- evm.node: Raise an exception when no realtime messages have been received in `http.connection_timeout` seconds.

## [7.2.0] - 2023-11-30

### Added
Expand Down
2 changes: 2 additions & 0 deletions src/dipdup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ def _skip_cli_group() -> bool:
help='A path to DipDup project config.',
default=[ROOT_CONFIG],
metavar='PATH',
envvar='DIPDUP_CONFIG',
)
@click.option(
'--env-file',
Expand All @@ -200,6 +201,7 @@ def _skip_cli_group() -> bool:
help='A path to .env file containing `KEY=value` strings.',
default=[],
metavar='PATH',
envvar='DIPDUP_ENV_FILE',
)
@click.pass_context
@_cli_wrapper
Expand Down
13 changes: 12 additions & 1 deletion src/dipdup/datasources/evm_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dipdup.pysignalr import WebsocketMessage
from dipdup.pysignalr import WebsocketProtocol
from dipdup.pysignalr import WebsocketTransport
from dipdup.utils import Watchdog

WEB3_CACHE_SIZE = 256

Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(self, config: EvmNodeDatasourceConfig, merge_subscriptions: bool =
self._subscription_ids: dict[str, EvmNodeSubscription] = {}
self._logs_queue: Queue[dict[str, Any]] = Queue()
self._heads: defaultdict[int, NodeHead] = defaultdict(NodeHead)
self._watchdog: Watchdog = Watchdog(self._http_config.connection_timeout)

self._on_connected_callbacks: set[EmptyCallback] = set()
self._on_disconnected_callbacks: set[EmptyCallback] = set()
Expand Down Expand Up @@ -109,14 +111,22 @@ async def run(self) -> None:
await asyncio.gather(
self._ws_loop(),
self._log_processor_loop(),
self._watchdog.run(),
)

async def _log_processor_loop(self) -> None:
while True:
log_json = await self._logs_queue.get()
level = int(log_json['blockNumber'], 16)

await self._heads[level].event.wait()
try:
await asyncio.wait_for(
self._heads[level].event.wait(),
timeout=self._http_config.request_timeout,
)
except asyncio.TimeoutError as e:
msg = f'Head for level {level} not received in {self._http_config.request_timeout} seconds'
raise FrameworkException(msg) from e
timestamp = self._heads[level].timestamp
if timestamp is None:
raise FrameworkException('Head received but timestamp is None')
Expand Down Expand Up @@ -275,6 +285,7 @@ async def _on_message(self, message: Message) -> None:
raise FrameworkException(f'Unknown message type: {type(message)}')

data = message.data
self._watchdog.reset()

if 'id' in data:
request_id = data['id']
Expand Down
3 changes: 3 additions & 0 deletions src/dipdup/indexes/evm_subsquid_events/matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def match_events(
matched_handlers: deque[MatchedEventsT] = deque()

for event in events:
if not event.topics:
continue

for handler_config in handlers:
typename = handler_config.contract.module_name
name = handler_config.name
Expand Down
23 changes: 23 additions & 0 deletions src/dipdup/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import importlib
import logging
import pkgutil
Expand Down Expand Up @@ -228,3 +229,25 @@ def json_dumps(obj: Any | str, option: int | None = orjson.OPT_INDENT_2) -> byte
default=_default_for_decimals,
option=option,
)


class Watchdog:
def __init__(self, timeout: int) -> None:
self._watchdog = asyncio.Event()
self._timeout = timeout

def reset(self) -> None:
self._watchdog.set()
self._watchdog.clear()

async def run(self) -> None:
while True:
await asyncio.sleep(self._timeout)
try:
await asyncio.wait_for(
self._watchdog.wait(),
timeout=self._timeout,
)
except asyncio.TimeoutError as e:
msg = f'Watchdog timeout; no messages received in {self._timeout} seconds'
raise FrameworkException(msg) from e