diff --git a/crates/bin/pindexer/src/block.rs b/crates/bin/pindexer/src/block.rs index 8ca1a97128..9308bd0595 100644 --- a/crates/bin/pindexer/src/block.rs +++ b/crates/bin/pindexer/src/block.rs @@ -1,12 +1,16 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent}; -use sqlx::{types::chrono::DateTime, PgPool}; +use sqlx::types::chrono::DateTime; #[derive(Debug)] pub struct Block {} #[async_trait] impl AppView for Block { + fn name(&self) -> String { + "block".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -27,34 +31,33 @@ CREATE TABLE IF NOT EXISTS block_details ( Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.sct.v1.EventBlockRoot" - } - - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventBlockRoot::from_event(event.as_ref())?; - let timestamp = pe.timestamp.unwrap_or_default(); + for event in batch.events() { + let pe = match pb::EventBlockRoot::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; + let timestamp = pe.timestamp.unwrap_or_default(); - sqlx::query( - " + sqlx::query( + " INSERT INTO block_details (height, timestamp, root) VALUES ($1, $2, $3) ", - ) - .bind(i64::try_from(pe.height)?) - .bind(DateTime::from_timestamp( - timestamp.seconds, - u32::try_from(timestamp.nanos)?, - )) - .bind(pe.root.unwrap().inner) - .execute(dbtx.as_mut()) - .await?; - + ) + .bind(i64::try_from(pe.height)?) + .bind(DateTime::from_timestamp( + timestamp.seconds, + u32::try_from(timestamp.nanos)?, + )) + .bind(pe.root.unwrap().inner) + .execute(dbtx.as_mut()) + .await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index ec3882d0c9..9574c90326 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -2,13 +2,11 @@ use std::fmt::Display; use anyhow::{anyhow, Context}; use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_asset::asset; use penumbra_dex::{event::EventCandlestickData, CandlestickData}; use penumbra_proto::{event::EventDomainType, DomainType}; use penumbra_sct::event::EventBlockRoot; -use prost::Name as _; -use sqlx::PgPool; type DateTime = sqlx::types::chrono::DateTime; @@ -453,35 +451,11 @@ impl Component { pub fn new() -> Self { Self {} } -} - -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - for statement in include_str!("schema.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - [ - ::Proto::full_name(), - ::Proto::full_name(), - ] - .into_iter() - .any(|x| type_str == x) - } async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let height = event.block_height; @@ -504,7 +478,35 @@ impl AppView for Component { } summary::update_all(dbtx, time).await?; } - tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + Ok(()) + } + + fn name(&self) -> String { + "dex_ex".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/governance.rs b/crates/bin/pindexer/src/governance.rs index ec49b0a74c..4d16090e58 100644 --- a/crates/bin/pindexer/src/governance.rs +++ b/crates/bin/pindexer/src/governance.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, Context, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_governance::{ proposal::ProposalPayloadToml, proposal_state, DelegatorVote, Proposal, ProposalDepositClaim, ProposalWithdraw, ValidatorVote, @@ -10,7 +12,6 @@ use penumbra_proto::{ event::ProtoEvent, }; use penumbra_stake::IdentityKey; -use sqlx::PgPool; #[derive(Debug)] pub struct GovernanceProposals {} @@ -24,16 +25,119 @@ const EVENT_PROPOSAL_FAILED: &str = "penumbra.core.component.governance.v1.Event const EVENT_PROPOSAL_SLASHED: &str = "penumbra.core.component.governance.v1.EventProposalSlashed"; const EVENT_PROPOSAL_DEPOSIT_CLAIM: &str = "penumbra.core.component.governance.v1.EventProposalDepositClaim"; -const ALL_RELEVANT_EVENTS: &[&str] = &[ - EVENT_PROPOSAL_SUBMIT, - EVENT_DELEGATOR_VOTE, - EVENT_VALIDATOR_VOTE, - EVENT_PROPOSAL_WITHDRAW, - EVENT_PROPOSAL_PASSED, - EVENT_PROPOSAL_FAILED, - EVENT_PROPOSAL_SLASHED, - EVENT_PROPOSAL_DEPOSIT_CLAIM, -]; + +impl GovernanceProposals { + async fn index_event( + &self, + dbtx: &mut PgTransaction<'_>, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + match event.event.kind.as_str() { + EVENT_PROPOSAL_SUBMIT => { + let pe = pb::EventProposalSubmit::from_event(event.as_ref())?; + let start_block_height = pe.start_height; + let end_block_height = pe.end_height; + let submit = pe + .submit + .ok_or_else(|| anyhow!("missing submit in event"))?; + let deposit_amount = submit + .deposit_amount + .ok_or_else(|| anyhow!("missing deposit amount in event"))? + .try_into() + .context("error converting deposit amount")?; + let proposal = submit + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_submit( + dbtx, + proposal, + deposit_amount, + start_block_height, + end_block_height, + event.block_height, + ) + .await?; + } + EVENT_DELEGATOR_VOTE => { + let pe = pb::EventDelegatorVote::from_event(event.as_ref())?; + let vote = pe + .vote + .ok_or_else(|| anyhow!("missing vote in event"))? + .try_into() + .context("error converting delegator vote")?; + let validator_identity_key = pe + .validator_identity_key + .ok_or_else(|| anyhow!("missing validator identity key in event"))? + .try_into() + .context("error converting validator identity key")?; + handle_delegator_vote(dbtx, vote, validator_identity_key, event.block_height) + .await?; + } + EVENT_VALIDATOR_VOTE => { + let pe = pb::EventValidatorVote::from_event(event.as_ref())?; + let voting_power = pe.voting_power; + let vote = pe + .vote + .ok_or_else(|| anyhow!("missing vote in event"))? + .try_into() + .context("error converting vote")?; + handle_validator_vote(dbtx, vote, voting_power, event.block_height).await?; + } + EVENT_PROPOSAL_WITHDRAW => { + let pe = pb::EventProposalWithdraw::from_event(event.as_ref())?; + let proposal_withdraw: ProposalWithdraw = pe + .withdraw + .ok_or_else(|| anyhow!("missing withdraw in event"))? + .try_into() + .context("error converting proposal withdraw")?; + let proposal = proposal_withdraw.proposal; + let reason = proposal_withdraw.reason; + handle_proposal_withdraw(dbtx, proposal, reason).await?; + } + EVENT_PROPOSAL_PASSED => { + let pe = pb::EventProposalPassed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_passed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_FAILED => { + let pe = pb::EventProposalFailed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_failed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_SLASHED => { + let pe = pb::EventProposalSlashed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_slashed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_DEPOSIT_CLAIM => { + let pe = pb::EventProposalDepositClaim::from_event(event.as_ref())?; + let deposit_claim = pe + .deposit_claim + .ok_or_else(|| anyhow!("missing deposit claim in event"))? + .try_into() + .context("error converting deposit claim")?; + handle_proposal_deposit_claim(dbtx, deposit_claim).await?; + } + _ => {} + } + + Ok(()) + } +} #[async_trait] impl AppView for GovernanceProposals { @@ -213,119 +317,18 @@ impl AppView for GovernanceProposals { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - ALL_RELEVANT_EVENTS.contains(&type_str) + fn name(&self) -> String { + "governance".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - match event.event.kind.as_str() { - EVENT_PROPOSAL_SUBMIT => { - let pe = pb::EventProposalSubmit::from_event(event.as_ref())?; - let start_block_height = pe.start_height; - let end_block_height = pe.end_height; - let submit = pe - .submit - .ok_or_else(|| anyhow!("missing submit in event"))?; - let deposit_amount = submit - .deposit_amount - .ok_or_else(|| anyhow!("missing deposit amount in event"))? - .try_into() - .context("error converting deposit amount")?; - let proposal = submit - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_submit( - dbtx, - proposal, - deposit_amount, - start_block_height, - end_block_height, - event.block_height, - ) - .await?; - } - EVENT_DELEGATOR_VOTE => { - let pe = pb::EventDelegatorVote::from_event(event.as_ref())?; - let vote = pe - .vote - .ok_or_else(|| anyhow!("missing vote in event"))? - .try_into() - .context("error converting delegator vote")?; - let validator_identity_key = pe - .validator_identity_key - .ok_or_else(|| anyhow!("missing validator identity key in event"))? - .try_into() - .context("error converting validator identity key")?; - handle_delegator_vote(dbtx, vote, validator_identity_key, event.block_height) - .await?; - } - EVENT_VALIDATOR_VOTE => { - let pe = pb::EventValidatorVote::from_event(event.as_ref())?; - let voting_power = pe.voting_power; - let vote = pe - .vote - .ok_or_else(|| anyhow!("missing vote in event"))? - .try_into() - .context("error converting vote")?; - handle_validator_vote(dbtx, vote, voting_power, event.block_height).await?; - } - EVENT_PROPOSAL_WITHDRAW => { - let pe = pb::EventProposalWithdraw::from_event(event.as_ref())?; - let proposal_withdraw: ProposalWithdraw = pe - .withdraw - .ok_or_else(|| anyhow!("missing withdraw in event"))? - .try_into() - .context("error converting proposal withdraw")?; - let proposal = proposal_withdraw.proposal; - let reason = proposal_withdraw.reason; - handle_proposal_withdraw(dbtx, proposal, reason).await?; - } - EVENT_PROPOSAL_PASSED => { - let pe = pb::EventProposalPassed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_passed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_FAILED => { - let pe = pb::EventProposalFailed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_failed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_SLASHED => { - let pe = pb::EventProposalSlashed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_slashed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_DEPOSIT_CLAIM => { - let pe = pb::EventProposalDepositClaim::from_event(event.as_ref())?; - let deposit_claim = pe - .deposit_claim - .ok_or_else(|| anyhow!("missing deposit claim in event"))? - .try_into() - .context("error converting deposit claim")?; - handle_proposal_deposit_claim(dbtx, deposit_claim).await?; - } - _ => {} + for event in batch.events() { + self.index_event(dbtx, event).await?; } - Ok(()) } } diff --git a/crates/bin/pindexer/src/ibc/mod.rs b/crates/bin/pindexer/src/ibc/mod.rs index 9bc6d795c4..417fd5ca19 100644 --- a/crates/bin/pindexer/src/ibc/mod.rs +++ b/crates/bin/pindexer/src/ibc/mod.rs @@ -1,5 +1,5 @@ use anyhow::anyhow; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_asset::Value; use penumbra_keys::Address; use penumbra_proto::{ @@ -8,7 +8,6 @@ use penumbra_proto::{ }, event::ProtoEvent as _, }; -use sqlx::PgPool; /// The kind of event we might care about. #[derive(Clone, Copy, Debug)] @@ -198,6 +197,10 @@ impl Component { #[async_trait] impl AppView for Component { + fn name(&self) -> String { + "ibc".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -206,18 +209,19 @@ impl AppView for Component { init_db(dbtx).await } - fn is_relevant(&self, type_str: &str) -> bool { - EventKind::try_from(type_str).is_ok() - } - - #[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))] - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> anyhow::Result<()> { - let transfer = Event::try_from(event)?.db_transfer(); - create_transfer(dbtx, event.block_height, transfer).await + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + let parsed = match Event::try_from(event) { + Ok(p) => p, + Err(_) => continue, + }; + let transfer = parsed.db_transfer(); + create_transfer(dbtx, event.block_height, transfer).await?; + } + Ok(()) } } diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index ddaa667635..c8dd76a31e 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -6,21 +6,21 @@ pub trait IndexerExt: Sized { impl IndexerExt for cometindex::Indexer { fn with_default_penumbra_app_views(self) -> Self { - self.with_index(crate::block::Block {}) - .with_index(crate::stake::ValidatorSet {}) - .with_index(crate::stake::Slashings {}) - .with_index(crate::stake::DelegationTxs {}) - .with_index(crate::stake::UndelegationTxs {}) - .with_index(crate::governance::GovernanceProposals {}) - .with_index(crate::dex_ex::Component::new()) - .with_index(crate::supply::Component::new()) - .with_index(crate::ibc::Component::new()) - .with_index(crate::insights::Component::new( + self.with_index(Box::new(crate::block::Block {})) + .with_index(Box::new(crate::stake::ValidatorSet {})) + .with_index(Box::new(crate::stake::Slashings {})) + .with_index(Box::new(crate::stake::DelegationTxs {})) + .with_index(Box::new(crate::stake::UndelegationTxs {})) + .with_index(Box::new(crate::governance::GovernanceProposals {})) + .with_index(Box::new(crate::dex_ex::Component::new())) + .with_index(Box::new(crate::supply::Component::new())) + .with_index(Box::new(crate::ibc::Component::new())) + .with_index(Box::new(crate::insights::Component::new( penumbra_asset::asset::Id::from_str( // USDC "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", ) .ok(), - )) + ))) } } diff --git a/crates/bin/pindexer/src/insights/mod.rs b/crates/bin/pindexer/src/insights/mod.rs index 221c64eb25..26887380da 100644 --- a/crates/bin/pindexer/src/insights/mod.rs +++ b/crates/bin/pindexer/src/insights/mod.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, iter}; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_app::genesis::Content; use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; use penumbra_dex::{ @@ -10,7 +10,7 @@ use penumbra_dex::{ use penumbra_fee::event::EventBlockFees; use penumbra_funding::event::EventFundingStreamReward; use penumbra_num::Amount; -use penumbra_proto::{event::EventDomainType, DomainType, Name}; +use penumbra_proto::event::EventDomainType; use penumbra_shielded_pool::event::{ EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund, EventOutboundFungibleTokenTransfer, @@ -20,7 +20,6 @@ use penumbra_stake::{ validator::Validator, IdentityKey, }; -use sqlx::PgPool; use crate::parsing::parse_content; @@ -280,47 +279,12 @@ async fn add_genesis_native_token_allocation_supply<'a>( Ok(()) } -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - for statement in include_str!("schema.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - - // decode the initial supply from the genesis - // initial app state is not recomputed from events, because events are not emitted in init_chain. - // instead, the indexer directly parses the genesis. - add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - [ - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ] - .into_iter() - .any(|x| type_str == x) - } +impl Component { async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let height = event.block_height; if let Ok(e) = EventUndelegate::try_from_event(&event.event) { @@ -504,3 +468,38 @@ impl AppView for Component { Ok(()) } } + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + + // decode the initial supply from the genesis + // initial app state is not recomputed from events, because events are not emitted in init_chain. + // instead, the indexer directly parses the genesis. + add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) + .await?; + Ok(()) + } + + fn name(&self) -> String { + "insights".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 71989105ff..fee7ab0f90 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,6 +1,5 @@ use anyhow::Result; use clap::Parser as _; -use pindexer::block::Block; use pindexer::{Indexer, IndexerExt as _, Options}; #[tokio::main] @@ -8,7 +7,6 @@ async fn main() -> Result<()> { Indexer::new(Options::parse()) .with_default_tracing() .with_default_penumbra_app_views() - .with_index(Block {}) .run() .await?; diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs index c25dc63a4d..bacbd5276e 100644 --- a/crates/bin/pindexer/src/stake/delegation_txs.rs +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -47,29 +47,28 @@ impl AppView for DelegationTxs { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventDelegate" + fn name(&self) -> String { + "stake/delegation_txs".to_string() } - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<()> { - let pe = pb::EventDelegate::from_event(event.as_ref())?; + async fn index_batch(&self, dbtx: &mut PgTransaction, batch: EventBatch) -> Result<()> { + for event in batch.events() { + let pe = match pb::EventDelegate::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let amount = Amount::try_from( - pe.amount - .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, - )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; - sqlx::query( + sqlx::query( "INSERT INTO stake_delegation_txs (ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" ) .bind(ik.to_string()) @@ -78,6 +77,7 @@ impl AppView for DelegationTxs { .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/missed_blocks.rs b/crates/bin/pindexer/src/stake/missed_blocks.rs index 4516d619ac..a289ca0b4a 100644 --- a/crates/bin/pindexer/src/stake/missed_blocks.rs +++ b/crates/bin/pindexer/src/stake/missed_blocks.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -45,29 +45,33 @@ impl AppView for MissedBlocks { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventValidatorMissedBlock" + fn name(&self) -> String { + "stake/missed_blocks".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + for event in batch.events() { + let pe = match pb::EventValidatorMissedBlock::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let height = event.block_height; + let height = event.block_height; - sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)") - .bind(height as i64) - .bind(ik.to_string()) - .execute(dbtx.as_mut()) - .await?; + sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)") + .bind(height as i64) + .bind(ik.to_string()) + .execute(dbtx.as_mut()) + .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs index bbc472171f..68134c6690 100644 --- a/crates/bin/pindexer/src/stake/slashings.rs +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -7,6 +9,44 @@ use penumbra_stake::IdentityKey; #[derive(Debug)] pub struct Slashings {} +impl Slashings { + async fn index_event( + &self, + dbtx: &mut PgTransaction<'_>, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + let pe = match pb::EventSlashingPenaltyApplied::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => return Ok(()), + }; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + + let height = event.block_height; + let epoch_index = pe.epoch_index; + + let penalty_json = serde_json::to_string( + &pe.new_penalty + .ok_or_else(|| anyhow!("missing new_penalty"))?, + )?; + + sqlx::query( + "INSERT INTO stake_slashings (height, ik, epoch_index, penalty) + VALUES ($1, $2, $3, $4)", + ) + .bind(height as i64) + .bind(ik.to_string()) + .bind(epoch_index as i64) + .bind(penalty_json) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} + #[async_trait] impl AppView for Slashings { async fn init_chain( @@ -41,44 +81,18 @@ impl AppView for Slashings { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - match type_str { - "penumbra.core.component.stake.v1.EventSlashingPenaltyApplied" => true, - _ => false, - } + fn name(&self) -> String { + "stake/slashings".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?; - let ik = IdentityKey::try_from( - pe.identity_key - .ok_or_else(|| anyhow!("missing ik in event"))?, - )?; - - let height = event.block_height; - let epoch_index = pe.epoch_index; - - let penalty_json = serde_json::to_string( - &pe.new_penalty - .ok_or_else(|| anyhow!("missing new_penalty"))?, - )?; - - sqlx::query( - "INSERT INTO stake_slashings (height, ik, epoch_index, penalty) - VALUES ($1, $2, $3, $4)", - ) - .bind(height as i64) - .bind(ik.to_string()) - .bind(epoch_index as i64) - .bind(penalty_json) - .execute(dbtx.as_mut()) - .await?; - + for event in batch.events() { + self.index_event(dbtx, event).await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs index aeb9d90511..e9a0a31d48 100644 --- a/crates/bin/pindexer/src/stake/undelegation_txs.rs +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -47,29 +47,28 @@ impl AppView for UndelegationTxs { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventUndelegate" + fn name(&self) -> String { + "stake/undelegation_txs".to_string() } - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<()> { - let pe = pb::EventUndelegate::from_event(event.as_ref())?; + async fn index_batch(&self, dbtx: &mut PgTransaction, batch: EventBatch) -> Result<()> { + for event in batch.events() { + let pe = match pb::EventUndelegate::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let amount = Amount::try_from( - pe.amount - .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, - )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; - sqlx::query( + sqlx::query( "INSERT INTO stake_undelegation_txs (ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" ) .bind(ik.to_string()) @@ -78,6 +77,7 @@ impl AppView for UndelegationTxs { .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs index bc4dc59a18..039b35c03c 100644 --- a/crates/bin/pindexer/src/stake/validator_set.rs +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_app::genesis::Content; use penumbra_asset::asset; @@ -17,57 +19,11 @@ use crate::parsing::parse_content; #[derive(Debug)] pub struct ValidatorSet {} -#[async_trait] -impl AppView for ValidatorSet { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - // table name is module path + struct name - // note: protobuf data is encoded as protojson for ease of consumers - // hence TEXT fields - "CREATE TABLE stake_validator_set ( - id SERIAL PRIMARY KEY, - ik TEXT NOT NULL, - name TEXT NOT NULL, - definition TEXT NOT NULL, - voting_power BIGINT NOT NULL, - queued_delegations BIGINT NOT NULL, - queued_undelegations BIGINT NOT NULL, - validator_state TEXT NOT NULL, - bonding_state TEXT NOT NULL - );", - ) - .execute(dbtx.as_mut()) - .await?; - - sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);") - .execute(dbtx.as_mut()) - .await?; - - add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - match type_str { - "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => true, - "penumbra.core.component.stake.v1.EventDelegate" => true, - "penumbra.core.component.stake.v1.EventUndelegate" => true, - "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => true, - "penumbra.core.component.stake.v1.EventValidatorStateChange" => true, - "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => true, - _ => false, - } - } - +impl ValidatorSet { async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { @@ -146,6 +102,56 @@ impl AppView for ValidatorSet { } } +#[async_trait] +impl AppView for ValidatorSet { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + sqlx::query( + // table name is module path + struct name + // note: protobuf data is encoded as protojson for ease of consumers + // hence TEXT fields + "CREATE TABLE stake_validator_set ( + id SERIAL PRIMARY KEY, + ik TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL, + voting_power BIGINT NOT NULL, + queued_delegations BIGINT NOT NULL, + queued_undelegations BIGINT NOT NULL, + validator_state TEXT NOT NULL, + bonding_state TEXT NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);") + .execute(dbtx.as_mut()) + .await?; + + add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?; + Ok(()) + } + + fn name(&self) -> String { + "stake/validator_set".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } + Ok(()) + } +} + async fn add_genesis_validators<'a>(dbtx: &mut PgTransaction<'a>, content: &Content) -> Result<()> { // Given a genesis validator, we need to figure out its delegations at // genesis by getting its delegation token then summing up all the allocations. diff --git a/crates/bin/pindexer/src/supply.rs b/crates/bin/pindexer/src/supply.rs index b1ea44ffbe..cc5b9f9bae 100644 --- a/crates/bin/pindexer/src/supply.rs +++ b/crates/bin/pindexer/src/supply.rs @@ -1,7 +1,9 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_app::genesis::Content; use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; use penumbra_num::Amount; @@ -13,7 +15,7 @@ use penumbra_proto::{ }, }; use penumbra_stake::{rate::RateData, validator::Validator, IdentityKey}; -use sqlx::{PgPool, Postgres, Transaction}; +use sqlx::{Postgres, Transaction}; use std::iter; use crate::parsing::parse_content; @@ -900,19 +902,20 @@ async fn add_genesis_native_token_allocation_supply<'a>( } #[derive(Debug)] -pub struct Component { - event_strings: HashSet<&'static str>, -} +pub struct Component {} impl Component { pub fn new() -> Self { - let event_strings = Event::NAMES.into_iter().collect(); - Self { event_strings } + Self {} } } #[async_trait] impl AppView for Component { + fn name(&self) -> String { + "supply".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -930,16 +933,18 @@ impl AppView for Component { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - self.event_strings.contains(type_str) - } - - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - Event::try_from(event)?.index(dbtx).await + for event in batch.events() { + let e = match Event::try_from(event) { + Ok(e) => e, + Err(_) => continue, + }; + e.index(dbtx).await?; + } + Ok(()) } } diff --git a/crates/util/cometindex/examples/fmd_clues.rs b/crates/util/cometindex/examples/fmd_clues.rs deleted file mode 100644 index fbd5fb3d15..0000000000 --- a/crates/util/cometindex/examples/fmd_clues.rs +++ /dev/null @@ -1,86 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction}; -use sqlx::PgPool; - -// This example is silly because it doesn't do any "compilation" of the raw -// events, so it's only useful as an example of exercising the harness and the -// intended usage: the _downstream_ crate depends on cometindex (generic over -// any event) and has its own app specific logic. But it doesn't have to -// reimplement the binary handling / arg parsing / etc - -#[derive(Debug)] -struct FmdCluesExample {} - -#[async_trait] -impl AppView for FmdCluesExample { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - " -CREATE TABLE IF NOT EXISTS fmd_clues_example ( - id SERIAL PRIMARY KEY, - tx_hash BYTEA NOT NULL, - fmd_clue VARCHAR NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue" - } - - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<(), anyhow::Error> { - // this is just an example in the integration tests, so we don't want to do any - // - queries against existing table state - // - parsing of the event data into structured data - // - computations of derived data - // but these should all be possible - let clue = event - .event - .attributes - .iter() - .find(|attr| attr.key == "clue") - .expect("fmd_clue attribute not found") - .value - .clone(); - let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec(); - - sqlx::query( - " - INSERT INTO fmd_clues (tx_hash, fmd_clue) - VALUES ($1, $2) - ", - ) - .bind(&tx_hash) - .bind(&clue) - .execute(dbtx.as_mut()) - .await?; - - Ok(()) - } -} - -#[tokio::main] -async fn main() -> Result<()> { - Indexer::new(Options::parse()) - .with_default_tracing() - // add as many indexers as you want - .with_index(FmdCluesExample {}) - .run() - .await?; - - Ok(()) -} diff --git a/crates/util/cometindex/src/contextualized.rs b/crates/util/cometindex/src/contextualized.rs index 2b34f34254..1805a3df41 100644 --- a/crates/util/cometindex/src/contextualized.rs +++ b/crates/util/cometindex/src/contextualized.rs @@ -1,6 +1,6 @@ use tendermint::abci::Event; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ContextualizedEvent { pub event: Event, pub block_height: u64, diff --git a/crates/util/cometindex/src/engine.rs b/crates/util/cometindex/src/engine.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/crates/util/cometindex/src/engine.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index f90d666b42..59a8018cd1 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; pub use sqlx::PgPool; use sqlx::{Postgres, Transaction}; @@ -6,21 +8,52 @@ use crate::ContextualizedEvent; pub type PgTransaction<'a> = Transaction<'a, Postgres>; +/// Represents all of the events in a given block +#[derive(Clone, Debug)] +pub struct BlockEvents { + /// The height of this block. + pub height: u64, + /// The events contained in this block, in order. + pub events: Vec, +} + +#[derive(Clone, Debug)] +pub struct EventBatch { + pub first_height: u64, + pub last_height: u64, + /// The batch of events, ordered by increasing height. + /// + /// The heights are guaranteed to be increasing, and to be contiguous. + pub by_height: Arc>, +} + +impl EventBatch { + pub fn events(&self) -> impl Iterator { + self.by_height.iter().flat_map(|x| x.events.iter()) + } +} + /// Represents a specific index of raw event data. #[async_trait] -pub trait AppView: std::fmt::Debug { +pub trait AppView: Send + Sync { + /// Return the name of this index. + /// + /// This should be unique across all of the indices. + fn name(&self) -> String; + + /// This will be called once when processing the genesis before the first block. async fn init_chain( &self, dbtx: &mut PgTransaction, app_state: &serde_json::Value, ) -> Result<(), anyhow::Error>; - fn is_relevant(&self, type_str: &str) -> bool; - - async fn index_event( + /// This allows processing a batch of events, over many blocks. + /// + /// By using a batch, we can potentially avoid a costly + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error>; } diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index 962b845dfb..04e1d3b915 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -1,29 +1,123 @@ -use std::pin::Pin; +mod indexing_state; +use crate::{index::EventBatch, opt::Options, AppView}; use anyhow::{Context as _, Result}; -use futures::{Stream, StreamExt, TryStreamExt}; -use sqlx::{postgres::PgPoolOptions, PgPool}; -use tap::{Tap, TapFallible, TapOptional}; -use tendermint::abci; -use tracing::{debug, info}; +use indexing_state::{Height, IndexingState}; +use std::sync::Arc; +use tokio::{sync::mpsc, task::JoinSet}; + +#[tracing::instrument(skip_all)] +async fn catchup( + state: &IndexingState, + indices: &[Arc], + genesis: Arc, +) -> anyhow::Result<()> { + if indices.len() <= 0 { + tracing::info!(why = "no indices", "catchup completed"); + return Ok(()); + } + + let (src_height, index_heights) = tokio::try_join!(state.src_height(), state.index_heights())?; + tracing::info!(?src_height, ?index_heights, "catchup status"); + let lowest_index_height = index_heights.values().copied().min().unwrap_or_default(); + if lowest_index_height >= src_height { + tracing::info!(why = "already caught up", "catchup completed"); + return Ok(()); + } + + // Constants that influence performance. + const DEFAULT_BATCH_SIZE: u64 = 1000; + const BATCH_LOOKAHEAD: usize = 2; + + let mut tasks = JoinSet::>::new(); + + let mut txs = Vec::with_capacity(indices.len()); + for index in indices.iter().cloned() { + let (tx, mut rx) = mpsc::channel::(BATCH_LOOKAHEAD); + txs.push(tx); + let name = index.name(); + let index_height = index_heights.get(&name).copied().unwrap_or_default(); + let state_cp = state.clone(); + let genesis_cp = genesis.clone(); + tasks.spawn(async move { + if index_height == Height::default() { + tracing::info!(?name, "initializing index"); + let mut dbtx = state_cp.begin_transaction().await?; + index.init_chain(&mut dbtx, &genesis_cp).await?; + tracing::info!(?name, "finished initialization"); + IndexingState::update_index_height(&mut dbtx, &name, Height::post_genesis()) + .await?; + dbtx.commit().await?; + } else { + tracing::info!(?name, "already initialized"); + } + while let Some(events) = rx.recv().await { + let mut dbtx = state_cp.begin_transaction().await?; + let last_height = events.last_height; + if index_height >= Height::from(last_height) { + tracing::info!( + first = events.first_height, + last = events.last_height, + index_name = &name, + "skipping batch" + ); + continue; + } + tracing::info!( + first = events.first_height, + last = events.last_height, + index_name = &name, + "indexing batch" + ); + index.index_batch(&mut dbtx, events).await?; + tracing::debug!(index_name = &name, "committing batch"); + IndexingState::update_index_height(&mut dbtx, &name, Height::from(last_height)) + .await?; -use crate::{opt::Options, AppView, ContextualizedEvent, PgTransaction}; + dbtx.commit().await?; + } + Ok(()) + }); + } + + let state_cp = state.clone(); + tasks.spawn(async move { + let mut height = lowest_index_height.next(); + while height <= src_height { + let first = height; + let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height); + height = next_height; + tracing::debug!(?first, ?last, "fetching batch"); + let events = state_cp.event_batch(first, last).await?; + tracing::info!(?first, ?last, "sending batch"); + for tx in &txs { + tx.send(events.clone()).await?; + } + } + Ok(()) + }); + + while let Some(res) = tasks.join_next().await { + res??; + } + Ok(()) +} pub struct Indexer { opts: Options, - indexes: Vec>, + indices: Vec>, } impl Indexer { pub fn new(opts: Options) -> Self { Self { opts, - indexes: Vec::new(), + indices: Vec::new(), } } - pub fn with_index(mut self, index: impl AppView + 'static) -> Self { - self.indexes.push(Box::new(index)); + pub fn with_index(mut self, index: Box) -> Self { + self.indices.push(Arc::from(index)); self } @@ -32,19 +126,6 @@ impl Indexer { self } - async fn create_dst_tables( - pool: &PgPool, - indexes: &[Box], - app_state: &serde_json::Value, - ) -> Result<()> { - let mut dbtx = pool.begin().await?; - for index in indexes { - index.init_chain(&mut dbtx, app_state).await?; - } - dbtx.commit().await?; - Ok(()) - } - pub async fn run(self) -> Result<(), anyhow::Error> { tracing::info!(?self.opts); let Self { @@ -56,262 +137,24 @@ impl Indexer { poll_ms, genesis_json, }, - indexes, + indices: indexes, } = self; - // Create a source db, with, for sanity, some read only settings. - // These will be overrideable by a consumer who knows what they're doing, - // but prevents basic mistakes. - // c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 - let src_db = PgPoolOptions::new() - .after_connect(|conn, _| { - Box::pin(async move { - sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") - .execute(conn) - .await?; - Ok(()) - }) - }) - .connect(&src_database_url) - .await?; - - let dst_db = PgPool::connect(&dst_database_url).await?; - - // Check if the destination db is initialized - let dst_db_initialized: bool = sqlx::query_scalar( - "SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'index_watermark' - )", + let state = IndexingState::init(&src_database_url, &dst_database_url).await?; + let genesis: serde_json::Value = serde_json::from_str( + &std::fs::read_to_string(genesis_json) + .context("error reading provided genesis.json file")?, ) - .fetch_one(&dst_db) - .await?; - - if !dst_db_initialized { - tracing::info!("no watermark found, initializing with genesis data"); - - // Create the table if it doesn't exist - sqlx::query("CREATE TABLE index_watermark (events_rowid BIGINT NOT NULL)") - .execute(&dst_db) - .await?; - - // Load the genesis JSON to be used populating initial tables - let genesis_content: serde_json::Value = serde_json::from_str( - &std::fs::read_to_string(genesis_json) - .context("error reading provided genesis.json file")?, - ) - .context("error parsing provided genesis.json file")?; - let app_state = genesis_content + .context("error parsing provided genesis.json file")?; + let app_state = Arc::new( + genesis .get("app_state") - .ok_or_else(|| anyhow::anyhow!("no app_state key in genesis.json"))?; - - Self::create_dst_tables(&dst_db, &indexes, app_state).await?; - } else { - tracing::info!("skipping genesis initialization"); - } - + .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))? + .clone(), + ); loop { - Self::tick(&src_db, &dst_db, &indexes).await?; + catchup(&state, indexes.as_slice(), app_state.clone()).await?; tokio::time::sleep(poll_ms).await; } } - - async fn tick( - src_db: &PgPool, - dst_db: &PgPool, - indexes: &[Box], - ) -> Result<(), anyhow::Error> { - // Fetch the highest rowid processed so far (the watermark) - let current_watermark: Option = - sqlx::query_as("SELECT events_rowid FROM index_watermark") - .fetch_optional(dst_db) - .await? - .map(|(w,)| w) - .tap_some(|row_id| debug!(%row_id, "fetched index watermark")) - .tap_none(|| debug!("no index watermark was present")); - - // Insert initial watermark if not present, so we can use a SET query later - if current_watermark.is_none() { - sqlx::query("INSERT INTO index_watermark (events_rowid) VALUES (0)") - .execute(dst_db) - .await? - .tap(|_| debug!("set index watermark to 0")); - } - - let watermark = current_watermark.unwrap_or(0); - - // Calculate new events count since the last watermark - sqlx::query_as::<_, (i64,)>("SELECT MAX(rowid) - $1 FROM events") - .bind(watermark) - .fetch_one(src_db) - .await - .map(|(count,)| count)? - .tap(|count| info!(%count, %watermark, "new events since last watermark")); - - let mut scanned_events = 0usize; - let mut relevant_events = 0usize; - - let mut es = read_events(&src_db, watermark); - let mut dbtx = dst_db.begin().await?; - while let Some(event) = es.next().await.transpose()? { - if scanned_events % 1000 == 0 { - tracing::info!(scanned_events, relevant_events); - } else { - tracing::debug!( - block_height = %event.block_height, - kind = %event.event.kind, - scanned_events, - relevant_events, - "processing event" - ); - } - - scanned_events += 1; - - // if not relevant then skip making a db tx for the dst db - if !indexes - .iter() - .any(|index| index.is_relevant(&event.as_ref().kind)) - { - tracing::trace!(kind = %event.as_ref().kind, "event is not relevant to any views"); - continue; - } - - relevant_events += 1; - - for index in indexes { - if index.is_relevant(&event.as_ref().kind) { - tracing::debug!(?event, ?index, "relevant to index"); - index.index_event(&mut dbtx, &event, &src_db).await?; - } - } - // Mark that we got to at least this event - update_watermark(&mut dbtx, event.local_rowid).await?; - // Only commit in batches of <= 1000 events, for about a 5x performance increase when - // catching up. - if relevant_events % 1000 == 0 { - dbtx.commit().await?; - dbtx = dst_db.begin().await?; - } - } - // Flush out the remaining changes. - dbtx.commit().await?; - - Ok(()) - } -} - -async fn update_watermark(dbtx: &mut PgTransaction<'_>, watermark: i64) -> Result<()> { - sqlx::query("UPDATE index_watermark SET events_rowid = $1") - .bind(watermark) - .execute(dbtx.as_mut()) // lol, see note on Executor trait about Transaction impl - .await - .tap_ok(|affected| { - debug!(%watermark, "updated index watermark"); - debug_assert_eq!( - affected.rows_affected(), - 1, - "only one row should be affected when updating the index watermark" - ); - }) - .map(|_| ()) - .map_err(anyhow::Error::from) -} - -fn read_events( - src_db: &PgPool, - watermark: i64, -) -> Pin> + Send + '_>> { - let event_stream = sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( - // This query does some shenanigans to ensure good performance. - // The main trick is that we know that each event has 1 block and <= 1 transaction associated - // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and - // then a sort, and instead work from the events in a linear fashion. - // Basically, this query ends up doing: - // - // for event in events >= id: - // attach attributes - // attach block - // attach transaction? - r#" -SELECT - events.rowid, - events.type, - blocks.height AS block_height, - tx_results.tx_hash, - events.attrs -FROM ( - SELECT - rowid, - type, - block_id, - tx_id, - jsonb_object_agg(attributes.key, attributes.value) AS attrs - FROM - events - LEFT JOIN - attributes ON rowid = attributes.event_id - WHERE - rowid > $1 - GROUP BY - rowid, - type, - block_id, - tx_id -) events -LEFT JOIN LATERAL ( - SELECT * FROM blocks WHERE blocks.rowid = events.block_id LIMIT 1 -) blocks -ON TRUE -LEFT JOIN LATERAL ( - SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1 -) tx_results -ON TRUE -ORDER BY - events.rowid ASC - "#, - ) - .bind(watermark) - .fetch(src_db) - .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { - tracing::debug!(?local_rowid, type_str, height, ?tx_hash); - let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { - hex::decode(s) - .expect("invalid tx_hash") - .try_into() - .expect("expected 32 bytes") - }); - let block_height = height as u64; - - let serde_json::Value::Object(attrs) = attrs else { - // saves an allocation below bc we can take ownership - panic!("expected JSON object"); - }; - - let event = abci::Event { - kind: type_str, - attributes: attrs - .into_iter() - .filter_map(|(k, v)| match v { - serde_json::Value::String(s) => Some((k, s)), - // we never hit this becasue of how we constructed the query - _ => None, - }) - .map(Into::into) - .collect(), - }; - - let ce = ContextualizedEvent { - event, - block_height, - tx_hash, - local_rowid, - }; - //tracing::info!(?ce); - - ce - }) - .map_err(|e| anyhow::Error::from(e).context("error reading from database")); - - event_stream.boxed() } diff --git a/crates/util/cometindex/src/indexer/indexing_state.rs b/crates/util/cometindex/src/indexer/indexing_state.rs new file mode 100644 index 0000000000..23f75a2c8c --- /dev/null +++ b/crates/util/cometindex/src/indexer/indexing_state.rs @@ -0,0 +1,298 @@ +use std::{collections::HashMap, sync::Arc}; + +use futures::TryStreamExt; +use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction}; +use tendermint::abci; + +use crate::{ + index::{BlockEvents, EventBatch}, + ContextualizedEvent, +}; + +/// Create a Database, with, for sanity, some read only settings. +/// +/// These will be overrideable by a consumer who knows what they're doing, +/// but prevents basic mistakes. +/// c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 +async fn read_only_db(url: &str) -> anyhow::Result { + PgPoolOptions::new() + .after_connect(|conn, _| { + Box::pin(async move { + sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + .execute(conn) + .await?; + Ok(()) + }) + }) + .connect(url) + .await + .map_err(Into::into) +} + +async fn read_write_db(url: &str) -> anyhow::Result { + PgPoolOptions::new().connect(url).await.map_err(Into::into) +} + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Height(u64); + +impl Height { + pub fn post_genesis() -> Self { + Height(1) + } + /// Return the last height in the batch, and then the first height in the next batch. + pub fn advance(self, batch_size: u64, max_height: Height) -> (Height, Height) { + let last = Height::from(self.0 + batch_size - 1).min(max_height); + let next_first = Height::from(last.0 + 1); + (last, next_first) + } + + pub fn next(self) -> Height { + Self(self.0 + 1) + } +} + +impl From for Height { + fn from(value: u64) -> Self { + Self(value) + } +} + +impl TryFrom for Height { + type Error = anyhow::Error; + + fn try_from(value: i64) -> Result { + Ok(Self(u64::try_from(value)?)) + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for Height { + fn decode( + value: >::ValueRef, + ) -> Result { + Ok(Height::try_from( + >::decode(value)?, + )?) + } +} + +impl sqlx::Type for Height { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl<'q> sqlx::Encode<'q, Postgres> for Height { + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + >::encode( + i64::try_from(self.0).expect("height should never exceed i64::MAX"), + buf, + ) + } +} + +#[derive(Debug, Clone)] +pub struct IndexingState { + src: PgPool, + dst: PgPool, +} + +impl IndexingState { + async fn create_watermark_table(&self) -> anyhow::Result<()> { + sqlx::query( + " + CREATE TABLE IF NOT EXISTS index_watermarks ( + index_name TEXT PRIMARY KEY, + height BIGINT NOT NULL + ) + ", + ) + .execute(&self.dst) + .await?; + Ok(()) + } + + /// The largest height for which we know we have all events. + pub async fn src_height(&self) -> anyhow::Result { + // We may be currently indexing events for this block. + let res: Option = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks") + .fetch_optional(&self.src) + .await?; + Ok(res.unwrap_or_default()) + } + + pub async fn index_heights(&self) -> anyhow::Result> { + let rows: Vec<(String, Height)> = + sqlx::query_as("SELECT index_name, height FROM index_watermarks") + .fetch_all(&self.dst) + .await?; + Ok(rows.into_iter().collect()) + } + + pub async fn update_index_height( + dbtx: &mut sqlx::Transaction<'_, Postgres>, + name: &str, + height: Height, + ) -> anyhow::Result<()> { + sqlx::query( + " + INSERT INTO index_watermarks + VALUES ($1, $2) + ON CONFLICT (index_name) + DO UPDATE SET height = excluded.height + ", + ) + .bind(name) + .bind(height) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result { + // The amount of events we expect a block to have. + const WORKING_CAPACITY: usize = 32; + + let mut by_height = Vec::with_capacity((last.0 - first.0 + 1) as usize); + let mut event_stream = + sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( + // This query does some shenanigans to ensure good performance. + // The main trick is that we know that each event has 1 block and <= 1 transaction associated + // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and + // then a sort, and instead work from the events in a linear fashion. + // Basically, this query ends up doing: + // + // for event in events >= id: + // attach attributes + // attach block + // attach transaction? + r#" +SELECT + events.rowid, + events.type, + events.height, + tx_results.tx_hash, + events.attrs +FROM ( + SELECT + (SELECT height FROM blocks WHERE blocks.rowid = block_id) as height, + rowid, + type, + block_id, + tx_id, + jsonb_object_agg(attributes.key, attributes.value) AS attrs + FROM + events + LEFT JOIN + attributes ON rowid = attributes.event_id + WHERE + block_id >= (SELECT rowid FROM blocks where height = $1) + AND + block_id <= (SELECT rowid FROM blocks where height = $2) + GROUP BY + rowid, + type, + block_id, + tx_id + ORDER BY + rowid ASC +) events +LEFT JOIN LATERAL ( + SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1 +) tx_results +ON TRUE +ORDER BY + events.rowid ASC + "#, + ) + .bind(first) + .bind(last) + .fetch(&self.src) + .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { + tracing::debug!(?local_rowid, type_str, height, ?tx_hash); + let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { + hex::decode(s) + .expect("invalid tx_hash") + .try_into() + .expect("expected 32 bytes") + }); + let block_height = height as u64; + + let serde_json::Value::Object(attrs) = attrs else { + // saves an allocation below bc we can take ownership + panic!("expected JSON object"); + }; + + let event = abci::Event { + kind: type_str, + attributes: attrs + .into_iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(s) => Some((k, s)), + // we never hit this becasue of how we constructed the query + _ => None, + }) + .map(Into::into) + .collect(), + }; + + let ce = ContextualizedEvent { + event, + block_height, + tx_hash, + local_rowid, + }; + + ce + }) + .map_err(|e| anyhow::Error::from(e).context("error reading from database")); + + let mut height = first.0; + let mut current_batch = BlockEvents { + height: first.0, + events: Vec::with_capacity(WORKING_CAPACITY), + }; + while let Some(e) = event_stream.try_next().await? { + assert!(e.block_height >= height); + if e.block_height > height { + by_height.push(current_batch); + current_batch = BlockEvents { + height, + events: Vec::with_capacity(WORKING_CAPACITY), + }; + height = e.block_height; + } + current_batch.events.push(e); + } + // Flush the current block, and create empty ones for the remaining heights. + while height <= last.0 { + by_height.push(current_batch); + current_batch = BlockEvents { + height, + events: Vec::new(), + }; + height += 1; + } + Ok(EventBatch { + first_height: first.0, + last_height: last.0, + by_height: Arc::new(by_height), + }) + } + + pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result { + tracing::info!(url = src_url, "connecting to raw database"); + tracing::info!(url = dst_url, "connecting to derived database"); + let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?; + let out = Self { src, dst }; + out.create_watermark_table().await?; + Ok(out) + } + + pub async fn begin_transaction(&self) -> anyhow::Result> { + Ok(self.dst.begin().await?) + } +} diff --git a/crates/util/cometindex/src/lib.rs b/crates/util/cometindex/src/lib.rs index 907dd6cced..82858651fc 100644 --- a/crates/util/cometindex/src/lib.rs +++ b/crates/util/cometindex/src/lib.rs @@ -1,5 +1,4 @@ mod contextualized; -pub mod engine; pub mod index; pub mod indexer; pub mod opt; diff --git a/crates/util/cometindex/vendor/schema.sql b/crates/util/cometindex/vendor/schema.sql index fd78e677e0..b39563ebe0 100644 --- a/crates/util/cometindex/vendor/schema.sql +++ b/crates/util/cometindex/vendor/schema.sql @@ -55,6 +55,8 @@ CREATE TABLE events ( type VARCHAR NOT NULL ); +CREATE INDEX ON events(block_id); + -- The attributes table records event attributes. CREATE TABLE attributes ( event_id BIGINT NOT NULL REFERENCES events(rowid),