Skip to content

Commit

Permalink
first draft of getBlobs
Browse files Browse the repository at this point in the history
  • Loading branch information
agnxsh committed Feb 10, 2025
1 parent 6e7c2f3 commit a46fef7
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 9 deletions.
65 changes: 65 additions & 0 deletions beacon_chain/el/el_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type
GetPayloadV3Response |
GetPayloadV4Response


contract(DepositContract):
proc deposit(pubkey: PubKeyBytes,
withdrawalCredentials: WithdrawalCredentialsBytes,
Expand Down Expand Up @@ -108,6 +109,8 @@ const
# https://github.com/ethereum/execution-apis/blob/v1.0.0-beta.3/src/engine/shanghai.md#request-2
GETPAYLOAD_TIMEOUT = 1.seconds

# https://github.com/ethereum/execution-apis/blob/ad9102b11212d51b736a0413c8655a8da93e55fc/src/engine/cancun.md#request-3
GETBLOBS_TIMEOUT = 1.seconds
connectionStateChangeHysteresisThreshold = 15
## How many unsuccesful/successful requests we must see
## before declaring the connection as degraded/restored
Expand Down Expand Up @@ -862,6 +865,13 @@ proc sendNewPayloadToSingleEL(
payload, versioned_hashes, Hash32 parent_beacon_block_root,
executionRequests)

proc sendGetBlobsToSingleEL(
connection: ELConnection,
versioned_hashes: seq[engine_api.VersionedHash]
): Future[GetBlobsV1Response] {.async: (raises: [CatchableError]).} =
let rpcClient = await connection.connectedRpcClient()
await rpcClient.engine_getBlobsV1(versioned_hashes)

type
StatusRelation = enum
newStatusIsPreferable
Expand Down Expand Up @@ -990,6 +1000,61 @@ proc lazyWait(futures: seq[FutureBase]) {.async: (raises: []).} =
if len(pending) > 0:
await noCancel allFutures(pending)

proc sendGetBlobs*(
m: ELManager,
blck: electra.SignedBeaconBlock | fulu.SignedBeaconBlock
): Future[Opt[seq[BlobAndProofV1]]] {.async: (raises: [CancelledError]).} =
if m.elConnections.len == 0:
return err()
let
timeout = GETBLOBS_TIMEOUT
deadline = sleepAsync(timeout)

var bestResponse = Opt.none(int)

while true:
let
requests = m.elConnections.mapIt(
sendGetBlobsToSingleEL(it, mapIt(
blck.message.body.blob_kzg_commitments,
engine_api.VersionedHash(kzg_commitment_to_versioned_hash(it)))))
timeoutExceeded =
try:
await allFutures(requests).wait(deadline)
false
except AsyncTimeoutError:
true
except CancelledError as exc:
let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)
raise exc

for idx, req in requests:
if not(req.finished()):
warn "Timeout while getting blob and proof",
url = m.elConnections[idx].engineUrl.url,
reason = req.error.msg
else:
if bestResponse.isNone:
bestResponse = Opt.some(idx)

let pending =
requests.filterIt(not(it.finished())).mapIt(it.cancelAndWait())
await noCancel allFutures(pending)

if bestResponse.isSome():
return ok(requests[bestResponse.get()].value().blobsAndProofs)

else:
# should not reach this case
discard

if timeoutExceeded:
break

err()

proc sendNewPayload*(
m: ELManager,
blck: SomeForkyBeaconBlock,
Expand Down
63 changes: 55 additions & 8 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
{.push raises: [].}

import
std/[os, random, terminal, times, exitprocs],
std/[os, random, terminal, times, exitprocs, sequtils],
chronos, chronicles,
metrics, metrics/chronos_httpserver,
ssz_serialization/types,
stew/[byteutils, io2],
eth/p2p/discoveryv5/[enr, random2],
./consensus_object_pools/[
Expand Down Expand Up @@ -459,21 +460,67 @@ proc initFullNode(
maybeFinalized: bool):
Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} =
withBlck(signedBlock):
when consensusFork >= ConsensusFork.Deneb:
when consensusFork >= ConsensusFork.Electra:
# Pull blobs and proofs from the EL blob pool
let blobsFromElOpt = await node.elManager.sendGetBlobs(forkyBlck)
if blobsFromElOpt.isSome():
let blobsEl = blobsFromElOpt.get()
# check lengths of array[BlobAndProofV1] with blob
# kzg commitments of the signed block
if blobsEl.len == forkyBlck.message.body.blob_kzg_commitments.len:
# create blob sidecars from EL instead
var
kzgBlbs: deneb.Blobs
kzgPrfs: deneb.KzgProofs

for idx in 0..<blobsEl.len:
kzgBlbs[idx] = blobsEl[idx].blob.data
kzgPrfs[idx].bytes = blobsEl[idx].proof.data
let blob_sidecars_el =
create_blob_sidecars(forkyBlck, kzgPrfs, kzgBlbs)

# populate blob quarantine to tackle blob loop
for blb_el in blob_sidecars_el:
blobQuarantine[].put(newClone blb_el)

# now pop blobQuarantine and make block available for attestation
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)

# in case EL does not support `engine_getBlobsV1`
else:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
return err(VerifierError.UnviableFork)
else:
return err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)

elif consensusFork >= ConsensusFork.Deneb and
consensusFork < ConsensusFork.Electra:
if not blobQuarantine[].hasBlobs(forkyBlck):
# We don't have all the blobs for this block, so we have
# to put it in blobless quarantine.
if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck):
err(VerifierError.UnviableFork)
return err(VerifierError.UnviableFork)
else:
err(VerifierError.MissingParent)
return err(VerifierError.MissingParent)
else:
let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck)
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.some(blobs),
maybeFinalized = maybeFinalized)

else:
await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock,
Opt.none(BlobSidecars),
maybeFinalized = maybeFinalized)
rmanBlockLoader = proc(
Expand Down
2 changes: 1 addition & 1 deletion vendor/nim-web3

0 comments on commit a46fef7

Please sign in to comment.