From 7533cc64930f96fb574afed5113efc85811bc3ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Wed, 6 Nov 2024 17:07:04 -0800 Subject: [PATCH] pindexer: implement batch processing API (#4913) This restructures the AppView interface to allow processing events in a batch. Previously, app views had to index one event at a time. This PR changes things so that app views get a batch of several blocks worth of events, with a guarantee to have all of the events in any block in the batch. ## Making App Views Easier to Write By having access to all the events in a block, app views are more ergonomic to write. For example, the dex explorer app view wants to know the time of the candlestick events it processes, but to do this, it needs to wait for the block root event later in the block, which provides this timestamp. Currently, because we don't have access to any context, we need to manually implement a queuing system in the database, which is very annoying, and a performance hit. ## Making App Views More Performant We can make app views more performant by processing both an entire block, and multiple blocks, since: - we don't need to write an update more than once per block to the database - we may be able to write updates less frequently, depending on the app view (e.g. when we need only the current value) - we can keep transient state in memory, instead of on the database, reducing writes and reads in all cases ## Additional Performance Improvements Now the app views are run in parallel, which provides additional improvements when syncing up. ## Testing Pindexer should work as usual, after wiping the database. # Checklist - [x] I have added guiding text to explain how a reviewer should test these changes. - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > pindexer only --- crates/bin/pindexer/src/block.rs | 49 +-- crates/bin/pindexer/src/dex_ex/mod.rs | 60 +-- crates/bin/pindexer/src/governance.rs | 241 +++++------ crates/bin/pindexer/src/ibc/mod.rs | 30 +- crates/bin/pindexer/src/indexer_ext.rs | 22 +- crates/bin/pindexer/src/insights/mod.rs | 79 ++-- crates/bin/pindexer/src/main.rs | 2 - .../bin/pindexer/src/stake/delegation_txs.rs | 38 +- .../bin/pindexer/src/stake/missed_blocks.rs | 38 +- crates/bin/pindexer/src/stake/slashings.rs | 82 ++-- .../pindexer/src/stake/undelegation_txs.rs | 38 +- .../bin/pindexer/src/stake/validator_set.rs | 104 ++--- crates/bin/pindexer/src/supply.rs | 37 +- crates/util/cometindex/examples/fmd_clues.rs | 86 ---- crates/util/cometindex/src/contextualized.rs | 2 +- crates/util/cometindex/src/engine.rs | 1 - crates/util/cometindex/src/index.rs | 45 +- crates/util/cometindex/src/indexer.rs | 391 ++++++------------ .../cometindex/src/indexer/indexing_state.rs | 298 +++++++++++++ crates/util/cometindex/src/lib.rs | 1 - crates/util/cometindex/vendor/schema.sql | 2 + 21 files changed, 886 insertions(+), 760 deletions(-) delete mode 100644 crates/util/cometindex/examples/fmd_clues.rs delete mode 100644 crates/util/cometindex/src/engine.rs create mode 100644 crates/util/cometindex/src/indexer/indexing_state.rs diff --git a/crates/bin/pindexer/src/block.rs b/crates/bin/pindexer/src/block.rs index 8ca1a97128..9308bd0595 100644 --- a/crates/bin/pindexer/src/block.rs +++ b/crates/bin/pindexer/src/block.rs @@ -1,12 +1,16 @@ -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent}; -use sqlx::{types::chrono::DateTime, PgPool}; +use sqlx::types::chrono::DateTime; #[derive(Debug)] pub struct Block {} #[async_trait] impl AppView for Block { + fn name(&self) -> String { + "block".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -27,34 +31,33 @@ CREATE TABLE IF NOT EXISTS block_details ( Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.sct.v1.EventBlockRoot" - } - - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventBlockRoot::from_event(event.as_ref())?; - let timestamp = pe.timestamp.unwrap_or_default(); + for event in batch.events() { + let pe = match pb::EventBlockRoot::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; + let timestamp = pe.timestamp.unwrap_or_default(); - sqlx::query( - " + sqlx::query( + " INSERT INTO block_details (height, timestamp, root) VALUES ($1, $2, $3) ", - ) - .bind(i64::try_from(pe.height)?) - .bind(DateTime::from_timestamp( - timestamp.seconds, - u32::try_from(timestamp.nanos)?, - )) - .bind(pe.root.unwrap().inner) - .execute(dbtx.as_mut()) - .await?; - + ) + .bind(i64::try_from(pe.height)?) + .bind(DateTime::from_timestamp( + timestamp.seconds, + u32::try_from(timestamp.nanos)?, + )) + .bind(pe.root.unwrap().inner) + .execute(dbtx.as_mut()) + .await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index ec3882d0c9..9574c90326 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -2,13 +2,11 @@ use std::fmt::Display; use anyhow::{anyhow, Context}; use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_asset::asset; use penumbra_dex::{event::EventCandlestickData, CandlestickData}; use penumbra_proto::{event::EventDomainType, DomainType}; use penumbra_sct::event::EventBlockRoot; -use prost::Name as _; -use sqlx::PgPool; type DateTime = sqlx::types::chrono::DateTime; @@ -453,35 +451,11 @@ impl Component { pub fn new() -> Self { Self {} } -} - -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - for statement in include_str!("schema.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - [ - ::Proto::full_name(), - ::Proto::full_name(), - ] - .into_iter() - .any(|x| type_str == x) - } async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let height = event.block_height; @@ -504,7 +478,35 @@ impl AppView for Component { } summary::update_all(dbtx, time).await?; } - tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + Ok(()) + } + + fn name(&self) -> String { + "dex_ex".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/governance.rs b/crates/bin/pindexer/src/governance.rs index ec49b0a74c..4d16090e58 100644 --- a/crates/bin/pindexer/src/governance.rs +++ b/crates/bin/pindexer/src/governance.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, Context, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_governance::{ proposal::ProposalPayloadToml, proposal_state, DelegatorVote, Proposal, ProposalDepositClaim, ProposalWithdraw, ValidatorVote, @@ -10,7 +12,6 @@ use penumbra_proto::{ event::ProtoEvent, }; use penumbra_stake::IdentityKey; -use sqlx::PgPool; #[derive(Debug)] pub struct GovernanceProposals {} @@ -24,16 +25,119 @@ const EVENT_PROPOSAL_FAILED: &str = "penumbra.core.component.governance.v1.Event const EVENT_PROPOSAL_SLASHED: &str = "penumbra.core.component.governance.v1.EventProposalSlashed"; const EVENT_PROPOSAL_DEPOSIT_CLAIM: &str = "penumbra.core.component.governance.v1.EventProposalDepositClaim"; -const ALL_RELEVANT_EVENTS: &[&str] = &[ - EVENT_PROPOSAL_SUBMIT, - EVENT_DELEGATOR_VOTE, - EVENT_VALIDATOR_VOTE, - EVENT_PROPOSAL_WITHDRAW, - EVENT_PROPOSAL_PASSED, - EVENT_PROPOSAL_FAILED, - EVENT_PROPOSAL_SLASHED, - EVENT_PROPOSAL_DEPOSIT_CLAIM, -]; + +impl GovernanceProposals { + async fn index_event( + &self, + dbtx: &mut PgTransaction<'_>, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + match event.event.kind.as_str() { + EVENT_PROPOSAL_SUBMIT => { + let pe = pb::EventProposalSubmit::from_event(event.as_ref())?; + let start_block_height = pe.start_height; + let end_block_height = pe.end_height; + let submit = pe + .submit + .ok_or_else(|| anyhow!("missing submit in event"))?; + let deposit_amount = submit + .deposit_amount + .ok_or_else(|| anyhow!("missing deposit amount in event"))? + .try_into() + .context("error converting deposit amount")?; + let proposal = submit + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_submit( + dbtx, + proposal, + deposit_amount, + start_block_height, + end_block_height, + event.block_height, + ) + .await?; + } + EVENT_DELEGATOR_VOTE => { + let pe = pb::EventDelegatorVote::from_event(event.as_ref())?; + let vote = pe + .vote + .ok_or_else(|| anyhow!("missing vote in event"))? + .try_into() + .context("error converting delegator vote")?; + let validator_identity_key = pe + .validator_identity_key + .ok_or_else(|| anyhow!("missing validator identity key in event"))? + .try_into() + .context("error converting validator identity key")?; + handle_delegator_vote(dbtx, vote, validator_identity_key, event.block_height) + .await?; + } + EVENT_VALIDATOR_VOTE => { + let pe = pb::EventValidatorVote::from_event(event.as_ref())?; + let voting_power = pe.voting_power; + let vote = pe + .vote + .ok_or_else(|| anyhow!("missing vote in event"))? + .try_into() + .context("error converting vote")?; + handle_validator_vote(dbtx, vote, voting_power, event.block_height).await?; + } + EVENT_PROPOSAL_WITHDRAW => { + let pe = pb::EventProposalWithdraw::from_event(event.as_ref())?; + let proposal_withdraw: ProposalWithdraw = pe + .withdraw + .ok_or_else(|| anyhow!("missing withdraw in event"))? + .try_into() + .context("error converting proposal withdraw")?; + let proposal = proposal_withdraw.proposal; + let reason = proposal_withdraw.reason; + handle_proposal_withdraw(dbtx, proposal, reason).await?; + } + EVENT_PROPOSAL_PASSED => { + let pe = pb::EventProposalPassed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_passed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_FAILED => { + let pe = pb::EventProposalFailed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_failed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_SLASHED => { + let pe = pb::EventProposalSlashed::from_event(event.as_ref())?; + let proposal = pe + .proposal + .ok_or_else(|| anyhow!("missing proposal in event"))? + .try_into() + .context("error converting proposal")?; + handle_proposal_slashed(dbtx, proposal).await?; + } + EVENT_PROPOSAL_DEPOSIT_CLAIM => { + let pe = pb::EventProposalDepositClaim::from_event(event.as_ref())?; + let deposit_claim = pe + .deposit_claim + .ok_or_else(|| anyhow!("missing deposit claim in event"))? + .try_into() + .context("error converting deposit claim")?; + handle_proposal_deposit_claim(dbtx, deposit_claim).await?; + } + _ => {} + } + + Ok(()) + } +} #[async_trait] impl AppView for GovernanceProposals { @@ -213,119 +317,18 @@ impl AppView for GovernanceProposals { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - ALL_RELEVANT_EVENTS.contains(&type_str) + fn name(&self) -> String { + "governance".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - match event.event.kind.as_str() { - EVENT_PROPOSAL_SUBMIT => { - let pe = pb::EventProposalSubmit::from_event(event.as_ref())?; - let start_block_height = pe.start_height; - let end_block_height = pe.end_height; - let submit = pe - .submit - .ok_or_else(|| anyhow!("missing submit in event"))?; - let deposit_amount = submit - .deposit_amount - .ok_or_else(|| anyhow!("missing deposit amount in event"))? - .try_into() - .context("error converting deposit amount")?; - let proposal = submit - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_submit( - dbtx, - proposal, - deposit_amount, - start_block_height, - end_block_height, - event.block_height, - ) - .await?; - } - EVENT_DELEGATOR_VOTE => { - let pe = pb::EventDelegatorVote::from_event(event.as_ref())?; - let vote = pe - .vote - .ok_or_else(|| anyhow!("missing vote in event"))? - .try_into() - .context("error converting delegator vote")?; - let validator_identity_key = pe - .validator_identity_key - .ok_or_else(|| anyhow!("missing validator identity key in event"))? - .try_into() - .context("error converting validator identity key")?; - handle_delegator_vote(dbtx, vote, validator_identity_key, event.block_height) - .await?; - } - EVENT_VALIDATOR_VOTE => { - let pe = pb::EventValidatorVote::from_event(event.as_ref())?; - let voting_power = pe.voting_power; - let vote = pe - .vote - .ok_or_else(|| anyhow!("missing vote in event"))? - .try_into() - .context("error converting vote")?; - handle_validator_vote(dbtx, vote, voting_power, event.block_height).await?; - } - EVENT_PROPOSAL_WITHDRAW => { - let pe = pb::EventProposalWithdraw::from_event(event.as_ref())?; - let proposal_withdraw: ProposalWithdraw = pe - .withdraw - .ok_or_else(|| anyhow!("missing withdraw in event"))? - .try_into() - .context("error converting proposal withdraw")?; - let proposal = proposal_withdraw.proposal; - let reason = proposal_withdraw.reason; - handle_proposal_withdraw(dbtx, proposal, reason).await?; - } - EVENT_PROPOSAL_PASSED => { - let pe = pb::EventProposalPassed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_passed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_FAILED => { - let pe = pb::EventProposalFailed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_failed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_SLASHED => { - let pe = pb::EventProposalSlashed::from_event(event.as_ref())?; - let proposal = pe - .proposal - .ok_or_else(|| anyhow!("missing proposal in event"))? - .try_into() - .context("error converting proposal")?; - handle_proposal_slashed(dbtx, proposal).await?; - } - EVENT_PROPOSAL_DEPOSIT_CLAIM => { - let pe = pb::EventProposalDepositClaim::from_event(event.as_ref())?; - let deposit_claim = pe - .deposit_claim - .ok_or_else(|| anyhow!("missing deposit claim in event"))? - .try_into() - .context("error converting deposit claim")?; - handle_proposal_deposit_claim(dbtx, deposit_claim).await?; - } - _ => {} + for event in batch.events() { + self.index_event(dbtx, event).await?; } - Ok(()) } } diff --git a/crates/bin/pindexer/src/ibc/mod.rs b/crates/bin/pindexer/src/ibc/mod.rs index 9bc6d795c4..417fd5ca19 100644 --- a/crates/bin/pindexer/src/ibc/mod.rs +++ b/crates/bin/pindexer/src/ibc/mod.rs @@ -1,5 +1,5 @@ use anyhow::anyhow; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_asset::Value; use penumbra_keys::Address; use penumbra_proto::{ @@ -8,7 +8,6 @@ use penumbra_proto::{ }, event::ProtoEvent as _, }; -use sqlx::PgPool; /// The kind of event we might care about. #[derive(Clone, Copy, Debug)] @@ -198,6 +197,10 @@ impl Component { #[async_trait] impl AppView for Component { + fn name(&self) -> String { + "ibc".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -206,18 +209,19 @@ impl AppView for Component { init_db(dbtx).await } - fn is_relevant(&self, type_str: &str) -> bool { - EventKind::try_from(type_str).is_ok() - } - - #[tracing::instrument(skip_all, fields(height = event.block_height, name = event.event.kind.as_str()))] - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> anyhow::Result<()> { - let transfer = Event::try_from(event)?.db_transfer(); - create_transfer(dbtx, event.block_height, transfer).await + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + let parsed = match Event::try_from(event) { + Ok(p) => p, + Err(_) => continue, + }; + let transfer = parsed.db_transfer(); + create_transfer(dbtx, event.block_height, transfer).await?; + } + Ok(()) } } diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index ddaa667635..c8dd76a31e 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -6,21 +6,21 @@ pub trait IndexerExt: Sized { impl IndexerExt for cometindex::Indexer { fn with_default_penumbra_app_views(self) -> Self { - self.with_index(crate::block::Block {}) - .with_index(crate::stake::ValidatorSet {}) - .with_index(crate::stake::Slashings {}) - .with_index(crate::stake::DelegationTxs {}) - .with_index(crate::stake::UndelegationTxs {}) - .with_index(crate::governance::GovernanceProposals {}) - .with_index(crate::dex_ex::Component::new()) - .with_index(crate::supply::Component::new()) - .with_index(crate::ibc::Component::new()) - .with_index(crate::insights::Component::new( + self.with_index(Box::new(crate::block::Block {})) + .with_index(Box::new(crate::stake::ValidatorSet {})) + .with_index(Box::new(crate::stake::Slashings {})) + .with_index(Box::new(crate::stake::DelegationTxs {})) + .with_index(Box::new(crate::stake::UndelegationTxs {})) + .with_index(Box::new(crate::governance::GovernanceProposals {})) + .with_index(Box::new(crate::dex_ex::Component::new())) + .with_index(Box::new(crate::supply::Component::new())) + .with_index(Box::new(crate::ibc::Component::new())) + .with_index(Box::new(crate::insights::Component::new( penumbra_asset::asset::Id::from_str( // USDC "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", ) .ok(), - )) + ))) } } diff --git a/crates/bin/pindexer/src/insights/mod.rs b/crates/bin/pindexer/src/insights/mod.rs index 221c64eb25..26887380da 100644 --- a/crates/bin/pindexer/src/insights/mod.rs +++ b/crates/bin/pindexer/src/insights/mod.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeMap, iter}; -use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; use penumbra_app::genesis::Content; use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; use penumbra_dex::{ @@ -10,7 +10,7 @@ use penumbra_dex::{ use penumbra_fee::event::EventBlockFees; use penumbra_funding::event::EventFundingStreamReward; use penumbra_num::Amount; -use penumbra_proto::{event::EventDomainType, DomainType, Name}; +use penumbra_proto::event::EventDomainType; use penumbra_shielded_pool::event::{ EventInboundFungibleTokenTransfer, EventOutboundFungibleTokenRefund, EventOutboundFungibleTokenTransfer, @@ -20,7 +20,6 @@ use penumbra_stake::{ validator::Validator, IdentityKey, }; -use sqlx::PgPool; use crate::parsing::parse_content; @@ -280,47 +279,12 @@ async fn add_genesis_native_token_allocation_supply<'a>( Ok(()) } -#[async_trait] -impl AppView for Component { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - for statement in include_str!("schema.sql").split(";") { - sqlx::query(statement).execute(dbtx.as_mut()).await?; - } - - // decode the initial supply from the genesis - // initial app state is not recomputed from events, because events are not emitted in init_chain. - // instead, the indexer directly parses the genesis. - add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - [ - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ::Proto::full_name(), - ] - .into_iter() - .any(|x| type_str == x) - } +impl Component { async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { let height = event.block_height; if let Ok(e) = EventUndelegate::try_from_event(&event.event) { @@ -504,3 +468,38 @@ impl AppView for Component { Ok(()) } } + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + + // decode the initial supply from the genesis + // initial app state is not recomputed from events, because events are not emitted in init_chain. + // instead, the indexer directly parses the genesis. + add_genesis_native_token_allocation_supply(dbtx, &parse_content(app_state.clone())?) + .await?; + Ok(()) + } + + fn name(&self) -> String { + "insights".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/main.rs b/crates/bin/pindexer/src/main.rs index 71989105ff..fee7ab0f90 100644 --- a/crates/bin/pindexer/src/main.rs +++ b/crates/bin/pindexer/src/main.rs @@ -1,6 +1,5 @@ use anyhow::Result; use clap::Parser as _; -use pindexer::block::Block; use pindexer::{Indexer, IndexerExt as _, Options}; #[tokio::main] @@ -8,7 +7,6 @@ async fn main() -> Result<()> { Indexer::new(Options::parse()) .with_default_tracing() .with_default_penumbra_app_views() - .with_index(Block {}) .run() .await?; diff --git a/crates/bin/pindexer/src/stake/delegation_txs.rs b/crates/bin/pindexer/src/stake/delegation_txs.rs index c25dc63a4d..bacbd5276e 100644 --- a/crates/bin/pindexer/src/stake/delegation_txs.rs +++ b/crates/bin/pindexer/src/stake/delegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -47,29 +47,28 @@ impl AppView for DelegationTxs { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventDelegate" + fn name(&self) -> String { + "stake/delegation_txs".to_string() } - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<()> { - let pe = pb::EventDelegate::from_event(event.as_ref())?; + async fn index_batch(&self, dbtx: &mut PgTransaction, batch: EventBatch) -> Result<()> { + for event in batch.events() { + let pe = match pb::EventDelegate::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let amount = Amount::try_from( - pe.amount - .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, - )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; - sqlx::query( + sqlx::query( "INSERT INTO stake_delegation_txs (ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" ) .bind(ik.to_string()) @@ -78,6 +77,7 @@ impl AppView for DelegationTxs { .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/missed_blocks.rs b/crates/bin/pindexer/src/stake/missed_blocks.rs index 4516d619ac..a289ca0b4a 100644 --- a/crates/bin/pindexer/src/stake/missed_blocks.rs +++ b/crates/bin/pindexer/src/stake/missed_blocks.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -45,29 +45,33 @@ impl AppView for MissedBlocks { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventValidatorMissedBlock" + fn name(&self) -> String { + "stake/missed_blocks".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventValidatorMissedBlock::from_event(event.as_ref())?; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + for event in batch.events() { + let pe = match pb::EventValidatorMissedBlock::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let height = event.block_height; + let height = event.block_height; - sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)") - .bind(height as i64) - .bind(ik.to_string()) - .execute(dbtx.as_mut()) - .await?; + sqlx::query("INSERT INTO stake_missed_blocks (height, ik) VALUES ($1, $2)") + .bind(height as i64) + .bind(ik.to_string()) + .execute(dbtx.as_mut()) + .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/slashings.rs b/crates/bin/pindexer/src/stake/slashings.rs index bbc472171f..68134c6690 100644 --- a/crates/bin/pindexer/src/stake/slashings.rs +++ b/crates/bin/pindexer/src/stake/slashings.rs @@ -1,5 +1,7 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -7,6 +9,44 @@ use penumbra_stake::IdentityKey; #[derive(Debug)] pub struct Slashings {} +impl Slashings { + async fn index_event( + &self, + dbtx: &mut PgTransaction<'_>, + event: &ContextualizedEvent, + ) -> Result<(), anyhow::Error> { + let pe = match pb::EventSlashingPenaltyApplied::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => return Ok(()), + }; + let ik = IdentityKey::try_from( + pe.identity_key + .ok_or_else(|| anyhow!("missing ik in event"))?, + )?; + + let height = event.block_height; + let epoch_index = pe.epoch_index; + + let penalty_json = serde_json::to_string( + &pe.new_penalty + .ok_or_else(|| anyhow!("missing new_penalty"))?, + )?; + + sqlx::query( + "INSERT INTO stake_slashings (height, ik, epoch_index, penalty) + VALUES ($1, $2, $3, $4)", + ) + .bind(height as i64) + .bind(ik.to_string()) + .bind(epoch_index as i64) + .bind(penalty_json) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } +} + #[async_trait] impl AppView for Slashings { async fn init_chain( @@ -41,44 +81,18 @@ impl AppView for Slashings { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - match type_str { - "penumbra.core.component.stake.v1.EventSlashingPenaltyApplied" => true, - _ => false, - } + fn name(&self) -> String { + "stake/slashings".to_string() } - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - let pe = pb::EventSlashingPenaltyApplied::from_event(event.as_ref())?; - let ik = IdentityKey::try_from( - pe.identity_key - .ok_or_else(|| anyhow!("missing ik in event"))?, - )?; - - let height = event.block_height; - let epoch_index = pe.epoch_index; - - let penalty_json = serde_json::to_string( - &pe.new_penalty - .ok_or_else(|| anyhow!("missing new_penalty"))?, - )?; - - sqlx::query( - "INSERT INTO stake_slashings (height, ik, epoch_index, penalty) - VALUES ($1, $2, $3, $4)", - ) - .bind(height as i64) - .bind(ik.to_string()) - .bind(epoch_index as i64) - .bind(penalty_json) - .execute(dbtx.as_mut()) - .await?; - + for event in batch.events() { + self.index_event(dbtx, event).await?; + } Ok(()) } } diff --git a/crates/bin/pindexer/src/stake/undelegation_txs.rs b/crates/bin/pindexer/src/stake/undelegation_txs.rs index aeb9d90511..e9a0a31d48 100644 --- a/crates/bin/pindexer/src/stake/undelegation_txs.rs +++ b/crates/bin/pindexer/src/stake/undelegation_txs.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction}; use penumbra_num::Amount; use penumbra_proto::{core::component::stake::v1 as pb, event::ProtoEvent}; use penumbra_stake::IdentityKey; @@ -47,29 +47,28 @@ impl AppView for UndelegationTxs { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.stake.v1.EventUndelegate" + fn name(&self) -> String { + "stake/undelegation_txs".to_string() } - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<()> { - let pe = pb::EventUndelegate::from_event(event.as_ref())?; + async fn index_batch(&self, dbtx: &mut PgTransaction, batch: EventBatch) -> Result<()> { + for event in batch.events() { + let pe = match pb::EventUndelegate::from_event(event.as_ref()) { + Ok(pe) => pe, + Err(_) => continue, + }; - let ik: IdentityKey = pe - .identity_key - .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? - .try_into()?; + let ik: IdentityKey = pe + .identity_key + .ok_or_else(|| anyhow::anyhow!("missing ik in event"))? + .try_into()?; - let amount = Amount::try_from( - pe.amount - .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, - )?; + let amount = Amount::try_from( + pe.amount + .ok_or_else(|| anyhow::anyhow!("missing amount in event"))?, + )?; - sqlx::query( + sqlx::query( "INSERT INTO stake_undelegation_txs (ik, amount, height, tx_hash) VALUES ($1, $2, $3, $4)" ) .bind(ik.to_string()) @@ -78,6 +77,7 @@ impl AppView for UndelegationTxs { .bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?) .execute(dbtx.as_mut()) .await?; + } Ok(()) } diff --git a/crates/bin/pindexer/src/stake/validator_set.rs b/crates/bin/pindexer/src/stake/validator_set.rs index bc4dc59a18..039b35c03c 100644 --- a/crates/bin/pindexer/src/stake/validator_set.rs +++ b/crates/bin/pindexer/src/stake/validator_set.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgPool, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_app::genesis::Content; use penumbra_asset::asset; @@ -17,57 +19,11 @@ use crate::parsing::parse_content; #[derive(Debug)] pub struct ValidatorSet {} -#[async_trait] -impl AppView for ValidatorSet { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - // table name is module path + struct name - // note: protobuf data is encoded as protojson for ease of consumers - // hence TEXT fields - "CREATE TABLE stake_validator_set ( - id SERIAL PRIMARY KEY, - ik TEXT NOT NULL, - name TEXT NOT NULL, - definition TEXT NOT NULL, - voting_power BIGINT NOT NULL, - queued_delegations BIGINT NOT NULL, - queued_undelegations BIGINT NOT NULL, - validator_state TEXT NOT NULL, - bonding_state TEXT NOT NULL - );", - ) - .execute(dbtx.as_mut()) - .await?; - - sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);") - .execute(dbtx.as_mut()) - .await?; - - add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - match type_str { - "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => true, - "penumbra.core.component.stake.v1.EventDelegate" => true, - "penumbra.core.component.stake.v1.EventUndelegate" => true, - "penumbra.core.component.stake.v1.EventValidatorVotingPowerChange" => true, - "penumbra.core.component.stake.v1.EventValidatorStateChange" => true, - "penumbra.core.component.stake.v1.EventValidatorBondingStateChange" => true, - _ => false, - } - } - +impl ValidatorSet { async fn index_event( &self, - dbtx: &mut PgTransaction, + dbtx: &mut PgTransaction<'_>, event: &ContextualizedEvent, - _src_db: &PgPool, ) -> Result<(), anyhow::Error> { match event.event.kind.as_str() { "penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => { @@ -146,6 +102,56 @@ impl AppView for ValidatorSet { } } +#[async_trait] +impl AppView for ValidatorSet { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + app_state: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + sqlx::query( + // table name is module path + struct name + // note: protobuf data is encoded as protojson for ease of consumers + // hence TEXT fields + "CREATE TABLE stake_validator_set ( + id SERIAL PRIMARY KEY, + ik TEXT NOT NULL, + name TEXT NOT NULL, + definition TEXT NOT NULL, + voting_power BIGINT NOT NULL, + queued_delegations BIGINT NOT NULL, + queued_undelegations BIGINT NOT NULL, + validator_state TEXT NOT NULL, + bonding_state TEXT NOT NULL + );", + ) + .execute(dbtx.as_mut()) + .await?; + + sqlx::query("CREATE UNIQUE INDEX idx_stake_validator_set_ik ON stake_validator_set(ik);") + .execute(dbtx.as_mut()) + .await?; + + add_genesis_validators(dbtx, &parse_content(app_state.clone())?).await?; + Ok(()) + } + + fn name(&self) -> String { + "stake/validator_set".to_string() + } + + async fn index_batch( + &self, + dbtx: &mut PgTransaction, + batch: EventBatch, + ) -> Result<(), anyhow::Error> { + for event in batch.events() { + self.index_event(dbtx, event).await?; + } + Ok(()) + } +} + async fn add_genesis_validators<'a>(dbtx: &mut PgTransaction<'a>, content: &Content) -> Result<()> { // Given a genesis validator, we need to figure out its delegations at // genesis by getting its delegation token then summing up all the allocations. diff --git a/crates/bin/pindexer/src/supply.rs b/crates/bin/pindexer/src/supply.rs index b1ea44ffbe..cc5b9f9bae 100644 --- a/crates/bin/pindexer/src/supply.rs +++ b/crates/bin/pindexer/src/supply.rs @@ -1,7 +1,9 @@ -use std::collections::{BTreeMap, HashSet}; +use std::collections::BTreeMap; use anyhow::{anyhow, Result}; -use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction}; +use cometindex::{ + async_trait, index::EventBatch, sqlx, AppView, ContextualizedEvent, PgTransaction, +}; use penumbra_app::genesis::Content; use penumbra_asset::{asset, STAKING_TOKEN_ASSET_ID}; use penumbra_num::Amount; @@ -13,7 +15,7 @@ use penumbra_proto::{ }, }; use penumbra_stake::{rate::RateData, validator::Validator, IdentityKey}; -use sqlx::{PgPool, Postgres, Transaction}; +use sqlx::{Postgres, Transaction}; use std::iter; use crate::parsing::parse_content; @@ -900,19 +902,20 @@ async fn add_genesis_native_token_allocation_supply<'a>( } #[derive(Debug)] -pub struct Component { - event_strings: HashSet<&'static str>, -} +pub struct Component {} impl Component { pub fn new() -> Self { - let event_strings = Event::NAMES.into_iter().collect(); - Self { event_strings } + Self {} } } #[async_trait] impl AppView for Component { + fn name(&self) -> String { + "supply".to_string() + } + async fn init_chain( &self, dbtx: &mut PgTransaction, @@ -930,16 +933,18 @@ impl AppView for Component { Ok(()) } - fn is_relevant(&self, type_str: &str) -> bool { - self.event_strings.contains(type_str) - } - - async fn index_event( + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error> { - Event::try_from(event)?.index(dbtx).await + for event in batch.events() { + let e = match Event::try_from(event) { + Ok(e) => e, + Err(_) => continue, + }; + e.index(dbtx).await?; + } + Ok(()) } } diff --git a/crates/util/cometindex/examples/fmd_clues.rs b/crates/util/cometindex/examples/fmd_clues.rs deleted file mode 100644 index fbd5fb3d15..0000000000 --- a/crates/util/cometindex/examples/fmd_clues.rs +++ /dev/null @@ -1,86 +0,0 @@ -use anyhow::Result; -use clap::Parser; -use cometindex::{async_trait, opt::Options, AppView, ContextualizedEvent, Indexer, PgTransaction}; -use sqlx::PgPool; - -// This example is silly because it doesn't do any "compilation" of the raw -// events, so it's only useful as an example of exercising the harness and the -// intended usage: the _downstream_ crate depends on cometindex (generic over -// any event) and has its own app specific logic. But it doesn't have to -// reimplement the binary handling / arg parsing / etc - -#[derive(Debug)] -struct FmdCluesExample {} - -#[async_trait] -impl AppView for FmdCluesExample { - async fn init_chain( - &self, - dbtx: &mut PgTransaction, - _app_state: &serde_json::Value, - ) -> Result<(), anyhow::Error> { - sqlx::query( - " -CREATE TABLE IF NOT EXISTS fmd_clues_example ( - id SERIAL PRIMARY KEY, - tx_hash BYTEA NOT NULL, - fmd_clue VARCHAR NOT NULL -); -", - ) - .execute(dbtx.as_mut()) - .await?; - Ok(()) - } - - fn is_relevant(&self, type_str: &str) -> bool { - type_str == "penumbra.core.component.shielded_pool.v1.EventBroadcastClue" - } - - async fn index_event( - &self, - dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - _src_db: &PgPool, - ) -> Result<(), anyhow::Error> { - // this is just an example in the integration tests, so we don't want to do any - // - queries against existing table state - // - parsing of the event data into structured data - // - computations of derived data - // but these should all be possible - let clue = event - .event - .attributes - .iter() - .find(|attr| attr.key == "clue") - .expect("fmd_clue attribute not found") - .value - .clone(); - let tx_hash = event.tx_hash.as_ref().expect("tx_hash not found").to_vec(); - - sqlx::query( - " - INSERT INTO fmd_clues (tx_hash, fmd_clue) - VALUES ($1, $2) - ", - ) - .bind(&tx_hash) - .bind(&clue) - .execute(dbtx.as_mut()) - .await?; - - Ok(()) - } -} - -#[tokio::main] -async fn main() -> Result<()> { - Indexer::new(Options::parse()) - .with_default_tracing() - // add as many indexers as you want - .with_index(FmdCluesExample {}) - .run() - .await?; - - Ok(()) -} diff --git a/crates/util/cometindex/src/contextualized.rs b/crates/util/cometindex/src/contextualized.rs index 2b34f34254..1805a3df41 100644 --- a/crates/util/cometindex/src/contextualized.rs +++ b/crates/util/cometindex/src/contextualized.rs @@ -1,6 +1,6 @@ use tendermint::abci::Event; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ContextualizedEvent { pub event: Event, pub block_height: u64, diff --git a/crates/util/cometindex/src/engine.rs b/crates/util/cometindex/src/engine.rs deleted file mode 100644 index 8b13789179..0000000000 --- a/crates/util/cometindex/src/engine.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/crates/util/cometindex/src/index.rs b/crates/util/cometindex/src/index.rs index f90d666b42..59a8018cd1 100644 --- a/crates/util/cometindex/src/index.rs +++ b/crates/util/cometindex/src/index.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use async_trait::async_trait; pub use sqlx::PgPool; use sqlx::{Postgres, Transaction}; @@ -6,21 +8,52 @@ use crate::ContextualizedEvent; pub type PgTransaction<'a> = Transaction<'a, Postgres>; +/// Represents all of the events in a given block +#[derive(Clone, Debug)] +pub struct BlockEvents { + /// The height of this block. + pub height: u64, + /// The events contained in this block, in order. + pub events: Vec, +} + +#[derive(Clone, Debug)] +pub struct EventBatch { + pub first_height: u64, + pub last_height: u64, + /// The batch of events, ordered by increasing height. + /// + /// The heights are guaranteed to be increasing, and to be contiguous. + pub by_height: Arc>, +} + +impl EventBatch { + pub fn events(&self) -> impl Iterator { + self.by_height.iter().flat_map(|x| x.events.iter()) + } +} + /// Represents a specific index of raw event data. #[async_trait] -pub trait AppView: std::fmt::Debug { +pub trait AppView: Send + Sync { + /// Return the name of this index. + /// + /// This should be unique across all of the indices. + fn name(&self) -> String; + + /// This will be called once when processing the genesis before the first block. async fn init_chain( &self, dbtx: &mut PgTransaction, app_state: &serde_json::Value, ) -> Result<(), anyhow::Error>; - fn is_relevant(&self, type_str: &str) -> bool; - - async fn index_event( + /// This allows processing a batch of events, over many blocks. + /// + /// By using a batch, we can potentially avoid a costly + async fn index_batch( &self, dbtx: &mut PgTransaction, - event: &ContextualizedEvent, - src_db: &PgPool, + batch: EventBatch, ) -> Result<(), anyhow::Error>; } diff --git a/crates/util/cometindex/src/indexer.rs b/crates/util/cometindex/src/indexer.rs index 962b845dfb..04e1d3b915 100644 --- a/crates/util/cometindex/src/indexer.rs +++ b/crates/util/cometindex/src/indexer.rs @@ -1,29 +1,123 @@ -use std::pin::Pin; +mod indexing_state; +use crate::{index::EventBatch, opt::Options, AppView}; use anyhow::{Context as _, Result}; -use futures::{Stream, StreamExt, TryStreamExt}; -use sqlx::{postgres::PgPoolOptions, PgPool}; -use tap::{Tap, TapFallible, TapOptional}; -use tendermint::abci; -use tracing::{debug, info}; +use indexing_state::{Height, IndexingState}; +use std::sync::Arc; +use tokio::{sync::mpsc, task::JoinSet}; + +#[tracing::instrument(skip_all)] +async fn catchup( + state: &IndexingState, + indices: &[Arc], + genesis: Arc, +) -> anyhow::Result<()> { + if indices.len() <= 0 { + tracing::info!(why = "no indices", "catchup completed"); + return Ok(()); + } + + let (src_height, index_heights) = tokio::try_join!(state.src_height(), state.index_heights())?; + tracing::info!(?src_height, ?index_heights, "catchup status"); + let lowest_index_height = index_heights.values().copied().min().unwrap_or_default(); + if lowest_index_height >= src_height { + tracing::info!(why = "already caught up", "catchup completed"); + return Ok(()); + } + + // Constants that influence performance. + const DEFAULT_BATCH_SIZE: u64 = 1000; + const BATCH_LOOKAHEAD: usize = 2; + + let mut tasks = JoinSet::>::new(); + + let mut txs = Vec::with_capacity(indices.len()); + for index in indices.iter().cloned() { + let (tx, mut rx) = mpsc::channel::(BATCH_LOOKAHEAD); + txs.push(tx); + let name = index.name(); + let index_height = index_heights.get(&name).copied().unwrap_or_default(); + let state_cp = state.clone(); + let genesis_cp = genesis.clone(); + tasks.spawn(async move { + if index_height == Height::default() { + tracing::info!(?name, "initializing index"); + let mut dbtx = state_cp.begin_transaction().await?; + index.init_chain(&mut dbtx, &genesis_cp).await?; + tracing::info!(?name, "finished initialization"); + IndexingState::update_index_height(&mut dbtx, &name, Height::post_genesis()) + .await?; + dbtx.commit().await?; + } else { + tracing::info!(?name, "already initialized"); + } + while let Some(events) = rx.recv().await { + let mut dbtx = state_cp.begin_transaction().await?; + let last_height = events.last_height; + if index_height >= Height::from(last_height) { + tracing::info!( + first = events.first_height, + last = events.last_height, + index_name = &name, + "skipping batch" + ); + continue; + } + tracing::info!( + first = events.first_height, + last = events.last_height, + index_name = &name, + "indexing batch" + ); + index.index_batch(&mut dbtx, events).await?; + tracing::debug!(index_name = &name, "committing batch"); + IndexingState::update_index_height(&mut dbtx, &name, Height::from(last_height)) + .await?; -use crate::{opt::Options, AppView, ContextualizedEvent, PgTransaction}; + dbtx.commit().await?; + } + Ok(()) + }); + } + + let state_cp = state.clone(); + tasks.spawn(async move { + let mut height = lowest_index_height.next(); + while height <= src_height { + let first = height; + let (last, next_height) = first.advance(DEFAULT_BATCH_SIZE, src_height); + height = next_height; + tracing::debug!(?first, ?last, "fetching batch"); + let events = state_cp.event_batch(first, last).await?; + tracing::info!(?first, ?last, "sending batch"); + for tx in &txs { + tx.send(events.clone()).await?; + } + } + Ok(()) + }); + + while let Some(res) = tasks.join_next().await { + res??; + } + Ok(()) +} pub struct Indexer { opts: Options, - indexes: Vec>, + indices: Vec>, } impl Indexer { pub fn new(opts: Options) -> Self { Self { opts, - indexes: Vec::new(), + indices: Vec::new(), } } - pub fn with_index(mut self, index: impl AppView + 'static) -> Self { - self.indexes.push(Box::new(index)); + pub fn with_index(mut self, index: Box) -> Self { + self.indices.push(Arc::from(index)); self } @@ -32,19 +126,6 @@ impl Indexer { self } - async fn create_dst_tables( - pool: &PgPool, - indexes: &[Box], - app_state: &serde_json::Value, - ) -> Result<()> { - let mut dbtx = pool.begin().await?; - for index in indexes { - index.init_chain(&mut dbtx, app_state).await?; - } - dbtx.commit().await?; - Ok(()) - } - pub async fn run(self) -> Result<(), anyhow::Error> { tracing::info!(?self.opts); let Self { @@ -56,262 +137,24 @@ impl Indexer { poll_ms, genesis_json, }, - indexes, + indices: indexes, } = self; - // Create a source db, with, for sanity, some read only settings. - // These will be overrideable by a consumer who knows what they're doing, - // but prevents basic mistakes. - // c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 - let src_db = PgPoolOptions::new() - .after_connect(|conn, _| { - Box::pin(async move { - sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") - .execute(conn) - .await?; - Ok(()) - }) - }) - .connect(&src_database_url) - .await?; - - let dst_db = PgPool::connect(&dst_database_url).await?; - - // Check if the destination db is initialized - let dst_db_initialized: bool = sqlx::query_scalar( - "SELECT EXISTS ( - SELECT FROM information_schema.tables - WHERE table_name = 'index_watermark' - )", + let state = IndexingState::init(&src_database_url, &dst_database_url).await?; + let genesis: serde_json::Value = serde_json::from_str( + &std::fs::read_to_string(genesis_json) + .context("error reading provided genesis.json file")?, ) - .fetch_one(&dst_db) - .await?; - - if !dst_db_initialized { - tracing::info!("no watermark found, initializing with genesis data"); - - // Create the table if it doesn't exist - sqlx::query("CREATE TABLE index_watermark (events_rowid BIGINT NOT NULL)") - .execute(&dst_db) - .await?; - - // Load the genesis JSON to be used populating initial tables - let genesis_content: serde_json::Value = serde_json::from_str( - &std::fs::read_to_string(genesis_json) - .context("error reading provided genesis.json file")?, - ) - .context("error parsing provided genesis.json file")?; - let app_state = genesis_content + .context("error parsing provided genesis.json file")?; + let app_state = Arc::new( + genesis .get("app_state") - .ok_or_else(|| anyhow::anyhow!("no app_state key in genesis.json"))?; - - Self::create_dst_tables(&dst_db, &indexes, app_state).await?; - } else { - tracing::info!("skipping genesis initialization"); - } - + .ok_or_else(|| anyhow::anyhow!("genesis missing app_state"))? + .clone(), + ); loop { - Self::tick(&src_db, &dst_db, &indexes).await?; + catchup(&state, indexes.as_slice(), app_state.clone()).await?; tokio::time::sleep(poll_ms).await; } } - - async fn tick( - src_db: &PgPool, - dst_db: &PgPool, - indexes: &[Box], - ) -> Result<(), anyhow::Error> { - // Fetch the highest rowid processed so far (the watermark) - let current_watermark: Option = - sqlx::query_as("SELECT events_rowid FROM index_watermark") - .fetch_optional(dst_db) - .await? - .map(|(w,)| w) - .tap_some(|row_id| debug!(%row_id, "fetched index watermark")) - .tap_none(|| debug!("no index watermark was present")); - - // Insert initial watermark if not present, so we can use a SET query later - if current_watermark.is_none() { - sqlx::query("INSERT INTO index_watermark (events_rowid) VALUES (0)") - .execute(dst_db) - .await? - .tap(|_| debug!("set index watermark to 0")); - } - - let watermark = current_watermark.unwrap_or(0); - - // Calculate new events count since the last watermark - sqlx::query_as::<_, (i64,)>("SELECT MAX(rowid) - $1 FROM events") - .bind(watermark) - .fetch_one(src_db) - .await - .map(|(count,)| count)? - .tap(|count| info!(%count, %watermark, "new events since last watermark")); - - let mut scanned_events = 0usize; - let mut relevant_events = 0usize; - - let mut es = read_events(&src_db, watermark); - let mut dbtx = dst_db.begin().await?; - while let Some(event) = es.next().await.transpose()? { - if scanned_events % 1000 == 0 { - tracing::info!(scanned_events, relevant_events); - } else { - tracing::debug!( - block_height = %event.block_height, - kind = %event.event.kind, - scanned_events, - relevant_events, - "processing event" - ); - } - - scanned_events += 1; - - // if not relevant then skip making a db tx for the dst db - if !indexes - .iter() - .any(|index| index.is_relevant(&event.as_ref().kind)) - { - tracing::trace!(kind = %event.as_ref().kind, "event is not relevant to any views"); - continue; - } - - relevant_events += 1; - - for index in indexes { - if index.is_relevant(&event.as_ref().kind) { - tracing::debug!(?event, ?index, "relevant to index"); - index.index_event(&mut dbtx, &event, &src_db).await?; - } - } - // Mark that we got to at least this event - update_watermark(&mut dbtx, event.local_rowid).await?; - // Only commit in batches of <= 1000 events, for about a 5x performance increase when - // catching up. - if relevant_events % 1000 == 0 { - dbtx.commit().await?; - dbtx = dst_db.begin().await?; - } - } - // Flush out the remaining changes. - dbtx.commit().await?; - - Ok(()) - } -} - -async fn update_watermark(dbtx: &mut PgTransaction<'_>, watermark: i64) -> Result<()> { - sqlx::query("UPDATE index_watermark SET events_rowid = $1") - .bind(watermark) - .execute(dbtx.as_mut()) // lol, see note on Executor trait about Transaction impl - .await - .tap_ok(|affected| { - debug!(%watermark, "updated index watermark"); - debug_assert_eq!( - affected.rows_affected(), - 1, - "only one row should be affected when updating the index watermark" - ); - }) - .map(|_| ()) - .map_err(anyhow::Error::from) -} - -fn read_events( - src_db: &PgPool, - watermark: i64, -) -> Pin> + Send + '_>> { - let event_stream = sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( - // This query does some shenanigans to ensure good performance. - // The main trick is that we know that each event has 1 block and <= 1 transaction associated - // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and - // then a sort, and instead work from the events in a linear fashion. - // Basically, this query ends up doing: - // - // for event in events >= id: - // attach attributes - // attach block - // attach transaction? - r#" -SELECT - events.rowid, - events.type, - blocks.height AS block_height, - tx_results.tx_hash, - events.attrs -FROM ( - SELECT - rowid, - type, - block_id, - tx_id, - jsonb_object_agg(attributes.key, attributes.value) AS attrs - FROM - events - LEFT JOIN - attributes ON rowid = attributes.event_id - WHERE - rowid > $1 - GROUP BY - rowid, - type, - block_id, - tx_id -) events -LEFT JOIN LATERAL ( - SELECT * FROM blocks WHERE blocks.rowid = events.block_id LIMIT 1 -) blocks -ON TRUE -LEFT JOIN LATERAL ( - SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1 -) tx_results -ON TRUE -ORDER BY - events.rowid ASC - "#, - ) - .bind(watermark) - .fetch(src_db) - .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { - tracing::debug!(?local_rowid, type_str, height, ?tx_hash); - let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { - hex::decode(s) - .expect("invalid tx_hash") - .try_into() - .expect("expected 32 bytes") - }); - let block_height = height as u64; - - let serde_json::Value::Object(attrs) = attrs else { - // saves an allocation below bc we can take ownership - panic!("expected JSON object"); - }; - - let event = abci::Event { - kind: type_str, - attributes: attrs - .into_iter() - .filter_map(|(k, v)| match v { - serde_json::Value::String(s) => Some((k, s)), - // we never hit this becasue of how we constructed the query - _ => None, - }) - .map(Into::into) - .collect(), - }; - - let ce = ContextualizedEvent { - event, - block_height, - tx_hash, - local_rowid, - }; - //tracing::info!(?ce); - - ce - }) - .map_err(|e| anyhow::Error::from(e).context("error reading from database")); - - event_stream.boxed() } diff --git a/crates/util/cometindex/src/indexer/indexing_state.rs b/crates/util/cometindex/src/indexer/indexing_state.rs new file mode 100644 index 0000000000..23f75a2c8c --- /dev/null +++ b/crates/util/cometindex/src/indexer/indexing_state.rs @@ -0,0 +1,298 @@ +use std::{collections::HashMap, sync::Arc}; + +use futures::TryStreamExt; +use sqlx::{postgres::PgPoolOptions, PgPool, Postgres, Transaction}; +use tendermint::abci; + +use crate::{ + index::{BlockEvents, EventBatch}, + ContextualizedEvent, +}; + +/// Create a Database, with, for sanity, some read only settings. +/// +/// These will be overrideable by a consumer who knows what they're doing, +/// but prevents basic mistakes. +/// c.f. https://github.com/launchbadge/sqlx/issues/481#issuecomment-727011811 +async fn read_only_db(url: &str) -> anyhow::Result { + PgPoolOptions::new() + .after_connect(|conn, _| { + Box::pin(async move { + sqlx::query("SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;") + .execute(conn) + .await?; + Ok(()) + }) + }) + .connect(url) + .await + .map_err(Into::into) +} + +async fn read_write_db(url: &str) -> anyhow::Result { + PgPoolOptions::new().connect(url).await.map_err(Into::into) +} + +#[derive(Default, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] +pub struct Height(u64); + +impl Height { + pub fn post_genesis() -> Self { + Height(1) + } + /// Return the last height in the batch, and then the first height in the next batch. + pub fn advance(self, batch_size: u64, max_height: Height) -> (Height, Height) { + let last = Height::from(self.0 + batch_size - 1).min(max_height); + let next_first = Height::from(last.0 + 1); + (last, next_first) + } + + pub fn next(self) -> Height { + Self(self.0 + 1) + } +} + +impl From for Height { + fn from(value: u64) -> Self { + Self(value) + } +} + +impl TryFrom for Height { + type Error = anyhow::Error; + + fn try_from(value: i64) -> Result { + Ok(Self(u64::try_from(value)?)) + } +} + +impl<'r> sqlx::Decode<'r, Postgres> for Height { + fn decode( + value: >::ValueRef, + ) -> Result { + Ok(Height::try_from( + >::decode(value)?, + )?) + } +} + +impl sqlx::Type for Height { + fn type_info() -> ::TypeInfo { + >::type_info() + } +} + +impl<'q> sqlx::Encode<'q, Postgres> for Height { + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + >::encode( + i64::try_from(self.0).expect("height should never exceed i64::MAX"), + buf, + ) + } +} + +#[derive(Debug, Clone)] +pub struct IndexingState { + src: PgPool, + dst: PgPool, +} + +impl IndexingState { + async fn create_watermark_table(&self) -> anyhow::Result<()> { + sqlx::query( + " + CREATE TABLE IF NOT EXISTS index_watermarks ( + index_name TEXT PRIMARY KEY, + height BIGINT NOT NULL + ) + ", + ) + .execute(&self.dst) + .await?; + Ok(()) + } + + /// The largest height for which we know we have all events. + pub async fn src_height(&self) -> anyhow::Result { + // We may be currently indexing events for this block. + let res: Option = sqlx::query_scalar("SELECT MAX(height) - 1 FROM blocks") + .fetch_optional(&self.src) + .await?; + Ok(res.unwrap_or_default()) + } + + pub async fn index_heights(&self) -> anyhow::Result> { + let rows: Vec<(String, Height)> = + sqlx::query_as("SELECT index_name, height FROM index_watermarks") + .fetch_all(&self.dst) + .await?; + Ok(rows.into_iter().collect()) + } + + pub async fn update_index_height( + dbtx: &mut sqlx::Transaction<'_, Postgres>, + name: &str, + height: Height, + ) -> anyhow::Result<()> { + sqlx::query( + " + INSERT INTO index_watermarks + VALUES ($1, $2) + ON CONFLICT (index_name) + DO UPDATE SET height = excluded.height + ", + ) + .bind(name) + .bind(height) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } + + pub async fn event_batch(&self, first: Height, last: Height) -> anyhow::Result { + // The amount of events we expect a block to have. + const WORKING_CAPACITY: usize = 32; + + let mut by_height = Vec::with_capacity((last.0 - first.0 + 1) as usize); + let mut event_stream = + sqlx::query_as::<_, (i64, String, i64, Option, serde_json::Value)>( + // This query does some shenanigans to ensure good performance. + // The main trick is that we know that each event has 1 block and <= 1 transaction associated + // with it, so we can "encourage" (force) Postgres to avoid doing a hash join and + // then a sort, and instead work from the events in a linear fashion. + // Basically, this query ends up doing: + // + // for event in events >= id: + // attach attributes + // attach block + // attach transaction? + r#" +SELECT + events.rowid, + events.type, + events.height, + tx_results.tx_hash, + events.attrs +FROM ( + SELECT + (SELECT height FROM blocks WHERE blocks.rowid = block_id) as height, + rowid, + type, + block_id, + tx_id, + jsonb_object_agg(attributes.key, attributes.value) AS attrs + FROM + events + LEFT JOIN + attributes ON rowid = attributes.event_id + WHERE + block_id >= (SELECT rowid FROM blocks where height = $1) + AND + block_id <= (SELECT rowid FROM blocks where height = $2) + GROUP BY + rowid, + type, + block_id, + tx_id + ORDER BY + rowid ASC +) events +LEFT JOIN LATERAL ( + SELECT * FROM tx_results WHERE tx_results.rowid = events.tx_id LIMIT 1 +) tx_results +ON TRUE +ORDER BY + events.rowid ASC + "#, + ) + .bind(first) + .bind(last) + .fetch(&self.src) + .map_ok(|(local_rowid, type_str, height, tx_hash, attrs)| { + tracing::debug!(?local_rowid, type_str, height, ?tx_hash); + let tx_hash: Option<[u8; 32]> = tx_hash.map(|s| { + hex::decode(s) + .expect("invalid tx_hash") + .try_into() + .expect("expected 32 bytes") + }); + let block_height = height as u64; + + let serde_json::Value::Object(attrs) = attrs else { + // saves an allocation below bc we can take ownership + panic!("expected JSON object"); + }; + + let event = abci::Event { + kind: type_str, + attributes: attrs + .into_iter() + .filter_map(|(k, v)| match v { + serde_json::Value::String(s) => Some((k, s)), + // we never hit this becasue of how we constructed the query + _ => None, + }) + .map(Into::into) + .collect(), + }; + + let ce = ContextualizedEvent { + event, + block_height, + tx_hash, + local_rowid, + }; + + ce + }) + .map_err(|e| anyhow::Error::from(e).context("error reading from database")); + + let mut height = first.0; + let mut current_batch = BlockEvents { + height: first.0, + events: Vec::with_capacity(WORKING_CAPACITY), + }; + while let Some(e) = event_stream.try_next().await? { + assert!(e.block_height >= height); + if e.block_height > height { + by_height.push(current_batch); + current_batch = BlockEvents { + height, + events: Vec::with_capacity(WORKING_CAPACITY), + }; + height = e.block_height; + } + current_batch.events.push(e); + } + // Flush the current block, and create empty ones for the remaining heights. + while height <= last.0 { + by_height.push(current_batch); + current_batch = BlockEvents { + height, + events: Vec::new(), + }; + height += 1; + } + Ok(EventBatch { + first_height: first.0, + last_height: last.0, + by_height: Arc::new(by_height), + }) + } + + pub async fn init(src_url: &str, dst_url: &str) -> anyhow::Result { + tracing::info!(url = src_url, "connecting to raw database"); + tracing::info!(url = dst_url, "connecting to derived database"); + let (src, dst) = tokio::try_join!(read_only_db(src_url), read_write_db(dst_url))?; + let out = Self { src, dst }; + out.create_watermark_table().await?; + Ok(out) + } + + pub async fn begin_transaction(&self) -> anyhow::Result> { + Ok(self.dst.begin().await?) + } +} diff --git a/crates/util/cometindex/src/lib.rs b/crates/util/cometindex/src/lib.rs index 907dd6cced..82858651fc 100644 --- a/crates/util/cometindex/src/lib.rs +++ b/crates/util/cometindex/src/lib.rs @@ -1,5 +1,4 @@ mod contextualized; -pub mod engine; pub mod index; pub mod indexer; pub mod opt; diff --git a/crates/util/cometindex/vendor/schema.sql b/crates/util/cometindex/vendor/schema.sql index fd78e677e0..b39563ebe0 100644 --- a/crates/util/cometindex/vendor/schema.sql +++ b/crates/util/cometindex/vendor/schema.sql @@ -55,6 +55,8 @@ CREATE TABLE events ( type VARCHAR NOT NULL ); +CREATE INDEX ON events(block_id); + -- The attributes table records event attributes. CREATE TABLE attributes ( event_id BIGINT NOT NULL REFERENCES events(rowid),