diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..b25c7fd --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,101 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in library 'fossil_headers_db'", + "cargo": { + "args": [ + "test", + "--no-run", + "--lib", + "--package=fossil_headers_db" + ], + "filter": { + "name": "fossil_headers_db", + "kind": "lib" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'fossil_headers_db'", + "cargo": { + "args": [ + "build", + "--bin=fossil_headers_db", + "--package=fossil_headers_db" + ], + "filter": { + "name": "fossil_headers_db", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'fossil_headers_db'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=fossil_headers_db", + "--package=fossil_headers_db" + ], + "filter": { + "name": "fossil_headers_db", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug executable 'fossil_indexer'", + "cargo": { + "args": [ + "build", + "--bin=fossil_indexer", + "--package=fossil_headers_db" + ], + "filter": { + "name": "fossil_indexer", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + }, + { + "type": "lldb", + "request": "launch", + "name": "Debug unit tests in executable 'fossil_indexer'", + "cargo": { + "args": [ + "test", + "--no-run", + "--bin=fossil_indexer", + "--package=fossil_headers_db" + ], + "filter": { + "name": "fossil_indexer", + "kind": "bin" + } + }, + "args": [], + "cwd": "${workspaceFolder}" + } + ] +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index af722c5..943df0b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -425,7 +425,10 @@ checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", + "serde", + "wasm-bindgen", "windows-targets 0.52.6", ] @@ -740,6 +743,7 @@ version = "0.1.0" dependencies = [ "async-std", "axum", + "chrono", "clap", "ctrlc", "dotenvy", diff --git a/Cargo.toml b/Cargo.toml index 29850cd..894e7ce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ name = "fossil_headers_db" version = "0.1.0" edition = "2021" +default-run = "fossil_headers_db" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -22,9 +23,19 @@ sqlx = { version = "0.8.2", features = [ "tls-rustls", "macros", ] } +chrono = { version = "0.4.38", features = ["serde"] } tokio = { version = "1.38.0", features = ["rt", "rt-multi-thread", "macros"] } tracing = "0.1.40" eyre = "0.6.10" once_cell = "1.1.0" tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } serde_json = "1.0.105" + + +[[bin]] +name = "fossil_headers_db" +path = "src/main.rs" + +[[bin]] +name = "fossil_indexer" +path = "src/indexer/main.rs" diff --git a/migrations/20250107071623_index_metadata.down.sql b/migrations/20250107071623_index_metadata.down.sql new file mode 100644 index 0000000..62ff3ca --- /dev/null +++ b/migrations/20250107071623_index_metadata.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +DROP TABLE IF EXISTS index_metadata; diff --git a/migrations/20250107071623_index_metadata.up.sql b/migrations/20250107071623_index_metadata.up.sql new file mode 100644 index 0000000..46a3c8a --- /dev/null +++ b/migrations/20250107071623_index_metadata.up.sql @@ -0,0 +1,9 @@ +-- Add up migration script here +CREATE TABLE + IF NOT EXISTS index_metadata ( + id INT8 GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + current_latest_block_number BIGINT NOT NULL, + indexing_starting_block_number BIGINT NOT NULL, + is_backfilling BOOLEAN DEFAULT TRUE, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP + ) \ No newline at end of file diff --git a/migrations/20250108182341_index_metadata_new_field.down.sql b/migrations/20250108182341_index_metadata_new_field.down.sql new file mode 100644 index 0000000..733c7fb --- /dev/null +++ b/migrations/20250108182341_index_metadata_new_field.down.sql @@ -0,0 +1,2 @@ +-- Add down migration script here +ALTER TABLE index_metadata DROP COLUMN backfilling_block_number; \ No newline at end of file diff --git a/migrations/20250108182341_index_metadata_new_field.up.sql b/migrations/20250108182341_index_metadata_new_field.up.sql new file mode 100644 index 0000000..2b29acb --- /dev/null +++ b/migrations/20250108182341_index_metadata_new_field.up.sql @@ -0,0 +1,2 @@ +-- Add up migration script here +ALTER TABLE index_metadata ADD COLUMN backfilling_block_number BIGINT; \ No newline at end of file diff --git a/run-migrations.sh b/run-migrations.sh old mode 100644 new mode 100755 diff --git a/src/db/mod.rs b/src/db/mod.rs index 95f91f5..121832b 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -1,5 +1,5 @@ -use crate::types::type_utils::convert_hex_string_to_i64; -use crate::types::BlockHeaderWithFullTransaction; +use crate::rpc::BlockHeaderWithFullTransaction; +use crate::utils::convert_hex_string_to_i64; use eyre::{Context, Error, Result}; use futures::FutureExt; use sqlx::postgres::PgConnectOptions; @@ -13,12 +13,12 @@ use tokio::sync::OnceCell; use tokio::time::sleep; use tracing::{error, info, warn}; -#[cfg(test)] mod db_test; static DB_POOL: OnceCell>> = OnceCell::const_new(); pub const DB_MAX_CONNECTIONS: u32 = 50; +// TODO: Not use a oncecell but instead use some sort of DI for easier testing. pub async fn get_db_pool() -> Result>> { if let Some(pool) = DB_POOL.get() { Ok(pool.clone()) @@ -304,6 +304,7 @@ where } } +#[allow(clippy::all)] fn is_transient_error(e: &Error) -> bool { // Check for database connection errors if let Some(db_err) = e.downcast_ref::() { @@ -322,3 +323,34 @@ fn is_transient_error(e: &Error) -> bool { false } } + +#[derive(Debug)] +pub struct DbConnection { + pub pool: Pool, +} + +impl DbConnection { + // TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. + #[allow(dead_code)] + pub async fn new(db_conn_string: Option) -> Result> { + let mut conn_options: PgConnectOptions = match db_conn_string { + Some(conn_string) => conn_string.parse()?, + None => dotenvy::var("DB_CONNECTION_STRING") + .context("DB_CONNECTION_STRING must be set")? + .parse()?, + }; + + conn_options = conn_options + .log_slow_statements(tracing::log::LevelFilter::Debug, Duration::new(120, 0)); + + let pool = PgPoolOptions::new() + .max_connections(DB_MAX_CONNECTIONS) + .connect_with(conn_options) + .await?; + + Ok(Arc::new(Self { pool })) + } +} + +#[cfg(test)] +mod tests {} diff --git a/src/indexer/batch_service.rs b/src/indexer/batch_service.rs new file mode 100644 index 0000000..50782de --- /dev/null +++ b/src/indexer/batch_service.rs @@ -0,0 +1,195 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use eyre::{anyhow, Result}; +use futures::future::try_join_all; +use tokio::task; +use tracing::{error, info, warn}; + +use crate::{ + db::DbConnection, + repositories::{ + block_header::insert_block_header_query, + index_metadata::{get_index_metadata, update_backfilling_block_number_query}, + }, + rpc, +}; + +#[derive(Debug)] +pub struct BatchIndexConfig { + // TODO: maybe reconsidering these variables? Since we share the same with quick for now + pub max_retries: u8, + pub poll_interval: u32, + pub rpc_timeout: u32, + pub index_batch_size: u32, +} + +impl Default for BatchIndexConfig { + fn default() -> Self { + Self { + max_retries: 10, + poll_interval: 10, + rpc_timeout: 300, + index_batch_size: 50, + } + } +} + +pub struct BatchIndexer { + config: BatchIndexConfig, + db: Arc, + should_terminate: Arc, +} + +impl BatchIndexer { + pub async fn new( + config: BatchIndexConfig, + db: Arc, + should_terminate: Arc, + ) -> BatchIndexer { + Self { + db, + config, + should_terminate, + } + } + + // TODO: Since this is similar to the quick indexer with the only exception being the block logic, + // maybe we could DRY this? + pub async fn index(&self) -> Result<()> { + // Batch indexer loop. does the following until terminated: + // 1. check if finished indexing (backfilling block is 0) + // 2. check current starting block and backfilling block + // 3. if not fully indexed, index the block in batches (20 seems to be good for quick, but should be adjustable) + // 4. if it is fully indexed, do nothing (maybe we could exit this too?) + while !self.should_terminate.load(Ordering::Relaxed) { + let current_index_metadata = match get_index_metadata(self.db.clone()).await { + Ok(metadata) => { + if let Some(metadata) = metadata { + metadata + } else { + error!("[batch_index] Error getting index metadata"); + return Err(anyhow!("Error getting index metadata: metadata not found.")); + } + } + Err(e) => { + error!("[batch_index] Error getting index metadata: {}", e); + return Err(e); + } + }; + + let index_start_block_number = current_index_metadata.indexing_starting_block_number; + let current_backfilling_block_number = + if let Some(block_number) = current_index_metadata.backfilling_block_number { + block_number + } else { + index_start_block_number + }; + + if current_backfilling_block_number == 0 { + info!("[batch_index] Backfilling complete, terminating backfilling process."); + break; + } + + if current_backfilling_block_number > 0 { + let backfilling_target_block = + current_backfilling_block_number - i64::from(self.config.index_batch_size); + let starting_block_number: i64 = if backfilling_target_block < 0 { + 0 + } else { + backfilling_target_block + }; + + self.index_block_range( + starting_block_number, + current_backfilling_block_number, + &self.should_terminate, + ) + .await?; + } else { + info!("[batch_index] Backfilling complete, terminating backfilling process."); + break; + } + } + Ok(()) + } + + // Indexing a block range, inclusive. + pub async fn index_block_range( + &self, + starting_block: i64, + ending_block: i64, + should_terminate: &AtomicBool, + ) -> Result<()> { + let block_range: Vec = (starting_block..ending_block + 1).collect(); + + for i in 0..self.config.max_retries { + if should_terminate.load(Ordering::Relaxed) { + info!("[batch_index] Termination requested. Stopping quick indexing."); + break; + } + + let timeout = self.config.rpc_timeout; + + let rpc_block_headers_futures: Vec<_> = block_range + .iter() + .map(|block_number| { + task::spawn(rpc::get_full_block_by_number( + *block_number, + Some(timeout.into()), + )) + }) + .collect(); + + let rpc_block_headers_response = try_join_all(rpc_block_headers_futures).await?; + + let mut block_headers = Vec::with_capacity(rpc_block_headers_response.len()); + let mut has_err = false; + + for header in rpc_block_headers_response.into_iter() { + match header { + Ok(header) => { + block_headers.push(header); + } + Err(e) => { + has_err = true; + warn!( + "[batch_index] Error retrieving block in range from {} to {}. error: {}", + starting_block, ending_block, e + ) + } + } + } + + if !has_err { + let mut db_tx = self.db.pool.begin().await?; + + insert_block_header_query(&mut db_tx, block_headers).await?; + update_backfilling_block_number_query(&mut db_tx, starting_block).await?; + + // Commit at the end + db_tx.commit().await?; + + info!( + "[batch_index] Indexing block range from {} to {} complete.", + starting_block, ending_block + ); + return Ok(()); + } + + // If there's an error during rpc, retry. + error!("[batch_index] Error encountered during rpc, retry no. {}. Re-running from block: {}", i, starting_block); + + // Exponential backoff + let backoff = (i as u64).pow(2) * 5; + tokio::time::sleep(Duration::from_secs(backoff)).await; + } + + Err(anyhow!("Max retries reached. Stopping batch indexing.")) + } +} diff --git a/src/indexer/indexer_tests.rs b/src/indexer/indexer_tests.rs new file mode 100644 index 0000000..689cb91 --- /dev/null +++ b/src/indexer/indexer_tests.rs @@ -0,0 +1 @@ +/// Mainly integration tests should live here. \ No newline at end of file diff --git a/src/indexer/main.rs b/src/indexer/main.rs new file mode 100644 index 0000000..d04f0a6 --- /dev/null +++ b/src/indexer/main.rs @@ -0,0 +1,146 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + thread::{self, JoinHandle}, +}; + +use eyre::{anyhow, Context, Result}; +use fossil_headers_db::{ + db::DbConnection, + indexer::{ + batch_service::{BatchIndexConfig, BatchIndexer}, + quick_service::{QuickIndexConfig, QuickIndexer}, + }, + repositories::index_metadata::{ + get_index_metadata, set_initial_indexing_status, IndexMetadata, + }, + router, rpc, +}; +use tracing::{error, info}; +use tracing_subscriber::fmt; + +pub async fn get_base_index_metadata(db: Arc) -> Result { + if let Some(metadata) = get_index_metadata(db.clone()).await? { + return Ok(metadata); + } + + let latest_block_number = rpc::get_latest_finalized_blocknumber(None).await?; + + set_initial_indexing_status(db.clone(), latest_block_number, latest_block_number, true).await?; + + if let Some(metadata) = get_index_metadata(db).await? { + return Ok(metadata); + } + + Err(anyhow!("Failed to get indexer metadata")) +} + +#[tokio::main] +pub async fn main() -> Result<()> { + // TODO: this should be set to only be turned on if we're in dev mode + dotenvy::dotenv()?; + + // Initialize tracing subscriber + fmt().init(); + + // Setup database connection + info!("Connecting to DB"); + let db = DbConnection::new(None).await?; + + info!("Starting Indexer"); + + let should_terminate = Arc::new(AtomicBool::new(false)); + + setup_ctrlc_handler(Arc::clone(&should_terminate))?; + + // Start by checking and updating the current status in the db. + // let indexing_metadata = get_base_index_metadata(db.clone()).await?; + let router_terminator = Arc::clone(&should_terminate); + + // Setup the router which allows us to query health status and operations + let router_handle = thread::Builder::new() + .name("[router]".to_owned()) + .spawn(move || { + let rt = tokio::runtime::Runtime::new()?; + + info!("Starting router"); + if let Err(e) = rt.block_on(router::initialize_router(router_terminator.clone())) { + error!("[router] unexpected error {}", e); + } + + info!("[router] shutting down"); + Ok(()) + })?; + + // Start the quick indexer + let quick_index_config = QuickIndexConfig::default(); + let quick_indexer = + QuickIndexer::new(quick_index_config, db.clone(), should_terminate.clone()).await; + + let quick_indexer_handle = thread::Builder::new() + .name("[quick_index]".to_owned()) + .spawn(move || { + let rt = tokio::runtime::Runtime::new()?; + + info!("Starting quick indexer"); + if let Err(e) = rt.block_on(quick_indexer.index()) { + error!("[quick_index] unexpected error {}", e); + } + Ok(()) + })?; + + // Start the batch indexer + let batch_index_config = BatchIndexConfig::default(); + let batch_indexer = + BatchIndexer::new(batch_index_config, db.clone(), should_terminate.clone()).await; + + let batch_indexer_handle = thread::Builder::new() + .name("[batch_index]".to_owned()) + .spawn(move || { + let rt = tokio::runtime::Runtime::new()?; + + info!("Starting batch indexer"); + if let Err(e) = rt.block_on(batch_indexer.index()) { + error!("[batch_index] unexpected error {}", e); + } + Ok(()) + })?; + + // Wait for termination, which will join all the handles. + wait_for_thread_completion(vec![ + router_handle, + quick_indexer_handle, + batch_indexer_handle, + ])?; + + Ok(()) +} + +fn wait_for_thread_completion(handles: Vec>>) -> Result<()> { + for handle in handles { + match handle.join() { + Ok(Ok(())) => { + info!("Thread completed successfully"); + } + Ok(Err(e)) => { + error!("Thread completed with an error: {:?}", e); + } + Err(e) => { + error!("Thread panicked: {:?}", e); + } + } + } + + Ok(()) +} + +fn setup_ctrlc_handler(should_terminate: Arc) -> Result<()> { + ctrlc::set_handler(move || { + info!("Received Ctrl+C"); + info!("Waiting for current processes to finish..."); + should_terminate.store(true, Ordering::SeqCst); + }) + .context("Failed to set Ctrl+C handler") +} diff --git a/src/indexer/mod.rs b/src/indexer/mod.rs new file mode 100644 index 0000000..0b69f5e --- /dev/null +++ b/src/indexer/mod.rs @@ -0,0 +1,16 @@ +pub mod batch_service; +pub mod quick_service; + +// TODO +// [x] 1. start a 'job' and add the job info to the job table, including statuses, etc +// [x] 2. spin up multiple of that job +// 3. record indexing information to a metadata table, including current latest indexed etc. +// 4. quick indexing should always be at the head, but need to use metadata to keep track which is the latest indexed and continue indexing from there +// 5. batch indexing can do by batch, but should be backfilling in reverse blocks to allow for latest blocks to work whilst the batch is indexing +// 6. batch indexing in addition need to handle after migration back filling, design the mechanism for triggering this. +// 7. batch indexing can also additionally have a repair mode to check if there's something missing, but that should ideally never happen. + +// Experiments +// 1. we can try a single service, spinning up jobs. that might work good enough and is easy to deploy. +// 2. we can also try multiple services, spinning up jobs +// 3. Need a good benchmarking toolset. One such way is check speed of indexing to 100k? diff --git a/src/indexer/quick_service.rs b/src/indexer/quick_service.rs new file mode 100644 index 0000000..b875c14 --- /dev/null +++ b/src/indexer/quick_service.rs @@ -0,0 +1,186 @@ +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; + +use eyre::{anyhow, Result}; +use futures::future::try_join_all; +use tokio::task; +use tracing::{error, info, warn}; + +use crate::{ + db::DbConnection, + repositories::{ + block_header::insert_block_header_query, + index_metadata::{get_index_metadata, update_latest_quick_index_block_number_query}, + }, + rpc::{self}, +}; + +#[derive(Debug)] +pub struct QuickIndexConfig { + pub max_retries: u8, + pub poll_interval: u32, + pub rpc_timeout: u32, + pub index_batch_size: u32, +} + +impl Default for QuickIndexConfig { + fn default() -> Self { + Self { + max_retries: 10, + poll_interval: 10, + rpc_timeout: 300, + index_batch_size: 20, + } + } +} + +pub struct QuickIndexer { + config: QuickIndexConfig, + db: Arc, + should_terminate: Arc, +} + +impl QuickIndexer { + pub async fn new( + config: QuickIndexConfig, + db: Arc, + should_terminate: Arc, + ) -> QuickIndexer { + Self { + db, + config, + should_terminate, + } + } + + pub async fn index(&self) -> Result<()> { + // Quick indexer loop, does the following until terminated: + // 1. check current latest block + // 2. check if the block is already indexed + // 3. if not, index the block + // 4. if yes, sleep for a period of time and do nothing + while !self.should_terminate.load(Ordering::Relaxed) { + let last_block_number = match get_index_metadata(self.db.clone()).await { + Ok(metadata) => { + if let Some(metadata) = metadata { + metadata.current_latest_block_number + } else { + error!("[quick_index] Error getting index metadata"); + return Err(anyhow!("Error getting index metadata: metadata not found.")); + } + } + Err(e) => { + error!("[quick_index] Error getting index metadata: {}", e); + return Err(e); + } + }; + + let new_latest_block = + rpc::get_latest_finalized_blocknumber(Some(self.config.rpc_timeout.into())).await?; + + if new_latest_block > last_block_number { + let ending_block_number: i64 = + if new_latest_block - last_block_number > self.config.index_batch_size.into() { + last_block_number + i64::from(self.config.index_batch_size) + } else { + new_latest_block + }; + + self.index_block_range( + last_block_number + 1, // index from recorded last block + 1 + ending_block_number, + &self.should_terminate, + ) + .await?; + } else { + info!( + "No new block finalized. Latest: {}. Sleeping for {}s...", + new_latest_block, self.config.poll_interval + ); + tokio::time::sleep(Duration::from_secs(self.config.poll_interval.into())).await; + } + } + + info!("[quick_index] Process terminating."); + Ok(()) + } + + // Indexing a block range, inclusive. + async fn index_block_range( + &self, + starting_block: i64, + ending_block: i64, + should_terminate: &AtomicBool, + ) -> Result<()> { + let block_range: Vec = (starting_block..ending_block + 1).collect(); + + for i in 0..self.config.max_retries { + if should_terminate.load(Ordering::Relaxed) { + info!("[quick_index] Termination requested. Stopping quick indexing."); + break; + } + + let timeout = self.config.rpc_timeout; + + let rpc_block_headers_futures: Vec<_> = block_range + .iter() + .map(|block_number| { + task::spawn(rpc::get_full_block_by_number( + *block_number, + Some(timeout.into()), + )) + }) + .collect(); + + let rpc_block_headers_response = try_join_all(rpc_block_headers_futures).await?; + + let mut block_headers = Vec::with_capacity(rpc_block_headers_response.len()); + let mut has_err = false; + + for header in rpc_block_headers_response.into_iter() { + match header { + Ok(header) => { + block_headers.push(header); + } + Err(e) => { + has_err = true; + warn!( + "[quick_index] Error retrieving block in range from {} to {}. error: {}", + starting_block, ending_block, e + ) + } + } + } + + if !has_err { + let mut db_tx = self.db.pool.begin().await?; + + insert_block_header_query(&mut db_tx, block_headers).await?; + update_latest_quick_index_block_number_query(&mut db_tx, ending_block).await?; + + // Commit at the end + db_tx.commit().await?; + + info!( + "[quick_index] Indexing block range from {} to {} complete.", + starting_block, ending_block + ); + return Ok(()); + } + + // If there's an error during rpc, retry. + error!("[quick_index] Error encountered during rpc, retry no. {}. Re-running from block: {}", i, starting_block); + + // Exponential backoff + let backoff = (i as u64).pow(2) * 5; + tokio::time::sleep(Duration::from_secs(backoff)).await; + } + + Err(anyhow!("Max retries reached. Stopping quick indexing.")) + } +} diff --git a/src/lib.rs b/src/lib.rs index 5f1d91d..1230a63 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ pub mod commands; pub mod db; +pub mod indexer; +pub mod repositories; pub mod router; pub mod rpc; -pub mod types; +pub mod utils; diff --git a/src/main.rs b/src/main.rs index 3249946..ef9f311 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,10 @@ use fossil_headers_db as _; mod commands; mod db; +mod repositories; mod router; mod rpc; -mod types; +mod utils; use clap::{Parser, ValueEnum}; use core::cmp::min; diff --git a/src/repositories/block_header.rs b/src/repositories/block_header.rs new file mode 100644 index 0000000..013cc3d --- /dev/null +++ b/src/repositories/block_header.rs @@ -0,0 +1,273 @@ +use eyre::{ContextCompat, Result}; +use sqlx::{query_builder::Separated, Postgres, QueryBuilder}; + +use crate::{ + rpc::{BlockHeaderWithFullTransaction, Transaction}, + utils::convert_hex_string_to_i64, +}; + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +#[derive(Debug)] +pub struct TransactionFormatted { + pub hash: String, + pub block_number: i64, + pub transaction_index: i64, + pub value: String, + pub gas_price: String, + pub gas: String, + pub from: Option, + pub to: Option, + pub max_priority_fee_per_gas: String, + pub max_fee_per_gas: String, + pub chain_id: Option, +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +#[derive(Debug)] +pub struct BlockHeaderFormatted { + pub gas_limit: i64, + pub gas_used: i64, + pub base_fee_per_gas: Option, + pub hash: String, + pub nonce: Option, + pub number: i64, + pub receipts_root: String, + pub state_root: String, + pub transactions_root: String, + pub parent_hash: Option, + pub miner: Option, + pub logs_bloom: Option, + pub difficulty: Option, + pub total_difficulty: Option, + pub sha3_uncles: Option, + pub timestamp: i64, + pub extra_data: Option, + pub mix_hash: Option, + pub withdrawals_root: Option, + pub blob_gas_used: Option, + pub excess_blob_gas: Option, + pub parent_beacon_block_root: Option, +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +// Seems that using transaction with multi row inserts seem to be the fastest +pub async fn insert_block_header_query( + db_tx: &mut sqlx::Transaction<'_, Postgres>, + block_headers: Vec, +) -> Result<()> { + let mut formatted_block_headers: Vec = + Vec::with_capacity(block_headers.len()); + let mut flattened_transactions = Vec::new(); + + for header in block_headers.iter() { + // These fields should be converted successfully, and if its not converted successfully, + // it should be considered an unintended bug, + let block_number = convert_hex_string_to_i64(&header.number)?; + let gas_limit = convert_hex_string_to_i64(&header.gas_limit)?; + let gas_used = convert_hex_string_to_i64(&header.gas_used)?; + let block_timestamp = convert_hex_string_to_i64(&header.timestamp)?; + let receipts_root = header + .receipts_root + .clone() + .context("receipt root should not be empty")?; + let state_root = header + .state_root + .clone() + .context("state root should not be empty")?; + let transactions_root = header + .transactions_root + .clone() + .context("transactions root should not be empty")?; + + formatted_block_headers.push(BlockHeaderFormatted { + hash: header.hash.clone(), + number: block_number, + gas_limit, + gas_used, + base_fee_per_gas: header.base_fee_per_gas.clone(), + nonce: header.nonce.clone(), + + parent_hash: header.parent_hash.clone(), + miner: header.miner.clone(), + logs_bloom: header.logs_bloom.clone(), + difficulty: header.difficulty.clone(), + total_difficulty: header.total_difficulty.clone(), + sha3_uncles: header.sha3_uncles.clone(), + timestamp: block_timestamp, + extra_data: header.extra_data.clone(), + mix_hash: header.mix_hash.clone(), + withdrawals_root: header.withdrawals_root.clone(), + blob_gas_used: header.blob_gas_used.clone(), + excess_blob_gas: header.excess_blob_gas.clone(), + parent_beacon_block_root: header.parent_beacon_block_root.clone(), + receipts_root, + state_root, + transactions_root, + }); + + // Collect the transactions here and get the queries. + flattened_transactions.extend(header.transactions.clone()); + } + + let mut query_builder: QueryBuilder = QueryBuilder::new( + "INSERT INTO blockheaders ( + block_hash, number, gas_limit, gas_used, base_fee_per_gas, + nonce, transaction_root, receipts_root, state_root, + parent_hash, miner, logs_bloom, difficulty, totalDifficulty, + sha3_uncles, timestamp, extra_data, mix_hash, withdrawals_root, + blob_gas_used, excess_blob_gas, parent_beacon_block_root + )", + ); + + query_builder.push_values( + formatted_block_headers.iter(), + |mut b: Separated<'_, '_, Postgres, &'static str>, block_header| { + // Convert values and unwrap_or_default() to handle errors + b.push_bind(&block_header.hash) + .push_bind(block_header.number) + .push_bind(block_header.gas_limit) + .push_bind(block_header.gas_used) + .push_bind(&block_header.base_fee_per_gas) + .push_bind(&block_header.nonce) + .push_bind(&block_header.transactions_root) + .push_bind(&block_header.receipts_root) + .push_bind(&block_header.state_root) + .push_bind(&block_header.parent_hash) + .push_bind(&block_header.miner) + .push_bind(&block_header.logs_bloom) + .push_bind(&block_header.difficulty) + .push_bind(&block_header.total_difficulty) + .push_bind(&block_header.sha3_uncles) + .push_bind(block_header.timestamp) + .push_bind(&block_header.extra_data) + .push_bind(&block_header.mix_hash) + .push_bind(&block_header.withdrawals_root) + .push_bind(&block_header.blob_gas_used) + .push_bind(&block_header.excess_blob_gas) + .push_bind(&block_header.parent_beacon_block_root); + }, + ); + + query_builder.push( + r#" + ON CONFLICT (number) + DO UPDATE SET + block_hash = EXCLUDED.block_hash, + gas_limit = EXCLUDED.gas_limit, + gas_used = EXCLUDED.gas_used, + base_fee_per_gas = EXCLUDED.base_fee_per_gas, + nonce = EXCLUDED.nonce, + transaction_root = EXCLUDED.transaction_root, + receipts_root = EXCLUDED.receipts_root, + state_root = EXCLUDED.state_root, + parent_hash = EXCLUDED.parent_hash, + miner = EXCLUDED.miner, + logs_bloom = EXCLUDED.logs_bloom, + difficulty = EXCLUDED.difficulty, + totalDifficulty = EXCLUDED.totalDifficulty, + sha3_uncles = EXCLUDED.sha3_uncles, + timestamp = EXCLUDED.timestamp, + extra_data = EXCLUDED.extra_data, + mix_hash = EXCLUDED.mix_hash, + withdrawals_root = EXCLUDED.withdrawals_root, + blob_gas_used = EXCLUDED.blob_gas_used, + excess_blob_gas = EXCLUDED.excess_blob_gas, + parent_beacon_block_root = EXCLUDED.parent_beacon_block_root;"#, + ); + + query_builder.build().execute(&mut **db_tx).await?; + + insert_block_txs_query(db_tx, flattened_transactions).await?; + + Ok(()) +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +pub async fn insert_block_txs_query( + db_tx: &mut sqlx::Transaction<'_, Postgres>, + transactions: Vec, +) -> Result<()> { + let mut query_builder: QueryBuilder = QueryBuilder::new( + "INSERT INTO transactions ( + block_number, transaction_hash, transaction_index, + from_addr, to_addr, value, gas_price, + max_priority_fee_per_gas, max_fee_per_gas, gas, chain_id + ) ", + ); + + // Pre-format and handle the potential error arising from the hex -> i64 conversion + + let mut formatted_transactions: Vec = + Vec::with_capacity(transactions.len()); + for transaction in transactions.iter() { + let tx_block_number = convert_hex_string_to_i64(&transaction.block_number)?; + let tx_index = convert_hex_string_to_i64(&transaction.transaction_index)?; + + formatted_transactions.push(TransactionFormatted { + block_number: tx_block_number, + hash: transaction.hash.clone(), + transaction_index: tx_index, + from: transaction.from.clone(), + to: transaction.to.clone(), + value: transaction.value.clone(), + // TODO: is this a good idea? To default to 0? Does this interfere with any calculations? + gas_price: transaction.gas_price.clone().unwrap_or("0".to_string()), + max_priority_fee_per_gas: transaction + .max_priority_fee_per_gas + .clone() + .unwrap_or("0".to_string()), + max_fee_per_gas: transaction + .max_fee_per_gas + .clone() + .unwrap_or("0".to_string()), + gas: transaction.gas.clone(), + chain_id: transaction.chain_id.clone(), + }); + } + + // Push the formatted values into the query. + query_builder.push_values( + formatted_transactions.iter(), + |mut b: Separated<'_, '_, Postgres, &'static str>, tx| { + // Convert values and unwrap_or_default() to handle errors + b.push_bind(tx.block_number) + .push_bind(&tx.hash) + .push_bind(tx.transaction_index) + .push_bind(&tx.from) + .push_bind(&tx.to) + .push_bind(&tx.value) + .push_bind(&tx.gas_price) + .push_bind(&tx.max_priority_fee_per_gas) + .push_bind(&tx.max_fee_per_gas) + .push_bind(&tx.gas) + .push_bind(&tx.chain_id); + }, + ); + query_builder.push( + r#" + ON CONFLICT (transaction_hash) + DO UPDATE SET + block_number = EXCLUDED.block_number, + transaction_index = EXCLUDED.transaction_index, + from_addr = EXCLUDED.from_addr, + to_addr = EXCLUDED.to_addr, + value = EXCLUDED.value, + gas_price = EXCLUDED.gas_price, + max_priority_fee_per_gas = EXCLUDED.max_priority_fee_per_gas, + max_fee_per_gas = EXCLUDED.max_fee_per_gas, + gas = EXCLUDED.gas, + chain_id = EXCLUDED.chain_id;"#, + ); + query_builder.build().execute(&mut **db_tx).await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + // TODO: add tests here with db +} diff --git a/src/repositories/index_metadata.rs b/src/repositories/index_metadata.rs new file mode 100644 index 0000000..bddade7 --- /dev/null +++ b/src/repositories/index_metadata.rs @@ -0,0 +1,193 @@ +use eyre::{anyhow, Report, Result}; +use serde::Deserialize; +use sqlx::Postgres; +use std::sync::Arc; +use tracing::error; + +use crate::db::DbConnection; + +#[derive(Debug, Deserialize, sqlx::FromRow)] +#[allow(dead_code)] +pub struct IndexMetadata { + pub id: i64, + pub current_latest_block_number: i64, + pub indexing_starting_block_number: i64, + pub is_backfilling: bool, + pub updated_at: chrono::DateTime, + pub backfilling_block_number: Option, +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +pub async fn get_index_metadata(db: Arc) -> Result> { + let db = db.as_ref(); + let result: Result = sqlx::query_as( + r#" + SELECT + id, + current_latest_block_number, + indexing_starting_block_number, + is_backfilling, + updated_at, + backfilling_block_number + FROM index_metadata + "#, + ) + .fetch_one(&db.pool) + .await; + + let result: Option = match result { + Ok(result) => Some(result), + Err(err) => match err { + sqlx::Error::RowNotFound => None, + err => { + error!("Failed to get indexer metadata: {}", err); + return Err(Report::new(err)); + } + }, + }; + + Ok(result) +} + +#[allow(dead_code)] +pub async fn set_is_backfilling(db: Arc, is_backfilling: bool) -> Result<()> { + let db = db.as_ref(); + let result = sqlx::query( + r#" + UPDATE index_metadata + SET is_backfilling = $1, + updated_at = CURRENT_TIMESTAMP + "#, + ) + .bind(is_backfilling) + .execute(&db.pool) + .await?; + + if result.rows_affected() != 1 { + error!( + "Failed to set is_backfilling, affecting {} rows", + result.rows_affected() + ); + return Err(anyhow!("Failed to set is_backfilling")); + } + + Ok(()) +} + +#[allow(dead_code)] +pub async fn set_initial_indexing_status( + db: Arc, + current_latest_block_number: i64, + indexing_starting_block_number: i64, + is_backfilling: bool, +) -> Result<()> { + // Check if there's already an entry, if it does then we can skip and only update. + let result = sqlx::query( + r#" + SELECT id + FROM index_metadata + "#, + ) + .fetch_one(&db.pool) + .await; + + if result.is_ok() { + let result = sqlx::query( + r#" + UPDATE index_metadata + SET current_latest_block_number = $1, + indexing_starting_block_number = $2, + is_backfilling = $3, + updated_at = CURRENT_TIMESTAMP + "#, + ) + .bind(current_latest_block_number) + .bind(indexing_starting_block_number) + .bind(is_backfilling) + .execute(&db.pool) + .await?; + + if result.rows_affected() != 1 { + error!("Failed to update initial indexing status"); + return Err(anyhow!( + "Failed to update initial indexing status".to_owned(), + )); + } + + return Ok(()); + } + + let result = sqlx::query( + r#" + INSERT INTO index_metadata ( + current_latest_block_number, + indexing_starting_block_number, + is_backfilling + ) VALUES ( + $1, + $2, + $3 + ) + "#, + ) + .bind(current_latest_block_number) + .bind(indexing_starting_block_number) + .bind(is_backfilling) + .execute(&db.pool) + .await?; + + if result.rows_affected() != 1 { + error!("Failed to insert initial indexing status"); + return Err(anyhow!( + "Failed to insert initial indexing status".to_owned(), + )); + } + + Ok(()) +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +pub async fn update_latest_quick_index_block_number_query( + db_tx: &mut sqlx::Transaction<'_, Postgres>, + block_number: i64, +) -> Result<()> { + sqlx::query( + r#" + UPDATE index_metadata + SET current_latest_block_number = $1, + updated_at = CURRENT_TIMESTAMP + "#, + ) + .bind(block_number) + .execute(&mut **db_tx) + .await?; + + Ok(()) +} + +// TODO: allow dead code for now. Adding tests in future PRs should allow us to remove this. +#[allow(dead_code)] +pub async fn update_backfilling_block_number_query( + db_tx: &mut sqlx::Transaction<'_, Postgres>, + block_number: i64, +) -> Result<()> { + sqlx::query( + r#" + UPDATE index_metadata + SET backfilling_block_number = $1, + updated_at = CURRENT_TIMESTAMP + "#, + ) + .bind(block_number) + .execute(&mut **db_tx) + .await?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + // TODO: add tests here with db +} diff --git a/src/repositories/mod.rs b/src/repositories/mod.rs new file mode 100644 index 0000000..614ca8d --- /dev/null +++ b/src/repositories/mod.rs @@ -0,0 +1,2 @@ +pub mod block_header; +pub mod index_metadata; diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 74b8c85..7895e87 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1,7 +1,4 @@ -use crate::types::{ - type_utils::convert_hex_string_to_i64, BlockHeaderWithEmptyTransaction, - BlockHeaderWithFullTransaction, -}; +use crate::utils::convert_hex_string_to_i64; use eyre::{Context, Result}; use once_cell::sync::Lazy; use reqwest::Client; @@ -28,16 +25,128 @@ pub struct RpcResponse { #[derive(Serialize)] struct RpcRequest<'a, T> { jsonrpc: &'a str, - id: &'a str, + id: String, method: &'a str, params: T, } +#[derive(Debug, Deserialize, Clone)] +pub struct Transaction { + pub hash: String, + #[serde(rename(deserialize = "blockNumber"))] + pub block_number: String, + #[serde(rename(deserialize = "transactionIndex"))] + pub transaction_index: String, + pub value: String, + #[serde(rename(deserialize = "gasPrice"))] + pub gas_price: Option, + pub gas: String, + pub from: Option, + pub to: Option, + #[serde(rename(deserialize = "maxPriorityFeePerGas"))] + pub max_priority_fee_per_gas: Option, + #[serde(rename(deserialize = "maxFeePerGas"))] + pub max_fee_per_gas: Option, + #[serde(rename(deserialize = "chainId"))] + pub chain_id: Option, +} + +#[derive(Debug, Deserialize)] +#[allow(dead_code)] +pub struct BlockHeaderWithEmptyTransaction { + #[serde(rename(deserialize = "gasLimit"))] + pub gas_limit: String, + #[serde(rename(deserialize = "gasUsed"))] + pub gas_used: String, + #[serde(rename(deserialize = "baseFeePerGas"))] + pub base_fee_per_gas: Option, + pub hash: String, + pub nonce: Option, + pub number: String, + #[serde(rename(deserialize = "receiptsRoot"))] + pub receipts_root: String, + #[serde(rename(deserialize = "stateRoot"))] + pub state_root: String, + #[serde(rename(deserialize = "transactionsRoot"))] + pub transactions_root: String, + #[serde(rename(deserialize = "parentHash"))] + pub parent_hash: Option, + #[serde(rename(deserialize = "miner"))] + pub miner: Option, + #[serde(rename(deserialize = "logsBloom"))] + pub logs_bloom: Option, + #[serde(rename(deserialize = "difficulty"))] + pub difficulty: Option, + #[serde(rename(deserialize = "totalDifficulty"))] + pub total_difficulty: Option, + #[serde(rename(deserialize = "sha3Uncles"))] + pub sha3_uncles: Option, + #[serde(rename(deserialize = "timestamp"))] + pub timestamp: String, + #[serde(rename(deserialize = "extraData"))] + pub extra_data: Option, + #[serde(rename(deserialize = "mixHash"))] + pub mix_hash: Option, + #[serde(rename(deserialize = "withdrawalsRoot"))] + pub withdrawals_root: Option, + #[serde(rename(deserialize = "blobGasUsed"))] + pub blob_gas_used: Option, + #[serde(rename(deserialize = "excessBlobGas"))] + pub excess_blob_gas: Option, + #[serde(rename(deserialize = "parentBeaconBlockRoot"))] + pub parent_beacon_block_root: Option, +} + +#[derive(Debug, Deserialize)] +pub struct BlockHeaderWithFullTransaction { + #[serde(rename(deserialize = "gasLimit"))] + pub gas_limit: String, + #[serde(rename(deserialize = "gasUsed"))] + pub gas_used: String, + #[serde(rename(deserialize = "baseFeePerGas"))] + pub base_fee_per_gas: Option, + pub hash: String, + pub nonce: Option, + pub number: String, + #[serde(rename(deserialize = "receiptsRoot"))] + pub receipts_root: Option, + #[serde(rename(deserialize = "stateRoot"))] + pub state_root: Option, + #[serde(rename(deserialize = "transactionsRoot"))] + pub transactions_root: Option, + pub transactions: Vec, + #[serde(rename(deserialize = "parentHash"))] + pub parent_hash: Option, + pub miner: Option, + #[serde(rename(deserialize = "logsBloom"))] + pub logs_bloom: Option, + #[serde(rename(deserialize = "difficulty"))] + pub difficulty: Option, + #[serde(rename(deserialize = "totalDifficulty"))] + pub total_difficulty: Option, + #[serde(rename(deserialize = "sha3Uncles"))] + pub sha3_uncles: Option, + #[serde(rename(deserialize = "timestamp"))] + pub timestamp: String, + #[serde(rename(deserialize = "extraData"))] + pub extra_data: Option, + #[serde(rename(deserialize = "mixHash"))] + pub mix_hash: Option, + #[serde(rename(deserialize = "withdrawalsRoot"))] + pub withdrawals_root: Option, + #[serde(rename(deserialize = "blobGasUsed"))] + pub blob_gas_used: Option, + #[serde(rename(deserialize = "excessBlobGas"))] + pub excess_blob_gas: Option, + #[serde(rename(deserialize = "parentBeaconBlockRoot"))] + pub parent_beacon_block_root: Option, +} + pub async fn get_latest_finalized_blocknumber(timeout: Option) -> Result { // TODO: Id should be different on every request, this is how request are identified by us and by the node. let params = RpcRequest { jsonrpc: "2.0", - id: "0", + id: "0".to_string(), method: "eth_getBlockByNumber", params: ("finalized", false), }; @@ -61,7 +170,7 @@ pub async fn get_full_block_by_number( ) -> Result { let params = RpcRequest { jsonrpc: "2.0", - id: "0", + id: "0".to_string(), method: "eth_getBlockByNumber", params: (format!("0x{:x}", number), true), }; @@ -74,6 +183,25 @@ pub async fn get_full_block_by_number( .await } +// TODO: Make this work as expected +#[allow(dead_code)] +pub async fn batch_get_full_block_by_number( + numbers: Vec, + timeout: Option, +) -> Result> { + let mut params = Vec::new(); + for number in numbers { + let num_str = number.to_string(); + params.push(RpcRequest { + jsonrpc: "2.0", + id: num_str, + method: "eth_getBlockByNumber", + params: (format!("0x{:x}", number), true), + }); + } + make_rpc_call::<_, Vec>(¶ms, timeout).await +} + async fn make_rpc_call Deserialize<'de>>( params: &T, timeout: Option, diff --git a/src/types/mod.rs b/src/types/mod.rs deleted file mode 100644 index a2ec6a2..0000000 --- a/src/types/mod.rs +++ /dev/null @@ -1,115 +0,0 @@ -pub mod type_utils; - -use serde::Deserialize; - -#[derive(Debug, Deserialize)] -pub struct Transaction { - pub hash: String, - #[serde(rename(deserialize = "blockNumber"))] - pub block_number: String, - #[serde(rename(deserialize = "transactionIndex"))] - pub transaction_index: String, - pub value: String, - #[serde(rename(deserialize = "gasPrice"))] - pub gas_price: Option, - pub gas: String, - pub from: Option, - pub to: Option, - #[serde(rename(deserialize = "maxPriorityFeePerGas"))] - pub max_priority_fee_per_gas: Option, - #[serde(rename(deserialize = "maxFeePerGas"))] - pub max_fee_per_gas: Option, - #[serde(rename(deserialize = "chainId"))] - pub chain_id: Option, -} - -#[derive(Debug, Deserialize)] -#[allow(dead_code)] -pub struct BlockHeaderWithEmptyTransaction { - #[serde(rename(deserialize = "gasLimit"))] - pub gas_limit: String, - #[serde(rename(deserialize = "gasUsed"))] - pub gas_used: String, - #[serde(rename(deserialize = "baseFeePerGas"))] - pub base_fee_per_gas: Option, - pub hash: String, - pub nonce: Option, - pub number: String, - #[serde(rename(deserialize = "receiptsRoot"))] - pub receipts_root: String, - #[serde(rename(deserialize = "stateRoot"))] - pub state_root: String, - #[serde(rename(deserialize = "transactionsRoot"))] - pub transactions_root: String, - #[serde(rename(deserialize = "parentHash"))] - pub parent_hash: Option, - #[serde(rename(deserialize = "miner"))] - pub miner: Option, - #[serde(rename(deserialize = "logsBloom"))] - pub logs_bloom: Option, - #[serde(rename(deserialize = "difficulty"))] - pub difficulty: Option, - #[serde(rename(deserialize = "totalDifficulty"))] - pub total_difficulty: Option, - #[serde(rename(deserialize = "sha3Uncles"))] - pub sha3_uncles: Option, - #[serde(rename(deserialize = "timestamp"))] - pub timestamp: String, - #[serde(rename(deserialize = "extraData"))] - pub extra_data: Option, - #[serde(rename(deserialize = "mixHash"))] - pub mix_hash: Option, - #[serde(rename(deserialize = "withdrawalsRoot"))] - pub withdrawals_root: Option, - #[serde(rename(deserialize = "blobGasUsed"))] - pub blob_gas_used: Option, - #[serde(rename(deserialize = "excessBlobGas"))] - pub excess_blob_gas: Option, - #[serde(rename(deserialize = "parentBeaconBlockRoot"))] - pub parent_beacon_block_root: Option, -} - -#[derive(Debug, Deserialize)] -pub struct BlockHeaderWithFullTransaction { - #[serde(rename(deserialize = "gasLimit"))] - pub gas_limit: String, - #[serde(rename(deserialize = "gasUsed"))] - pub gas_used: String, - #[serde(rename(deserialize = "baseFeePerGas"))] - pub base_fee_per_gas: Option, - pub hash: String, - pub nonce: Option, - pub number: String, - #[serde(rename(deserialize = "receiptsRoot"))] - pub receipts_root: Option, - #[serde(rename(deserialize = "stateRoot"))] - pub state_root: Option, - #[serde(rename(deserialize = "transactionsRoot"))] - pub transactions_root: Option, - pub transactions: Vec, - #[serde(rename(deserialize = "parentHash"))] - pub parent_hash: Option, - pub miner: Option, - #[serde(rename(deserialize = "logsBloom"))] - pub logs_bloom: Option, - #[serde(rename(deserialize = "difficulty"))] - pub difficulty: Option, - #[serde(rename(deserialize = "totalDifficulty"))] - pub total_difficulty: Option, - #[serde(rename(deserialize = "sha3Uncles"))] - pub sha3_uncles: Option, - #[serde(rename(deserialize = "timestamp"))] - pub timestamp: String, - #[serde(rename(deserialize = "extraData"))] - pub extra_data: Option, - #[serde(rename(deserialize = "mixHash"))] - pub mix_hash: Option, - #[serde(rename(deserialize = "withdrawalsRoot"))] - pub withdrawals_root: Option, - #[serde(rename(deserialize = "blobGasUsed"))] - pub blob_gas_used: Option, - #[serde(rename(deserialize = "excessBlobGas"))] - pub excess_blob_gas: Option, - #[serde(rename(deserialize = "parentBeaconBlockRoot"))] - pub parent_beacon_block_root: Option, -} diff --git a/src/types/type_utils.rs b/src/utils.rs similarity index 100% rename from src/types/type_utils.rs rename to src/utils.rs