From 639c7995c4d336b161059ee4f453bcbe4b783a20 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Thu, 16 Jan 2025 09:30:18 -0800 Subject: [PATCH] pindexer: dex_ex: calculate USDC volume continuously (#4987) This adds a field to the low level snapshot used to derive other metrics, which captures the current price of a given asset relative to USDC (configurable to an arbitrary indexing denom). This then allows a pair summary to have a more accurate view of the volume in USDC terms, and for the aggregate summary to thus have a more accurate view. As a side-effect, this also gets rid of liquidity minimums for considering denoms in the aggregate summary. To test, run pindexer again, compare the aggregate summary, and then check that the converted volume is about the same. ## Checklist before requesting a review - [x] I have added guiding text to explain how a reviewer should test these changes. - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- crates/bin/pindexer/src/dex_ex/mod.rs | 35 ++++++++++++++++++----- crates/bin/pindexer/src/dex_ex/schema.sql | 4 +++ 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 4bbe4c6233..2e0ba1e409 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -377,6 +377,7 @@ mod summary { end: asset::Id, price: f64, liquidity: f64, + start_price_indexing_denom: f64, } impl Context { @@ -385,9 +386,9 @@ mod summary { start: asset::Id, end: asset::Id, ) -> anyhow::Result { - let row: Option<(f64, f64)> = sqlx::query_as( + let row: Option<(f64, f64, f64)> = sqlx::query_as( " - SELECT price, liquidity + SELECT price, liquidity, start_price_indexing_denom FROM dex_ex_pairs_block_snapshot WHERE asset_start = $1 AND asset_end = $2 @@ -399,12 +400,13 @@ mod summary { .bind(end.to_bytes()) .fetch_optional(dbtx.as_mut()) .await?; - let (price, liquidity) = row.unwrap_or_default(); + let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default(); Ok(Self { start, end, price, liquidity, + start_price_indexing_denom, }) } @@ -414,16 +416,20 @@ mod summary { now: DateTime, candle: Option, metrics: PairMetrics, + start_price_indexing_denom: Option, ) -> anyhow::Result<()> { if let Some(candle) = candle { self.price = candle.close; } + if let Some(price) = start_price_indexing_denom { + self.start_price_indexing_denom = price; + } self.liquidity += metrics.liquidity_change; sqlx::query( " INSERT INTO dex_ex_pairs_block_snapshot VALUES ( - DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8 + DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9 ) ", ) @@ -434,6 +440,7 @@ mod summary { .bind(self.liquidity) .bind(candle.map(|x| x.direct_volume).unwrap_or_default()) .bind(candle.map(|x| x.swap_volume).unwrap_or_default()) + .bind(self.start_price_indexing_denom) .bind(metrics.trades) .execute(dbtx.as_mut()) .await?; @@ -481,6 +488,8 @@ mod summary { SELECT COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window, COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window, + COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window, + COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window, COALESCE(SUM(trades), 0.0) AS trades_over_window, COALESCE(MIN(price), 0.0) AS low, COALESCE(MAX(price), 0.0) AS high @@ -496,6 +505,8 @@ mod summary { liquidity, liquidity_then, direct_volume_over_window, swap_volume_over_window, + direct_volume_indexing_denom_over_window, + swap_volume_indexing_denom_over_window, trades_over_window FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE ON CONFLICT (asset_start, asset_end, the_window) @@ -506,6 +517,8 @@ mod summary { liquidity_then = EXCLUDED.liquidity_then, direct_volume_over_window = EXCLUDED.direct_volume_over_window, swap_volume_over_window = EXCLUDED.swap_volume_over_window, + direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window, + swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window, trades_over_window = EXCLUDED.trades_over_window ", ) @@ -532,7 +545,7 @@ mod summary { eligible_denoms AS ( SELECT asset_start as asset, price FROM dex_ex_pairs_summary - WHERE asset_end = $1 AND liquidity >= $2 + WHERE asset_end = $1 UNION VALUES ($1, 1.0) ), converted_pairs_summary AS ( @@ -540,8 +553,8 @@ mod summary { asset_start, asset_end, (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change, liquidity * ed_end.price AS liquidity, - direct_volume_over_window * ed_start.price AS dv, - swap_volume_over_window * ed_start.price AS sv, + direct_volume_indexing_denom_over_window AS dv, + swap_volume_indexing_denom_over_window AS sv, trades_over_window as trades FROM dex_ex_pairs_summary JOIN eligible_denoms AS ed_end @@ -884,6 +897,13 @@ impl Events { Ok(()) } + + /// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block. + pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option { + self.candles + .get(&DirectedTradingPair::new(asset, indexing_denom)) + .map(|x| x.close) + } } #[derive(Debug)] @@ -1244,6 +1264,7 @@ impl AppView for Component { time, events.candles.get(&pair).copied(), events.metrics.get(&pair).copied().unwrap_or_default(), + events.price_for(self.denom, pair.start), ) .await?; } diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index de284b5e98..3ec69f328c 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot ( liquidity FLOAT8 NOT NULL, direct_volume FLOAT8 NOT NULL, swap_volume FLOAT8 NOT NULL, + -- The most recent price of the start asset, in terms of the indexing denom. + start_price_indexing_denom FLOAT8 NOT NULL, trades FLOAT8 NOT NULL ); @@ -55,6 +57,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary ( liquidity_then FLOAT8 NOT NULL, direct_volume_over_window FLOAT8 NOT NULL, swap_volume_over_window FLOAT8 NOT NULL, + direct_volume_indexing_denom_over_window FLOAT8 NOT NULL, + swap_volume_indexing_denom_over_window FLOAT8 NOT NULL, trades_over_window FLOAT8 NOT NULL, PRIMARY KEY (asset_start, asset_end, the_window) );