diff --git a/.gitignore b/.gitignore index 11c5ccff..ee87c4a1 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ .env .env.* *.log +config.toml diff --git a/Cargo.lock b/Cargo.lock index b6ca6afc..11d04594 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1332,6 +1332,8 @@ name = "configuration" version = "0.1.0" dependencies = [ "anyhow", + "near-indexer-primitives", + "project-root", "serde", "serde_derive", "toml 0.8.8", @@ -4316,6 +4318,12 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "project-root" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8bccbff07d5ed689c4087d20d7307a52ab6141edeedf487c3876a55b86cf63df" + [[package]] name = "prometheus" version = "0.13.3" @@ -5393,6 +5401,7 @@ dependencies = [ "aws-types", "borsh 0.10.3", "clap 3.2.25", + "configuration", "database", "dotenv", "futures", diff --git a/configuration/Cargo.toml b/configuration/Cargo.toml index aee86ee9..09393a48 100644 --- a/configuration/Cargo.toml +++ b/configuration/Cargo.toml @@ -6,5 +6,8 @@ edition = "2021" [dependencies] anyhow = "1.0.70" toml = "0.8.4" +project-root = "0.2.2" serde = "1.0.145" serde_derive = "1.0.145" + +near-indexer-primitives = "0.17.0" diff --git a/configuration/example.config.toml b/configuration/example.config.toml new file mode 100644 index 00000000..4b23e209 --- /dev/null +++ b/configuration/example.config.toml @@ -0,0 +1,9 @@ +[state_indexer] +accounts = ["test.near"] # Empty list means all accounts +changes = ["state"] # ["state", "access_key", "contract_code", "account"] Empty list means all changes + +[tx_indexer] +# Not yet implemented + +[rpc_server] +# Not yet implemented diff --git a/configuration/src/lib.rs b/configuration/src/lib.rs index 2c4f4bb7..295ebf35 100644 --- a/configuration/src/lib.rs +++ b/configuration/src/lib.rs @@ -1,38 +1,89 @@ +use near_indexer_primitives::views::StateChangeValueView; use serde_derive::Deserialize; +use std::path::Path; -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone, Default)] pub struct Config { pub state_indexer: StateIndexerConfig, } -#[derive(Deserialize)] +#[derive(Deserialize, Debug, Clone, Default)] pub struct StateIndexerConfig { pub accounts: Vec, pub changes: Vec, } -async fn read_toml_file() -> anyhow::Result { - let filename = "config.toml"; +impl StateIndexerConfig { + fn contains_account(&self, account_id: &str) -> bool { + if self.accounts.is_empty() { + true + } else { + self.accounts.contains(&account_id.to_string()) + } + } - let contents = match std::fs::read_to_string(filename) { - Ok(content) => content, - Err(err) => { - anyhow::bail!("Could not read file: {}.\n Error: {}", filename, err); + fn contains_change(&self, change_type: &str) -> bool { + if self.changes.is_empty() { + true + } else { + self.changes.contains(&change_type.to_string()) + } + } + pub fn should_be_indexed(&self, state_change_value: &StateChangeValueView) -> bool { + match state_change_value { + StateChangeValueView::DataUpdate { account_id, .. } + | StateChangeValueView::DataDeletion { account_id, .. } => { + self.contains_account(account_id) && self.contains_change("state") + } + StateChangeValueView::AccessKeyUpdate { account_id, .. } + | StateChangeValueView::AccessKeyDeletion { account_id, .. } => { + self.contains_account(account_id) && self.contains_change("access_key") + } + StateChangeValueView::ContractCodeUpdate { account_id, .. } + | StateChangeValueView::ContractCodeDeletion { account_id, .. } => { + self.contains_account(account_id) && self.contains_change("contract_code") + } + StateChangeValueView::AccountUpdate { account_id, .. } + | StateChangeValueView::AccountDeletion { account_id, .. } => { + self.contains_account(account_id) && self.contains_change("account") + } } - }; + } +} - let config: Config = match toml::from_str::(&contents) { - Ok(config) => config, +async fn read_toml_file(path_file: &Path) -> anyhow::Result { + match std::fs::read_to_string(path_file) { + Ok(content) => match toml::from_str::(&content) { + Ok(config) => Ok(config), + Err(err) => { + anyhow::bail!( + "Unable to load data from: {:?}.\n Error: {}", + path_file.to_str(), + err + ); + } + }, Err(err) => { - anyhow::bail!("Unable to load data from: {}.\n Error: {}", filename, err); + anyhow::bail!( + "Could not read file: {:?}.\n Error: {}", + path_file.to_str(), + err + ); } - }; + } +} - Ok(config) +pub async fn read_configuration_from_file(path_file: &str) -> anyhow::Result { + let path_file = Path::new(path_file); + read_toml_file(path_file).await } pub async fn read_configuration() -> anyhow::Result { - let config = read_toml_file().await?; - - Ok(config) + let mut path_root = project_root::get_project_root()?; + path_root.push("config.toml"); + if path_root.exists() { + read_toml_file(path_root.as_path()).await + } else { + Ok(Config::default()) + } } diff --git a/state-indexer/Cargo.toml b/state-indexer/Cargo.toml index a5c81dbf..250cf1c9 100644 --- a/state-indexer/Cargo.toml +++ b/state-indexer/Cargo.toml @@ -42,6 +42,7 @@ tracing-opentelemetry = { version = "0.19" } tracing-stackdriver = "0.7.2" # GCP logs database = { path = "../database" } +configuration = { path = "../configuration" } near-primitives-core = "0.17.0" near-indexer-primitives = "0.17.0" diff --git a/state-indexer/src/main.rs b/state-indexer/src/main.rs index 1d716d42..c45a0d64 100644 --- a/state-indexer/src/main.rs +++ b/state-indexer/src/main.rs @@ -24,6 +24,7 @@ async fn handle_streamer_message( streamer_message: near_indexer_primitives::StreamerMessage, db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static), indexer_id: &str, + indexer_config: configuration::StateIndexerConfig, stats: std::sync::Arc>, ) -> anyhow::Result<()> { let block_height = streamer_message.block.header.height; @@ -43,7 +44,8 @@ async fn handle_streamer_message( .collect(), db_manager, ); - let handle_state_change_future = handle_state_changes(streamer_message, db_manager, block_height, block_hash); + let handle_state_change_future = + handle_state_changes(streamer_message, db_manager, block_height, block_hash, indexer_config); let update_meta_future = db_manager.update_meta(indexer_id, block_height); @@ -91,6 +93,7 @@ async fn handle_state_changes( db_manager: &(impl database::StateIndexerDbManager + Sync + Send + 'static), block_height: u64, block_hash: CryptoHash, + indexer_config: configuration::StateIndexerConfig, ) -> anyhow::Result> { let mut state_changes_to_store = std::collections::HashMap::::new(); @@ -103,6 +106,9 @@ async fn handle_state_changes( // Collecting a unique list of StateChangeWithCauseView for account_id + change kind + suffix // by overwriting the records in the HashMap for state_change in initial_state_changes { + if !indexer_config.should_be_indexed(&state_change.value) { + continue; + }; let key = match &state_change.value { StateChangeValueView::DataUpdate { account_id, key, .. } | StateChangeValueView::DataDeletion { account_id, key } => { @@ -264,10 +270,17 @@ async fn main() -> anyhow::Result<()> { let stats = std::sync::Arc::new(tokio::sync::RwLock::new(metrics::Stats::new())); tokio::spawn(metrics::state_logger(std::sync::Arc::clone(&stats), opts.rpc_url().to_string())); + let indexer_config = configuration::read_configuration().await?.state_indexer; let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream) .map(|streamer_message| { - handle_streamer_message(streamer_message, &db_manager, &opts.indexer_id, std::sync::Arc::clone(&stats)) + handle_streamer_message( + streamer_message, + &db_manager, + &opts.indexer_id, + indexer_config.clone(), + std::sync::Arc::clone(&stats), + ) }) .buffer_unordered(opts.concurrency);