Skip to content

Commit

Permalink
pindexer(dex_ex): index batch swap execution traces (#4990)
Browse files Browse the repository at this point in the history
## Describe your changes

This PR adds a `dex_ex_batch_swap_traces` table that tracks batch swap
execution traces.

We do not (yet) track individual position ids for each subtrace hops, so
this is left empty.

Notably, the schema contains a price float, but no input/output amount
floats for now. This is good to have, but we can add it later and it's
not immediately useful to the intended consumer of this data.

The schema:

```sql
-- This table tracks individual execution traces for a directed batch swap.
CREATE TABLE IF NOT EXISTS dex_ex_batch_swap_traces (
  -- Primary key
  rowid SERIAL PRIMARY KEY,

  -- The height of the block the batch swap was included in.
  height INTEGER NOT NULL,
  -- The time the batch swap was included in a block.
  time TIMESTAMPTZ NOT NULL,

  -- The amount of asset 1 consumed by the micro execution in raw denom.
  input NUMERIC(39) NOT NULL,
  -- The amount of asset 2 produced by the micro execution in raw denom.
  output NUMERIC(39) NOT NULL,

  -- The amount of asset 1 consumed by the macro execution.
  batch_input NUMERIC(39) NOT NULL,
  -- The amount of asset 2 produced by the macro execution.
  batch_output NUMERIC(39) NOT NULL,
  -- The price (output/input) as a float
  price_float DOUBLE PRECISION NOT NULL,


  -- The directed start asset of the batch swap.
  asset_start BYTEA NOT NULL,
  -- The directed end asset of the batch swap.
  asset_end BYTEA NOT NULL,

  -- Each hop in the list contains an asset id.
  asset_hops BYTEA[] NOT NULL,
  -- Each hop in the list contains an amount.
  amount_hops NUMERIC(39)[] NOT NULL,
  -- Each hop in the list contains a position ID.
  position_id_hops BYTEA[] NOT NULL
);

CREATE INDEX ON dex_ex_batch_swap_traces (time, height);
CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end);
```

## Checklist before requesting a review

- [x] I have added guiding text to explain how a reviewer should test
these changes.

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > pindexer changes
  • Loading branch information
erwanor authored Jan 17, 2025
1 parent 639c799 commit 1781508
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 1 deletion.
117 changes: 116 additions & 1 deletion crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ use cometindex::{
AppView, PgTransaction,
};
use penumbra_asset::asset;
use penumbra_dex::lp::position::{Id as PositionId, Position};
use penumbra_dex::{
event::EventBatchSwap,
lp::position::{Id as PositionId, Position},
SwapExecution,
};
use penumbra_dex::{
event::{
EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen,
Expand Down Expand Up @@ -681,6 +685,7 @@ struct Events {
position_executions: Vec<EventPositionExecution>,
position_closes: Vec<EventPositionClose>,
position_withdrawals: Vec<EventPositionWithdraw>,
batch_swaps: Vec<EventBatchSwap>,
// Track transaction hashes by position ID
position_open_txs: BTreeMap<PositionId, [u8; 32]>,
position_close_txs: BTreeMap<PositionId, [u8; 32]>,
Expand All @@ -699,6 +704,7 @@ impl Events {
position_executions: Vec::new(),
position_closes: Vec::new(),
position_withdrawals: Vec::new(),
batch_swaps: Vec::new(),
position_open_txs: BTreeMap::new(),
position_close_txs: BTreeMap::new(),
position_withdrawal_txs: BTreeMap::new(),
Expand Down Expand Up @@ -855,6 +861,8 @@ impl Events {
if let Some(tx_hash) = event.tx_hash {
out.position_close_txs.insert(e.position_id, tx_hash);
}
} else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
out.batch_swaps.push(e);
}
}
Ok(out)
Expand Down Expand Up @@ -1002,6 +1010,107 @@ impl Component {
Ok(())
}

async fn record_swap_execution_traces(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
swap_execution: &SwapExecution,
) -> anyhow::Result<()> {
let SwapExecution {
traces,
input: se_input,
output: se_output,
} = swap_execution;

let asset_start = se_input.asset_id;
let asset_end = se_output.asset_id;
let batch_input = se_input.amount;
let batch_output = se_output.amount;

for trace in traces.iter() {
let Some(input_value) = trace.first() else {
continue;
};
let Some(output_value) = trace.last() else {
continue;
};

let input = input_value.amount;
let output = output_value.amount;

let price_float = (output.value() as f64) / (input.value() as f64);
let amount_hops = trace
.iter()
.map(|x| BigDecimal::from(x.amount.value()))
.collect::<Vec<_>>();
let position_id_hops: Vec<[u8; 32]> = vec![];
let asset_hops = trace
.iter()
.map(|x| x.asset_id.to_bytes())
.collect::<Vec<_>>();

sqlx::query(
"INSERT INTO dex_ex_batch_swap_traces (
height,
time,
input,
output,
batch_input,
batch_output,
price_float,
asset_start,
asset_end,
asset_hops,
amount_hops,
position_id_hops
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
)
.bind(height)
.bind(time)
.bind(BigDecimal::from(input.value()))
.bind(BigDecimal::from(output.value()))
.bind(BigDecimal::from(batch_input.value()))
.bind(BigDecimal::from(batch_output.value()))
.bind(price_float)
.bind(asset_start.to_bytes())
.bind(asset_end.to_bytes())
.bind(asset_hops)
.bind(amount_hops)
.bind(position_id_hops)
.execute(dbtx.as_mut())
.await?;
}

Ok(())
}

async fn record_batch_swap_traces(
&self,
dbtx: &mut PgTransaction<'_>,
time: DateTime,
height: i32,
event: &EventBatchSwap,
) -> anyhow::Result<()> {
let EventBatchSwap {
batch_swap_output_data: _,
swap_execution_1_for_2,
swap_execution_2_for_1,
} = event;

if let Some(batch_swap_1_2) = swap_execution_1_for_2 {
self.record_swap_execution_traces(dbtx, time, height, batch_swap_1_2)
.await?;
}

if let Some(batch_swap_2_1) = swap_execution_2_for_1 {
self.record_swap_execution_traces(dbtx, time, height, batch_swap_2_1)
.await?;
}

Ok(())
}

async fn record_position_execution(
&self,
dbtx: &mut PgTransaction<'_>,
Expand Down Expand Up @@ -1199,6 +1308,12 @@ impl AppView for Component {
// Load any missing positions before processing events
events.load_positions(dbtx).await?;

// Record batch swap execution traces.
for event in &events.batch_swaps {
self.record_batch_swap_traces(dbtx, time, block.height as i32, event)
.await?;
}

// Record position opens
for event in &events.position_opens {
let tx_hash = events.position_open_txs.get(&event.position_id).copied();
Expand Down
40 changes: 40 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,46 @@ CREATE TABLE IF NOT EXISTS dex_ex_position_withdrawals (
CREATE INDEX ON dex_ex_position_withdrawals (height);
CREATE INDEX ON dex_ex_position_withdrawals (position_id, height);

-- This table tracks individual execution traces for a directed batch swap.
CREATE TABLE IF NOT EXISTS dex_ex_batch_swap_traces (
-- Primary key
rowid SERIAL PRIMARY KEY,

-- The height of the block the batch swap was included in.
height INTEGER NOT NULL,
-- The time the batch swap was included in a block.
time TIMESTAMPTZ NOT NULL,

-- The amount of asset 1 consumed by the micro execution in raw denom.
input NUMERIC(39) NOT NULL,
-- The amount of asset 2 produced by the micro execution in raw denom.
output NUMERIC(39) NOT NULL,

-- The amount of asset 1 consumed by the macro execution.
batch_input NUMERIC(39) NOT NULL,
-- The amount of asset 2 produced by the macro execution.
batch_output NUMERIC(39) NOT NULL,
-- The price (output/input) as a float
price_float DOUBLE PRECISION NOT NULL,


-- The directed start asset of the batch swap.
asset_start BYTEA NOT NULL,
-- The directed end asset of the batch swap.
asset_end BYTEA NOT NULL,

-- Each hop in the list contains an asset id.
asset_hops BYTEA[] NOT NULL,
-- Each hop in the list contains an amount.
amount_hops NUMERIC(39)[] NOT NULL,
-- Each hop in the list contains a position ID.
position_id_hops BYTEA[] NOT NULL
);

CREATE INDEX ON dex_ex_batch_swap_traces (time, height);
CREATE INDEX ON dex_ex_batch_swap_traces (asset_start, asset_end);
-- TODO(erwan): We can add a GIN index on the position id later.

ALTER TABLE dex_ex_position_executions
ADD CONSTRAINT fk_position_executions
FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id);
Expand Down

0 comments on commit 1781508

Please sign in to comment.