Skip to content

Commit

Permalink
Fix: check the read set value to reuse previous result
Browse files Browse the repository at this point in the history
  • Loading branch information
AshinGau committed Oct 22, 2024
1 parent 314abf7 commit 89e7923
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 117 deletions.
37 changes: 21 additions & 16 deletions src/hint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@ impl TxState {
fn insert_location(&mut self, location: LocationAndType, rw_type: RWType) {
match rw_type {
RWType::ReadOnly => {
self.read_set.insert(location);
self.read_set.insert(location, None);
}
RWType::WriteOnly => {
self.write_set.insert(location);
}
RWType::ReadWrite => {
self.read_set.insert(location.clone());
self.read_set.insert(location.clone(), None);
self.write_set.insert(location);
}
}
Expand Down Expand Up @@ -111,13 +111,18 @@ impl ParallelExecutionHints {
.insert_location(LocationAndType::Basic(to_address), RWType::ReadOnly);
rw_set.insert_location(LocationAndType::Code(to_address), RWType::ReadOnly);
// Update hints with contract data based on the transaction details
ParallelExecutionHints::update_hints_with_contract_data(
if !ParallelExecutionHints::update_hints_with_contract_data(
tx_env.caller,
to_address,
None,
&tx_env.data,
rw_set,
);
) {
rw_set.insert_location(
LocationAndType::Basic(to_address),
RWType::WriteOnly,
);
}
} else if to_address != tx_env.caller {
rw_set
.insert_location(LocationAndType::Basic(to_address), RWType::ReadWrite);
Expand Down Expand Up @@ -184,20 +189,21 @@ impl ParallelExecutionHints {
code: Option<Bytes>,
data: &Bytes,
tx_rw_set: &mut TxState,
) {
) -> bool {
if code.is_none() && data.is_empty() {
panic!("Unreachable error")
return false;
}
if data.len() < 4 || (data.len() - 4) % 32 != 0 {
tx_rw_set.insert_location(LocationAndType::Basic(contract_address), RWType::WriteOnly);
// Invalid tx, or tx that triggers fallback CALL
return;
return false;
}
let (func_id, parameters) = Self::decode_contract_parameters(data);
match Self::get_contract_type(contract_address) {
ContractType::ERC20 => match ERC20Function::from(func_id) {
ERC20Function::Transfer => {
assert_eq!(parameters.len(), 2);
if parameters.len() != 2 {
return false;
}
let to_address: [u8; 20] =
parameters[0].as_slice()[12..].try_into().expect("try into failed");
let to_slot = Self::slot_from_address(0, vec![Address::new(to_address)]);
Expand All @@ -214,7 +220,9 @@ impl ParallelExecutionHints {
}
}
ERC20Function::TransferFrom => {
assert_eq!(parameters.len(), 3);
if parameters.len() != 3 {
return false;
}
let from_address: [u8; 20] =
parameters[0].as_slice()[12..].try_into().expect("try into failed");
let from_address = Address::new(from_address);
Expand All @@ -239,17 +247,14 @@ impl ParallelExecutionHints {
);
}
_ => {
tx_rw_set.insert_location(
LocationAndType::Basic(contract_address),
RWType::WriteOnly,
);
return false;
}
},
ContractType::UNKNOWN => {
tx_rw_set
.insert_location(LocationAndType::Basic(contract_address), RWType::WriteOnly);
return false;
}
}
true
}

fn get_contract_type(_contract_address: Address) -> ContractType {
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ lazy_static! {
.unwrap();
}

use crate::storage::LazyUpdateValue;
pub use scheduler::*;

/// The maximum number of rounds for transaction execution.
Expand Down Expand Up @@ -119,15 +120,15 @@ impl<DBError: Display> Display for GrevmError<DBError> {
/// This struct encapsulates the outcome of executing a transaction, including the execution
/// result, state transitions, and any rewards to the miner.
#[derive(Debug, Clone, Default)]
pub struct ResultAndTransition {
pub(crate) struct ResultAndTransition {
/// Status of execution.
pub result: Option<ExecutionResult>,

/// State that got updated.
pub transition: Vec<(Address, TransitionAccount)>,

/// Rewards to miner.
pub rewards: u128,
pub miner_update: LazyUpdateValue,
}

/// Utility function for parallel execution using fork-join pattern.
Expand Down
65 changes: 11 additions & 54 deletions src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,29 +85,11 @@ where
}
}

/// Check if there are unconfirmed transactions in the current partition
/// If there are unconfirmed transactions,
/// the update_write_set will be used to determine whether the transaction needs to be rerun
fn has_unconfirmed_tx(tx_states: &Vec<TxState>, assigned_txs: &Vec<TxId>) -> bool {
// If the first transaction is not executed, it means that the partition has not been executed
if tx_states[0].tx_status == TransactionStatus::Initial {
return false;
}
for txid in assigned_txs {
if tx_states[*txid].tx_status == TransactionStatus::Unconfirmed {
return true;
}
}
false
}

/// Execute transactions in the partition
/// The transactions are executed optimistically, and their read and write sets are recorded after execution.
/// The final state of a transaction is determined by the scheduler's validation process.
pub(crate) fn execute(&mut self) {
let start = Instant::now();
// the update_write_set is used to determine whether the transaction needs to be rerun
let mut update_write_set: HashSet<LocationAndType> = HashSet::new();
let mut evm = EvmBuilder::default()
.with_db(&mut self.partition_db)
.with_spec_id(self.spec_id)
Expand All @@ -118,37 +100,19 @@ where
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };

let has_unconfirmed_tx = Self::has_unconfirmed_tx(tx_states, &self.assigned_txs);
for txid in &self.assigned_txs {
let txid = *txid;

// Miner is handled separately for each transaction
// However, if the miner is involved in the transaction
// we have to fully track the updates of this transaction to make sure the miner's account is updated correctly
let mut miner_involved = false;
if let Some(tx) = self.txs.get(txid) {
*evm.tx_mut() = tx.clone();
if self.coinbase == tx.caller {
miner_involved = true;
}
if let TxKind::Call(to) = tx.transact_to {
if self.coinbase == to {
miner_involved = true;
}
}
} else {
panic!("Wrong transactions ID");
}
// If the transaction is unconfirmed, it may not require repeated execution
let mut should_execute = true;
if tx_states[txid].tx_status == TransactionStatus::Unconfirmed {
// Unconfirmed transactions from the previous round might not need to be re-executed.
// The verification process is as follows:
// 1. Create an update_write_set to store the write sets of previous conflicting transactions.
// 2. If an unconfirmed transaction is re-executed, add its write set to update_write_set.
// 3. When processing an unconfirmed transaction, join its previous round's read set with update_write_set.
// 4. If there is no intersection, the unconfirmed transaction does not need to be re-executed; otherwise, it does.
if tx_states[txid].read_set.is_disjoint(&update_write_set) {
if evm.db_mut().check_read_set(&tx_states[txid].read_set) {
// Unconfirmed transactions from the previous round might not need to be re-executed.
let transition = &tx_states[txid].execute_result.transition;
evm.db_mut().temporary_commit_transition(transition);
should_execute = false;
Expand All @@ -157,32 +121,25 @@ where
}
}
if should_execute {
evm.db_mut().miner_involved = miner_involved;
let result = evm.transact();
match result {
Ok(result_and_state) => {
let read_set = evm.db_mut().take_read_set();
let (write_set, rewards) =
let (write_set, miner_update) =
evm.db().generate_write_set(&result_and_state.state);
if has_unconfirmed_tx {
update_write_set.extend(write_set.clone().into_iter());
}

// Check if the transaction can be skipped
// skip_validation=true does not necessarily mean the transaction can skip validation.
// Only transactions with consecutive minimum TxID can skip validation.
// This is because if a transaction with a smaller TxID conflicts,
// the states of subsequent transactions are invalid.
let mut skip_validation = true;
if !read_set.is_subset(&tx_states[txid].read_set) {
skip_validation = false;
}
if skip_validation && !write_set.is_subset(&tx_states[txid].write_set) {
skip_validation = false;
}
let mut skip_validation =
read_set.iter().all(|l| tx_states[txid].read_set.contains_key(l.0));
skip_validation &=
write_set.iter().all(|l| tx_states[txid].write_set.contains(l));

let ResultAndState { result, mut state } = result_and_state;
if rewards.is_some() {
if miner_update.is_some() {
// remove miner's state if we handle rewards separately
state.remove(&self.coinbase);
}
Expand All @@ -199,7 +156,7 @@ where
execute_result: ResultAndTransition {
result: Some(result),
transition,
rewards: rewards.unwrap_or(0),
miner_update: miner_update.unwrap_or_default(),
},
};
}
Expand All @@ -214,10 +171,10 @@ where
let mut read_set = evm.db_mut().take_read_set();
// update write set with the caller and transact_to
let mut write_set = HashSet::new();
read_set.insert(LocationAndType::Basic(evm.tx().caller));
read_set.insert(LocationAndType::Basic(evm.tx().caller), None);
write_set.insert(LocationAndType::Basic(evm.tx().caller));
if let TxKind::Call(to) = evm.tx().transact_to {
read_set.insert(LocationAndType::Basic(to));
read_set.insert(LocationAndType::Basic(to), None);
write_set.insert(LocationAndType::Basic(to));
}

Expand Down
14 changes: 7 additions & 7 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::time::{Duration, Instant};

use crate::hint::ParallelExecutionHints;
use crate::partition::PartitionExecutor;
use crate::storage::SchedulerDB;
use crate::storage::{LazyUpdateValue, SchedulerDB};
use crate::tx_dependency::{DependentTxsVec, TxDependency};
use crate::{
fork_join_util, GrevmError, LocationAndType, ResultAndTransition, TransactionStatus, TxId,
Expand Down Expand Up @@ -110,7 +110,7 @@ pub(crate) type LocationSet = HashSet<LocationAndType>;
#[derive(Clone)]
pub(crate) struct TxState {
pub tx_status: TransactionStatus,
pub read_set: LocationSet,
pub read_set: HashMap<LocationAndType, Option<U256>>,
pub write_set: LocationSet,
pub execute_result: ResultAndTransition,
}
Expand All @@ -119,7 +119,7 @@ impl TxState {
pub(crate) fn new() -> Self {
Self {
tx_status: TransactionStatus::Initial,
read_set: HashSet::new(),
read_set: HashMap::new(),
write_set: HashSet::new(),
execute_result: ResultAndTransition::default(),
}
Expand Down Expand Up @@ -386,7 +386,7 @@ where
let mut conflict = tx_states[txid].tx_status == TransactionStatus::Conflict;
let mut updated_dependencies = BTreeSet::new();
if txid >= end_skip_id {
for location in tx_states[txid].read_set.iter() {
for (location, _) in tx_states[txid].read_set.iter() {
if let Some(written_txs) = merged_write_set.get(location) {
if let Some(previous_txid) = written_txs.range(..txid).next_back() {
// update dependencies: previous_txid <- txid
Expand Down Expand Up @@ -490,10 +490,10 @@ where
#[allow(invalid_reference_casting)]
let tx_states =
unsafe { &mut *(&(*self.tx_states) as *const Vec<TxState> as *mut Vec<TxState>) };
let mut rewards: u128 = 0;
let mut miner_updates = Vec::with_capacity(finality_tx_cnt);
let start_txid = self.num_finality_txs - finality_tx_cnt;
for txid in start_txid..self.num_finality_txs {
rewards += tx_states[txid].execute_result.rewards;
miner_updates.push(tx_states[txid].execute_result.miner_update.clone());
database
.commit_transition(std::mem::take(&mut tx_states[txid].execute_result.transition));
self.results.push(tx_states[txid].execute_result.result.clone().unwrap());
Expand All @@ -504,7 +504,7 @@ where
// and track the rewards for the miner for each transaction separately.
// The miner’s account is only updated after validation by SchedulerDB.increment_balances
database
.increment_balances(vec![(self.coinbase, rewards)])
.update_balances(vec![(self.coinbase, LazyUpdateValue::merge(miner_updates))])
.map_err(|err| GrevmError::EvmError(EVMError::Database(err)))?;
self.metrics.commit_transition_time.increment(start.elapsed().as_nanos() as u64);
Ok(())
Expand Down
Loading

0 comments on commit 89e7923

Please sign in to comment.