From a46fef7d3bdcb81962c707307ce4c9da1bf0ccfa Mon Sep 17 00:00:00 2001 From: Agnish Ghosh Date: Mon, 10 Feb 2025 21:32:58 +0530 Subject: [PATCH] first draft of getBlobs --- beacon_chain/el/el_manager.nim | 65 +++++++++++++++++++++++++++++ beacon_chain/nimbus_beacon_node.nim | 63 ++++++++++++++++++++++++---- vendor/nim-web3 | 2 +- 3 files changed, 121 insertions(+), 9 deletions(-) diff --git a/beacon_chain/el/el_manager.nim b/beacon_chain/el/el_manager.nim index c5c2e3c93c..91c5901671 100644 --- a/beacon_chain/el/el_manager.nim +++ b/beacon_chain/el/el_manager.nim @@ -57,6 +57,7 @@ type GetPayloadV3Response | GetPayloadV4Response + contract(DepositContract): proc deposit(pubkey: PubKeyBytes, withdrawalCredentials: WithdrawalCredentialsBytes, @@ -108,6 +109,8 @@ const # https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2 GETPAYLOAD_TIMEOUT = 1.seconds + # https://github.com/ethereum/execution-apis/blob/ad9102b11212d51b736a0413c8655a8da93e55fc/src/engine/cancun.md#request-3 + GETBLOBS_TIMEOUT = 1.seconds connectionStateChangeHysteresisThreshold = 15 ## How many unsuccesful/successful requests we must see ## before declaring the connection as degraded/restored @@ -862,6 +865,13 @@ proc sendNewPayloadToSingleEL( payload, versioned_hashes, Hash32 parent_beacon_block_root, executionRequests) +proc sendGetBlobsToSingleEL( + connection: ELConnection, + versioned_hashes: seq[engine_api.VersionedHash] +): Future[GetBlobsV1Response] {.async: (raises: [CatchableError]).} = + let rpcClient = await connection.connectedRpcClient() + await rpcClient.engine_getBlobsV1(versioned_hashes) + type StatusRelation = enum newStatusIsPreferable @@ -990,6 +1000,61 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} = if len(pending) > 0: await noCancel allFutures(pending) +proc sendGetBlobs*( + m: ELManager, + blck: electra.SignedBeaconBlock | fulu.SignedBeaconBlock +): Future[Opt[seq[BlobAndProofV1]]] {.async: (raises: [CancelledError]).} = + if m.elConnections.len == 0: + return err() + let + timeout = GETBLOBS_TIMEOUT + deadline = sleepAsync(timeout) + + var bestResponse = Opt.none(int) + + while true: + let + requests = m.elConnections.mapIt( + sendGetBlobsToSingleEL(it, mapIt( + blck.message.body.blob_kzg_commitments, + engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it))))) + timeoutExceeded = + try: + await allFutures(requests).wait(deadline) + false + except AsyncTimeoutError: + true + except CancelledError as exc: + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + raise exc + + for idx, req in requests: + if not(req.finished()): + warn "Timeout while getting blob and proof", + url = m.elConnections[idx].engineUrl.url, + reason = req.error.msg + else: + if bestResponse.isNone: + bestResponse = Opt.some(idx) + + let pending = + requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait()) + await noCancel allFutures(pending) + + if bestResponse.isSome(): + return ok(requests[bestResponse.get()].value().blobsAndProofs) + + else: + # should not reach this case + discard + + if timeoutExceeded: + break + + err() + proc sendNewPayload*( m: ELManager, blck: SomeForkyBeaconBlock, diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index d5c274c6a0..843d67e984 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -8,9 +8,10 @@ {.push raises: [].} import - std/[os, random, terminal, times, exitprocs], + std/[os, random, terminal, times, exitprocs, sequtils], chronos, chronicles, metrics, metrics/chronos_httpserver, + ssz_serialization/types, stew/[byteutils, io2], eth/p2p/discoveryv5/[enr, random2], ./consensus_object_pools/[ @@ -459,21 +460,67 @@ proc initFullNode( maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when consensusFork >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Electra: + # Pull blobs and proofs from the EL blob pool + let blobsFromElOpt = await node.elManager.sendGetBlobs(forkyBlck) + if blobsFromElOpt.isSome(): + let blobsEl = blobsFromElOpt.get() + # check lengths of array[BlobAndProofV1] with blob + # kzg commitments of the signed block + if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len: + # create blob sidecars from EL instead + var + kzgBlbs: deneb.Blobs + kzgPrfs: deneb.KzgProofs + + for idx in 0..= ConsensusFork.Deneb and + consensusFork < ConsensusFork.Electra: if not blobQuarantine[].hasBlobs(forkyBlck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) + return err(VerifierError.UnviableFork) else: - err(VerifierError.MissingParent) + return err(VerifierError.MissingParent) else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), - maybeFinalized = maybeFinalized) + return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.some(blobs), + maybeFinalized = maybeFinalized) + else: - await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, Opt.none(BlobSidecars), maybeFinalized = maybeFinalized) rmanBlockLoader = proc( diff --git a/vendor/nim-web3 b/vendor/nim-web3 index a3bc5ad48e..f3c86a6674 160000 --- a/vendor/nim-web3 +++ b/vendor/nim-web3 @@ -1 +1 @@ -Subproject commit a3bc5ad48e2b05fa253ba68bbd5b84e4ea234f50 +Subproject commit f3c86a6674070d1f68d7fec6e0572aba94bae096