Skip to content

Commit

Permalink
feat(rollup): step function (#86)
Browse files Browse the repository at this point in the history
This PR introduces a first iteration of the pipeline `step`
functionality.

The pipeline will try to advance as long as there are new derived
payloads,
and then it will wait for a new L1 notification before trying again.

- closes #71 (first iteration, anyway)

Stack:

- #77 
  - #85 
    - 👉 #86
  • Loading branch information
merklefruit authored Sep 8, 2024
1 parent 2131bcc commit a23a422
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 27 deletions.
9 changes: 4 additions & 5 deletions crates/rollup/src/driver/cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ impl SyncCursor {
}
}

/// Get the current L2 tip and the corresponding L1 origin block info.
pub fn tip(&self) -> (L2BlockInfo, BlockInfo) {
if let Some((origin_number, l2_tip)) = self.l1_origin_to_l2_blocks.last_key_value() {
let origin_block = self.l1_origin_block_info[origin_number];
(*l2_tip, origin_block)
/// Get the current L2 tip
pub fn tip(&self) -> L2BlockInfo {
if let Some((_, l2_tip)) = self.l1_origin_to_l2_blocks.last_key_value() {
*l2_tip
} else {
unreachable!("cursor must be initialized with one block before advancing")
}
Expand Down
150 changes: 129 additions & 21 deletions crates/rollup/src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,27 @@ use std::{fmt::Debug, sync::Arc};

use eyre::{bail, Result};
use kona_derive::{
errors::StageError,
online::{AlloyChainProvider, AlloyL2ChainProvider, OnlineBlobProviderBuilder},
traits::{BlobProvider, ChainProvider, L2ChainProvider},
};
use kona_primitives::BlockInfo;
use kona_primitives::{BlockInfo, L2BlockInfo};
use kona_providers::{
blob_provider::DurableBlobProvider, InMemoryChainProvider, LayeredBlobProvider, Pipeline,
StepResult,
};
use reth::rpc::types::engine::JwtSecret;
use reth_exex::ExExContext;
use reth_node_api::FullNodeComponents;
use superchain_registry::RollupConfig;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, trace, warn};

use crate::{new_rollup_pipeline, HeraArgsExt, RollupPipeline};
use crate::{
cli::ValidationMode,
new_rollup_pipeline,
validator::{EngineApiValidator, TrustedValidator},
AttributesValidator, HeraArgsExt, RollupPipeline,
};

mod context;
use context::{ChainNotification, DriverContext, StandaloneContext};
Expand All @@ -39,6 +47,8 @@ pub struct Driver<DC, CP, BP, L2CP> {
l2_chain_provider: L2CP,
/// Cursor to keep track of the L2 tip
cursor: SyncCursor,
/// The validator to verify newly derived L2 attributes
validator: Box<dyn AttributesValidator>,
}

impl<N> Driver<ExExContext<N>, InMemoryChainProvider, LayeredBlobProvider, AlloyL2ChainProvider>
Expand All @@ -48,26 +58,27 @@ where
/// Create a new Hera Execution Extension Driver
pub fn exex(ctx: ExExContext<N>, args: HeraArgsExt, cfg: Arc<RollupConfig>) -> Self {
let cp = InMemoryChainProvider::with_capacity(args.in_mem_chain_provider_capacity);
let bp = LayeredBlobProvider::new(args.l1_beacon_client_url, args.l1_blob_archiver_url);
let l2_cp = AlloyL2ChainProvider::new_http(args.l2_rpc_url, cfg.clone());
let cursor = SyncCursor::new(cfg.channel_timeout);
let l2_cp = AlloyL2ChainProvider::new_http(args.l2_rpc_url.clone(), cfg.clone());
let bp = LayeredBlobProvider::new(
args.l1_beacon_client_url.clone(),
args.l1_blob_archiver_url.clone(),
);

Self { cfg, ctx, chain_provider: cp, blob_provider: bp, l2_chain_provider: l2_cp, cursor }
Self::with_components(ctx, args, cfg, cp, bp, l2_cp)
}
}

impl Driver<StandaloneContext, AlloyChainProvider, DurableBlobProvider, AlloyL2ChainProvider> {
/// Create a new standalone Hera Driver
/// Create a new Standalone Hera Driver
pub fn standalone(ctx: StandaloneContext, args: HeraArgsExt, cfg: Arc<RollupConfig>) -> Self {
let cp = AlloyChainProvider::new_http(args.l1_rpc_url);
let cp = AlloyChainProvider::new_http(args.l1_rpc_url.clone());
let l2_cp = AlloyL2ChainProvider::new_http(args.l2_rpc_url.clone(), cfg.clone());
let bp = OnlineBlobProviderBuilder::new()
.with_primary(args.l1_beacon_client_url.to_string())
.with_fallback(args.l1_blob_archiver_url.map(|url| url.to_string()))
.with_fallback(args.l1_blob_archiver_url.clone().map(|url| url.to_string()))
.build();
let l2_cp = AlloyL2ChainProvider::new_http(args.l2_rpc_url, cfg.clone());
let cursor = SyncCursor::new(cfg.channel_timeout);

Self { cfg, ctx, chain_provider: cp, blob_provider: bp, l2_chain_provider: l2_cp, cursor }
Self::with_components(ctx, args, cfg, cp, bp, l2_cp)
}
}

Expand All @@ -78,6 +89,32 @@ where
BP: BlobProvider + Clone + Send + Sync + Debug + 'static,
L2CP: L2ChainProvider + Clone + Send + Sync + Debug + 'static,
{
/// Create a new Hera Driver with the provided components.
fn with_components(
ctx: DC,
args: HeraArgsExt,
cfg: Arc<RollupConfig>,
chain_provider: CP,
blob_provider: BP,
l2_chain_provider: L2CP,
) -> Self {
let cursor = SyncCursor::new(cfg.channel_timeout);
let validator: Box<dyn AttributesValidator> = match args.validation_mode {
ValidationMode::Trusted => {
Box::new(TrustedValidator::new_http(args.l2_rpc_url, cfg.canyon_time.unwrap_or(0)))
}
ValidationMode::EngineApi => Box::new(EngineApiValidator::new_http(
args.l2_engine_api_url.expect("Missing L2 engine API URL"),
match args.l2_engine_jwt_secret.as_ref() {
Some(fpath) => JwtSecret::from_file(fpath).expect("Invalid L2 JWT secret file"),
None => panic!("Missing L2 engine JWT secret"),
},
)),
};

Self { cfg, ctx, chain_provider, blob_provider, l2_chain_provider, cursor, validator }
}

/// Wait for the L2 genesis L1 block (aka "origin block") to be available in the L1 chain.
async fn wait_for_l2_genesis_l1_block(&mut self) -> Result<()> {
loop {
Expand Down Expand Up @@ -118,12 +155,80 @@ where
}

/// Advance the pipeline to the next L2 block.
async fn step(&mut self, pipeline: &mut RollupPipeline<CP, BP, L2CP>) -> Result<()> {
let (l2_tip, l1_origin) = self.cursor.tip();
let _ = pipeline.step(l2_tip).await;
self.cursor.advance(l1_origin, l2_tip);
///
/// Returns `true` if the pipeline can move forward again, `false` otherwise.
async fn step(&mut self, pipeline: &mut RollupPipeline<CP, BP, L2CP>) -> bool {
let l2_tip = self.cursor.tip();

match pipeline.step(l2_tip).await {
StepResult::PreparedAttributes => trace!("Perpared new attributes"),
StepResult::AdvancedOrigin => trace!("Advanced origin"),
StepResult::OriginAdvanceErr(err) => warn!("Could not advance origin: {:?}", err),
StepResult::StepFailed(err) => match err {
StageError::NotEnoughData => debug!("Not enough data to advance pipeline"),
_ => error!("Error stepping derivation pipeline: {:?}", err),
},
}

let derived_attributes = if let Some(attributes) = pipeline.peek() {
match self.validator.validate(attributes).await {
Ok(true) => {
trace!("Validated payload attributes");
pipeline.next().expect("Peeked attributes must be available")
}
Ok(false) => {
error!("Failed payload attributes validation");
// TODO: allow users to specify how they want to treat invalid payloads.
// In the default scenario we just log an error and continue.
return false;
}
Err(err) => {
error!("Error while validating payload attributes: {:?}", err);
return false;
}
}
} else {
debug!("No attributes available to validate");
return false;
};

unimplemented!()
let derived = derived_attributes.parent.block_info.number + 1;
let (new_l1_origin, new_l2_tip) = match self.fetch_new_tip(derived).await {
Ok(tip_info) => tip_info,
Err(err) => {
// TODO: add a retry mechanism?
error!("Failed to fetch new tip: {:?}", err);
return false;
}
};

// Perform a sanity check on the new tip
if new_l2_tip.block_info.number != derived {
error!("Expected L2 block number {} but got {}", derived, new_l2_tip.block_info.number);
return false;
}

// Advance the cursor to the new L2 block
self.cursor.advance(new_l1_origin, new_l2_tip);
info!("Advanced derivation pipeline to L2 block: {}", derived);
true
}

/// Fetch the new L2 tip and L1 origin block info for the given L2 block number.
async fn fetch_new_tip(&mut self, l2_tip: u64) -> Result<(BlockInfo, L2BlockInfo)> {
let l2_block = self
.l2_chain_provider
.l2_block_info_by_number(l2_tip)
.await
.map_err(|e| eyre::eyre!(e))?;

let l1_origin = self
.chain_provider
.block_info_by_number(l2_block.l1_origin.number)
.await
.map_err(|e| eyre::eyre!(e))?;

Ok((l1_origin, l2_block))
}

/// Handle a chain notification from the driver context.
Expand Down Expand Up @@ -176,11 +281,14 @@ where
// Step 2: Initialize the rollup pipeline
let mut pipeline = self.init_pipeline();

// Step 3: Start processing events
// Step 3: Start the processing loop
loop {
// TODO: handle pipeline step (stubbed)
let _ = self.step(&mut pipeline).await;
// Try to advance the pipeline until there's no more data to process
if self.step(&mut pipeline).await {
continue;
}

// Handle any incoming notifications from the context
if let Some(notification) = self.ctx.recv_notification().await {
self.handle_notification(notification, &mut pipeline).await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/rollup/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use url::Url;
///
/// A trait that defines the interface for validating newly derived L2 attributes.
#[async_trait]
pub trait AttributesValidator: Debug {
pub trait AttributesValidator: Debug + Send {
/// Validates the given [`L2AttributesWithParent`] and returns true
/// if the attributes are valid, false otherwise.
async fn validate(&self, attributes: &L2AttributesWithParent) -> Result<bool>;
Expand Down

0 comments on commit a23a422

Please sign in to comment.