Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lqt(dex): setup volume trackers #5016

Merged
merged 10 commits into from
Jan 30, 2025
67 changes: 67 additions & 0 deletions crates/core/component/dex/src/component/lqt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::lp::position;
use crate::state_key::lqt;
use anyhow::Result;
use async_trait::async_trait;
use cnidarium::StateRead;
use futures::StreamExt;
use penumbra_sdk_asset::asset;
use penumbra_sdk_num::Amount;
use penumbra_sdk_proto::StateReadProto;
use penumbra_sdk_sct::component::clock::EpochRead;
use std::pin::Pin;

/// Provides public read access to LQT data.
#[async_trait]
pub trait LqtRead: StateRead {
/// Returns the cumulative volume of staking token for a trading pair.
/// This is the sum of the outflows of the staking token from all positions in the pair.
///
/// Default to zero if no volume is found.
async fn get_volume_for_pair(&self, asset: asset::Id) -> Amount {
let epoch = self.get_current_epoch().await.expect("epoch is always set");
let key = lqt::v1::pair::lookup::volume_by_pair(epoch.index, asset);
let value = self.nonverifiable_get(&key).await.unwrap_or_default();
erwanor marked this conversation as resolved.
Show resolved Hide resolved
value.unwrap_or_default()
}

/// Returns the cumulative volume of staking token for a given position id.
/// This is the sum of the outflows of the staking token from the position.
///
/// Default to zero if no volume is found.
async fn get_volume_for_position(&self, position_id: &position::Id) -> Amount {
let epoch = self.get_current_epoch().await.expect("epoch is always set");
let key = lqt::v1::lp::lookup::volume_by_position(epoch.index, position_id);
let value = self.nonverifiable_get(&key).await.unwrap_or_default();
erwanor marked this conversation as resolved.
Show resolved Hide resolved
value.unwrap_or_default()
}

/// Returns a stream of position ids sorted by descending volume.
/// The volume is the sum of the outflows of the staking token from the position.
fn positions_by_volume_stream(
erwanor marked this conversation as resolved.
Show resolved Hide resolved
&self,
epoch_index: u64,
asset_id: asset::Id,
) -> Result<
Pin<
Box<
dyn futures::Stream<Item = Result<(asset::Id, position::Id, Amount)>>
+ Send
+ 'static,
>,
>,
> {
let key = lqt::v1::lp::by_volume::prefix_with_asset(epoch_index, &asset_id);
Ok(self
.nonverifiable_prefix_raw(&key)
.map(|res| {
res.map(|(raw_entry, _)| {
let (asset, volume, position_id) =
lqt::v1::lp::by_volume::parse_key(&raw_entry).expect("internal invariant failed: failed to parse state key for lqt::v1::lp::by_volume");
(asset, position_id, volume)
})
})
.boxed())
}
}

impl<T: StateRead + ?Sized> LqtRead for T {}
2 changes: 2 additions & 0 deletions crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ pub(crate) mod circuit_breaker;
mod dex;
mod eviction_manager;
mod flow;
mod lqt;
mod position_manager;
mod swap_manager;

pub use dex::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::PositionManager;

// Read data from the Dex component;
pub use lqt::LqtRead;
pub use position_manager::PositionRead;
pub use swap_manager::SwapDataRead;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod base_liquidity_index;
pub(crate) mod counter;
pub(crate) mod inventory_index;
pub(crate) mod price_index;
pub(crate) mod volume_tracker;

#[async_trait]
pub trait PositionRead: StateRead {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#![allow(unused_imports, unused_variables, dead_code)]
use anyhow::Result;
use cnidarium::StateWrite;
use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
use penumbra_sdk_num::Amount;
use position::State::*;
use tracing::instrument;

use crate::component::lqt::LqtRead;
use crate::lp::position::{self, Position};
use crate::state_key::{engine, lqt};
use crate::{trading_pair, DirectedTradingPair, TradingPair};
use async_trait::async_trait;
use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
use penumbra_sdk_sct::component::clock::EpochRead;

#[async_trait]
pub(crate) trait PositionVolumeTracker: StateWrite {
async fn increase_volume_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
) {
// We only index the volume for staking token pairs.
if !new_state.phi.matches_input(*STAKING_TOKEN_ASSET_ID) {
return;
}

// Or if the position has existed before.
if prev_state.is_none() {
tracing::debug!(?position_id, "newly opened position, skipping volume index");
return;
}

// Short-circuit if the position is transitioning to a non-open state.
// This might miss some volume updates, but is more conservative on state-flow.
if !matches!(new_state.state, position::State::Opened) {
tracing::debug!(
?position_id,
"new state is not `Opened`, skipping volume index"
);
return;
}

let trading_pair = new_state.phi.pair.clone();

// We want to track the **outflow** of staking tokens from the position.
// This means that we track the amount of staking tokens that have left the position.
// We do this by comparing the previous and new reserves of the staking token.
// We **DO NOT** want to track the volume of the other asset denominated in staking tokens.
let prev_state = prev_state.as_ref().expect("the previous state exists");
let prev_balance = prev_state
.reserves_for(*STAKING_TOKEN_ASSET_ID)
.expect("the staking token is in the pair");

let new_balance = new_state
.reserves_for(*STAKING_TOKEN_ASSET_ID)
.expect("the staking token is in the pair");

// We track the *outflow* of the staking token.
// "How much inventory has left the position?"
let staking_token_outflow = prev_balance.saturating_sub(&new_balance);

// We lookup the previous volume index entry.
let old_volume = self.get_volume_for_position(position_id).await;
let new_volume = old_volume.saturating_add(&staking_token_outflow);

// Grab the ambient epoch index.
let epoch_index = self
.get_current_epoch()
.await
.expect("epoch is always set")
.index;

// Find the trading pair asset that is not the staking token.
let other_asset = if trading_pair.asset_1() == *STAKING_TOKEN_ASSET_ID {
trading_pair.asset_2()
} else {
trading_pair.asset_1()
};

self.update_volume(
epoch_index,
&other_asset,
position_id,
old_volume,
new_volume,
)
}
}

impl<T: StateWrite + ?Sized> PositionVolumeTracker for T {}

trait Inner: StateWrite {
#[instrument(skip(self))]
fn update_volume(
erwanor marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
epoch_index: u64,
asset_id: &asset::Id,
position_id: &position::Id,
old_volume: Amount,
new_volume: Amount,
) {
// First, update the lookup index with the new volume.
let lookup_key = lqt::v1::lp::lookup::volume_by_position(epoch_index, position_id);
use penumbra_sdk_proto::StateWriteProto;
self.nonverifiable_put(lookup_key.to_vec(), new_volume);

// Then, update the sorted index:
let old_index_key =
lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, old_volume);
// Delete the old key:
self.nonverifiable_delete(old_index_key.to_vec());
// Store the new one:
let new_index_key =
lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, new_volume);
self.nonverifiable_put(new_index_key.to_vec(), new_volume);
}
}

impl<T: StateWrite + ?Sized> Inner for T {}
118 changes: 118 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,124 @@ pub fn aggregate_value() -> &'static str {
"dex/aggregate_value"
}

pub mod lqt {
pub mod v1 {
pub mod pair {
pub mod lookup {
use penumbra_sdk_asset::asset;

pub(crate) fn prefix(epoch_index: u64) -> String {
erwanor marked this conversation as resolved.
Show resolved Hide resolved
format!("dex/lqt/v1/pair/lookup/{epoch_index:020}/")
}

// A lookup index used to inspect aggregate outflows for a given pair.
/// It maps a trading pair (staking token, asset) to the cumulative volume of outbound liquidity.
///
/// # Key Encoding
/// The lookup key is encoded as `prefix || asset`
/// # Value Encoding
/// The value is encoded as `BE(Amount)`
pub(crate) fn volume_by_pair(epoch_index: u64, asset: asset::Id) -> [u8; 76] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 76];
key[0..44].copy_from_slice(prefix_bytes.as_bytes());
key[44..44 + 32].copy_from_slice(&asset.to_bytes());
key
}
}
}

pub mod lp {
pub mod lookup {
pub(crate) fn prefix(epoch_index: u64) -> String {
format!("dex/lqt/v1/lp/lookup/{epoch_index:020}/")
}

/// A lookup index used to update the `by_volume` index.
/// It maps a position id to the latest tally of outbound cumulative volume.
///
/// # Key Encoding
/// The lookup key is encoded as `prefix || position_id`
/// # Value Encoding
/// The value is encoded as `BE(Amount)`
pub(crate) fn volume_by_position(
epoch_index: u64,
position_id: &crate::lp::position::Id,
) -> [u8; 74] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 74];
key[0..42].copy_from_slice(prefix_bytes.as_bytes());
key[42..42 + 32].copy_from_slice(&position_id.0);
key
}
}

pub mod by_volume {
use anyhow::{ensure, Result};
use penumbra_sdk_asset::asset;
use penumbra_sdk_num::Amount;

use crate::lp::position;

pub fn prefix(epoch_index: u64) -> String {
format!("dex/lqt/v1/lp/by_volume/{epoch_index:020}/")
}

pub fn prefix_with_asset(epoch_index: u64, asset: &asset::Id) -> [u8; 77] {
let prefix = prefix(epoch_index);
let mut key = [0u8; 77];
key[0..45].copy_from_slice(prefix.as_bytes());
key[45..45 + 32].copy_from_slice(&asset.to_bytes());
key
}

/// Tracks the cumulative volume of outbound liquidity for a given pair.
/// The pair is always connected by the staking token, which is the implicit numeraire.
///
/// # Encoding
/// The full key is encoded as: `prefix || asset || BE(!volume) || position`
pub(crate) fn key(
epoch_index: u64,
asset: &asset::Id,
position: &position::Id,
volume: Amount,
) -> [u8; 125] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 125];
key[0..45].copy_from_slice(prefix_bytes.as_bytes());
key[45..45 + 32].copy_from_slice(&asset.to_bytes());
key[45 + 32..45 + 32 + 16].copy_from_slice(&(!volume).to_be_bytes());
key[45 + 32 + 16..45 + 32 + 16 + 32].copy_from_slice(&position.0);
key
}

/// Parse a raw key into its constituent parts.
///
/// # Errors
/// This function will return an error if the key is not 125 bytes. Or, if the
/// key contains an invalid asset or position identifier.
pub(crate) fn parse_key(key: &[u8]) -> Result<(asset::Id, Amount, position::Id)> {
ensure!(key.len() == 125, "key must be 125 bytes");

// Skip the first 45 bytes, which is the prefix.
let raw_asset: [u8; 32] = key[45..45 + 32].try_into()?;
let asset: asset::Id = raw_asset.try_into()?;

let raw_amount: [u8; 16] = key[45 + 32..45 + 32 + 16].try_into()?;
let amount_complement = Amount::from_be_bytes(raw_amount);
let amount = !amount_complement;

let raw_position_id: [u8; 32] =
key[45 + 32 + 16..45 + 32 + 16 + 32].try_into()?;
let position_id = position::Id(raw_position_id);

Ok((asset, amount, position_id))
}
}
}
}
}

pub(crate) mod engine {
use super::*;
use crate::lp::BareTradingFunction;
Expand Down
Loading