Skip to content

Commit

Permalink
feat: support bor node eth_getTransactionReceiptsByBlock
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanViast committed Nov 13, 2024
1 parent 02b5da2 commit c8ab4a0
Show file tree
Hide file tree
Showing 7 changed files with 9,136 additions and 1 deletion.
82 changes: 82 additions & 0 deletions ethereumetl/jobs/export_bor_block_receipts_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import json

from blockchainetl.jobs.base_job import BaseJob
from ethereumetl.executors.batch_work_executor import BatchWorkExecutor
from ethereumetl.json_rpc_requests import generate_get_transaction_receipt_by_block_json_rpc
from ethereumetl.mappers.receipt_log_mapper import EthReceiptLogMapper
from ethereumetl.mappers.receipt_mapper import EthReceiptMapper
from ethereumetl.utils import rpc_response_batch_to_results


# Exports receipts and logs
class ExportBorBlockReceiptsJob(BaseJob):
def __init__(
self,
blocks_iterable,
batch_size,
batch_web3_provider,
max_workers,
item_exporter,
export_receipts=True,
export_logs=True):
self.batch_web3_provider = batch_web3_provider
self.blocks_iterable = blocks_iterable

self.batch_work_executor = BatchWorkExecutor(batch_size, max_workers, progress_name=self.__class__.__name__)
self.item_exporter = item_exporter

self.export_receipts = export_receipts
self.export_logs = export_logs
if not self.export_receipts and not self.export_logs:
raise ValueError('At least one of export_receipts or export_logs must be True')

self.receipt_mapper = EthReceiptMapper()
self.receipt_log_mapper = EthReceiptLogMapper()

def _start(self):
self.item_exporter.open()

def _export(self):
self.batch_work_executor.execute(self.blocks_iterable, self._export_receipts)

def _export_receipts(self, blocks_iterable):
receipts_rpc = list(generate_get_transaction_receipt_by_block_json_rpc(blocks_iterable))
response = self.batch_web3_provider.make_batch_request(json.dumps(receipts_rpc))
results = rpc_response_batch_to_results(response)
receipts = [self.receipt_mapper.json_dict_to_receipt(result) for receipts in results for result in receipts]
for receipt in receipts:
self._export_receipt(receipt)

def _export_receipt(self, receipt):
if self.export_receipts:
self.item_exporter.export_item(self.receipt_mapper.receipt_to_dict(receipt))
if self.export_logs:
for log in receipt.logs:
self.item_exporter.export_item(self.receipt_log_mapper.receipt_log_to_dict(log))

def _end(self):
self.batch_work_executor.shutdown()
self.item_exporter.close()
9 changes: 9 additions & 0 deletions ethereumetl/json_rpc_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ def generate_get_block_receipt_json_rpc(blocks):
)


def generate_get_transaction_receipt_by_block_json_rpc(blocks):
for idx, block_number in enumerate(blocks):
yield generate_json_rpc(
method='eth_getTransactionReceiptsByBlock',
params=[hex(block_number)],
request_id=idx
)


def generate_get_code_json_rpc(contract_addresses, block='latest'):
for idx, contract_address in enumerate(contract_addresses):
yield generate_json_rpc(
Expand Down
21 changes: 20 additions & 1 deletion ethereumetl/streaming/eth_streamer_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from ethereumetl.enumeration.entity_type import EntityType
from ethereumetl.jobs.export_block_receipts_job import ExportBlockReceiptsJob
from ethereumetl.jobs.export_blocks_job import ExportBlocksJob
from ethereumetl.jobs.export_bor_block_receipts_job import ExportBorBlockReceiptsJob
from ethereumetl.jobs.export_geth_traces_job import ExportGethTracesJob
from ethereumetl.jobs.export_receipts_job import ExportReceiptsJob
from ethereumetl.jobs.export_traces_job import ExportTracesJob
Expand Down Expand Up @@ -95,7 +96,9 @@ def export_all(self, start_block, end_block):
# Export receipts and logs
receipts, logs = [], []
if self._should_export(EntityType.RECEIPT) or self._should_export(EntityType.LOG):
if not self.verify_clients("geth", "erigon"):
if self.verify_clients("bor"):
receipts, logs = self._export_bor_receipts_and_logs_by_block(blocks)
elif not self.verify_clients("geth", "erigon"):
receipts, logs = self._export_receipts_and_logs(transactions)
else:
receipts, logs = self._export_receipts_and_logs_by_block(blocks)
Expand Down Expand Up @@ -214,6 +217,22 @@ def _export_receipts_and_logs_by_block(self, blocks):
logs = exporter.get_items("log")
return receipts, logs

def _export_bor_receipts_and_logs_by_block(self, blocks):
exporter = InMemoryItemExporter(item_types=["receipt", "log"])
job = ExportBorBlockReceiptsJob(
blocks_iterable=(block["number"] for block in blocks),
batch_size=self.batch_size,
batch_web3_provider=self.batch_web3_provider,
max_workers=self.max_workers,
item_exporter=exporter,
export_receipts=self._should_export(EntityType.RECEIPT),
export_logs=self._should_export(EntityType.LOG),
)
job.run()
receipts = exporter.get_items("receipt")
logs = exporter.get_items("log")
return receipts, logs

def _export_receipts_and_logs(self, transactions):
exporter = InMemoryItemExporter(item_types=["receipt", "log"])
job = ExportReceiptsJob(
Expand Down
72 changes: 72 additions & 0 deletions tests/ethereumetl/job/test_export_bor_block_receipts_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# MIT License
#
# Copyright (c) 2018 Evgeny Medvedev, [email protected]
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.


import pytest

import tests.resources
from ethereumetl.jobs.export_bor_block_receipts_job import ExportBorBlockReceiptsJob
from ethereumetl.jobs.exporters.receipts_and_logs_item_exporter import receipts_and_logs_item_exporter
from ethereumetl.thread_local_proxy import ThreadLocalProxy
from tests.ethereumetl.job.helpers import get_web3_provider
from tests.helpers import compare_lines_ignore_order, read_file

RESOURCE_GROUP = 'test_export_bor_block_receipts_job'


def read_resource(resource_group, file_name):
return tests.resources.read_resource([RESOURCE_GROUP, resource_group], file_name)


TARGET_BLOCK_NUMBER = 64225931


@pytest.mark.parametrize("batch_size,block_number,output_format,resource_group,web3_provider_type", [
(1, TARGET_BLOCK_NUMBER, 'json', 'bor_block_receipts_job', 'mock')
# skip_if_slow_tests_disabled((1, DEFAULT_TX_HASHES, 'csv', 'receipts_with_logs', 'infura'))
])
def test_export_bor_block_receipts_job(tmpdir, batch_size, block_number, output_format, resource_group, web3_provider_type):
receipts_output_file = str(tmpdir.join('actual_receipts.' + output_format))
logs_output_file = str(tmpdir.join('actual_logs.' + output_format))
print("receipts_output_file", receipts_output_file)
print("logs_output_file", logs_output_file)

job = ExportBorBlockReceiptsJob(
blocks_iterable=(block_number,),
batch_size=batch_size,
batch_web3_provider=ThreadLocalProxy(
lambda: get_web3_provider(web3_provider_type, lambda file: read_resource(resource_group, file), batch=True)
),
max_workers=5,
item_exporter=receipts_and_logs_item_exporter(receipts_output_file, logs_output_file),
export_receipts=receipts_output_file is not None,
export_logs=logs_output_file is not None
)
job.run()

compare_lines_ignore_order(
read_resource(resource_group, 'expected_receipts.' + output_format), read_file(receipts_output_file)
)

compare_lines_ignore_order(
read_resource(resource_group, 'expected_logs.' + output_format), read_file(logs_output_file)
)
Loading

0 comments on commit c8ab4a0

Please sign in to comment.