From 89e792361a451fbd35f148e8a2c76164adc63e80 Mon Sep 17 00:00:00 2001 From: gaoxin Date: Tue, 22 Oct 2024 21:14:33 +0800 Subject: [PATCH] Fix: check the read set value to reuse previous result --- src/hint.rs | 37 +++++---- src/lib.rs | 5 +- src/partition.rs | 65 +++------------ src/scheduler.rs | 14 ++-- src/storage.rs | 191 +++++++++++++++++++++++++++++++++++-------- src/tx_dependency.rs | 2 +- tests/mainnet.rs | 17 +++- 7 files changed, 214 insertions(+), 117 deletions(-) diff --git a/src/hint.rs b/src/hint.rs index 9ca6bc9..cd741a7 100644 --- a/src/hint.rs +++ b/src/hint.rs @@ -60,13 +60,13 @@ impl TxState { fn insert_location(&mut self, location: LocationAndType, rw_type: RWType) { match rw_type { RWType::ReadOnly => { - self.read_set.insert(location); + self.read_set.insert(location, None); } RWType::WriteOnly => { self.write_set.insert(location); } RWType::ReadWrite => { - self.read_set.insert(location.clone()); + self.read_set.insert(location.clone(), None); self.write_set.insert(location); } } @@ -111,13 +111,18 @@ impl ParallelExecutionHints { .insert_location(LocationAndType::Basic(to_address), RWType::ReadOnly); rw_set.insert_location(LocationAndType::Code(to_address), RWType::ReadOnly); // Update hints with contract data based on the transaction details - ParallelExecutionHints::update_hints_with_contract_data( + if !ParallelExecutionHints::update_hints_with_contract_data( tx_env.caller, to_address, None, &tx_env.data, rw_set, - ); + ) { + rw_set.insert_location( + LocationAndType::Basic(to_address), + RWType::WriteOnly, + ); + } } else if to_address != tx_env.caller { rw_set .insert_location(LocationAndType::Basic(to_address), RWType::ReadWrite); @@ -184,20 +189,21 @@ impl ParallelExecutionHints { code: Option, data: &Bytes, tx_rw_set: &mut TxState, - ) { + ) -> bool { if code.is_none() && data.is_empty() { - panic!("Unreachable error") + return false; } if data.len() < 4 || (data.len() - 4) % 32 != 0 { - tx_rw_set.insert_location(LocationAndType::Basic(contract_address), RWType::WriteOnly); // Invalid tx, or tx that triggers fallback CALL - return; + return false; } let (func_id, parameters) = Self::decode_contract_parameters(data); match Self::get_contract_type(contract_address) { ContractType::ERC20 => match ERC20Function::from(func_id) { ERC20Function::Transfer => { - assert_eq!(parameters.len(), 2); + if parameters.len() != 2 { + return false; + } let to_address: [u8; 20] = parameters[0].as_slice()[12..].try_into().expect("try into failed"); let to_slot = Self::slot_from_address(0, vec![Address::new(to_address)]); @@ -214,7 +220,9 @@ impl ParallelExecutionHints { } } ERC20Function::TransferFrom => { - assert_eq!(parameters.len(), 3); + if parameters.len() != 3 { + return false; + } let from_address: [u8; 20] = parameters[0].as_slice()[12..].try_into().expect("try into failed"); let from_address = Address::new(from_address); @@ -239,17 +247,14 @@ impl ParallelExecutionHints { ); } _ => { - tx_rw_set.insert_location( - LocationAndType::Basic(contract_address), - RWType::WriteOnly, - ); + return false; } }, ContractType::UNKNOWN => { - tx_rw_set - .insert_location(LocationAndType::Basic(contract_address), RWType::WriteOnly); + return false; } } + true } fn get_contract_type(_contract_address: Address) -> ContractType { diff --git a/src/lib.rs b/src/lib.rs index c039146..664ddbd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -39,6 +39,7 @@ lazy_static! { .unwrap(); } +use crate::storage::LazyUpdateValue; pub use scheduler::*; /// The maximum number of rounds for transaction execution. @@ -119,7 +120,7 @@ impl Display for GrevmError { /// This struct encapsulates the outcome of executing a transaction, including the execution /// result, state transitions, and any rewards to the miner. #[derive(Debug, Clone, Default)] -pub struct ResultAndTransition { +pub(crate) struct ResultAndTransition { /// Status of execution. pub result: Option, @@ -127,7 +128,7 @@ pub struct ResultAndTransition { pub transition: Vec<(Address, TransitionAccount)>, /// Rewards to miner. - pub rewards: u128, + pub miner_update: LazyUpdateValue, } /// Utility function for parallel execution using fork-join pattern. diff --git a/src/partition.rs b/src/partition.rs index 568aff3..974f29e 100644 --- a/src/partition.rs +++ b/src/partition.rs @@ -85,29 +85,11 @@ where } } - /// Check if there are unconfirmed transactions in the current partition - /// If there are unconfirmed transactions, - /// the update_write_set will be used to determine whether the transaction needs to be rerun - fn has_unconfirmed_tx(tx_states: &Vec, assigned_txs: &Vec) -> bool { - // If the first transaction is not executed, it means that the partition has not been executed - if tx_states[0].tx_status == TransactionStatus::Initial { - return false; - } - for txid in assigned_txs { - if tx_states[*txid].tx_status == TransactionStatus::Unconfirmed { - return true; - } - } - false - } - /// Execute transactions in the partition /// The transactions are executed optimistically, and their read and write sets are recorded after execution. /// The final state of a transaction is determined by the scheduler's validation process. pub(crate) fn execute(&mut self) { let start = Instant::now(); - // the update_write_set is used to determine whether the transaction needs to be rerun - let mut update_write_set: HashSet = HashSet::new(); let mut evm = EvmBuilder::default() .with_db(&mut self.partition_db) .with_spec_id(self.spec_id) @@ -118,37 +100,19 @@ where let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - let has_unconfirmed_tx = Self::has_unconfirmed_tx(tx_states, &self.assigned_txs); for txid in &self.assigned_txs { let txid = *txid; - // Miner is handled separately for each transaction - // However, if the miner is involved in the transaction - // we have to fully track the updates of this transaction to make sure the miner's account is updated correctly - let mut miner_involved = false; if let Some(tx) = self.txs.get(txid) { *evm.tx_mut() = tx.clone(); - if self.coinbase == tx.caller { - miner_involved = true; - } - if let TxKind::Call(to) = tx.transact_to { - if self.coinbase == to { - miner_involved = true; - } - } } else { panic!("Wrong transactions ID"); } // If the transaction is unconfirmed, it may not require repeated execution let mut should_execute = true; if tx_states[txid].tx_status == TransactionStatus::Unconfirmed { - // Unconfirmed transactions from the previous round might not need to be re-executed. - // The verification process is as follows: - // 1. Create an update_write_set to store the write sets of previous conflicting transactions. - // 2. If an unconfirmed transaction is re-executed, add its write set to update_write_set. - // 3. When processing an unconfirmed transaction, join its previous round's read set with update_write_set. - // 4. If there is no intersection, the unconfirmed transaction does not need to be re-executed; otherwise, it does. - if tx_states[txid].read_set.is_disjoint(&update_write_set) { + if evm.db_mut().check_read_set(&tx_states[txid].read_set) { + // Unconfirmed transactions from the previous round might not need to be re-executed. let transition = &tx_states[txid].execute_result.transition; evm.db_mut().temporary_commit_transition(transition); should_execute = false; @@ -157,32 +121,25 @@ where } } if should_execute { - evm.db_mut().miner_involved = miner_involved; let result = evm.transact(); match result { Ok(result_and_state) => { let read_set = evm.db_mut().take_read_set(); - let (write_set, rewards) = + let (write_set, miner_update) = evm.db().generate_write_set(&result_and_state.state); - if has_unconfirmed_tx { - update_write_set.extend(write_set.clone().into_iter()); - } // Check if the transaction can be skipped // skip_validation=true does not necessarily mean the transaction can skip validation. // Only transactions with consecutive minimum TxID can skip validation. // This is because if a transaction with a smaller TxID conflicts, // the states of subsequent transactions are invalid. - let mut skip_validation = true; - if !read_set.is_subset(&tx_states[txid].read_set) { - skip_validation = false; - } - if skip_validation && !write_set.is_subset(&tx_states[txid].write_set) { - skip_validation = false; - } + let mut skip_validation = + read_set.iter().all(|l| tx_states[txid].read_set.contains_key(l.0)); + skip_validation &= + write_set.iter().all(|l| tx_states[txid].write_set.contains(l)); let ResultAndState { result, mut state } = result_and_state; - if rewards.is_some() { + if miner_update.is_some() { // remove miner's state if we handle rewards separately state.remove(&self.coinbase); } @@ -199,7 +156,7 @@ where execute_result: ResultAndTransition { result: Some(result), transition, - rewards: rewards.unwrap_or(0), + miner_update: miner_update.unwrap_or_default(), }, }; } @@ -214,10 +171,10 @@ where let mut read_set = evm.db_mut().take_read_set(); // update write set with the caller and transact_to let mut write_set = HashSet::new(); - read_set.insert(LocationAndType::Basic(evm.tx().caller)); + read_set.insert(LocationAndType::Basic(evm.tx().caller), None); write_set.insert(LocationAndType::Basic(evm.tx().caller)); if let TxKind::Call(to) = evm.tx().transact_to { - read_set.insert(LocationAndType::Basic(to)); + read_set.insert(LocationAndType::Basic(to), None); write_set.insert(LocationAndType::Basic(to)); } diff --git a/src/scheduler.rs b/src/scheduler.rs index 4285fad..770c351 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -5,7 +5,7 @@ use std::time::{Duration, Instant}; use crate::hint::ParallelExecutionHints; use crate::partition::PartitionExecutor; -use crate::storage::SchedulerDB; +use crate::storage::{LazyUpdateValue, SchedulerDB}; use crate::tx_dependency::{DependentTxsVec, TxDependency}; use crate::{ fork_join_util, GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId, @@ -110,7 +110,7 @@ pub(crate) type LocationSet = HashSet; #[derive(Clone)] pub(crate) struct TxState { pub tx_status: TransactionStatus, - pub read_set: LocationSet, + pub read_set: HashMap>, pub write_set: LocationSet, pub execute_result: ResultAndTransition, } @@ -119,7 +119,7 @@ impl TxState { pub(crate) fn new() -> Self { Self { tx_status: TransactionStatus::Initial, - read_set: HashSet::new(), + read_set: HashMap::new(), write_set: HashSet::new(), execute_result: ResultAndTransition::default(), } @@ -386,7 +386,7 @@ where let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict; let mut updated_dependencies = BTreeSet::new(); if txid >= end_skip_id { - for location in tx_states[txid].read_set.iter() { + for (location, _) in tx_states[txid].read_set.iter() { if let Some(written_txs) = merged_write_set.get(location) { if let Some(previous_txid) = written_txs.range(..txid).next_back() { // update dependencies: previous_txid <- txid @@ -490,10 +490,10 @@ where #[allow(invalid_reference_casting)] let tx_states = unsafe { &mut *(&(*self.tx_states) as *const Vec as *mut Vec) }; - let mut rewards: u128 = 0; + let mut miner_updates = Vec::with_capacity(finality_tx_cnt); let start_txid = self.num_finality_txs - finality_tx_cnt; for txid in start_txid..self.num_finality_txs { - rewards += tx_states[txid].execute_result.rewards; + miner_updates.push(tx_states[txid].execute_result.miner_update.clone()); database .commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition)); self.results.push(tx_states[txid].execute_result.result.clone().unwrap()); @@ -504,7 +504,7 @@ where // and track the rewards for the miner for each transaction separately. // The miner’s account is only updated after validation by SchedulerDB.increment_balances database - .increment_balances(vec![(self.coinbase, rewards)]) + .update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))]) .map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?; self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64); Ok(()) diff --git a/src/storage.rs b/src/storage.rs index f9f0761..7ea06ca 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -8,6 +8,64 @@ use revm::{CacheState, Database, DatabaseRef, TransitionAccount, TransitionState use std::collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet}; use std::sync::Arc; +/// LazyUpdateValue is used to update the balance of the miner's account. +/// The miner's reward is calculated by subtracting the previous balance from the current balance. +#[derive(Debug, Clone)] +pub(crate) enum LazyUpdateValue { + Increase(u128, u64), + Decrease(u128, u64), +} + +impl Default for LazyUpdateValue { + fn default() -> Self { + Self::Increase(0, 0) + } +} + +/// Merge multiple LazyUpdateValue into one. +impl LazyUpdateValue { + pub(crate) fn merge(values: Vec) -> Self { + let mut value: u128 = 0; + let mut positive: bool = true; + let mut nonce: u64 = 0; + for lazy_value in values { + match lazy_value { + Self::Increase(inc, add_nonce) => { + nonce += add_nonce; + if positive { + value += inc; + } else { + if value > inc { + value -= inc + } else { + value = inc - value; + positive = true; + } + } + } + Self::Decrease(dec, add_nonce) => { + nonce += add_nonce; + if positive { + if value > dec { + value -= dec; + } else { + value = dec - value; + positive = false; + } + } else { + value += dec; + } + } + } + } + if positive { + Self::Increase(value, nonce) + } else { + Self::Decrease(value, nonce) + } + } +} + /// SchedulerDB is a database wrapper that manages state transitions and caching for the EVM. /// It maintains a cache of committed data, a transition state for ongoing transactions, and a bundle state /// for finalizing block state changes. It also tracks block hashes for quick access. @@ -45,7 +103,7 @@ pub(crate) struct SchedulerDB { impl SchedulerDB { pub(crate) fn new(database: DB) -> Self { Self { - cache: CacheState::default(), + cache: CacheState::new(false), database, transition_state: Some(TransitionState::default()), bundle_state: BundleState::default(), @@ -106,22 +164,28 @@ where /// The miner's reward is calculated by subtracting the previous balance from the current balance. /// and should add to the miner's account after each round of execution for finality transactions. - pub(crate) fn increment_balances( + pub(crate) fn update_balances( &mut self, - balances: impl IntoIterator, + balances: impl IntoIterator, ) -> Result<(), DB::Error> { // make transition and update cache state let mut transitions = Vec::new(); - for (address, balance) in balances { - if balance == 0 { - continue; - } - + for (address, update) in balances { let cache_account = self.load_cache_account(address)?; - transitions.push(( - address, - cache_account.increment_balance(balance).expect("Balance is not zero"), - )) + let mut info = cache_account.account_info().unwrap_or_default(); + let (new_balance, add_nonce) = match update { + LazyUpdateValue::Increase(value, nonce) => { + (info.balance.saturating_add(U256::from(value)), nonce) + } + LazyUpdateValue::Decrease(value, nonce) => { + (info.balance.saturating_sub(U256::from(value)), nonce) + } + }; + if info.balance != new_balance || add_nonce != 0 { + info.balance = new_balance; + info.nonce += add_nonce; + transitions.push((address, cache_account.change(info, Default::default()))); + } } // append transition if let Some(s) = self.transition_state.as_mut() { @@ -280,34 +344,34 @@ pub(crate) struct PartitionDB { pub scheduler_db: Arc>, pub block_hashes: BTreeMap, - /// Does the miner participate in the transaction - pub miner_involved: bool, /// Record the read set of current tx, will be consumed after the execution of each tx - tx_read_set: LocationSet, + tx_read_set: HashMap>, } impl PartitionDB { pub(crate) fn new(coinbase: Address, scheduler_db: Arc>) -> Self { Self { coinbase, - cache: CacheState::default(), + cache: CacheState::new(false), scheduler_db, block_hashes: BTreeMap::new(), - miner_involved: false, - tx_read_set: HashSet::new(), + tx_read_set: HashMap::new(), } } /// consume the read set after evm.transact() for each tx - pub(crate) fn take_read_set(&mut self) -> LocationSet { + pub(crate) fn take_read_set(&mut self) -> HashMap> { core::mem::take(&mut self.tx_read_set) } /// Generate the write set after evm.transact() for each tx /// The write set includes the locations of the basic account, code, and storage slots that have been modified. /// Returns the write set(exclude miner) and the miner's rewards. - pub(crate) fn generate_write_set(&self, changes: &EvmState) -> (LocationSet, Option) { - let mut rewards: Option = None; + pub(crate) fn generate_write_set( + &self, + changes: &EvmState, + ) -> (LocationSet, Option) { + let mut miner_update: Option = None; let mut write_set = HashSet::new(); for (address, account) in changes { if account.is_selfdestructed() { @@ -321,19 +385,31 @@ impl PartitionDB { continue; } + // Lazy update miner's balance let mut miner_updated = false; - // When fully tracking the updates to the miner’s account, - // we should set rewards = 0 - if self.coinbase == *address && !self.miner_involved { + if self.coinbase == *address { match self.cache.accounts.get(address) { Some(miner) => match miner.account.as_ref() { Some(miner) => { - rewards = Some((account.info.balance - miner.info.balance).to()); + if account.info.balance >= miner.info.balance { + miner_update = Some(LazyUpdateValue::Increase( + (account.info.balance - miner.info.balance).to(), + account.info.nonce - miner.info.nonce, + )); + } else { + miner_update = Some(LazyUpdateValue::Decrease( + (miner.info.balance - account.info.balance).to(), + account.info.nonce - miner.info.nonce, + )); + } miner_updated = true; } // LoadedNotExisting None => { - rewards = Some(account.info.balance.to()); + miner_update = Some(LazyUpdateValue::Increase( + account.info.balance.to(), + account.info.nonce, + )); miner_updated = true; } }, @@ -377,7 +453,7 @@ impl PartitionDB { write_set.insert(LocationAndType::Storage(*address, *slot)); } } - (write_set, rewards) + (write_set, miner_update) } /// Temporary commit the state change after evm.transact() for each tx @@ -397,6 +473,47 @@ impl PartitionDB { } } +impl PartitionDB +where + DB: DatabaseRef, +{ + /// If the read set is consistent with the read set of the previous round of execution, + /// We can reuse the results of the previous round of execution, and no need to re-execute the transaction. + pub(crate) fn check_read_set( + &mut self, + read_set: &HashMap>, + ) -> bool { + let mut visit_account = HashSet::new(); + for (location, _) in read_set { + match location { + LocationAndType::Basic(address) => { + if !visit_account.contains(address) { + let _ = self.basic(address.clone()); + visit_account.insert(address.clone()); + } + } + LocationAndType::Storage(address, index) => { + // If the account is not loaded, we need to load it from the database. + if !visit_account.contains(address) { + let _ = self.basic(address.clone()); + visit_account.insert(address.clone()); + } + let _ = self.storage(address.clone(), index.clone()); + } + _ => {} + } + } + let new_read_set = self.take_read_set(); + if new_read_set.len() != read_set.len() { + false + } else { + new_read_set + .iter() + .all(|(key, value)| read_set.get(key).map_or(false, |v| *value == *v)) + } + } +} + /// Used to build evm, and hook the read operations impl Database for PartitionDB where @@ -405,10 +522,6 @@ where type Error = DB::Error; fn basic(&mut self, address: Address) -> Result, Self::Error> { - if address != self.coinbase || self.miner_involved { - self.tx_read_set.insert(LocationAndType::Basic(address)); - } - // 1. read from internal cache let result = match self.cache.accounts.entry(address) { hash_map::Entry::Vacant(entry) => { @@ -423,13 +536,18 @@ where } hash_map::Entry::Occupied(entry) => Ok(entry.get().account_info()), }; + let mut balance = None; if let Ok(account) = &result { if let Some(info) = account { if !info.is_empty_code_hash() { - self.tx_read_set.insert(LocationAndType::Code(address)); + self.tx_read_set.insert(LocationAndType::Code(address), None); } + balance = Some(info.balance); } } + if address != self.coinbase { + self.tx_read_set.insert(LocationAndType::Basic(address), balance); + } result } @@ -455,9 +573,14 @@ where } fn storage(&mut self, address: Address, index: U256) -> Result { - self.tx_read_set.insert(LocationAndType::Storage(address, index)); + let result = load_storage(&mut self.cache, &self.scheduler_db.database, address, index); + let mut slot_value = None; + if let Ok(value) = &result { + slot_value = Some(value.clone()); + } + self.tx_read_set.insert(LocationAndType::Storage(address, index), slot_value); - load_storage(&mut self.cache, &self.scheduler_db.database, address, index) + result } fn block_hash(&mut self, number: u64) -> Result { diff --git a/src/tx_dependency.rs b/src/tx_dependency.rs index 8b5f472..3171f76 100644 --- a/src/tx_dependency.rs +++ b/src/tx_dependency.rs @@ -43,7 +43,7 @@ impl TxDependency { let mut last_write_tx: HashMap = HashMap::new(); for (txid, rw_set) in tx_states.iter().enumerate() { let dependencies = &mut self.tx_dependency[txid]; - for location in rw_set.read_set.iter() { + for (location, _) in rw_set.read_set.iter() { if let Some(previous) = last_write_tx.get(location) { dependencies.push(*previous); } diff --git a/tests/mainnet.rs b/tests/mainnet.rs index 9c17a34..b7d89d1 100644 --- a/tests/mainnet.rs +++ b/tests/mainnet.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use alloy_chains::NamedChain; use alloy_rpc_types::{Block, BlockTransactions}; use common::{compat, storage::InMemoryDB}; -use grevm::GrevmScheduler; +use grevm::{GrevmError, GrevmScheduler}; +use metrics_util::debugging::DebuggingRecorder; use revm::primitives::{Env, TxEnv}; fn test_execute_alloy(block: Block, db: InMemoryDB) { @@ -24,8 +25,18 @@ fn test_execute_alloy(block: Block, db: InMemoryDB) { let reth_result = common::execute_revm_sequential(db.clone(), spec_id, env.clone(), txs.clone()); - let executor = GrevmScheduler::new(spec_id, env, db, txs); - let parallel_result = executor.parallel_execute(); + // create registry for metrics + let recorder = DebuggingRecorder::new(); + let mut parallel_result = Err(GrevmError::UnreachableError(String::from("Init"))); + metrics::with_local_recorder(&recorder, || { + let executor = GrevmScheduler::new(spec_id, env, db, txs); + parallel_result = executor.parallel_execute(); + + let snapshot = recorder.snapshotter().snapshot(); + for (key, unit, desc, value) in snapshot.into_vec() { + println!("metrics: {} => value: {:?}", key.key().name(), value); + } + }); common::compare_execution_result( &reth_result.as_ref().unwrap().results,