Skip to content
This repository has been archived by the owner on Jan 8, 2025. It is now read-only.

Commit

Permalink
feat: add a RetryHandler (#1248)
Browse files Browse the repository at this point in the history
add a `RetryHandler` to handle transactions retries
  • Loading branch information
greged93 authored Jul 2, 2024
1 parent 82a84a2 commit e83f669
Show file tree
Hide file tree
Showing 9 changed files with 253 additions and 191 deletions.
6 changes: 0 additions & 6 deletions src/eth_provider/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ use std::str::FromStr;
lazy_static! {
pub static ref MAX_PRIORITY_FEE_PER_GAS: u64 = 0;

/// Maximum number of times a transaction can be retried
pub static ref TRANSACTION_MAX_RETRIES: u8 = u8::from_str(
&std::env::var("TRANSACTION_MAX_RETRIES")
.unwrap_or_else(|_| panic!("Missing environment variable TRANSACTION_MAX_RETRIES"))
).expect("failing to parse TRANSACTION_MAX_RETRIES");

/// Maximum number of logs that can be fetched in a single request
pub static ref MAX_LOGS: Option<u64> = std::env::var("MAX_LOGS")
.ok()
Expand Down
1 change: 0 additions & 1 deletion src/eth_provider/database/ethereum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ impl EthereumTransactionStore for Database {
.get_one::<StoredPendingTransaction>(filter, None)
.await?
.map(|tx| tx.retries + 1)
.inspect(|retries| tracing::info!("Retrying {} with {} retries", hash, retries))
.or_else(|| {
tracing::info!("New transaction {} in pending pool", hash);
None
Expand Down
1 change: 0 additions & 1 deletion src/eth_provider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ pub mod constant;
pub mod contracts;
pub mod database;
pub mod error;
pub mod pending_pool;
pub mod provider;
pub mod starknet;
pub mod utils;
43 changes: 0 additions & 43 deletions src/eth_provider/pending_pool.rs

This file was deleted.

52 changes: 1 addition & 51 deletions src/eth_provider/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use starknet::core::types::SyncStatusType;
use starknet::core::utils::get_storage_var_address;
use starknet_crypto::FieldElement;

use super::constant::{
BLOCK_NUMBER_HEX_STRING_LEN, CALL_REQUEST_GAS_LIMIT, HASH_HEX_STRING_LEN, MAX_LOGS, TRANSACTION_MAX_RETRIES,
};
use super::constant::{BLOCK_NUMBER_HEX_STRING_LEN, CALL_REQUEST_GAS_LIMIT, HASH_HEX_STRING_LEN, MAX_LOGS};
use super::database::ethereum::EthereumBlockStore;
use super::database::filter::EthDatabaseFilterBuilder;
use super::database::types::{
Expand Down Expand Up @@ -852,51 +850,3 @@ where
Ok(())
}
}

impl<SP> EthDataProvider<SP>
where
SP: starknet::providers::Provider + Send + Sync,
{
pub async fn retry_transactions(&self) -> EthProviderResult<Vec<B256>> {
// Initialize an empty vector to store the hashes of retried transactions
let mut transactions_retried = Vec::new();

// Iterate over pending transactions fetched from the database
for tx in self.database.get::<StoredPendingTransaction>(None, None).await? {
let hash = tx.tx.hash;
let filter = EthDatabaseFilterBuilder::<filter::Transaction>::default().with_tx_hash(&hash).build();
// Check if the number of retries exceeds the maximum allowed retries
// or if the transaction already exists in the database of finalized transactions
if tx.retries + 1 > *TRANSACTION_MAX_RETRIES || self.database.transaction(&hash).await?.is_some() {
tracing::info!("Pruning pending transaction: {hash}");

// Delete the pending transaction from the database
self.database.delete_one::<StoredPendingTransaction>(filter).await?;

// Continue to the next iteration of the loop
continue;
}

// Generate primitive transaction, handle error if any
let transaction = match TransactionSignedEcRecovered::try_from(tx.tx.clone()) {
Ok(transaction) => transaction,
Err(error) => {
tracing::info!("Pruning pending transaction: {hash}, conversion error: {error}");
// Delete the pending transaction from the database due conversion error
// Malformed transaction
self.database.delete_one::<StoredPendingTransaction>(filter).await?;
// Continue to the next iteration of the loop
continue;
}
};

tracing::info!("Retrying transaction: {hash}");

// Create a signed transaction and send it
transactions_retried.push(self.send_raw_transaction(transaction.into_signed().envelope_encoded()).await?);
}

// Return the hashes of retried transactions
Ok(transactions_retried)
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod eth_provider;
pub mod eth_rpc;
pub mod models;
pub mod prometheus_handler;
pub mod retry;
#[cfg(feature = "testing")]
pub mod test_utils;
pub mod tracing;
24 changes: 15 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ use std::sync::Arc;

use dotenvy::dotenv;
use eyre::Result;
use mongodb::options::{DatabaseOptions, ReadConcern, WriteConcern};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tracing_subscriber::{filter, util::SubscriberInitExt};

use kakarot_rpc::config::KakarotRpcConfig;
use kakarot_rpc::eth_provider::database::Database;
use kakarot_rpc::eth_provider::pending_pool::start_retry_service;
use kakarot_rpc::eth_provider::provider::EthDataProvider;
use kakarot_rpc::eth_rpc::config::RPCConfig;
use kakarot_rpc::eth_rpc::rpc::KakarotRpcModuleBuilder;
use kakarot_rpc::eth_rpc::run_server;
use mongodb::options::{DatabaseOptions, ReadConcern, WriteConcern};
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::JsonRpcClient;
use tracing_subscriber::{filter, util::SubscriberInitExt};
use kakarot_rpc::retry::RetryHandler;

#[tokio::main]
async fn main() -> Result<()> {
Expand Down Expand Up @@ -43,17 +44,22 @@ async fn main() -> Result<()> {
#[cfg(feature = "hive")]
setup_hive(&starknet_provider).await?;

// Setup the retry service
let eth_provider = EthDataProvider::new(db, Arc::new(starknet_provider)).await?;
tokio::spawn(start_retry_service(eth_provider.clone()));
// Setup the eth provider
let eth_provider = EthDataProvider::new(db.clone(), Arc::new(starknet_provider)).await?;

// Setup the retry handler
let retry_handler = RetryHandler::new(eth_provider.clone(), db);
retry_handler.start(&tokio::runtime::Handle::current());

// Setup the RPC module
let kakarot_rpc_module = KakarotRpcModuleBuilder::new(eth_provider).rpc_module()?;

// Start the RPC server
let (socket_addr, server_handle) = run_server(kakarot_rpc_module, rpc_config).await?;

let url = format!("http://{socket_addr}");

println!("RPC Server running on {url}...");
tracing::info!("RPC Server running on {url}...");

server_handle.stopped().await;

Expand Down
Loading

0 comments on commit e83f669

Please sign in to comment.