Skip to content

Commit

Permalink
pindexer: dex_ex: calculate USDC volume continuously (#4987)
Browse files Browse the repository at this point in the history
This adds a field to the low level snapshot used to derive other
metrics, which captures the current price of a given asset relative to
USDC (configurable to an arbitrary indexing denom).

This then allows a pair summary to have a more accurate view of the
volume in USDC terms, and for the aggregate summary to thus have a more
accurate view.

As a side-effect, this also gets rid of liquidity minimums for
considering denoms in the aggregate summary.

To test, run pindexer again, compare the aggregate summary, and then
check that the converted volume is about the same.

## Checklist before requesting a review

- [x] I have added guiding text to explain how a reviewer should test
these changes.

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > indexing only
  • Loading branch information
cronokirby authored Jan 16, 2025
1 parent fd68dcd commit 639c799
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
35 changes: 28 additions & 7 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ mod summary {
end: asset::Id,
price: f64,
liquidity: f64,
start_price_indexing_denom: f64,
}

impl Context {
Expand All @@ -385,9 +386,9 @@ mod summary {
start: asset::Id,
end: asset::Id,
) -> anyhow::Result<Self> {
let row: Option<(f64, f64)> = sqlx::query_as(
let row: Option<(f64, f64, f64)> = sqlx::query_as(
"
SELECT price, liquidity
SELECT price, liquidity, start_price_indexing_denom
FROM dex_ex_pairs_block_snapshot
WHERE asset_start = $1
AND asset_end = $2
Expand All @@ -399,12 +400,13 @@ mod summary {
.bind(end.to_bytes())
.fetch_optional(dbtx.as_mut())
.await?;
let (price, liquidity) = row.unwrap_or_default();
let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
Ok(Self {
start,
end,
price,
liquidity,
start_price_indexing_denom,
})
}

Expand All @@ -414,16 +416,20 @@ mod summary {
now: DateTime,
candle: Option<Candle>,
metrics: PairMetrics,
start_price_indexing_denom: Option<f64>,
) -> anyhow::Result<()> {
if let Some(candle) = candle {
self.price = candle.close;
}
if let Some(price) = start_price_indexing_denom {
self.start_price_indexing_denom = price;
}
self.liquidity += metrics.liquidity_change;

sqlx::query(
"
INSERT INTO dex_ex_pairs_block_snapshot VALUES (
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
)
",
)
Expand All @@ -434,6 +440,7 @@ mod summary {
.bind(self.liquidity)
.bind(candle.map(|x| x.direct_volume).unwrap_or_default())
.bind(candle.map(|x| x.swap_volume).unwrap_or_default())
.bind(self.start_price_indexing_denom)
.bind(metrics.trades)
.execute(dbtx.as_mut())
.await?;
Expand Down Expand Up @@ -481,6 +488,8 @@ mod summary {
SELECT
COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
COALESCE(SUM(trades), 0.0) AS trades_over_window,
COALESCE(MIN(price), 0.0) AS low,
COALESCE(MAX(price), 0.0) AS high
Expand All @@ -496,6 +505,8 @@ mod summary {
liquidity, liquidity_then,
direct_volume_over_window,
swap_volume_over_window,
direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window,
trades_over_window
FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
ON CONFLICT (asset_start, asset_end, the_window)
Expand All @@ -506,6 +517,8 @@ mod summary {
liquidity_then = EXCLUDED.liquidity_then,
direct_volume_over_window = EXCLUDED.direct_volume_over_window,
swap_volume_over_window = EXCLUDED.swap_volume_over_window,
direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
trades_over_window = EXCLUDED.trades_over_window
",
)
Expand All @@ -532,16 +545,16 @@ mod summary {
eligible_denoms AS (
SELECT asset_start as asset, price
FROM dex_ex_pairs_summary
WHERE asset_end = $1 AND liquidity >= $2
WHERE asset_end = $1
UNION VALUES ($1, 1.0)
),
converted_pairs_summary AS (
SELECT
asset_start, asset_end,
(dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
liquidity * ed_end.price AS liquidity,
direct_volume_over_window * ed_start.price AS dv,
swap_volume_over_window * ed_start.price AS sv,
direct_volume_indexing_denom_over_window AS dv,
swap_volume_indexing_denom_over_window AS sv,
trades_over_window as trades
FROM dex_ex_pairs_summary
JOIN eligible_denoms AS ed_end
Expand Down Expand Up @@ -884,6 +897,13 @@ impl Events {

Ok(())
}

/// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
self.candles
.get(&DirectedTradingPair::new(asset, indexing_denom))
.map(|x| x.close)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1244,6 +1264,7 @@ impl AppView for Component {
time,
events.candles.get(&pair).copied(),
events.metrics.get(&pair).copied().unwrap_or_default(),
events.price_for(self.denom, pair.start),
)
.await?;
}
Expand Down
4 changes: 4 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot (
liquidity FLOAT8 NOT NULL,
direct_volume FLOAT8 NOT NULL,
swap_volume FLOAT8 NOT NULL,
-- The most recent price of the start asset, in terms of the indexing denom.
start_price_indexing_denom FLOAT8 NOT NULL,
trades FLOAT8 NOT NULL
);

Expand All @@ -55,6 +57,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary (
liquidity_then FLOAT8 NOT NULL,
direct_volume_over_window FLOAT8 NOT NULL,
swap_volume_over_window FLOAT8 NOT NULL,
direct_volume_indexing_denom_over_window FLOAT8 NOT NULL,
swap_volume_indexing_denom_over_window FLOAT8 NOT NULL,
trades_over_window FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end, the_window)
);
Expand Down

0 comments on commit 639c799

Please sign in to comment.