diff --git a/.changeset/curly-ligers-crash.md b/.changeset/curly-ligers-crash.md new file mode 100644 index 000000000..0c14e449a --- /dev/null +++ b/.changeset/curly-ligers-crash.md @@ -0,0 +1,5 @@ +--- +"@nomicfoundation/edr": patch +--- + +Removed usage of batch calls to prevent 429 errors diff --git a/.github/workflows/edr-benchmark.yml b/.github/workflows/edr-benchmark.yml index f1fccac22..a6bd23d42 100644 --- a/.github/workflows/edr-benchmark.yml +++ b/.github/workflows/edr-benchmark.yml @@ -35,7 +35,9 @@ jobs: environment: github-action-benchmark runs-on: self-hosted # Only run for trusted collaborators since third-parties could run malicious code on the self-hosted benchmark runner. - if: github.ref == 'refs/heads/main' || (github.event.pull_request.author_association == 'OWNER' || github.event.pull_request.author_association == 'MEMBER' || github.event.pull_request.author_association == 'COLLABORATOR') + # Temporarily disabled after moving to our own repo; see https://github.com/NomicFoundation/edr/issues/402 + # if: github.ref == 'refs/heads/main' || (github.event.pull_request.author_association == 'OWNER' || github.event.pull_request.author_association == 'MEMBER' || github.event.pull_request.author_association == 'COLLABORATOR') + if: false steps: - uses: actions/checkout@v3 diff --git a/.github/workflows/edr-ci.yml b/.github/workflows/edr-ci.yml index 569bf622e..6ab1356fd 100644 --- a/.github/workflows/edr-ci.yml +++ b/.github/workflows/edr-ci.yml @@ -151,11 +151,7 @@ jobs: args: --all --check - name: Run cargo clippy - uses: auguwu/clippy-action@1.3.0 - with: - token: ${{ secrets.GITHUB_TOKEN }} - all-features: true - check-args: --workspace --all-targets -- -Dwarnings + run: cargo clippy --workspace --all-targets --all-features -- -D warnings edr-docs: name: Build EDR Docs diff --git a/crates/edr_eth/Cargo.toml b/crates/edr_eth/Cargo.toml index 70ce4c0f1..e0887e895 100644 --- a/crates/edr_eth/Cargo.toml +++ b/crates/edr_eth/Cargo.toml @@ -25,7 +25,7 @@ serde = { version = "1.0.147", default-features = false, features = ["derive"], serde_json = { version = "1.0.89", optional = true } sha3 = { version = "0.10.8", default-features = false } thiserror = { version = "1.0.37", default-features = false } -tokio = { version = "1.21.2", default-features = false, features = ["fs", "sync"] } +tokio = { version = "1.21.2", default-features = false, features = ["fs", "macros", "sync"] } tracing = { version = "0.1.37", features = ["attributes", "std"], optional = true } triehash = { version = "0.8.4", default-features = false } uuid = { version = "1.4.1", default-features = false, features = ["v4"] } diff --git a/crates/edr_eth/src/remote/client.rs b/crates/edr_eth/src/remote/client.rs index c5bc37b52..dcbef5018 100644 --- a/crates/edr_eth/src/remote/client.rs +++ b/crates/edr_eth/src/remote/client.rs @@ -1,19 +1,16 @@ mod reqwest_error; use std::{ - collections::{HashMap, VecDeque}, fmt::Debug, io, path::{Path, PathBuf}, sync::atomic::{AtomicU64, Ordering}, - thread::available_parallelism, time::{Duration, Instant}, }; use futures::stream::StreamExt; use hyper::header::HeaderValue; pub use hyper::{header, http::Error as HttpError, HeaderMap}; -use itertools::{izip, Itertools}; use reqwest::Client as HttpClient; use reqwest_middleware::{ClientBuilder as HttpClientBuilder, ClientWithMiddleware}; use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware}; @@ -103,17 +100,6 @@ pub enum RpcClientError { #[error(transparent)] InvalidUrl(#[from] url::ParseError), - /// A response is missing from a batch request. - #[error("Missing response for method: '{method:?}' for request id: '{id:?}' in batch request")] - MissingResponse { - /// The method invocation for which the response is missing. - method: Box, - /// The id of the request iwth the missing response - id: Id, - /// The response text - response: String, - }, - /// The JSON-RPC returned an error. #[error("{error}. Request: {request}")] JsonRpcError { @@ -287,7 +273,7 @@ impl RpcClient { let path = self.make_cache_path(cache_key.as_ref()).await?; match tokio::fs::read_to_string(&path).await { Ok(contents) => match serde_json::from_str(&contents) { - Ok(value) => Ok(Some(ResponseValue::Cached { value, path })), + Ok(value) => Ok(Some(ResponseValue { value, path })), Err(error) => { log_cache_error( cache_key.as_ref(), @@ -498,13 +484,6 @@ impl RpcClient { .map_err(|err| RpcClientError::CorruptedResponse(err.into())) } - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] - fn get_ids(&self, count: u64) -> Vec { - let start = self.next_id.fetch_add(count, Ordering::Relaxed); - let end = start + count; - (start..end).map(Id::Num).collect() - } - fn serialize_request( &self, input: &RequestMethod, @@ -595,101 +574,6 @@ impl RpcClient { .and_then(|response| Self::extract_result(request, response)) } - /// Returns the results of the given method invocations. - async fn batch_call( - &self, - methods: &[RequestMethod], - ) -> Result, RpcClientError> { - self.batch_call_with_resolver(methods, |_| None).await - } - - #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip_all))] - async fn batch_call_with_resolver( - &self, - methods: &[RequestMethod], - resolve_block_number: impl Fn(&serde_json::Value) -> Option, - ) -> Result, RpcClientError> { - let ids = self.get_ids(methods.len() as u64); - - let cache_keys = methods.iter().map(try_read_cache_key).collect::>(); - - let mut results: Vec> = Vec::with_capacity(cache_keys.len()); - - for cache_key in &cache_keys { - results.push(self.try_from_cache(cache_key.as_ref()).await?); - } - - let mut requests: Vec = Vec::new(); - let mut id_to_index = HashMap::<&Id, usize>::new(); - for (index, (id, method, cache_response)) in izip!(&ids, methods, &results).enumerate() { - if cache_response.is_none() { - #[cfg(feature = "tracing")] - tracing::trace!("Cache miss: {}", method.name()); - - let request = Self::serialize_request_with_id(method, id.clone())?; - requests.push(request); - id_to_index.insert(id, index); - } else { - #[cfg(feature = "tracing")] - tracing::trace!("Cache hit: {}", method.name()); - } - } - - // Don't send empty request - if requests.is_empty() { - Ok(results - .into_iter() - .enumerate() - .map(|(_index, result)| { - result.expect("If there are no requests to send, there must be a cached response for each method invocation") - }) - .collect()) - } else { - let request_body = SerializedRequest( - serde_json::to_value(&requests).map_err(RpcClientError::InvalidJsonRequest)?, - ); - let remote_response = self.send_request_body(&request_body).await?; - let remote_responses: Vec> = - Self::parse_response_str(&remote_response)?; - - for response in remote_responses { - let index = id_to_index - // Remove to make sure no duplicate ids in response - .remove(&response.id) - .ok_or_else(|| RpcClientError::InvalidId { - response: remote_response.clone(), - id: response.id, - })?; - - let result = - response - .data - .into_result() - .map_err(|error| RpcClientError::JsonRpcError { - error, - request: request_body.to_json_string(), - })?; - - self.try_write_response_to_cache(&methods[index], &result, &resolve_block_number) - .await?; - - results[index] = Some(ResponseValue::Remote(result)); - } - - results - .into_iter() - .enumerate() - .map(|(index, result)| { - result.ok_or_else(|| RpcClientError::MissingResponse { - method: Box::new(methods[index].clone()), - id: ids[index].clone(), - response: remote_response.clone(), - }) - }) - .collect() - } - } - /// Calls `eth_blockNumber` and returns the block number. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn block_number(&self) -> Result { @@ -755,74 +639,24 @@ impl RpcClient { .await } - /// Fetch the latest block number, chain id and network id in a batch call. + /// Fetch the latest block number, chain id and network id concurrently. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn fetch_fork_metadata(&self) -> Result { - let mut inputs = vec![RequestMethod::NetVersion(())]; - - let maybe_block_number = self.maybe_cached_block_number().await?; - if maybe_block_number.is_none() { - inputs.push(RequestMethod::BlockNumber(())); - } - - // Only request the chain id if we don't have it yet. - let mut maybe_chain_id_from_url = None; - if !self.chain_id.initialized() { - maybe_chain_id_from_url = chain_id_from_url(&self.url); - if maybe_chain_id_from_url.is_none() { - inputs.push(RequestMethod::ChainId(())); - } - }; + let network_id = self.network_id(); + let block_number = self.block_number(); + let chain_id = self.chain_id(); - let mut results = self.batch_call(inputs.as_slice()).await?.into_iter(); - let expect = "batch call returns results for all calls on success"; - - let network_id = results.next().expect(expect).parse::().await?; - - let block_number = if let Some(block_number) = maybe_block_number { - block_number - } else { - let block_number = results - .next() - .expect(expect) - .parse::() - .await? - .as_limbs()[0]; - { - let mut write_guard = self.cached_block_number.write().await; - *write_guard = Some(CachedBlockNumber::new(block_number)); - } - block_number - }; - - let chain_id = *self - .chain_id - .get_or_try_init(|| async { - if let Some(chain_id) = maybe_chain_id_from_url { - Ok(chain_id) - } else { - // It's possible that the chain id was initialized in-between, but it's not - // possible that the chain id was initialized prior to our - // call, and it isn't initialized now, therefore we must've requested - // the chain id as well. - results - .next() - .expect(expect) - .parse::() - .await - .map(|chain_id| chain_id.as_limbs()[0]) - } - }) - .await?; + let (network_id, block_number, chain_id) = + tokio::try_join!(network_id, block_number, chain_id)?; Ok(ForkMetadata { chain_id, - network_id: network_id.as_limbs()[0], + network_id, latest_block_number: block_number, }) } - /// Submit a consolidated batch of RPC method invocations in order to obtain + /// Submits three concurrent RPC method invocations in order to obtain /// the set of data contained in [`AccountInfo`]. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn get_account_info( @@ -830,21 +664,12 @@ impl RpcClient { address: &Address, block: Option, ) -> Result { - let inputs = &[ - RequestMethod::GetBalance(*address, block.clone()), - RequestMethod::GetTransactionCount(*address, block.clone()), - RequestMethod::GetCode(*address, block.clone()), - ]; - - let responses = self.batch_call(inputs).await?; - let (balance, nonce, code) = responses - .into_iter() - .collect_tuple() - .expect("batch call checks responses"); + let balance = self.get_balance(address, block.clone()); + let nonce = self.get_transaction_count(address, block.clone()); + let code = self.get_code(address, block.clone()); + + let (balance, nonce, code) = tokio::try_join!(balance, nonce, code)?; - let balance = balance.parse::().await?; - let nonce: u64 = nonce.parse::().await?.to(); - let code = code.parse::().await?; let code = if code.is_empty() { None } else { @@ -855,11 +680,11 @@ impl RpcClient { balance, code_hash: code.as_ref().map_or(KECCAK_EMPTY, Bytecode::hash_slow), code, - nonce, + nonce: nonce.to(), }) } - /// Fetch account infos for multiple addresses in a batch call. + /// Fetch account infos for multiple addresses using concurrent requests. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn get_account_infos( &self, @@ -868,7 +693,7 @@ impl RpcClient { ) -> Result, RpcClientError> { futures::stream::iter(addresses.iter()) .map(|address| self.get_account_info(address, block.clone())) - .buffered(MAX_PARALLEL_REQUESTS) + .buffered(MAX_PARALLEL_REQUESTS / 3 + 1) .collect::>>() .await .into_iter() @@ -884,6 +709,16 @@ impl RpcClient { self.call(RequestMethod::GetBlockByHash(*hash, false)).await } + /// Calls `eth_getBalance`. + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] + pub async fn get_balance( + &self, + address: &Address, + block: Option, + ) -> Result { + self.call(RequestMethod::GetBalance(*address, block)).await + } + /// Calls `eth_getBlockByHash` and returns the transaction's data. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn get_block_by_hash_with_transaction_data( @@ -919,6 +754,16 @@ impl RpcClient { .await } + /// Calls `eth_getCode`. + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] + pub async fn get_code( + &self, + address: &Address, + block: Option, + ) -> Result { + self.call(RequestMethod::GetCode(*address, block)).await + } + /// Calls `eth_getLogs` using a starting and ending block (inclusive). #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn get_logs_by_range( @@ -969,23 +814,19 @@ impl RpcClient { .await } - /// Methods for retrieving multiple transaction receipts in one batch + /// Methods for retrieving multiple transaction receipts using concurrent + /// requests. #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace", skip(self)))] pub async fn get_transaction_receipts( &self, hashes: impl IntoIterator + Debug, ) -> Result>, RpcClientError> { - let requests: Vec = hashes + let requests = hashes .into_iter() - .map(|transaction_hash| RequestMethod::GetTransactionReceipt(*transaction_hash)) - .collect(); + .map(|transaction_hash| self.get_transaction_receipt(transaction_hash)); - let responses = self.batch_call(&requests).await?; - - futures::stream::iter(responses) - .map(ResponseValue::parse) - // Primarily CPU heavy work, only does i/o on error. - .buffered(available_parallelism().map(usize::from).unwrap_or(1)) + futures::stream::iter(requests) + .buffered(MAX_PARALLEL_REQUESTS) .collect::, RpcClientError>>>() .await .into_iter() @@ -1028,39 +869,25 @@ async fn remove_from_cache(path: &Path) -> Result<(), RpcClientError> { } #[derive(Debug, Clone)] -enum ResponseValue { - Remote(serde_json::Value), - Cached { - value: serde_json::Value, - path: PathBuf, - }, +struct ResponseValue { + value: serde_json::Value, + path: PathBuf, } impl ResponseValue { async fn parse(self) -> Result { - match self { - ResponseValue::Remote(value) => { - serde_json::from_value(value.clone()).map_err(|error| { - RpcClientError::InvalidResponse { - response: value.to_string(), - expected_type: std::any::type_name::(), - error, - } + match serde_json::from_value(self.value.clone()) { + Ok(result) => Ok(result), + Err(error) => { + // Remove the file from cache if the contents don't match the expected type. + // This can happen for example if a new field is added to a type. + remove_from_cache(&self.path).await?; + Err(RpcClientError::InvalidResponse { + response: self.value.to_string(), + expected_type: std::any::type_name::(), + error, }) } - ResponseValue::Cached { value, path } => match serde_json::from_value(value.clone()) { - Ok(result) => Ok(result), - Err(error) => { - // Remove the file from cache if the contents don't match the expected type. - // This can happen for example if a new field is added to a type. - remove_from_cache(&path).await?; - Err(RpcClientError::InvalidResponse { - response: value.to_string(), - expected_type: std::any::type_name::(), - error, - }) - } - }, } } } @@ -1165,21 +992,6 @@ mod tests { } } - #[test] - fn get_ids_zero() { - let client = RpcClient::new("http://localhost:8545", PathBuf::new(), None).expect("url ok"); - let ids = client.get_ids(0); - assert!(ids.is_empty()); - } - - #[test] - fn get_ids_more() { - let client = RpcClient::new("http://localhost:8545", PathBuf::new(), None).expect("url ok"); - let count = 11; - let ids = client.get_ids(count); - assert_eq!(ids.len(), 11); - } - #[tokio::test] async fn send_request_body_400_status() { const STATUS_CODE: u16 = 400; @@ -2001,41 +1813,5 @@ mod tests { .await .expect("should have succeeded"); } - - #[tokio::test] - async fn handles_invalid_type_in_cache_batch_call() { - let alchemy_url = get_alchemy_url(); - let client = TestRpcClient::new(&alchemy_url); - - let dai_address = Address::from_str("0x6b175474e89094c44da98b954eedeac495271d0f") - .expect("failed to parse address"); - let block_spec = BlockSpec::Number(16220843); - - // Make an initial call to populate the cache. - client - .get_account_info(&dai_address, Some(block_spec.clone())) - .await - .expect("initial call should succeed"); - assert_eq!(client.files_in_cache().len(), 3); - - // Write some valid JSON, but invalid U256 - tokio::fs::write(&client.files_in_cache()[0], "\"not-hex\"") - .await - .unwrap(); - - // Call with invalid type in cache fails, but removes faulty cache item - client - .get_account_info(&dai_address, Some(block_spec.clone())) - .await - .expect_err("should fail due to invalid json in cache"); - assert_eq!(client.files_in_cache().len(), 2); - - // Subsequent call fetches removed cache item and succeeds. - client - .get_account_info(&dai_address, Some(block_spec.clone())) - .await - .expect("subsequent call should succeed"); - assert_eq!(client.files_in_cache().len(), 3); - } } } diff --git a/crates/edr_eth/src/remote/request_methods.rs b/crates/edr_eth/src/remote/request_methods.rs index d2f9ed2f7..908848327 100644 --- a/crates/edr_eth/src/remote/request_methods.rs +++ b/crates/edr_eth/src/remote/request_methods.rs @@ -100,6 +100,7 @@ pub enum RequestMethod { } impl RequestMethod { + #[cfg(feature = "tracing")] pub fn name(&self) -> &'static str { match self { Self::BlockNumber(_) => "eth_blockNumber", diff --git a/crates/edr_evm/src/state/trie.rs b/crates/edr_evm/src/state/trie.rs index 5224bae2c..dd74c899c 100644 --- a/crates/edr_evm/src/state/trie.rs +++ b/crates/edr_evm/src/state/trie.rs @@ -264,16 +264,20 @@ mod tests { let code_2_hash = code_2.hash_slow(); let address1 = Address::random(); - let mut account1 = AccountInfo::default(); - account1.code_hash = code_1_hash; - account1.code = Some(code_1); + let account1 = AccountInfo { + code_hash: code_1_hash, + code: Some(code_1), + ..Default::default() + }; state1.insert_account(address1, account1)?; state1.set_account_storage_slot(address1, U256::from(100), U256::from(100))?; let address2 = Address::random(); - let mut account2 = AccountInfo::default(); - account2.code_hash = code_2_hash; - account2.code = Some(code_2); + let account2 = AccountInfo { + code_hash: code_2_hash, + code: Some(code_2), + ..Default::default() + }; let mut state2 = state1.clone(); state2.insert_account(address2, account2)?; state2.set_account_storage_slot(address2, U256::from(200), U256::from(200))?;