Skip to content

Commit

Permalink
add configuration to tx-indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
kobayurii committed Jan 16, 2024
1 parent b3bfe9c commit 87b4756
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 163 deletions.
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions configuration/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub struct DatabaseTxIndexerConfig {
deserialize_with = "deserialize_data_or_env",
default = "DatabaseTxIndexerConfig::default_max_db_parallel_queries"
)]
pub max_db_parallel_queries: u16,
pub max_db_parallel_queries: i64,
}

impl DatabaseTxIndexerConfig {
Expand All @@ -97,7 +97,7 @@ impl DatabaseTxIndexerConfig {
true
}

pub fn default_max_db_parallel_queries() -> u16 {
pub fn default_max_db_parallel_queries() -> i64 {
144
}
}
Expand Down
2 changes: 1 addition & 1 deletion configuration/src/configs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub(crate) mod database;
mod general;
pub(crate) mod general;
mod lake;
mod rightsizing;

Expand Down
1 change: 1 addition & 0 deletions configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::path::Path;
mod configs;

pub use crate::configs::database::DatabaseConfig;
pub use crate::configs::general::ChainId;
pub use crate::configs::Config;

pub async fn read_configuration_from_file(path_file: &str) -> anyhow::Result<Config> {
Expand Down
5 changes: 1 addition & 4 deletions tx-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ edition = "2021"
actix-web = "4.2.1"
anyhow = "1.0.70"
async-trait = "0.1.66"
aws-credential-types = "1.0.0"
aws-sdk-s3 = { version = "0.39.1", features = ["behavior-version-latest"] }
aws-types = "1.0.0"
clap = { version = "3.2.22", features = ["color", "derive", "env"] }
dotenv = "0.15.0"
futures = "0.3.5"
futures-locks = "0.7.1"
humantime = "2.1.0"
Expand Down Expand Up @@ -40,6 +36,7 @@ opentelemetry-jaeger = { version = "0.18", features = [
tracing-opentelemetry = { version = "0.19" }
tracing-stackdriver = "0.7.2" # GCP logs

configuration = { path = "../configuration" }
database = { path = "../database" }
readnode-primitives = { path = "../readnode-primitives" }

Expand Down
14 changes: 6 additions & 8 deletions tx-indexer/src/collector.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::storage::base::TxCollectingStorage;
use futures::{
future::{join_all, try_join_all},
StreamExt,
};
use near_indexer_primitives::IndexerTransactionWithOutcome;

use crate::config;
use crate::storage::base::TxCollectingStorage;

/// Blocks #47317863 and #47317864 with restored receipts.
const PROBLEMATIC_BLOCKS: [near_indexer_primitives::CryptoHash; 2] = [
near_indexer_primitives::CryptoHash(
Expand All @@ -21,7 +19,7 @@ const PROBLEMATIC_BLOCKS: [near_indexer_primitives::CryptoHash; 2] = [

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
pub(crate) async fn index_transactions(
chain_id: config::ChainId,
chain_id: configuration::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
Expand Down Expand Up @@ -133,7 +131,7 @@ async fn new_transaction_details_to_collecting_pool(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn collect_receipts_and_outcomes(
chain_id: config::ChainId,
chain_id: configuration::ChainId,
streamer_message: &near_indexer_primitives::StreamerMessage,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
Expand All @@ -159,7 +157,7 @@ async fn collect_receipts_and_outcomes(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_shard(
chain_id: config::ChainId,
chain_id: configuration::ChainId,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
Expand Down Expand Up @@ -189,7 +187,7 @@ async fn process_shard(

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn process_receipt_execution_outcome(
chain_id: config::ChainId,
chain_id: configuration::ChainId,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<impl TxCollectingStorage>,
block_height: u64,
Expand All @@ -198,7 +196,7 @@ async fn process_receipt_execution_outcome(
receipt_execution_outcome: &near_indexer_primitives::IndexerExecutionOutcomeWithReceipt,
) -> anyhow::Result<()> {
if PROBLEMATIC_BLOCKS.contains(&block_hash) {
if let config::ChainId::Mainnet(_) = chain_id {
if let configuration::ChainId::Mainnet = chain_id {
tx_collecting_storage
.restore_transaction_by_receipt_id(
&receipt_execution_outcome.receipt.receipt_id.to_string(),
Expand Down
132 changes: 10 additions & 122 deletions tx-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,67 +16,8 @@ use tracing_subscriber::util::SubscriberInitExt;
setting(clap::AppSettings::NextLineHelp)
)]
pub(crate) struct Opts {
/// Indexer ID to handle meta data about the instance
#[clap(long, env)]
pub indexer_id: String,
/// Port for metrics server
#[clap(long, default_value = "8080", env)]
pub port: u16,
/// DB connection string
#[clap(long, default_value = "127.0.0.1:9042", env)]
pub database_url: String,
/// DB user(login)
#[clap(long, env)]
pub database_user: Option<String>,
/// DB password
#[clap(long, env)]
pub database_password: Option<String>,
/// Chain ID: testnet or mainnet
#[clap(subcommand)]
pub chain_id: ChainId,
/// To restore cache from db we use smart range blocks
/// Regular transaction takes some blocks to be finalized
/// We don't need to restore too old transactions for the indexer because we will probably never be able to reassemble them.
/// We use a range of 1000 blocks for our peace of mind. We also leave the option to increase or decrease this range
#[clap(long, default_value = "1000", env)]
pub cache_restore_blocks_range: u64,
/// Parallel queries = (nodes in cluster) ✕ (cores in node) ✕ 3
/// Current we have 6 - nodes with 8 - cpus
/// 6 ✕ 8 ✕ 3 = 144
#[clap(long, env, default_value = "144")]
pub max_db_parallel_queries: i64,

/// ScyllaDB preferred DataCenter
/// Accepts the DC name of the ScyllaDB to filter the connection to that DC only (preferrably).
/// If you connect to multi-DC cluter, you might experience big latencies while working with the DB. This is due to the fact that ScyllaDB driver tries to connect to any of the nodes in the cluster disregarding of the location of the DC. This option allows to filter the connection to the DC you need. Example: "DC1" where DC1 is located in the same region as the application.
#[cfg(feature = "scylla_db")]
#[clap(long, env)]
pub preferred_dc: Option<String>,

/// Max retry count for ScyllaDB if `strict_mode` is `false`
#[cfg(feature = "scylla_db")]
#[clap(long, env, default_value_t = 5)]
pub max_retry: u8,

/// Attempts to store data in the database should be infinite to ensure no data is missing.
/// Disable it to perform a limited write attempts (`max_retry`)
/// before giving up, and moving to the next piece of data
#[cfg(feature = "scylla_db")]
#[clap(long, env, default_value_t = true)]
pub strict_mode: bool,

/// Postgres database name
#[cfg(feature = "postgres_db")]
#[clap(long, env)]
pub database_name: Option<String>,
}

#[derive(Subcommand, Debug, Clone)]
pub enum ChainId {
#[clap(subcommand)]
Mainnet(StartOptions),
#[clap(subcommand)]
Testnet(StartOptions),
pub start_options: StartOptions,
}

#[allow(clippy::enum_variant_names)]
Expand All @@ -92,87 +33,34 @@ pub enum StartOptions {
FromLatest,
}

impl Opts {
/// Returns [StartOptions] for current [Opts]
pub fn start_options(&self) -> &StartOptions {
match &self.chain_id {
ChainId::Mainnet(start_options) | ChainId::Testnet(start_options) => start_options,
}
}

pub fn rpc_url(&self) -> &str {
match self.chain_id {
ChainId::Mainnet(_) => "https://rpc.mainnet.near.org",
ChainId::Testnet(_) => "https://rpc.testnet.near.org",
}
}
}

impl Opts {
pub async fn to_additional_database_options(&self) -> database::AdditionalDatabaseOptions {
database::AdditionalDatabaseOptions {
#[cfg(feature = "scylla_db")]
preferred_dc: self.preferred_dc.clone(),
#[cfg(feature = "scylla_db")]
keepalive_interval: None,
#[cfg(feature = "scylla_db")]
max_retry: self.max_retry,
#[cfg(feature = "scylla_db")]
strict_mode: self.strict_mode,
#[cfg(feature = "postgres_db")]
database_name: self.database_name.clone(),
}
}

pub async fn to_lake_config(
&self,
start_block_height: u64,
) -> anyhow::Result<near_lake_framework::LakeConfig> {
let config_builder = near_lake_framework::LakeConfigBuilder::default();

Ok(match &self.chain_id {
ChainId::Mainnet(_) => config_builder
.mainnet()
.start_block_height(start_block_height),
ChainId::Testnet(_) => config_builder
.testnet()
.start_block_height(start_block_height),
}
.build()
.expect("Failed to build LakeConfig"))
}
}

pub(crate) async fn get_start_block_height(
opts: &Opts,
rpc_client: &JsonRpcClient,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
start_options: &StartOptions,
indexer_id: &str,
) -> anyhow::Result<u64> {
match opts.start_options() {
match start_options {
StartOptions::FromBlock { height } => Ok(*height),
StartOptions::FromInterruption { height } => {
if let Ok(block_height) = db_manager
.get_last_processed_block_height(opts.indexer_id.as_str())
.await
{
if let Ok(block_height) = db_manager.get_last_processed_block_height(indexer_id).await {
Ok(block_height)
} else {
if let Some(height) = height {
return Ok(*height);
}
Ok(final_block_height(opts.rpc_url()).await?)
Ok(final_block_height(rpc_client).await?)
}
}
StartOptions::FromLatest => Ok(final_block_height(opts.rpc_url()).await?),
StartOptions::FromLatest => Ok(final_block_height(rpc_client).await?),
}
}

pub async fn final_block_height(rpc_url: &str) -> anyhow::Result<u64> {
let client = JsonRpcClient::connect(rpc_url.to_string());
pub async fn final_block_height(rpc_client: &JsonRpcClient) -> anyhow::Result<u64> {
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};

let latest_block = client.call(request).await?;
let latest_block = rpc_client.call(request).await?;

Ok(latest_block.header.height)
}
Expand Down
42 changes: 22 additions & 20 deletions tx-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,19 @@ pub(crate) const INDEXER: &str = "tx_indexer";

#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();

init_tracing()?;

let opts: Opts = Opts::parse();

let indexer_config = configuration::read_configuration().await?;

tracing::info!(target: INDEXER, "Connecting to db...");
#[cfg(feature = "scylla_db")]
let db_manager: std::sync::Arc<
Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>,
> = std::sync::Arc::new(Box::new(
database::prepare_db_manager::<database::scylladb::tx_indexer::ScyllaDBManager>(
&opts.database_url,
opts.database_user.as_deref(),
opts.database_password.as_deref(),
opts.to_additional_database_options().await,
&indexer_config.database,
)
.await?,
));
Expand All @@ -37,50 +34,55 @@ async fn main() -> anyhow::Result<()> {
Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>,
> = std::sync::Arc::new(Box::new(
database::prepare_db_manager::<database::postgres::tx_indexer::PostgresDBManager>(
&opts.database_url,
opts.database_user.as_deref(),
opts.database_password.as_deref(),
opts.to_additional_database_options().await,
&indexer_config.database,
)
.await?,
));
let indexer_id = &indexer_config.general.tx_indexer.indexer_id;
let rpc_client =
near_jsonrpc_client::JsonRpcClient::connect(&indexer_config.general.near_rpc_url);
let start_block_height =
config::get_start_block_height(&rpc_client, &db_manager, &opts.start_options, indexer_id)
.await?;

let start_block_height = config::get_start_block_height(&opts, &db_manager).await?;
tracing::info!(target: INDEXER, "Generating LakeConfig...");
let config: near_lake_framework::LakeConfig = opts.to_lake_config(start_block_height).await?;
let lake_config = indexer_config.to_lake_config(start_block_height).await?;

tracing::info!(target: INDEXER, "Creating hash storage...");
let tx_collecting_storage = std::sync::Arc::new(
storage::database::HashStorageWithDB::init_with_restore(
db_manager.clone(),
start_block_height,
opts.cache_restore_blocks_range,
opts.max_db_parallel_queries,
indexer_config.general.tx_indexer.cache_restore_blocks_range,
indexer_config.database.tx_indexer.max_db_parallel_queries,
)
.await?,
);

tracing::info!(target: INDEXER, "Instantiating the stream...",);
let (sender, stream) = near_lake_framework::streamer(config);
let (sender, stream) = near_lake_framework::streamer(lake_config);

// Initiate metrics http server
tokio::spawn(metrics::init_server(opts.port).expect("Failed to start metrics server"));
tokio::spawn(
metrics::init_server(indexer_config.general.tx_indexer.metrics_server_port)
.expect("Failed to start metrics server"),
);

let stats = std::sync::Arc::new(tokio::sync::RwLock::new(metrics::Stats::new()));
tokio::spawn(metrics::state_logger(
std::sync::Arc::clone(&stats),
opts.rpc_url().to_string(),
rpc_client.clone(),
));

tracing::info!(target: INDEXER, "Starting tx indexer...",);
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(|streamer_message| {
handle_streamer_message(
opts.chain_id.clone(),
indexer_config.general.chain_id.clone(),
streamer_message,
&db_manager,
&tx_collecting_storage,
&opts.indexer_id,
indexer_id,
std::sync::Arc::clone(&stats),
)
})
Expand All @@ -103,7 +105,7 @@ async fn main() -> anyhow::Result<()> {

#[cfg_attr(feature = "tracing-instrumentation", tracing::instrument(skip_all))]
async fn handle_streamer_message(
chain_id: config::ChainId,
chain_id: configuration::ChainId,
streamer_message: near_indexer_primitives::StreamerMessage,
db_manager: &std::sync::Arc<Box<dyn database::TxIndexerDbManager + Sync + Send + 'static>>,
tx_collecting_storage: &std::sync::Arc<impl storage::base::TxCollectingStorage>,
Expand Down
Loading

0 comments on commit 87b4756

Please sign in to comment.