Skip to content

Commit

Permalink
add new andpoint view_state_paginated
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Dec 15, 2023
1 parent 05f88e0 commit 3a4665b
Show file tree
Hide file tree
Showing 10 changed files with 125 additions and 74 deletions.
7 changes: 2 additions & 5 deletions database/src/base/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ pub trait ReaderDbManager {
&self,
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
);
keys_data: Vec<readnode_primitives::StateKey>,
) -> std::collections::HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue>;

/// Returns the near_primitives::account::Account at the given block height
async fn get_account(
Expand Down
11 changes: 5 additions & 6 deletions database/src/postgres/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,17 @@ impl StateChangesData {
mut conn: crate::postgres::PgAsyncConn,
account_id: &str,
block_height: u64,
data_key: String,
) -> anyhow::Result<Option<Vec<u8>>> {
data_keys: Vec<String>,
) -> anyhow::Result<Vec<Self>> {
let response = state_changes_data::table
.filter(state_changes_data::account_id.eq(account_id))
.filter(state_changes_data::block_height.le(bigdecimal::BigDecimal::from(block_height)))
.filter(state_changes_data::data_key.eq(data_key))
.filter(state_changes_data::data_key.eq_any(data_keys))
.order(state_changes_data::block_height.desc())
.select(Self::as_select())
.first(&mut conn)
.load(&mut conn)
.await?;

Ok(response.data_value)
Ok(response)
}
}

Expand Down
29 changes: 19 additions & 10 deletions database/src/postgres/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,29 +100,38 @@ impl crate::ReaderDbManager for PostgresDBManager {
&self,
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
) {
keys_data: Vec<readnode_primitives::StateKey>,
) -> std::collections::HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> {
let connection = if let Ok(pg_connection) = Self::get_connection(&self.pg_pool).await {
pg_connection
} else {
return (key_data, readnode_primitives::StateValue::default());
return std::collections::HashMap::new();
};

let result = if let Ok(result) = crate::models::StateChangesData::get_state_key_value(
connection,
account_id,
block_height,
hex::encode(key_data.clone()),
keys_data.iter().map(hex::encode).collect::<Vec<String>>(),
)
.await
{
result.unwrap_or_default()
let mut data = std::collections::HashMap::new();
for state_data in result.into_iter() {
if let Some(data_value) = state_data.data_value {
let key = hex::decode(state_data.data_key).unwrap_or_default();
let value = readnode_primitives::StateValue::try_from(data_value)
.unwrap_or_default();
if !key.is_empty() && !value.is_empty() {
data.insert(key, value);
}
}
}
data
} else {
readnode_primitives::StateValue::default()
std::collections::HashMap::new()
};
(key_data, result)
result
}

async fn get_account(
Expand Down
41 changes: 21 additions & 20 deletions database/src/scylladb/rpc_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl ScyllaStorageManager for ScyllaDBManager {

get_state_key_value: Self::prepare_read_query(
&scylla_db_session,
"SELECT data_value FROM state_indexer.state_changes_data WHERE account_id = ? AND block_height <= ? AND data_key = ? LIMIT 1",
"SELECT data_key, data_value FROM state_indexer.state_changes_data WHERE account_id = ? AND block_height <= ? AND data_key IN ?",
).await?,

get_account: Self::prepare_read_query(
Expand Down Expand Up @@ -187,12 +187,12 @@ impl crate::ReaderDbManager for ScyllaDBManager {
.execute_iter(paged_query, (account_id.to_string(),))
.await?
.into_typed::<(String,)>();
let mut stata_keys = vec![];
let mut state_keys = vec![];
while let Some(next_row_res) = rows_stream.next().await {
let (value,): (String,) = next_row_res?;
stata_keys.push(hex::decode(value)?);
state_keys.push(hex::decode(value)?);
}
Ok(stata_keys)
Ok(state_keys)
}

/// Returns state keys for the given account id filtered by the given prefix
Expand Down Expand Up @@ -222,32 +222,33 @@ impl crate::ReaderDbManager for ScyllaDBManager {
&self,
account_id: &near_primitives::types::AccountId,
block_height: near_primitives::types::BlockHeight,
key_data: readnode_primitives::StateKey,
) -> (
readnode_primitives::StateKey,
readnode_primitives::StateValue,
) {
let value = match Self::execute_prepared_query(
keys_data: Vec<readnode_primitives::StateKey>,
) -> std::collections::HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> {
if let Ok(result) = Self::execute_prepared_query(
&self.scylla_session,
&self.get_state_key_value,
(
account_id.to_string(),
num_bigint::BigInt::from(block_height),
hex::encode(&key_data).to_string(),
keys_data.iter().map(hex::encode).collect::<Vec<String>>(),
),
)
.await
{
Ok(result) => {
let (value,) = result
.single_row_typed::<(readnode_primitives::StateValue,)>()
.unwrap_or_default();
value
if let Ok(rows) = result.rows() {
let mut values = std::collections::HashMap::new();
for row in rows {
if let Ok((key, value)) = row.into_typed::<(String, Vec<u8>)>() {
values.insert(hex::decode(key).unwrap(), value);
}
}
values
} else {
std::collections::HashMap::new()
}
Err(_) => readnode_primitives::StateValue::default(),
};

(key_data, value)
} else {
std::collections::HashMap::new()
}
}

/// Returns the near_primitives::account::Account at the given block height
Expand Down
4 changes: 4 additions & 0 deletions rpc-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,10 @@ async fn main() -> anyhow::Result<()> {
let rpc = Server::new()
.with_data(Data::new(state))
.with_method("query", modules::queries::methods::query)
.with_method(
"view_state_paginated",
modules::state::methods::view_state_paginated,
)
.with_method("block", modules::blocks::methods::block)
.with_method(
"EXPERIMENTAL_changes",
Expand Down
1 change: 1 addition & 0 deletions rpc-server/src/modules/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ pub mod gas;
pub mod network;
pub mod queries;
pub mod receipts;
pub mod state;
pub mod transactions;
34 changes: 23 additions & 11 deletions rpc-server/src/modules/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,18 @@ impl near_vm_runner::logic::External for CodeStorage {
key: &[u8],
_mode: near_vm_runner::logic::StorageGetMode,
) -> Result<Option<Box<dyn near_vm_runner::logic::ValuePtr>>> {
let get_db_data =
self.db_manager
.get_state_key_value(&self.account_id, self.block_height, key.to_vec());
let (_, data) = block_on(get_db_data);
Ok(if !data.is_empty() {
Some(Box::new(StorageValuePtr { value: data }) as Box<_>)
let get_db_data = self.db_manager.get_state_key_value(
&self.account_id,
self.block_height,
vec![key.to_vec()],
);
let data = block_on(get_db_data);
Ok(if let Some(value) = data.get(key).cloned() {
if value.is_empty() {
None
} else {
Some(Box::new(StorageValuePtr { value }) as Box<_>)
}
} else {
None
})
Expand Down Expand Up @@ -103,11 +109,17 @@ impl near_vm_runner::logic::External for CodeStorage {
key: &[u8],
_mode: near_vm_runner::logic::StorageGetMode,
) -> Result<bool> {
let get_db_state_keys =
self.db_manager
.get_state_key_value(&self.account_id, self.block_height, key.to_vec());
let (_, data) = block_on(get_db_state_keys);
Ok(!data.is_empty())
let get_db_state_keys = self.db_manager.get_state_key_value(
&self.account_id,
self.block_height,
vec![key.to_vec()],
);
let data = block_on(get_db_state_keys);
if let Some(value) = data.get(key).cloned() {
Ok(!value.is_empty())
} else {
Ok(false)
}
}

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip(self)))]
Expand Down
44 changes: 22 additions & 22 deletions rpc-server/src/modules/queries/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,22 @@ pub async fn get_state_keys_from_db(
};
match result {
Ok(state_keys) => {
for state_keys_chunk in state_keys.chunks(1000) {
// TODO: 1000 is hardcoded value. Make it configurable.
let mut tasks_futures = vec![];
for state_key in state_keys_chunk {
let state_value_result_future =
db_manager.get_state_key_value(account_id, block_height, state_key.clone());
tasks_futures.push(state_value_result_future);
}
let results = futures::future::join_all(tasks_futures).await;
for (state_key, state_value) in results.into_iter() {
if !state_value.is_empty() {
data.insert(state_key, state_value);
}
let tasks_futures = state_keys.chunks(100).map(|state_keys_chunk| {
db_manager.get_state_key_value(account_id, block_height, state_keys_chunk.to_vec())
});

let results: HashMap<readnode_primitives::StateKey, readnode_primitives::StateValue> =
futures::future::join_all(tasks_futures)
.await
.into_iter()
.flat_map(|result| result)
.collect();

results.iter().for_each(|(state_key, state_value)| {
if !state_value.is_empty() {
data.insert(state_key.clone(), state_value.clone());
}
}
});
data
}
Err(_) => data,
Expand Down Expand Up @@ -124,14 +125,13 @@ pub async fn fetch_state_from_db(
if state_from_db.is_empty() {
anyhow::bail!("Data not found in db")
} else {
let mut values = Vec::new();
for (key, value) in state_from_db.iter() {
let state_item = near_primitives::views::StateItem {
key: key.to_vec().into(),
value: value.to_vec().into(),
};
values.push(state_item)
}
let values = state_from_db
.into_iter()
.map(|(key, value)| near_primitives::views::StateItem {
key: key.into(),
value: value.into(),
})
.collect();
Ok(near_primitives::views::ViewStateResult {
values,
proof: vec![], // TODO: this is hardcoded empty value since we don't support proofs yet
Expand Down
27 changes: 27 additions & 0 deletions rpc-server/src/modules/state/methods.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::config::ServerContext;
use crate::errors::RPCError;
use jsonrpc_v2::{Data, Params};

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RpcViewStatePaginatedRequest {
#[serde(flatten)]
pub block_reference: near_primitives::types::BlockReference,
pub next_page: Option<String>,
}

#[derive(serde::Serialize, serde::Deserialize, Debug)]
pub struct RpcQueryResponse {
#[serde(flatten)]
pub kind: near_jsonrpc_primitives::types::query::QueryResponseKind,
pub block_height: near_primitives::types::BlockHeight,
pub block_hash: near_primitives::hash::CryptoHash,
pub next_page: Option<String>,
}

pub async fn view_state_paginated(
data: Data<ServerContext>,
Params(params): Params<RpcViewStatePaginatedRequest>,
) -> Result<RpcQueryResponse, RPCError> {
println!("{:?}", params);
todo!()
}
1 change: 1 addition & 0 deletions rpc-server/src/modules/state/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod methods;

0 comments on commit 3a4665b

Please sign in to comment.