From ac2c93349c28f87a7c7bb525b7c8fdbf17ebee14 Mon Sep 17 00:00:00 2001 From: Yurii Koba Date: Wed, 13 Dec 2023 15:54:56 +0200 Subject: [PATCH] improvement view_state and remove state limit --- database/src/base/rpc_server.rs | 5 ++- database/src/postgres/rpc_server.rs | 27 +++++++++----- database/src/scylladb/rpc_server.rs | 47 ++++++++++++++++--------- rpc-server/src/modules/queries/mod.rs | 22 +++++------- rpc-server/src/modules/queries/utils.rs | 21 +++++------ 5 files changed, 72 insertions(+), 50 deletions(-) diff --git a/database/src/base/rpc_server.rs b/database/src/base/rpc_server.rs index c7801e20..2b64f9dc 100644 --- a/database/src/base/rpc_server.rs +++ b/database/src/base/rpc_server.rs @@ -31,7 +31,10 @@ pub trait ReaderDbManager { account_id: &near_primitives::types::AccountId, block_height: near_primitives::types::BlockHeight, key_data: readnode_primitives::StateKey, - ) -> anyhow::Result; + ) -> ( + readnode_primitives::StateKey, + readnode_primitives::StateValue, + ); /// Returns the near_primitives::account::Account at the given block height async fn get_account( diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index 13396a49..e47d63c1 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -101,19 +101,28 @@ impl crate::ReaderDbManager for PostgresDBManager { account_id: &near_primitives::types::AccountId, block_height: near_primitives::types::BlockHeight, key_data: readnode_primitives::StateKey, - ) -> anyhow::Result { - let result = crate::models::StateChangesData::get_state_key_value( - Self::get_connection(&self.pg_pool).await?, + ) -> ( + readnode_primitives::StateKey, + readnode_primitives::StateValue, + ) { + let connection = if let Ok(pg_connection) = Self::get_connection(&self.pg_pool).await { + pg_connection + } else { + return (key_data, readnode_primitives::StateValue::default()); + }; + let result = if let Ok(result) = crate::models::StateChangesData::get_state_key_value( + connection, account_id, block_height, - hex::encode(key_data), + hex::encode(key_data.clone()), ) - .await?; - if let Some(value) = result { - Ok(value) + .await + { + result.unwrap_or_default() } else { - anyhow::bail!("State value not found") - } + readnode_primitives::StateValue::default() + }; + (key_data, result) } async fn get_account( diff --git a/database/src/scylladb/rpc_server.rs b/database/src/scylladb/rpc_server.rs index b10fd53b..d6139bc7 100644 --- a/database/src/scylladb/rpc_server.rs +++ b/database/src/scylladb/rpc_server.rs @@ -1,5 +1,6 @@ use crate::scylladb::ScyllaStorageManager; use borsh::{BorshDeserialize, BorshSerialize}; +use futures::StreamExt; use num_traits::ToPrimitive; use scylla::{prepared_statement::PreparedStatement, IntoTypedRows}; use std::convert::TryFrom; @@ -169,16 +170,19 @@ impl crate::ReaderDbManager for ScyllaDBManager { &self, account_id: &near_primitives::types::AccountId, ) -> anyhow::Result> { - let result = Self::execute_prepared_query( - &self.scylla_session, - &self.get_all_state_keys, - (account_id.to_string(),), - ) - .await? - .rows_typed::<(String,)>()? - .filter_map(|row| row.ok().and_then(|(value,)| hex::decode(value).ok())); - - Ok(result.collect()) + let mut paged_query = self.get_all_state_keys.clone(); + paged_query.set_page_size(25000); + let mut rows_stream = self + .scylla_session + .execute_iter(paged_query, (account_id.to_string(),)) + .await? + .into_typed::<(String,)>(); + let mut stata_keys = vec![]; + while let Some(next_row_res) = rows_stream.next().await { + let (value,): (String,) = next_row_res?; + stata_keys.push(hex::decode(value)?); + } + Ok(stata_keys) } /// Returns state keys for the given account id filtered by the given prefix @@ -209,8 +213,11 @@ impl crate::ReaderDbManager for ScyllaDBManager { account_id: &near_primitives::types::AccountId, block_height: near_primitives::types::BlockHeight, key_data: readnode_primitives::StateKey, - ) -> anyhow::Result { - let result = Self::execute_prepared_query( + ) -> ( + readnode_primitives::StateKey, + readnode_primitives::StateValue, + ) { + let value = match Self::execute_prepared_query( &self.scylla_session, &self.get_state_key_value, ( @@ -219,10 +226,18 @@ impl crate::ReaderDbManager for ScyllaDBManager { hex::encode(&key_data).to_string(), ), ) - .await? - .single_row_typed::<(readnode_primitives::StateValue,)>()?; - - Ok(result.0) + .await + { + Ok(result) => { + let (value,) = result + .single_row_typed::<(readnode_primitives::StateValue,)>() + .unwrap_or_default(); + value + } + Err(_) => readnode_primitives::StateValue::default(), + }; + + (key_data, value) } /// Returns the near_primitives::account::Account at the given block height diff --git a/rpc-server/src/modules/queries/mod.rs b/rpc-server/src/modules/queries/mod.rs index 73da3241..c22377d9 100644 --- a/rpc-server/src/modules/queries/mod.rs +++ b/rpc-server/src/modules/queries/mod.rs @@ -5,8 +5,6 @@ use std::collections::HashMap; pub mod methods; pub mod utils; -const MAX_LIMIT: u8 = 100; - pub type Result = ::std::result::Result; pub struct CodeStorage { @@ -69,14 +67,12 @@ impl near_vm_runner::logic::External for CodeStorage { let get_db_data = self.db_manager .get_state_key_value(&self.account_id, self.block_height, key.to_vec()); - match block_on(get_db_data) { - Ok(data) => Ok(if !data.is_empty() { - Some(Box::new(StorageValuePtr { value: data }) as Box<_>) - } else { - None - }), - Err(_) => Ok(None), - } + let (_, data) = block_on(get_db_data); + Ok(if !data.is_empty() { + Some(Box::new(StorageValuePtr { value: data }) as Box<_>) + } else { + None + }) } #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] @@ -109,10 +105,8 @@ impl near_vm_runner::logic::External for CodeStorage { let get_db_state_keys = self.db_manager .get_state_key_value(&self.account_id, self.block_height, key.to_vec()); - match block_on(get_db_state_keys) { - Ok(data) => Ok(!data.is_empty()), - Err(_) => Ok(false), - } + let (_, data) = block_on(get_db_state_keys); + Ok(!data.is_empty()) } #[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))] diff --git a/rpc-server/src/modules/queries/utils.rs b/rpc-server/src/modules/queries/utils.rs index d05d22f0..a8c7b229 100644 --- a/rpc-server/src/modules/queries/utils.rs +++ b/rpc-server/src/modules/queries/utils.rs @@ -10,7 +10,7 @@ use tokio::task; use crate::config::CompiledCodeCache; use crate::errors::FunctionCallError; -use crate::modules::queries::{CodeStorage, MAX_LIMIT}; +use crate::modules::queries::CodeStorage; pub struct RunContractResponse { pub result: Vec, @@ -48,18 +48,19 @@ pub async fn get_state_keys_from_db( }; match result { Ok(state_keys) => { - for state_key in state_keys { - let state_value_result = db_manager - .get_state_key_value(account_id, block_height, state_key.clone()) - .await; - if let Ok(state_value) = state_value_result { + for state_keys_chunk in state_keys.chunks(1000) { + // TODO: 1000 is hardcoded value. Make it configurable. + let mut tasks_futures = vec![]; + for state_key in state_keys_chunk { + let state_value_result_future = + db_manager.get_state_key_value(account_id, block_height, state_key.clone()); + tasks_futures.push(state_value_result_future); + } + let results = futures::future::join_all(tasks_futures).await; + for (state_key, state_value) in results.into_iter() { if !state_value.is_empty() { data.insert(state_key, state_value); } - }; - let keys_count = data.keys().len() as u8; - if keys_count > MAX_LIMIT { - return data; } } data