diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 2e0ba1e409..bdcd0cde26 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,7 +5,11 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_asset::asset; -use penumbra_dex::lp::position::{Id as PositionId, Position}; +use penumbra_dex::{ + event::EventBatchSwap, + lp::position::{Id as PositionId, Position}, + SwapExecution, +}; use penumbra_dex::{ event::{ EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen, @@ -681,6 +685,7 @@ struct Events { position_executions: Vec, position_closes: Vec, position_withdrawals: Vec, + batch_swaps: Vec, // Track transaction hashes by position ID position_open_txs: BTreeMap, position_close_txs: BTreeMap, @@ -699,6 +704,7 @@ impl Events { position_executions: Vec::new(), position_closes: Vec::new(), position_withdrawals: Vec::new(), + batch_swaps: Vec::new(), position_open_txs: BTreeMap::new(), position_close_txs: BTreeMap::new(), position_withdrawal_txs: BTreeMap::new(), @@ -855,6 +861,8 @@ impl Events { if let Some(tx_hash) = event.tx_hash { out.position_close_txs.insert(e.position_id, tx_hash); } + } else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) { + out.batch_swaps.push(e); } } Ok(out) @@ -1002,6 +1010,107 @@ impl Component { Ok(()) } + async fn record_swap_execution_traces( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + swap_execution: &SwapExecution, + ) -> anyhow::Result<()> { + let SwapExecution { + traces, + input: se_input, + output: se_output, + } = swap_execution; + + let asset_start = se_input.asset_id; + let asset_end = se_output.asset_id; + let batch_input = se_input.amount; + let batch_output = se_output.amount; + + for trace in traces.iter() { + let Some(input_value) = trace.first() else { + continue; + }; + let Some(output_value) = trace.last() else { + continue; + }; + + let input = input_value.amount; + let output = output_value.amount; + + let price_float = (output.value() as f64) / (input.value() as f64); + let amount_hops = trace + .iter() + .map(|x| BigDecimal::from(x.amount.value())) + .collect::>(); + let position_id_hops: Vec<[u8; 32]> = vec![]; + let asset_hops = trace + .iter() + .map(|x| x.asset_id.to_bytes()) + .collect::>(); + + sqlx::query( + "INSERT INTO dex_ex_batch_swap_traces ( + height, + time, + input, + output, + batch_input, + batch_output, + price_float, + asset_start, + asset_end, + asset_hops, + amount_hops, + position_id_hops + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)", + ) + .bind(height) + .bind(time) + .bind(BigDecimal::from(input.value())) + .bind(BigDecimal::from(output.value())) + .bind(BigDecimal::from(batch_input.value())) + .bind(BigDecimal::from(batch_output.value())) + .bind(price_float) + .bind(asset_start.to_bytes()) + .bind(asset_end.to_bytes()) + .bind(asset_hops) + .bind(amount_hops) + .bind(position_id_hops) + .execute(dbtx.as_mut()) + .await?; + } + + Ok(()) + } + + async fn record_batch_swap_traces( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + event: &EventBatchSwap, + ) -> anyhow::Result<()> { + let EventBatchSwap { + batch_swap_output_data: _, + swap_execution_1_for_2, + swap_execution_2_for_1, + } = event; + + if let Some(batch_swap_1_2) = swap_execution_1_for_2 { + self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2) + .await?; + } + + if let Some(batch_swap_2_1) = swap_execution_2_for_1 { + self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1) + .await?; + } + + Ok(()) + } + async fn record_position_execution( &self, dbtx: &mut PgTransaction<'_>, @@ -1199,6 +1308,12 @@ impl AppView for Component { // Load any missing positions before processing events events.load_positions(dbtx).await?; + // Record batch swap execution traces. + for event in &events.batch_swaps { + self.record_batch_swap_traces(dbtx, time, block.height as i32, event) + .await?; + } + // Record position opens for event in &events.position_opens { let tx_hash = events.position_open_txs.get(&event.position_id).copied(); diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 3ec69f328c..15122ab946 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -169,6 +169,46 @@ CREATE TABLE IF NOT EXISTS dex_ex_position_withdrawals ( CREATE INDEX ON dex_ex_position_withdrawals (height); CREATE INDEX ON dex_ex_position_withdrawals (position_id, height); +-- This table tracks individual execution traces for a directed batch swap. +CREATE TABLE IF NOT EXISTS dex_ex_batch_swap_traces ( + -- Primary key + rowid SERIAL PRIMARY KEY, + + -- The height of the block the batch swap was included in. + height INTEGER NOT NULL, + -- The time the batch swap was included in a block. + time TIMESTAMPTZ NOT NULL, + + -- The amount of asset 1 consumed by the micro execution in raw denom. + input NUMERIC(39) NOT NULL, + -- The amount of asset 2 produced by the micro execution in raw denom. + output NUMERIC(39) NOT NULL, + + -- The amount of asset 1 consumed by the macro execution. + batch_input NUMERIC(39) NOT NULL, + -- The amount of asset 2 produced by the macro execution. + batch_output NUMERIC(39) NOT NULL, + -- The price (output/input) as a float + price_float DOUBLE PRECISION NOT NULL, + + + -- The directed start asset of the batch swap. + asset_start BYTEA NOT NULL, + -- The directed end asset of the batch swap. + asset_end BYTEA NOT NULL, + + -- Each hop in the list contains an asset id. + asset_hops BYTEA[] NOT NULL, + -- Each hop in the list contains an amount. + amount_hops NUMERIC(39)[] NOT NULL, + -- Each hop in the list contains a position ID. + position_id_hops BYTEA[] NOT NULL +); + +CREATE INDEX ON dex_ex_batch_swap_traces (time, height); +CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end); +-- TODO(erwan): We can add a GIN index on the position id later. + ALTER TABLE dex_ex_position_executions ADD CONSTRAINT fk_position_executions FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id);