Skip to content

Commit

Permalink
Refactor metrics (#902)
Browse files Browse the repository at this point in the history
Co-authored-by: Vladimir Bobrikov <[email protected]>
Co-authored-by: Lev Gorodetskiy <[email protected]>
  • Loading branch information
3 people authored Nov 25, 2023
1 parent 2f56f25 commit 553176f
Show file tree
Hide file tree
Showing 16 changed files with 63 additions and 216 deletions.
41 changes: 21 additions & 20 deletions docs/6.deployment/4.prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,6 @@ description: "DipDup provides basic integration with the Prometheus monitoring s

# Prometheus integration

::banner{type="warning"}
This page is incomplete and/or outdated. Come back later.
::

DipDup provides basic integration with the Prometheus monitoring system by exposing some metrics.

When running DipDup in Docker make sure that the Prometheus instance is in the same network.
Expand All @@ -17,19 +13,24 @@ When running DipDup in Docker make sure that the Prometheus instance is in the s

The following metrics are exposed under `dipdup` namespace:

| metric name | description |
|-|-|
| `dipdup_indexes_total` | Number of indexes in operation by status |
| `dipdup_index_level_sync_duration_seconds` | Duration of indexing a single level |
| `dipdup_index_level_realtime_duration_seconds` | Duration of last index synchronization |
| `dipdup_index_total_sync_duration_seconds` | Duration of the last index synchronization |
| `dipdup_index_total_realtime_duration_seconds` | Duration of the last index realtime synchronization |
| `dipdup_index_levels_to_sync_total` | Number of levels to reach synced state |
| `dipdup_index_levels_to_realtime_total` | Number of levels to reach realtime state |
| `dipdup_index_handlers_matched_total` | Index total hits |
| `dipdup_datasource_head_updated_timestamp` | Timestamp of the last head update |
| `dipdup_datasource_rollbacks_total` | Number of rollbacks |
| `dipdup_http_errors_total` | Number of http errors |
| `dipdup_callback_duration_seconds` | Duration of callback execution |

You can also query [internal tables](../5.advanced/3.internal-tables.md) for monitoring purposes.
| metric name | description |
| ---------------------------------------------- | ---------------------------------------------- |
| `dipdup_indexes_total` | Number of indexes in operation by status |
| `dipdup_index_total_sync_duration_seconds` | Duration of the last synchronization |
| `dipdup_index_total_realtime_duration_seconds` | Duration of the last realtime queue processing |
| `dipdup_index_levels_to_sync_total` | Number of levels to reach synced state |
| `dipdup_index_levels_to_realtime_total` | Number of levels to reach realtime state |
| `dipdup_index_handlers_matched_total` | Index total hits |
| `dipdup_datasource_head_updated_timestamp` | Timestamp of the last head update |
| `dipdup_datasource_rollbacks_total` | Number of rollbacks |
| `dipdup_http_errors_total` | Number of HTTP errors |

Also, DipDup exposes the following metrics for compatibility with Subsquid Cloud:

| metric name | description |
| ------------------------------------------ | ---------------------------------------------------------- |
| `sqd_processor_last_block` | Level of the last processed block from Subsquid Archives |
| `sqd_processor_chain_height` | Current chain height as reported by Subsquid Archives |
| `sqd_processor_archive_http_errors_in_row` | Number of consecutive failed requests to Subsquid Archives |

If you need more complex metrics, consider querying [Internal tables](../5.advanced/3.internal-tables.md).
7 changes: 3 additions & 4 deletions docs/7.references/2.config.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ description: "Config file reference"

<dl class="py class">
<dt class="sig sig-object py" id="AdvancedConfig">
<em class="property"><span class="pre">class</span><span class="w"> </span></em><span class="sig-prename descclassname"><span class="pre"></span></span><span class="sig-name descname"><span class="pre">AdvancedConfig</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">reindex=&lt;factory&gt;</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">scheduler=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">postpone_jobs=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">early_realtime=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">skip_version_check=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">rollback_depth=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">decimal_precision=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">unsafe_sqlite=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">metrics=MetricsLevel.basic</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">alt_operation_matcher=False</span></span></em><span class="sig-paren">)</span><a class="headerlink" href="#AdvancedConfig" title="Link to this definition">¶</a></dt>
<em class="property"><span class="pre">class</span><span class="w"> </span></em><span class="sig-prename descclassname"><span class="pre"></span></span><span class="sig-name descname"><span class="pre">AdvancedConfig</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">reindex=&lt;factory&gt;</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">scheduler=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">postpone_jobs=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">early_realtime=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">skip_version_check=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">rollback_depth=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">decimal_precision=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">unsafe_sqlite=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">alt_operation_matcher=False</span></span></em><span class="sig-paren">)</span><a class="headerlink" href="#AdvancedConfig" title="Link to this definition">¶</a></dt>
<dd><p>This section allows users to tune some system-wide options, either experimental or unsuitable for generic configurations.</p>
<dl class="field-list simple">
<dt class="field-odd">Parameters<span class="colon">:</span></dt>
Expand All @@ -48,7 +48,7 @@ description: "Config file reference"
<li><p><strong>rollback_depth</strong> (<em>int</em><em> | </em><em>None</em>) – A number of levels to keep for rollback</p></li>
<li><p><strong>decimal_precision</strong> (<em>int</em><em> | </em><em>None</em>) – Overwrite precision if it’s not guessed correctly based on project models.</p></li>
<li><p><strong>unsafe_sqlite</strong> (<em>bool</em>) – Disable journaling and data integrity checks. Use only for testing.</p></li>
<li><p><strong>metrics</strong> (<em>MetricsLevel</em>) – off/basic/advanced based on how much performance metrics you want to collect</p></li>
<li><p><strong>metrics</strong> – off/basic/advanced based on how much performance metrics you want to collect</p></li>
<li><p><strong>alt_operation_matcher</strong> (<em>bool</em>) – Use different algorithm to match operations (dev only)</p></li>
</ul>
</dd>
Expand Down Expand Up @@ -328,7 +328,7 @@ description: "Config file reference"

<dl class="py class">
<dt class="sig sig-object py" id="SubsquidEventsIndexConfig">
<em class="property"><span class="pre">class</span><span class="w"> </span></em><span class="sig-prename descclassname"><span class="pre"></span></span><span class="sig-name descname"><span class="pre">SubsquidEventsIndexConfig</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">kind</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">datasource</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">handlers=&lt;factory&gt;</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">abi=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">node_only=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">expose_metrics=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">first_level=0</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">last_level=0</span></span></em><span class="sig-paren">)</span><a class="headerlink" href="#SubsquidEventsIndexConfig" title="Link to this definition">¶</a></dt>
<em class="property"><span class="pre">class</span><span class="w"> </span></em><span class="sig-prename descclassname"><span class="pre"></span></span><span class="sig-name descname"><span class="pre">SubsquidEventsIndexConfig</span></span><span class="sig-paren">(</span><em class="sig-param"><span class="n"><span class="pre">kind</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">datasource</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">handlers=&lt;factory&gt;</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">abi=None</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">node_only=False</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">first_level=0</span></span></em>, <em class="sig-param"><span class="n"><span class="pre">last_level=0</span></span></em><span class="sig-paren">)</span><a class="headerlink" href="#SubsquidEventsIndexConfig" title="Link to this definition">¶</a></dt>
<dd><p>Subsquid datasource config</p>
<dl class="field-list simple">
<dt class="field-odd">Parameters<span class="colon">:</span></dt>
Expand All @@ -340,7 +340,6 @@ description: "Config file reference"
<li><p><strong>node_only</strong> (<em>bool</em>) – Don’t use Subsquid Archives API (dev only)</p></li>
<li><p><strong>first_level</strong> (<em>int</em>) – Level to start indexing from</p></li>
<li><p><strong>last_level</strong> (<em>int</em>) – Level to stop indexing and disable this index</p></li>
<li><p><strong>expose_metrics</strong> (<em>bool</em>) – </p></li>
</ul>
</dd>
</dl>
Expand Down
2 changes: 0 additions & 2 deletions src/dipdup/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
from dipdup.models import ReindexingAction
from dipdup.models import ReindexingReason
from dipdup.models import SkipHistory
from dipdup.performance import MetricsLevel
from dipdup.utils import pascal_to_snake
from dipdup.yaml import DipDupYAMLConfig

Expand Down Expand Up @@ -579,7 +578,6 @@ class AdvancedConfig:
rollback_depth: int | None = None
decimal_precision: int | None = None
unsafe_sqlite: bool = False
metrics: MetricsLevel = MetricsLevel.basic
alt_operation_matcher: bool = False

class Config:
Expand Down
1 change: 0 additions & 1 deletion src/dipdup/config/evm_subsquid_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class SubsquidEventsIndexConfig(IndexConfig):
handlers: tuple[SubsquidEventsHandlerConfig, ...] = field(default_factory=tuple)
abi: AbiDatasourceConfig | tuple[AbiDatasourceConfig, ...] | None = None
node_only: bool = False
expose_metrics: bool = False

first_level: int = 0
last_level: int = 0
Expand Down
19 changes: 7 additions & 12 deletions src/dipdup/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import sys
from collections import deque
from contextlib import AsyncExitStack
from contextlib import ExitStack
from contextlib import contextmanager
from contextlib import suppress
from pathlib import Path
Expand Down Expand Up @@ -68,7 +67,6 @@
from dipdup.performance import caches
from dipdup.performance import metrics
from dipdup.performance import queues
from dipdup.prometheus import Metrics
from dipdup.utils import FormattedLogger

if TYPE_CHECKING:
Expand Down Expand Up @@ -613,16 +611,13 @@ async def execute_sql_query(

@contextmanager
def _callback_wrapper(self, module: str) -> Iterator[None]:
with ExitStack() as stack:
try:
if Metrics.enabled:
stack.enter_context(Metrics.measure_callback_duration(module))
yield
# NOTE: Do not wrap known errors like ProjectImportError
except FrameworkException:
raise
except Exception as e:
raise CallbackError(module, e) from e
try:
yield
# NOTE: Do not wrap known errors like ProjectImportError
except FrameworkException:
raise
except Exception as e:
raise CallbackError(module, e) from e

def _get_handler(self, name: str, index: str) -> HandlerConfig:
try:
Expand Down
19 changes: 3 additions & 16 deletions src/dipdup/dipdup.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,7 @@
from dipdup.models.tezos_tzkt import TzktRollbackMessage
from dipdup.models.tezos_tzkt import TzktTokenTransferData
from dipdup.package import DipDupPackage
from dipdup.performance import MetricsLevel
from dipdup.performance import metrics
from dipdup.performance import with_pprofile
from dipdup.prometheus import Metrics
from dipdup.scheduler import SchedulerManager
from dipdup.transactions import TransactionManager
Expand Down Expand Up @@ -386,8 +384,7 @@ async def _on_tzkt_head(self, datasource: TzktDatasource, head: TzktHeadBlockDat
},
),
)
if Metrics.enabled:
Metrics.set_datasource_head_updated(datasource.name)
Metrics.set_datasource_head_updated(datasource.name)
for index in self._indexes.values():
if isinstance(index, TzktHeadIndex) and index.datasource == datasource:
index.push_head(head)
Expand All @@ -404,8 +401,7 @@ async def _on_evm_node_head(self, datasource: EvmNodeDatasource, head: EvmNodeHe
},
),
)
if Metrics.enabled:
Metrics.set_datasource_head_updated(datasource.name)
Metrics.set_datasource_head_updated(datasource.name)

async def _on_evm_node_logs(self, datasource: EvmNodeDatasource, logs: EvmNodeLogData) -> None:
for index in self._indexes.values():
Expand Down Expand Up @@ -465,8 +461,7 @@ async def _on_tzkt_rollback(

channel = f'{datasource.name}:{type_.value}'
_logger.info('Channel `%s` has rolled back: %s -> %s', channel, from_level, to_level)
if Metrics.enabled:
Metrics.set_datasource_rollback(datasource.name)
Metrics.set_datasource_rollback(datasource.name)

# NOTE: Choose action for each index
for index_name, index in self._indexes.items():
Expand Down Expand Up @@ -565,8 +560,6 @@ async def run(self) -> None:
self._ctx.package.verify()

async with AsyncExitStack() as stack:
await self._set_up_metrics(stack)

stack.enter_context(suppress(KeyboardInterrupt, CancelledError))
await self._set_up_database(stack)
await self._set_up_transactions(stack)
Expand Down Expand Up @@ -794,9 +787,3 @@ async def _set_up_scheduler(self, tasks: set[Task[None]]) -> Event:
tasks.add(run_task)

return event

async def _set_up_metrics(self, stack: AsyncExitStack) -> None:
level = self._config.advanced.metrics
metrics.set_level(level)
if level == MetricsLevel.full:
await stack.enter_async_context(with_pprofile(self._config.package))
12 changes: 4 additions & 8 deletions src/dipdup/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,7 @@ async def _retry_request(
retry_count = 0 if env.TEST else self._config.retry_count
retry_count_str = 'inf' if retry_count is sys.maxsize else str(retry_count)

if Metrics.enabled:
Metrics.set_http_errors_in_row(self._url, 0)
Metrics.set_http_errors_in_row(self._url, 0)

while True:
self._logger.debug('HTTP request attempt %s/%s', attempt, retry_count_str)
Expand All @@ -165,8 +164,7 @@ async def _retry_request(

ratelimit_sleep: float | None = None
if isinstance(e, aiohttp.ClientResponseError):
if Metrics.enabled:
Metrics.set_http_error(self._url, e.status)
Metrics.set_http_error(self._url, e.status)

if e.status == HTTPStatus.TOO_MANY_REQUESTS:
ratelimit_sleep = self._config.ratelimit_sleep
Expand All @@ -175,14 +173,12 @@ async def _retry_request(
e.headers = cast(Mapping[str, Any], e.headers)
ratelimit_sleep = max(ratelimit_sleep, int(e.headers['Retry-After']))
else:
if Metrics.enabled:
Metrics.set_http_error(self._url, 0)
Metrics.set_http_error(self._url, 0)

self._logger.warning('HTTP request attempt %s/%s failed: %s', attempt, retry_count_str, e)
self._logger.info('Waiting %s seconds before retry', ratelimit_sleep or retry_sleep)

if Metrics.enabled:
Metrics.set_http_errors_in_row(self._url, attempt)
Metrics.set_http_errors_in_row(self._url, attempt)

await asyncio.sleep(ratelimit_sleep or retry_sleep)
attempt += 1
Expand Down
22 changes: 6 additions & 16 deletions src/dipdup/index.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from abc import ABC
from abc import abstractmethod
from collections import deque
from contextlib import ExitStack
from typing import Any
from typing import Generic
from typing import TypeVar
Expand Down Expand Up @@ -53,8 +52,7 @@ def push_realtime_message(self, message: IndexQueueItemT) -> None:
"""Push message to the queue"""
self._queue.append(message)

if Metrics.enabled:
Metrics.set_levels_to_realtime(self._config.name, len(self._queue))
Metrics.set_levels_to_realtime(self._config.name, len(self._queue))

@abstractmethod
async def _synchronize(self, sync_level: int) -> None:
Expand Down Expand Up @@ -134,9 +132,7 @@ async def process(self) -> bool:

last_level = self._config.last_level
if last_level:
with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_total_sync_duration())
with Metrics.measure_total_sync_duration():
await self._synchronize(last_level)
await self._enter_disabled_state(last_level)
return True
Expand All @@ -148,16 +144,12 @@ async def process(self) -> bool:
self._logger.info('Index is behind the datasource level, syncing: %s -> %s', index_level, sync_level)
self._queue.clear()

with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_total_sync_duration())
with Metrics.measure_total_sync_duration():
await self._synchronize(sync_level)
return True

if self._queue:
with ExitStack() as stack:
if Metrics.enabled:
stack.enter_context(Metrics.measure_total_realtime_duration())
with Metrics.measure_total_realtime_duration():
await self._process_queue()
return True

Expand All @@ -177,14 +169,12 @@ async def _enter_sync_state(self, head_level: int) -> int | None:

async def _exit_sync_state(self, head_level: int) -> None:
self._logger.info('Index is synchronized to level %s', head_level)
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, 0)
Metrics.set_levels_to_sync(self._config.name, 0)
await self._update_state(status=IndexStatus.realtime, level=head_level)

async def _enter_disabled_state(self, last_level: int) -> None:
self._logger.info('Index is synchronized to level %s', last_level)
if Metrics.enabled:
Metrics.set_levels_to_sync(self._config.name, 0)
Metrics.set_levels_to_sync(self._config.name, 0)
await self._update_state(status=IndexStatus.disabled, level=last_level)

async def _update_state(
Expand Down
Loading

0 comments on commit 553176f

Please sign in to comment.