Skip to content

Commit

Permalink
pindexer: dex_ex: calculate USDC volume continuously
Browse files Browse the repository at this point in the history
This adds a field to the low level snapshot used to derive other
metrics, which captures the current price of a given asset relative to
USDC (configurable to an arbitrary indexing denom).

This then allows a pair summary to have a more accurate view of the
volume in USDC terms, and for the aggregate summary to thus have a more
accurate view.

As a side-effect, this also gets rid of liquidity minimums for
considering denoms in the aggregate summary.
  • Loading branch information
cronokirby committed Jan 15, 2025
1 parent 64c32ef commit 9f04a07
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
35 changes: 28 additions & 7 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ mod summary {
end: asset::Id,
price: f64,
liquidity: f64,
start_price_indexing_denom: f64,
}

impl Context {
Expand All @@ -385,9 +386,9 @@ mod summary {
start: asset::Id,
end: asset::Id,
) -> anyhow::Result<Self> {
let row: Option<(f64, f64)> = sqlx::query_as(
let row: Option<(f64, f64, f64)> = sqlx::query_as(
"
SELECT price, liquidity
SELECT price, liquidity, start_price_indexing_denom
FROM dex_ex_pairs_block_snapshot
WHERE asset_start = $1
AND asset_end = $2
Expand All @@ -399,12 +400,13 @@ mod summary {
.bind(end.to_bytes())
.fetch_optional(dbtx.as_mut())
.await?;
let (price, liquidity) = row.unwrap_or_default();
let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
Ok(Self {
start,
end,
price,
liquidity,
start_price_indexing_denom,
})
}

Expand All @@ -414,16 +416,20 @@ mod summary {
now: DateTime,
candle: Option<Candle>,
metrics: PairMetrics,
start_price_indexing_denom: Option<f64>,
) -> anyhow::Result<()> {
if let Some(candle) = candle {
self.price = candle.close;
}
if let Some(price) = start_price_indexing_denom {
self.start_price_indexing_denom = price;
}
self.liquidity += metrics.liquidity_change;

sqlx::query(
"
INSERT INTO dex_ex_pairs_block_snapshot VALUES (
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
)
",
)
Expand All @@ -434,6 +440,7 @@ mod summary {
.bind(self.liquidity)
.bind(candle.map(|x| x.direct_volume).unwrap_or_default())
.bind(candle.map(|x| x.swap_volume).unwrap_or_default())
.bind(self.start_price_indexing_denom)
.bind(metrics.trades)
.execute(dbtx.as_mut())
.await?;
Expand Down Expand Up @@ -481,6 +488,8 @@ mod summary {
SELECT
COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
COALESCE(SUM(trades), 0.0) AS trades_over_window,
COALESCE(MIN(price), 0.0) AS low,
COALESCE(MAX(price), 0.0) AS high
Expand All @@ -496,6 +505,8 @@ mod summary {
liquidity, liquidity_then,
direct_volume_over_window,
swap_volume_over_window,
direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window,
trades_over_window
FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
ON CONFLICT (asset_start, asset_end, the_window)
Expand All @@ -506,6 +517,8 @@ mod summary {
liquidity_then = EXCLUDED.liquidity_then,
direct_volume_over_window = EXCLUDED.direct_volume_over_window,
swap_volume_over_window = EXCLUDED.swap_volume_over_window,
direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
trades_over_window = EXCLUDED.trades_over_window
",
)
Expand All @@ -532,16 +545,16 @@ mod summary {
eligible_denoms AS (
SELECT asset_start as asset, price
FROM dex_ex_pairs_summary
WHERE asset_end = $1 AND liquidity >= $2
WHERE asset_end = $1
UNION VALUES ($1, 1.0)
),
converted_pairs_summary AS (
SELECT
asset_start, asset_end,
(dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
liquidity * ed_end.price AS liquidity,
direct_volume_over_window * ed_start.price AS dv,
swap_volume_over_window * ed_start.price AS sv,
direct_volume_indexing_denom_over_window AS dv,
swap_volume_indexing_denom_over_window AS sv,
trades_over_window as trades
FROM dex_ex_pairs_summary
JOIN eligible_denoms AS ed_end
Expand Down Expand Up @@ -884,6 +897,13 @@ impl Events {

Ok(())
}

/// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
self.candles
.get(&DirectedTradingPair::new(asset, indexing_denom))
.map(|x| x.close)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1244,6 +1264,7 @@ impl AppView for Component {
time,
events.candles.get(&pair).copied(),
events.metrics.get(&pair).copied().unwrap_or_default(),
events.price_for(self.denom, pair.start),
)
.await?;
}
Expand Down
4 changes: 4 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot (
liquidity FLOAT8 NOT NULL,
direct_volume FLOAT8 NOT NULL,
swap_volume FLOAT8 NOT NULL,
-- The most recent price of the start asset, in terms of the indexing denom.
start_price_indexing_denom FLOAT8 NOT NULL,
trades FLOAT8 NOT NULL
);

Expand All @@ -55,6 +57,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary (
liquidity_then FLOAT8 NOT NULL,
direct_volume_over_window FLOAT8 NOT NULL,
swap_volume_over_window FLOAT8 NOT NULL,
direct_volume_indexing_denom_over_window FLOAT8 NOT NULL,
swap_volume_indexing_denom_over_window FLOAT8 NOT NULL,
trades_over_window FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end, the_window)
);
Expand Down

0 comments on commit 9f04a07

Please sign in to comment.