Skip to content

Commit

Permalink
pindexer(dex_ex): record batch swap traces
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Jan 16, 2025
1 parent 854aa9c commit afc12db
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 2 deletions.
119 changes: 118 additions & 1 deletion crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use cometindex::{
AppView, PgTransaction,
};
use penumbra_asset::asset;
use penumbra_dex::lp::position::{Id as PositionId, Position};
use penumbra_dex::{
event::EventBatchSwap,
lp::position::{Id as PositionId, Position},
SwapExecution,
};
use penumbra_dex::{
event::{
EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen,
Expand Down Expand Up @@ -668,6 +672,7 @@ struct Events {
position_executions: Vec<EventPositionExecution>,
position_closes: Vec<EventPositionClose>,
position_withdrawals: Vec<EventPositionWithdraw>,
batch_swaps: Vec<EventBatchSwap>,
// Track transaction hashes by position ID
position_open_txs: BTreeMap<PositionId, [u8; 32]>,
position_close_txs: BTreeMap<PositionId, [u8; 32]>,
Expand All @@ -686,6 +691,7 @@ impl Events {
position_executions: Vec::new(),
position_closes: Vec::new(),
position_withdrawals: Vec::new(),
batch_swaps: Vec::new(),
position_open_txs: BTreeMap::new(),
position_close_txs: BTreeMap::new(),
position_withdrawal_txs: BTreeMap::new(),
Expand Down Expand Up @@ -842,6 +848,8 @@ impl Events {
if let Some(tx_hash) = event.tx_hash {
out.position_close_txs.insert(e.position_id, tx_hash);
}
} else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
out.batch_swaps.push(e);
}
}
Ok(out)
Expand Down Expand Up @@ -982,6 +990,109 @@ impl Component {
Ok(())
}

async fn record_swap_execution_traces(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
swap_execution: &SwapExecution,
) -> anyhow::Result<()> {
let SwapExecution {
traces,
input: se_input,
output: se_output,
} = swap_execution;

let asset_start = se_input.asset_id;
let asset_end = se_output.asset_id;
let batch_input = se_input.amount;
let batch_output = se_output.amount;

for trace in traces.iter() {
let Some(input_value) = trace.first() else {
continue;
};
let Some(output_value) = trace.last() else {
continue;
};

let input = input_value.amount;
let output = output_value.amount;

let price_float = (output.value() as f64) / (input.value() as f64);
let amount_hops = trace
.iter()
.map(|x| BigDecimal::from(x.amount.value()))
.collect::<Vec<_>>();
let position_id_hops: Vec<[u8; 32]> = vec![];
let asset_hops = trace
.iter()
.map(|x| x.asset_id.to_bytes())
.collect::<Vec<_>>();

sqlx::query(
"INSERT INTO dex_ex_batch_swap_traces (
height,
time,
input,
output,
batch_input,
batch_output,
asset_start,
asset_end,
price_float,
asset_start,
asset_end,
asset_hops,
amount_hops,
position_id_hops,
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
)
.bind(height)
.bind(time)
.bind(BigDecimal::from(input.value()))
.bind(BigDecimal::from(output.value()))
.bind(BigDecimal::from(batch_input.value()))
.bind(BigDecimal::from(batch_output.value()))
.bind(price_float)
.bind(asset_start.to_bytes())
.bind(asset_end.to_bytes())
.bind(asset_hops)
.bind(amount_hops)
.bind(position_id_hops)
.execute(dbtx.as_mut())
.await?;
}

Ok(())
}

async fn record_batch_swap_traces(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
event: &EventBatchSwap,
) -> anyhow::Result<()> {
let EventBatchSwap {
batch_swap_output_data: _,
swap_execution_1_for_2,
swap_execution_2_for_1,
} = event;

if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
.await?;
}

if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
.await?;
}

Ok(())
}

async fn record_position_execution(
&self,
dbtx: &mut PgTransaction<'_>,
Expand Down Expand Up @@ -1179,6 +1290,12 @@ impl AppView for Component {
// Load any missing positions before processing events
events.load_positions(dbtx).await?;

// Record batch swap execution traces.
for event in &events.batch_swaps {
self.record_batch_swap_traces(dbtx, time, block.height as i32, event)
.await?;
}

// Record position opens
for event in &events.position_opens {
let tx_hash = events.position_open_txs.get(&event.position_id).copied();
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ CREATE TABLE IF NOT EXISTS dex_ex_batch_swap_traces (
-- Each hop in the list contains an asset id.
asset_hops BYTEA[] NOT NULL,
-- Each hop in the list contains an amount.
amount_hops BYTEA[] NOT NULL,
amount_hops NUMERIC(39)[] NOT NULL,
-- Each hop in the list contains a position ID.
position_id_hops BYTEA[] NOT NULL
);
Expand Down

0 comments on commit afc12db

Please sign in to comment.