Skip to content

Commit

Permalink
add state indexer configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Nov 7, 2023
1 parent af25642 commit fec5ca8
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
.env
.env.*
*.log
config.toml
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ edition = "2021"
[dependencies]
anyhow = "1.0.70"
toml = "0.8.4"
project-root = "0.2.2"
serde = "1.0.145"
serde_derive = "1.0.145"

near-indexer-primitives = "0.17.0"
9 changes: 9 additions & 0 deletions configuration/example.config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[state_indexer]
accounts = ["test.near"] # Empty list means all accounts
changes = ["state"] # ["state", "access_key", "contract_code", "account"] Empty list means all changes

[tx_indexer]
# Not yet implemented

[rpc_server]
# Not yet implemented
85 changes: 68 additions & 17 deletions configuration/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,89 @@
use near_indexer_primitives::views::StateChangeValueView;
use serde_derive::Deserialize;
use std::path::Path;

#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone, Default)]
pub struct Config {
pub state_indexer: StateIndexerConfig,
}

#[derive(Deserialize)]
#[derive(Deserialize, Debug, Clone, Default)]
pub struct StateIndexerConfig {
pub accounts: Vec<String>,
pub changes: Vec<String>,
}

async fn read_toml_file() -> anyhow::Result<Config> {
let filename = "config.toml";
impl StateIndexerConfig {
fn contains_account(&self, account_id: &str) -> bool {
if self.accounts.is_empty() {
true
} else {
self.accounts.contains(&account_id.to_string())
}
}

let contents = match std::fs::read_to_string(filename) {
Ok(content) => content,
Err(err) => {
anyhow::bail!("Could not read file: {}.\n Error: {}", filename, err);
fn contains_change(&self, change_type: &str) -> bool {
if self.changes.is_empty() {
true
} else {
self.changes.contains(&change_type.to_string())
}
}
pub fn should_be_indexed(&self, state_change_value: &StateChangeValueView) -> bool {
match state_change_value {
StateChangeValueView::DataUpdate { account_id, .. }
| StateChangeValueView::DataDeletion { account_id, .. } => {
self.contains_account(account_id) && self.contains_change("state")
}
StateChangeValueView::AccessKeyUpdate { account_id, .. }
| StateChangeValueView::AccessKeyDeletion { account_id, .. } => {
self.contains_account(account_id) && self.contains_change("access_key")
}
StateChangeValueView::ContractCodeUpdate { account_id, .. }
| StateChangeValueView::ContractCodeDeletion { account_id, .. } => {
self.contains_account(account_id) && self.contains_change("contract_code")
}
StateChangeValueView::AccountUpdate { account_id, .. }
| StateChangeValueView::AccountDeletion { account_id, .. } => {
self.contains_account(account_id) && self.contains_change("account")
}
}
};
}
}

let config: Config = match toml::from_str::<Config>(&contents) {
Ok(config) => config,
async fn read_toml_file(path_file: &Path) -> anyhow::Result<Config> {
match std::fs::read_to_string(path_file) {
Ok(content) => match toml::from_str::<Config>(&content) {
Ok(config) => Ok(config),
Err(err) => {
anyhow::bail!(
"Unable to load data from: {:?}.\n Error: {}",
path_file.to_str(),
err
);
}
},
Err(err) => {
anyhow::bail!("Unable to load data from: {}.\n Error: {}", filename, err);
anyhow::bail!(
"Could not read file: {:?}.\n Error: {}",
path_file.to_str(),
err
);
}
};
}
}

Ok(config)
pub async fn read_configuration_from_file(path_file: &str) -> anyhow::Result<Config> {
let path_file = Path::new(path_file);
read_toml_file(path_file).await
}

pub async fn read_configuration() -> anyhow::Result<Config> {
let config = read_toml_file().await?;

Ok(config)
let mut path_root = project_root::get_project_root()?;
path_root.push("config.toml");
if path_root.exists() {
read_toml_file(path_root.as_path()).await
} else {
Ok(Config::default())
}
}
1 change: 1 addition & 0 deletions state-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tracing-opentelemetry = { version = "0.19" }
tracing-stackdriver = "0.7.2" # GCP logs

database = { path = "../database" }
configuration = { path = "../configuration" }

near-primitives-core = "0.17.0"
near-indexer-primitives = "0.17.0"
Expand Down
17 changes: 15 additions & 2 deletions state-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ async fn handle_streamer_message(
streamer_message: near_indexer_primitives::StreamerMessage,
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
indexer_id: &str,
indexer_config: configuration::StateIndexerConfig,
stats: std::sync::Arc<tokio::sync::RwLock<metrics::Stats>>,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
Expand All @@ -43,7 +44,8 @@ async fn handle_streamer_message(
.collect(),
db_manager,
);
let handle_state_change_future = handle_state_changes(streamer_message, db_manager, block_height, block_hash);
let handle_state_change_future =
handle_state_changes(streamer_message, db_manager, block_height, block_hash, indexer_config);

let update_meta_future = db_manager.update_meta(indexer_id, block_height);

Expand Down Expand Up @@ -91,6 +93,7 @@ async fn handle_state_changes(
db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static),
block_height: u64,
block_hash: CryptoHash,
indexer_config: configuration::StateIndexerConfig,
) -> anyhow::Result<Vec<()>> {
let mut state_changes_to_store =
std::collections::HashMap::<String, near_indexer_primitives::views::StateChangeWithCauseView>::new();
Expand All @@ -103,6 +106,9 @@ async fn handle_state_changes(
// Collecting a unique list of StateChangeWithCauseView for account_id + change kind + suffix
// by overwriting the records in the HashMap
for state_change in initial_state_changes {
if !indexer_config.should_be_indexed(&state_change.value) {
continue;
};
let key = match &state_change.value {
StateChangeValueView::DataUpdate { account_id, key, .. }
| StateChangeValueView::DataDeletion { account_id, key } => {
Expand Down Expand Up @@ -254,10 +260,17 @@ async fn main() -> anyhow::Result<()> {

let stats = std::sync::Arc::new(tokio::sync::RwLock::new(metrics::Stats::new()));
tokio::spawn(metrics::state_logger(std::sync::Arc::clone(&stats), opts.rpc_url().to_string()));
let indexer_config = configuration::read_configuration().await?.state_indexer;

let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| {
handle_streamer_message(streamer_message, &db_manager, &opts.indexer_id, std::sync::Arc::clone(&stats))
handle_streamer_message(
streamer_message,
&db_manager,
&opts.indexer_id,
indexer_config.clone(),
std::sync::Arc::clone(&stats),
)
})
.buffer_unordered(opts.concurrency);

Expand Down

0 comments on commit fec5ca8

Please sign in to comment.