Skip to content
This repository has been archived by the owner on Jan 8, 2025. It is now read-only.

Commit

Permalink
Improve transaction_by_hash to handle pending tx (#957)
Browse files Browse the repository at this point in the history
* Improve transaction_by_hash using aggregate view to handle pending transactions

* refactor test

* add sorting by block number
  • Loading branch information
tcoratger authored Apr 12, 2024
1 parent a6b92e0 commit b6bcb3c
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/eth_provider/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ impl Database {
Ok(result)
}

/// Get a single document from aggregated collections
pub async fn get_one_aggregate<T>(&self, pipeline: impl IntoIterator<Item = Document>) -> DatabaseResult<Option<T>>
where
T: DeserializeOwned + CollectionName,
{
let collection = self.0.collection::<T>(T::collection_name());
let mut cursor = collection.aggregate(pipeline, None).await?;

Ok(cursor.try_next().await?.map(|doc| mongodb::bson::de::from_document(doc)).transpose()?)
}

/// Update a single document in a collection
pub async fn update_one<T>(&self, doc: T, filter: impl Into<Document>, upsert: bool) -> DatabaseResult<()>
where
Expand Down
6 changes: 6 additions & 0 deletions src/eth_provider/database/types/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ impl From<StoredTransaction> for Transaction {
}
}

impl From<Transaction> for StoredTransaction {
fn from(tx: Transaction) -> Self {
Self { tx }
}
}

#[cfg(any(test, feature = "arbitrary", feature = "testing"))]
impl<'a> arbitrary::Arbitrary<'a> for StoredTransaction {
fn arbitrary(u: &mut arbitrary::Unstructured<'a>) -> arbitrary::Result<Self> {
Expand Down
3 changes: 3 additions & 0 deletions src/eth_provider/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ pub enum KakarotError {
/// Error related to the database.
#[error(transparent)]
DatabaseError(#[from] mongodb::error::Error),
/// Error related to the database deserialization.
#[error(transparent)]
DatabaseDeserializationError(#[from] mongodb::bson::de::Error),
/// Error related to the evm execution.
#[error(transparent)]
ExecutionError(EvmError),
Expand Down
38 changes: 32 additions & 6 deletions src/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use super::database::types::{
header::StoredHeader, log::StoredLog, receipt::StoredTransactionReceipt, transaction::StoredPendingTransaction,
transaction::StoredTransaction, transaction::StoredTransactionHash,
};
use super::database::Database;
use super::database::{CollectionName, Database};
use super::error::{EthApiError, EthereumDataFormatError, EvmError, KakarotError, SignatureError, TransactionError};
use super::starknet::kakarot_core::core::{CallInput, Uint256};
use super::starknet::kakarot_core::{
Expand Down Expand Up @@ -235,11 +235,37 @@ where
}

async fn transaction_by_hash(&self, hash: B256) -> EthProviderResult<Option<reth_rpc_types::Transaction>> {
Ok(self
.database
.get_one::<StoredTransaction>(into_filter("tx.hash", &hash, HASH_PADDING), None)
.await?
.map(Into::into))
let pipeline = vec![
doc! {
// Union with pending transactions with only specified hash
"$unionWith": {
"coll": StoredPendingTransaction::collection_name(),
"pipeline": [
{
"$match": {
"tx.hash": format_hex(hash, HASH_PADDING)
}
}
]
},
},
// Only specified hash in the transactions collection
doc! {
"$match": {
"tx.hash": format_hex(hash, HASH_PADDING)
}
},
// Sort in descending order by block number as pending transactions have null block number
doc! {
"$sort": { "tx.blockNumber" : -1 }
},
// Only one document in the final result with priority to the final transactions collection if available
doc! {
"$limit": 1
},
];

Ok(self.database.get_one_aggregate::<StoredTransaction>(pipeline).await?.map(Into::into))
}

async fn transaction_by_block_hash_and_index(
Expand Down
2 changes: 1 addition & 1 deletion src/eth_provider/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub(crate) fn format_hex(value: impl LowerHex, width: usize) -> String {
}

/// Converts a key and value into a MongoDB filter.
pub(crate) fn into_filter<T>(key: &str, value: &T, width: usize) -> Document
pub fn into_filter<T>(key: &str, value: &T, width: usize) -> Document
where
T: LowerHex,
{
Expand Down
69 changes: 68 additions & 1 deletion tests/eth_provider.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#![cfg(feature = "testing")]
use std::str::FromStr;

use kakarot_rpc::eth_provider::database::types::transaction::StoredPendingTransaction;
use kakarot_rpc::eth_provider::constant::HASH_PADDING;
use kakarot_rpc::eth_provider::database::types::transaction::{StoredPendingTransaction, StoredTransaction};
use kakarot_rpc::eth_provider::provider::EthereumProvider;
use kakarot_rpc::eth_provider::utils::into_filter;
use kakarot_rpc::models::felt::Felt252Wrapper;
use kakarot_rpc::test_utils::eoa::Eoa as _;
use kakarot_rpc::test_utils::evm_contract::EvmContract;
Expand Down Expand Up @@ -503,3 +505,68 @@ async fn test_send_raw_transaction_wrong_signature(#[future] katana: Katana, _se
// Assert that no transaction is found
assert!(tx.is_none());
}

#[rstest]
#[awt]
#[tokio::test(flavor = "multi_thread")]
async fn test_transaction_by_hash(#[future] katana: Katana, _setup: ()) {
// Given
// Retrieve an instance of the Ethereum provider from the test environment
let eth_provider = katana.eth_provider();

// Retrieve the first transaction from the test environment
let first_transaction = katana.first_transaction().unwrap();

// Check if the first transaction is returned correctly by the `transaction_by_hash` method
assert_eq!(eth_provider.transaction_by_hash(first_transaction.hash).await.unwrap().unwrap(), first_transaction);

// Check if a non-existent transaction returns None
assert!(eth_provider.transaction_by_hash(B256::random()).await.unwrap().is_none());

// Generate a pending transaction to be stored in the pending transactions collection
// Create a sample transaction
let transaction = Transaction::Eip1559(TxEip1559 {
chain_id: 1,
nonce: 0,
gas_limit: 21000,
to: TransactionKind::Call(Address::random()),
value: U256::from(1000),
input: Bytes::default(),
max_fee_per_gas: 875000000,
max_priority_fee_per_gas: 0,
access_list: Default::default(),
});

// Sign the transaction
let signature = sign_message(katana.eoa().private_key(), transaction.signature_hash()).unwrap();
let transaction_signed = TransactionSigned::from_transaction_and_signature(transaction, signature);

// Send the transaction
let _ = eth_provider
.send_raw_transaction(transaction_signed.envelope_encoded())
.await
.expect("failed to send transaction");

// Retrieve the pending transaction from the database
let mut stored_transaction: StoredPendingTransaction =
eth_provider.database().get_one(None, None).await.expect("Failed to get transaction").unwrap();

let tx = stored_transaction.clone().tx;

// Check if the pending transaction is returned correctly by the `transaction_by_hash` method
assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap(), tx);

// Modify the block number of the pending transaction
stored_transaction.tx.block_number = Some(U256::from(1111));

// Insert the transaction into the final transaction collection
let filter = into_filter("tx.hash", &stored_transaction.tx.hash, HASH_PADDING);
eth_provider
.database()
.update_one::<StoredTransaction>(stored_transaction.tx.into(), filter, true)
.await
.expect("Failed to insert documents");

// Check if the final transaction is returned correctly by the `transaction_by_hash` method
assert_eq!(eth_provider.transaction_by_hash(tx.hash).await.unwrap().unwrap().block_number, Some(U256::from(1111)));
}

0 comments on commit b6bcb3c

Please sign in to comment.