Skip to content

Commit

Permalink
Merge pull request #23 from chainbase-labs/fix/fetch-logs
Browse files Browse the repository at this point in the history
Fix/fetch logs
  • Loading branch information
zzir authored Aug 6, 2024
2 parents 29e084f + 789ee79 commit 68f2f67
Show file tree
Hide file tree
Showing 15 changed files with 42 additions and 16 deletions.
13 changes: 11 additions & 2 deletions ethereumetl/executors/batch_work_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
# SOFTWARE.

import logging
import os
import time

from requests.exceptions import Timeout as RequestsTimeout, HTTPError, TooManyRedirects
from web3._utils.threads import Timeout as Web3Timeout

from ethereumetl.executors.bounded_executor import BoundedExecutor
from ethereumetl.executors.fail_safe_executor import FailSafeExecutor
from ethereumetl.executors.mode import BatchMode
from ethereumetl.misc.retriable_value_error import RetriableValueError
from ethereumetl.progress_logger import ProgressLogger
from ethereumetl.utils import dynamic_batch_iterator
Expand All @@ -37,10 +39,11 @@

BATCH_CHANGE_COOLDOWN_PERIOD_SECONDS = 2 * 60

mode = os.getenv("BATCH_MODE", "Batch")

# Executes the given work in batches, reducing the batch size exponentially in case of errors.
class BatchWorkExecutor:
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5):
def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCEPTIONS, max_retries=5, progress_name="work"):
self.batch_size = starting_batch_size
self.max_batch_size = starting_batch_size
self.latest_batch_size_change_time = None
Expand All @@ -50,11 +53,17 @@ def __init__(self, starting_batch_size, max_workers, retry_exceptions=RETRY_EXCE
self.executor = FailSafeExecutor(BoundedExecutor(1, self.max_workers))
self.retry_exceptions = retry_exceptions
self.max_retries = max_retries
self.progress_logger = ProgressLogger()
self.progress_logger = ProgressLogger(progress_name)
self.logger = logging.getLogger('BatchWorkExecutor')

def execute(self, work_iterable, work_handler, total_items=None):
self.progress_logger.start(total_items=total_items)

if BatchMode[mode.capitalize()] == BatchMode.Concurrency:
for item in work_iterable:
self.executor.submit(self._fail_safe_execute, work_handler, [item])
return

for batch in dynamic_batch_iterator(work_iterable, lambda: self.batch_size):
self.executor.submit(self._fail_safe_execute, work_handler, batch)

Expand Down
7 changes: 7 additions & 0 deletions ethereumetl/executors/mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum, auto


class BatchMode(Enum):
Concurrency = auto()
Batch = auto()

2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_block_receipts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
self.batch_web3_provider = batch_web3_provider
self.blocks_iterable = blocks_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.export_receipts = export_receipts
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_blocks_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(

self.batch_web3_provider = batch_web3_provider

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter
self.chain = chain

Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_contracts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(
self.batch_web3_provider = batch_web3_provider
self.contract_addresses_iterable = contract_addresses_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.contract_service = EthContractService()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_geth_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(

self.batch_web3_provider = batch_web3_provider

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.geth_trace_mapper = EthGethTraceMapper()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_origin_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(
self.marketplace_listing_exporter = marketplace_listing_exporter
self.shop_product_exporter = shop_product_exporter

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)

self.event_extractor = OriginEventExtractor(ipfs_client)

Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_receipts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
self.batch_web3_provider = batch_web3_provider
self.transaction_hashes_iterable = transaction_hashes_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter
self.chain = chain

Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_token_transfers_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def __init__(
self.tokens = tokens
self.item_exporter = item_exporter

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)

self.receipt_log_mapper = EthReceiptLogMapper()
self.token_transfer_mapper = EthTokenTransferMapper()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_tokens_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class ExportTokensJob(BaseJob):
def __init__(self, web3, item_exporter, token_addresses_iterable, max_workers):
self.item_exporter = item_exporter
self.token_addresses_iterable = token_addresses_iterable
self.batch_work_executor = BatchWorkExecutor(1, max_workers)
self.batch_work_executor = BatchWorkExecutor(1, max_workers, progress_name=self.__class__.__name__)

self.token_service = EthTokenService(web3, clean_user_provided_content)
self.token_mapper = EthTokenMapper()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/export_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(
self.web3 = web3

# TODO: use batch_size when this issue is fixed https://github.com/paritytech/parity-ethereum/issues/9822
self.batch_work_executor = BatchWorkExecutor(1, max_workers)
self.batch_work_executor = BatchWorkExecutor(1, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.trace_mapper = EthTraceMapper()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/extract_contracts_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
item_exporter):
self.traces_iterable = traces_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.contract_service = EthContractService()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/extract_geth_traces_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
item_exporter):
self.traces_iterable = traces_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.trace_mapper = EthTraceMapper()
Expand Down
2 changes: 1 addition & 1 deletion ethereumetl/jobs/extract_token_transfers_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def __init__(
item_exporter):
self.logs_iterable = logs_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers)
self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.receipt_log_mapper = EthReceiptLogMapper()
Expand Down
14 changes: 12 additions & 2 deletions ethereumetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,24 @@ def __init__(
self.item_timestamp_calculator = EthItemTimestampCalculator()
self.reorg_service = reorg_service

def get_client_version(self):
response = self.batch_web3_provider.make_request('web3_clientVersion', [])
return response["result"]

def open(self):
self.client_version = self.get_client_version()
self.item_exporter.open()

def get_current_block_number(self):
w3 = build_web3(self.batch_web3_provider)
return int(w3.eth.getBlock("latest").number)

def verify_clients(self, *client: str):
for item in client:
if self.client_version.lower().startswith(item):
return True
return False

def export_all(self, start_block, end_block):
# Export blocks and transactions
blocks, transactions = [], []
Expand All @@ -81,7 +92,7 @@ def export_all(self, start_block, end_block):
# Export receipts and logs
receipts, logs = [], []
if self._should_export(EntityType.RECEIPT) or self._should_export(EntityType.LOG):
if self.node_client != "erigon":
if not self.verify_clients("geth", "erigon"):
receipts, logs = self._export_receipts_and_logs(transactions)
else:
receipts, logs = self._export_receipts_and_logs_by_block(blocks)
Expand Down Expand Up @@ -301,7 +312,6 @@ def _should_export(self, entity_type):

def calculate_item_info(self, items):
for item in items:

item.update(
{
"item_id": self.item_id_calculator.calculate(item),
Expand Down

0 comments on commit 68f2f67

Please sign in to comment.