diff --git a/database/src/postgres/rpc_server.rs b/database/src/postgres/rpc_server.rs index 65975b12..0f278069 100644 --- a/database/src/postgres/rpc_server.rs +++ b/database/src/postgres/rpc_server.rs @@ -74,7 +74,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { } else { crate::postgres::PageState::new(1000) }; - let mut stream = sqlx::query_as::<_, (String, Vec)>( + // Start a transaction + let mut tx = shard_id_pool.pool.begin().await?; + // Set a timeout only for this transaction + sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout + .execute(&mut *tx) + .await?; + + let result = sqlx::query_as::<_, (String, Vec)>( " WITH latest_blocks AS ( SELECT @@ -113,11 +120,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { .bind(bigdecimal::BigDecimal::from(block_height)) .bind(page_state.last_data_key.clone()) .bind(page_state.page_size) - .fetch(shard_id_pool.pool); + .fetch_all(&mut *tx) + .await?; + + // Rollback or commit the transaction + tx.rollback().await?; + let mut items = std::collections::HashMap::new(); let mut last_data_key = String::new(); - while let Some(row) = stream.next().await { - let (key, value): (String, Vec) = row?; + for (key, value) in result { last_data_key.clone_from(&key); items.insert(hex::decode(key)?, value); } @@ -151,7 +162,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { ]) .inc(); let mut items = std::collections::HashMap::new(); - let mut stream = sqlx::query_as::<_, (String, Vec)>( + + // Start a transaction + let mut tx = shard_id_pool.pool.begin().await?; + // Set a timeout only for this transaction + sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout + .execute(&mut *tx) + .await?; + + let result = sqlx::query_as::<_, (String, Vec)>( " WITH latest_blocks AS ( SELECT @@ -183,9 +202,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { .bind(account_id.to_string()) .bind(format!("{}%", hex::encode(prefix))) .bind(bigdecimal::BigDecimal::from(block_height)) - .fetch(shard_id_pool.pool); - while let Some(row) = stream.next().await { - let (key, value): (String, Vec) = row?; + .fetch_all(&mut *tx) + .await?; + + // Rollback or commit the transaction + tx.rollback().await?; + + for (key, value) in result { items.insert(hex::decode(key)?, value); } Ok(items) @@ -208,7 +231,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { ]) .inc(); let mut items = std::collections::HashMap::new(); - let mut stream = sqlx::query_as::<_, (String, Vec)>( + // Start a transaction + let mut tx = shard_id_pool.pool.begin().await?; + // Set a timeout only for this transaction + sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout + .execute(&mut *tx) + .await?; + + let result = sqlx::query_as::<_, (String, Vec)>( " WITH latest_blocks AS ( SELECT @@ -238,9 +268,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { ) .bind(account_id.to_string()) .bind(bigdecimal::BigDecimal::from(block_height)) - .fetch(shard_id_pool.pool); - while let Some(row) = stream.next().await { - let (key, value): (String, Vec) = row?; + .fetch_all(&mut *tx) + .await?; + + // Rollback or commit the transaction + tx.rollback().await?; + + for (key, value) in result { items.insert(hex::decode(key)?, value); } Ok(items) @@ -413,7 +447,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { ]) .inc(); let mut access_keys = vec![]; - let mut stream = sqlx::query_as::<_, (String, Vec, bigdecimal::BigDecimal)>( + + // Start a transaction + let mut tx = shard_id_pool.pool.begin().await?; + // Set a timeout only for this transaction + sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout + .execute(&mut *tx) + .await?; + + let result = sqlx::query_as::<_, (String, Vec, bigdecimal::BigDecimal)>( " WITH latest_blocks AS ( SELECT @@ -446,9 +488,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager { ) .bind(account_id.to_string()) .bind(bigdecimal::BigDecimal::from(block_height)) - .fetch(shard_id_pool.pool); - while let Some(row) = stream.next().await { - let (public_key_hex, access_key, _): (String, Vec, _) = row?; + .fetch_all(&mut *tx) + .await?; + + // Rollback or commit the transaction + tx.rollback().await?; + + for (public_key_hex, access_key, _) in result { let access_key_view = near_primitives::views::AccessKeyInfoView { public_key: borsh::from_slice::(&hex::decode( public_key_hex, diff --git a/rpc-server/src/modules/queries/contract_runner/code_storage.rs b/rpc-server/src/modules/queries/contract_runner/code_storage.rs index 4c424a2a..bc31ec0f 100644 --- a/rpc-server/src/modules/queries/contract_runner/code_storage.rs +++ b/rpc-server/src/modules/queries/contract_runner/code_storage.rs @@ -58,6 +58,7 @@ impl CodeStorage { "query_call_function", ) .await + .unwrap_or_default() } else { HashMap::new() }; diff --git a/rpc-server/src/modules/queries/contract_runner/mod.rs b/rpc-server/src/modules/queries/contract_runner/mod.rs index 4aa88cfb..177c2885 100644 --- a/rpc-server/src/modules/queries/contract_runner/mod.rs +++ b/rpc-server/src/modules/queries/contract_runner/mod.rs @@ -62,7 +62,7 @@ pub async fn run_contract( readnode_primitives::StateKey, Option, >, - prefetch_state_size_limit: u64, + _prefetch_state_size_limit: u64, ) -> Result { let contract = db_manager .get_account(account_id, block.header.height, "query_call_function") @@ -156,7 +156,7 @@ pub async fn run_contract( .map(|code| code.data.len()) .unwrap_or_default() }; - let state_size = contract + let _state_size = contract .data .storage_usage() .saturating_sub(code_len as u64); @@ -167,7 +167,8 @@ pub async fn run_contract( block.header.height, validators, optimistic_data, - state_size <= prefetch_state_size_limit, + // state_size <= prefetch_state_size_limit, + false, ) .await; diff --git a/rpc-server/src/modules/queries/methods.rs b/rpc-server/src/modules/queries/methods.rs index 4a7b61b3..6dbe9725 100644 --- a/rpc-server/src/modules/queries/methods.rs +++ b/rpc-server/src/modules/queries/methods.rs @@ -503,7 +503,12 @@ async fn optimistic_view_state( prefix, "query_view_state", ) - .await; + .await + .map_err( + |err| near_jsonrpc::primitives::types::query::RpcQueryError::InternalError { + error_message: err.to_string(), + }, + )?; let mut values: Vec = state_from_db .into_iter() @@ -551,7 +556,12 @@ async fn database_view_state( prefix, "query_view_state", ) - .await; + .await + .map_err( + |err| near_jsonrpc::primitives::types::query::RpcQueryError::InternalError { + error_message: err.to_string(), + }, + )?; let values: Vec = state_from_db .into_iter() diff --git a/rpc-server/src/modules/queries/utils.rs b/rpc-server/src/modules/queries/utils.rs index 59f9bb14..c19ef1ce 100644 --- a/rpc-server/src/modules/queries/utils.rs +++ b/rpc-server/src/modules/queries/utils.rs @@ -33,7 +33,7 @@ pub async fn get_state_from_db( block_height: near_primitives::types::BlockHeight, prefix: &[u8], method_name: &str, -) -> HashMap { +) -> anyhow::Result> { tracing::debug!( "`get_state_from_db` call. AccountId {}, block {}, prefix {:?}", account_id, @@ -43,5 +43,4 @@ pub async fn get_state_from_db( db_manager .get_account_state(account_id, block_height, prefix, method_name) .await - .unwrap_or_default() }