Skip to content

Commit

Permalink
add database query timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jan 21, 2025
1 parent 15b7d23 commit fc4f7db
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 23 deletions.
78 changes: 62 additions & 16 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
} else {
crate::postgres::PageState::new(1000)
};
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
// Start a transaction
let mut tx = shard_id_pool.pool.begin().await?;
// Set a timeout only for this transaction
sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout
.execute(&mut *tx)
.await?;

let result = sqlx::query_as::<_, (String, Vec<u8>)>(
"
WITH latest_blocks AS (
SELECT
Expand Down Expand Up @@ -113,11 +120,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
.bind(bigdecimal::BigDecimal::from(block_height))
.bind(page_state.last_data_key.clone())
.bind(page_state.page_size)
.fetch(shard_id_pool.pool);
.fetch_all(&mut *tx)
.await?;

// Rollback or commit the transaction
tx.rollback().await?;

let mut items = std::collections::HashMap::new();
let mut last_data_key = String::new();
while let Some(row) = stream.next().await {
let (key, value): (String, Vec<u8>) = row?;
for (key, value) in result {
last_data_key.clone_from(&key);
items.insert(hex::decode(key)?, value);
}
Expand Down Expand Up @@ -151,7 +162,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(

// Start a transaction
let mut tx = shard_id_pool.pool.begin().await?;
// Set a timeout only for this transaction
sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout
.execute(&mut *tx)
.await?;

let result = sqlx::query_as::<_, (String, Vec<u8>)>(
"
WITH latest_blocks AS (
SELECT
Expand Down Expand Up @@ -183,9 +202,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
.bind(account_id.to_string())
.bind(format!("{}%", hex::encode(prefix)))
.bind(bigdecimal::BigDecimal::from(block_height))
.fetch(shard_id_pool.pool);
while let Some(row) = stream.next().await {
let (key, value): (String, Vec<u8>) = row?;
.fetch_all(&mut *tx)
.await?;

// Rollback or commit the transaction
tx.rollback().await?;

for (key, value) in result {
items.insert(hex::decode(key)?, value);
}
Ok(items)
Expand All @@ -208,7 +231,14 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
])
.inc();
let mut items = std::collections::HashMap::new();
let mut stream = sqlx::query_as::<_, (String, Vec<u8>)>(
// Start a transaction
let mut tx = shard_id_pool.pool.begin().await?;
// Set a timeout only for this transaction
sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout
.execute(&mut *tx)
.await?;

let result = sqlx::query_as::<_, (String, Vec<u8>)>(
"
WITH latest_blocks AS (
SELECT
Expand Down Expand Up @@ -238,9 +268,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
)
.bind(account_id.to_string())
.bind(bigdecimal::BigDecimal::from(block_height))
.fetch(shard_id_pool.pool);
while let Some(row) = stream.next().await {
let (key, value): (String, Vec<u8>) = row?;
.fetch_all(&mut *tx)
.await?;

// Rollback or commit the transaction
tx.rollback().await?;

for (key, value) in result {
items.insert(hex::decode(key)?, value);
}
Ok(items)
Expand Down Expand Up @@ -413,7 +447,15 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
])
.inc();
let mut access_keys = vec![];
let mut stream = sqlx::query_as::<_, (String, Vec<u8>, bigdecimal::BigDecimal)>(

// Start a transaction
let mut tx = shard_id_pool.pool.begin().await?;
// Set a timeout only for this transaction
sqlx::query("SET LOCAL statement_timeout = 5000") // 5 seconds timeout
.execute(&mut *tx)
.await?;

let result = sqlx::query_as::<_, (String, Vec<u8>, bigdecimal::BigDecimal)>(
"
WITH latest_blocks AS (
SELECT
Expand Down Expand Up @@ -446,9 +488,13 @@ impl crate::ReaderDbManager for crate::PostgresDBManager {
)
.bind(account_id.to_string())
.bind(bigdecimal::BigDecimal::from(block_height))
.fetch(shard_id_pool.pool);
while let Some(row) = stream.next().await {
let (public_key_hex, access_key, _): (String, Vec<u8>, _) = row?;
.fetch_all(&mut *tx)
.await?;

// Rollback or commit the transaction
tx.rollback().await?;

for (public_key_hex, access_key, _) in result {
let access_key_view = near_primitives::views::AccessKeyInfoView {
public_key: borsh::from_slice::<near_crypto::PublicKey>(&hex::decode(
public_key_hex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl CodeStorage {
"query_call_function",
)
.await
.unwrap_or_default()
} else {
HashMap::new()
};
Expand Down
7 changes: 4 additions & 3 deletions rpc-server/src/modules/queries/contract_runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn run_contract(
readnode_primitives::StateKey,
Option<readnode_primitives::StateValue>,
>,
prefetch_state_size_limit: u64,
_prefetch_state_size_limit: u64,
) -> Result<RunContractResponse, near_jsonrpc::primitives::types::query::RpcQueryError> {
let contract = db_manager
.get_account(account_id, block.header.height, "query_call_function")
Expand Down Expand Up @@ -156,7 +156,7 @@ pub async fn run_contract(
.map(|code| code.data.len())
.unwrap_or_default()
};
let state_size = contract
let _state_size = contract
.data
.storage_usage()
.saturating_sub(code_len as u64);
Expand All @@ -167,7 +167,8 @@ pub async fn run_contract(
block.header.height,
validators,
optimistic_data,
state_size <= prefetch_state_size_limit,
// state_size <= prefetch_state_size_limit,
false,
)
.await;

Expand Down
14 changes: 12 additions & 2 deletions rpc-server/src/modules/queries/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,12 @@ async fn optimistic_view_state(
prefix,
"query_view_state",
)
.await;
.await
.map_err(
|err| near_jsonrpc::primitives::types::query::RpcQueryError::InternalError {
error_message: err.to_string(),
},
)?;

let mut values: Vec<near_primitives::views::StateItem> = state_from_db
.into_iter()
Expand Down Expand Up @@ -551,7 +556,12 @@ async fn database_view_state(
prefix,
"query_view_state",
)
.await;
.await
.map_err(
|err| near_jsonrpc::primitives::types::query::RpcQueryError::InternalError {
error_message: err.to_string(),
},
)?;

let values: Vec<near_primitives::views::StateItem> = state_from_db
.into_iter()
Expand Down
3 changes: 1 addition & 2 deletions rpc-server/src/modules/queries/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub async fn get_state_from_db(
block_height: near_primitives::types::BlockHeight,
prefix: &[u8],
method_name: &str,
) -> HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> {
) -> anyhow::Result<HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue>> {
tracing::debug!(
"`get_state_from_db` call. AccountId {}, block {}, prefix {:?}",
account_id,
Expand All @@ -43,5 +43,4 @@ pub async fn get_state_from_db(
db_manager
.get_account_state(account_id, block_height, prefix, method_name)
.await
.unwrap_or_default()
}

0 comments on commit fc4f7db

Please sign in to comment.