diff --git a/CHANGELOG.md b/CHANGELOG.md index f5c03bf80..7daf76275 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,17 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog], and this project adheres to [Semantic Versioning]. +## [Unreleased] + +### Added + +- cli: Added `DIPDUP_CONFIG` and `DIPDUP_ENV_FILE` environment variables corresponding to `--config` and `--env-file` options. + +### Fixed + +- evm.node: Fixed crash on anonymous event logs during the last mile indexing. +- evm.node: Raise an exception when no realtime messages have been received in `http.connection_timeout` seconds. + ## [7.2.0] - 2023-11-30 ### Added diff --git a/src/dipdup/cli.py b/src/dipdup/cli.py index 0ce8e3a52..958f0bdba 100644 --- a/src/dipdup/cli.py +++ b/src/dipdup/cli.py @@ -191,6 +191,7 @@ def _skip_cli_group() -> bool: help='A path to DipDup project config.', default=[ROOT_CONFIG], metavar='PATH', + envvar='DIPDUP_CONFIG', ) @click.option( '--env-file', @@ -200,6 +201,7 @@ def _skip_cli_group() -> bool: help='A path to .env file containing `KEY=value` strings.', default=[], metavar='PATH', + envvar='DIPDUP_ENV_FILE', ) @click.pass_context @_cli_wrapper diff --git a/src/dipdup/datasources/evm_node.py b/src/dipdup/datasources/evm_node.py index 11d317630..2ab69c83f 100644 --- a/src/dipdup/datasources/evm_node.py +++ b/src/dipdup/datasources/evm_node.py @@ -35,6 +35,7 @@ from dipdup.pysignalr import WebsocketMessage from dipdup.pysignalr import WebsocketProtocol from dipdup.pysignalr import WebsocketTransport +from dipdup.utils import Watchdog WEB3_CACHE_SIZE = 256 @@ -66,6 +67,7 @@ def __init__(self, config: EvmNodeDatasourceConfig, merge_subscriptions: bool = self._subscription_ids: dict[str, EvmNodeSubscription] = {} self._logs_queue: Queue[dict[str, Any]] = Queue() self._heads: defaultdict[int, NodeHead] = defaultdict(NodeHead) + self._watchdog: Watchdog = Watchdog(self._http_config.connection_timeout) self._on_connected_callbacks: set[EmptyCallback] = set() self._on_disconnected_callbacks: set[EmptyCallback] = set() @@ -109,6 +111,7 @@ async def run(self) -> None: await asyncio.gather( self._ws_loop(), self._log_processor_loop(), + self._watchdog.run(), ) async def _log_processor_loop(self) -> None: @@ -116,7 +119,14 @@ async def _log_processor_loop(self) -> None: log_json = await self._logs_queue.get() level = int(log_json['blockNumber'], 16) - await self._heads[level].event.wait() + try: + await asyncio.wait_for( + self._heads[level].event.wait(), + timeout=self._http_config.connection_timeout, + ) + except asyncio.TimeoutError as e: + msg = f'Head for level {level} not received in {self._http_config.connection_timeout} seconds' + raise FrameworkException(msg) from e timestamp = self._heads[level].timestamp if timestamp is None: raise FrameworkException('Head received but timestamp is None') @@ -275,6 +285,7 @@ async def _on_message(self, message: Message) -> None: raise FrameworkException(f'Unknown message type: {type(message)}') data = message.data + self._watchdog.reset() if 'id' in data: request_id = data['id'] diff --git a/src/dipdup/indexes/evm_subsquid_events/matcher.py b/src/dipdup/indexes/evm_subsquid_events/matcher.py index 9a1b87dbf..b7f7c54f2 100644 --- a/src/dipdup/indexes/evm_subsquid_events/matcher.py +++ b/src/dipdup/indexes/evm_subsquid_events/matcher.py @@ -96,6 +96,9 @@ def match_events( matched_handlers: deque[MatchedEventsT] = deque() for event in events: + if not event.topics: + continue + for handler_config in handlers: typename = handler_config.contract.module_name name = handler_config.name diff --git a/src/dipdup/utils.py b/src/dipdup/utils.py index 2c9fe70b9..7db163276 100644 --- a/src/dipdup/utils.py +++ b/src/dipdup/utils.py @@ -1,3 +1,4 @@ +import asyncio import importlib import logging import pkgutil @@ -228,3 +229,25 @@ def json_dumps(obj: Any | str, option: int | None = orjson.OPT_INDENT_2) -> byte default=_default_for_decimals, option=option, ) + + +class Watchdog: + def __init__(self, timeout: int) -> None: + self._watchdog = asyncio.Event() + self._timeout = timeout + + def reset(self) -> None: + self._watchdog.set() + self._watchdog.clear() + + async def run(self) -> None: + while True: + await asyncio.sleep(self._timeout) + try: + await asyncio.wait_for( + self._watchdog.wait(), + timeout=self._timeout, + ) + except asyncio.TimeoutError as e: + msg = f'Watchdog timeout; no messages received in {self._timeout} seconds' + raise FrameworkException(msg) from e