Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pindexer: dex_ex: calculate USDC volume continuously #4987

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ mod summary {
end: asset::Id,
price: f64,
liquidity: f64,
start_price_indexing_denom: f64,
}

impl Context {
Expand All @@ -385,9 +386,9 @@ mod summary {
start: asset::Id,
end: asset::Id,
) -> anyhow::Result<Self> {
let row: Option<(f64, f64)> = sqlx::query_as(
let row: Option<(f64, f64, f64)> = sqlx::query_as(
"
SELECT price, liquidity
SELECT price, liquidity, start_price_indexing_denom
FROM dex_ex_pairs_block_snapshot
WHERE asset_start = $1
AND asset_end = $2
Expand All @@ -399,12 +400,13 @@ mod summary {
.bind(end.to_bytes())
.fetch_optional(dbtx.as_mut())
.await?;
let (price, liquidity) = row.unwrap_or_default();
let (price, liquidity, start_price_indexing_denom) = row.unwrap_or_default();
Ok(Self {
start,
end,
price,
liquidity,
start_price_indexing_denom,
})
}

Expand All @@ -414,16 +416,20 @@ mod summary {
now: DateTime,
candle: Option<Candle>,
metrics: PairMetrics,
start_price_indexing_denom: Option<f64>,
) -> anyhow::Result<()> {
if let Some(candle) = candle {
self.price = candle.close;
}
if let Some(price) = start_price_indexing_denom {
self.start_price_indexing_denom = price;
}
self.liquidity += metrics.liquidity_change;

sqlx::query(
"
INSERT INTO dex_ex_pairs_block_snapshot VALUES (
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8
DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9
)
",
)
Expand All @@ -434,6 +440,7 @@ mod summary {
.bind(self.liquidity)
.bind(candle.map(|x| x.direct_volume).unwrap_or_default())
.bind(candle.map(|x| x.swap_volume).unwrap_or_default())
.bind(self.start_price_indexing_denom)
.bind(metrics.trades)
.execute(dbtx.as_mut())
.await?;
Expand Down Expand Up @@ -481,6 +488,8 @@ mod summary {
SELECT
COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window,
COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * direct_volume), 0.0) as direct_volume_indexing_denom_over_window,
COALESCE(SUM(COALESCE(start_price_indexing_denom, 0.0) * swap_volume), 0.0) as swap_volume_indexing_denom_over_window,
COALESCE(SUM(trades), 0.0) AS trades_over_window,
COALESCE(MIN(price), 0.0) AS low,
COALESCE(MAX(price), 0.0) AS high
Expand All @@ -496,6 +505,8 @@ mod summary {
liquidity, liquidity_then,
direct_volume_over_window,
swap_volume_over_window,
direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window,
trades_over_window
FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE
ON CONFLICT (asset_start, asset_end, the_window)
Expand All @@ -506,6 +517,8 @@ mod summary {
liquidity_then = EXCLUDED.liquidity_then,
direct_volume_over_window = EXCLUDED.direct_volume_over_window,
swap_volume_over_window = EXCLUDED.swap_volume_over_window,
direct_volume_indexing_denom_over_window = EXCLUDED.direct_volume_indexing_denom_over_window,
swap_volume_indexing_denom_over_window = EXCLUDED.swap_volume_indexing_denom_over_window,
trades_over_window = EXCLUDED.trades_over_window
",
)
Expand All @@ -532,16 +545,16 @@ mod summary {
eligible_denoms AS (
SELECT asset_start as asset, price
FROM dex_ex_pairs_summary
WHERE asset_end = $1 AND liquidity >= $2
WHERE asset_end = $1
UNION VALUES ($1, 1.0)
),
converted_pairs_summary AS (
SELECT
asset_start, asset_end,
(dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change,
liquidity * ed_end.price AS liquidity,
direct_volume_over_window * ed_start.price AS dv,
swap_volume_over_window * ed_start.price AS sv,
direct_volume_indexing_denom_over_window AS dv,
swap_volume_indexing_denom_over_window AS sv,
trades_over_window as trades
FROM dex_ex_pairs_summary
JOIN eligible_denoms AS ed_end
Expand Down Expand Up @@ -884,6 +897,13 @@ impl Events {

Ok(())
}

/// Attempt to find the price, relative to a given indexing denom, for a particular asset, in this block.
pub fn price_for(&self, indexing_denom: asset::Id, asset: asset::Id) -> Option<f64> {
self.candles
.get(&DirectedTradingPair::new(asset, indexing_denom))
.map(|x| x.close)
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -1244,6 +1264,7 @@ impl AppView for Component {
time,
events.candles.get(&pair).copied(),
events.metrics.get(&pair).copied().unwrap_or_default(),
events.price_for(self.denom, pair.start),
)
.await?;
}
Expand Down
4 changes: 4 additions & 0 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot (
liquidity FLOAT8 NOT NULL,
direct_volume FLOAT8 NOT NULL,
swap_volume FLOAT8 NOT NULL,
-- The most recent price of the start asset, in terms of the indexing denom.
start_price_indexing_denom FLOAT8 NOT NULL,
trades FLOAT8 NOT NULL
);

Expand All @@ -55,6 +57,8 @@ CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary (
liquidity_then FLOAT8 NOT NULL,
direct_volume_over_window FLOAT8 NOT NULL,
swap_volume_over_window FLOAT8 NOT NULL,
direct_volume_indexing_denom_over_window FLOAT8 NOT NULL,
swap_volume_indexing_denom_over_window FLOAT8 NOT NULL,
trades_over_window FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end, the_window)
);
Expand Down
Loading