Skip to content

Commit

Permalink
lqt(dex): setup volume trackers (#5016)
Browse files Browse the repository at this point in the history
## Describe your changes
This PR:
- expose a component level api `LqtRead`
- define two new DEX state key modules: `lqt::v1::lp` and
`lqt::v1::pair`
- implements a `position_manager::volume_tracker`
- stubs out the inner position manager entrypoint, deferring
implementation to later

## Volume definition

We track the **outflow** of staking tokens from the position. This means
that an attacker controlled asset must commit to a staking token
inventory for at least a full block execution.

## State key modeling

The lookup index maps an epoch index and a position id to a cumulative
volume tally.

The full sorted index orders position ids by cumulative volume (keyed to
the epoch).

## Issue ticket number and link
Part of #5015 

## Checklist before requesting a review

- [x] I have added guiding text to explain how a reviewer should test
these changes.
N/A

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > LQT branch

---------

Signed-off-by: Erwan Or <[email protected]>
Co-authored-by: Lúcás Meier <[email protected]>
  • Loading branch information
erwanor and cronokirby authored Jan 30, 2025
1 parent 3b45dc2 commit 943a9a8
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 0 deletions.
67 changes: 67 additions & 0 deletions crates/core/component/dex/src/component/lqt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use crate::lp::position;
use crate::state_key::lqt;
use anyhow::Result;
use async_trait::async_trait;
use cnidarium::StateRead;
use futures::StreamExt;
use penumbra_sdk_asset::asset;
use penumbra_sdk_num::Amount;
use penumbra_sdk_proto::StateReadProto;
use penumbra_sdk_sct::component::clock::EpochRead;
use std::pin::Pin;

/// Provides public read access to LQT data.
#[async_trait]
pub trait LqtRead: StateRead {
/// Returns the cumulative volume of staking token for a trading pair.
/// This is the sum of the outflows of the staking token from all positions in the pair.
///
/// Default to zero if no volume is found.
async fn get_volume_for_pair(&self, asset: asset::Id) -> Amount {
let epoch = self.get_current_epoch().await.expect("epoch is always set");
let key = lqt::v1::pair::lookup::volume_by_pair(epoch.index, asset);
let value = self.nonverifiable_get(&key).await.unwrap_or_default();
value.unwrap_or_default()
}

/// Returns the cumulative volume of staking token for a given position id.
/// This is the sum of the outflows of the staking token from the position.
///
/// Default to zero if no volume is found.
async fn get_volume_for_position(&self, position_id: &position::Id) -> Amount {
let epoch = self.get_current_epoch().await.expect("epoch is always set");
let key = lqt::v1::lp::lookup::volume_by_position(epoch.index, position_id);
let value = self.nonverifiable_get(&key).await.unwrap_or_default();
value.unwrap_or_default()
}

/// Returns a stream of position ids sorted by descending volume.
/// The volume is the sum of the outflows of the staking token from the position.
fn positions_by_volume_stream(
&self,
epoch_index: u64,
asset_id: asset::Id,
) -> Result<
Pin<
Box<
dyn futures::Stream<Item = Result<(asset::Id, position::Id, Amount)>>
+ Send
+ 'static,
>,
>,
> {
let key = lqt::v1::lp::by_volume::prefix_with_asset(epoch_index, &asset_id);
Ok(self
.nonverifiable_prefix_raw(&key)
.map(|res| {
res.map(|(raw_entry, _)| {
let (asset, volume, position_id) =
lqt::v1::lp::by_volume::parse_key(&raw_entry).expect("internal invariant failed: failed to parse state key for lqt::v1::lp::by_volume");
(asset, position_id, volume)
})
})
.boxed())
}
}

impl<T: StateRead + ?Sized> LqtRead for T {}
2 changes: 2 additions & 0 deletions crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ pub(crate) mod circuit_breaker;
mod dex;
mod eviction_manager;
mod flow;
mod lqt;
mod position_manager;
mod swap_manager;

pub use dex::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::PositionManager;

// Read data from the Dex component;
pub use lqt::LqtRead;
pub use position_manager::PositionRead;
pub use swap_manager::SwapDataRead;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ mod base_liquidity_index;
pub(crate) mod counter;
pub(crate) mod inventory_index;
pub(crate) mod price_index;
pub(crate) mod volume_tracker;

#[async_trait]
pub trait PositionRead: StateRead {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#![allow(unused_imports, unused_variables, dead_code)]
use anyhow::Result;
use cnidarium::StateWrite;
use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID};
use penumbra_sdk_num::Amount;
use position::State::*;
use tracing::instrument;

use crate::component::lqt::LqtRead;
use crate::lp::position::{self, Position};
use crate::state_key::{engine, lqt};
use crate::{trading_pair, DirectedTradingPair, TradingPair};
use async_trait::async_trait;
use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
use penumbra_sdk_sct::component::clock::EpochRead;

#[async_trait]
pub(crate) trait PositionVolumeTracker: StateWrite {
async fn increase_volume_index(
&mut self,
position_id: &position::Id,
prev_state: &Option<Position>,
new_state: &Position,
) {
// We only index the volume for staking token pairs.
if !new_state.phi.matches_input(*STAKING_TOKEN_ASSET_ID) {
return;
}

// Or if the position has existed before.
if prev_state.is_none() {
tracing::debug!(?position_id, "newly opened position, skipping volume index");
return;
}

// Short-circuit if the position is transitioning to a non-open state.
// This might miss some volume updates, but is more conservative on state-flow.
if !matches!(new_state.state, position::State::Opened) {
tracing::debug!(
?position_id,
"new state is not `Opened`, skipping volume index"
);
return;
}

let trading_pair = new_state.phi.pair.clone();

// We want to track the **outflow** of staking tokens from the position.
// This means that we track the amount of staking tokens that have left the position.
// We do this by comparing the previous and new reserves of the staking token.
// We **DO NOT** want to track the volume of the other asset denominated in staking tokens.
let prev_state = prev_state.as_ref().expect("the previous state exists");
let prev_balance = prev_state
.reserves_for(*STAKING_TOKEN_ASSET_ID)
.expect("the staking token is in the pair");

let new_balance = new_state
.reserves_for(*STAKING_TOKEN_ASSET_ID)
.expect("the staking token is in the pair");

// We track the *outflow* of the staking token.
// "How much inventory has left the position?"
let staking_token_outflow = prev_balance.saturating_sub(&new_balance);

// We lookup the previous volume index entry.
let old_volume = self.get_volume_for_position(position_id).await;
let new_volume = old_volume.saturating_add(&staking_token_outflow);

// Grab the ambient epoch index.
let epoch_index = self
.get_current_epoch()
.await
.expect("epoch is always set")
.index;

// Find the trading pair asset that is not the staking token.
let other_asset = if trading_pair.asset_1() == *STAKING_TOKEN_ASSET_ID {
trading_pair.asset_2()
} else {
trading_pair.asset_1()
};

self.update_volume(
epoch_index,
&other_asset,
position_id,
old_volume,
new_volume,
)
}
}

impl<T: StateWrite + ?Sized> PositionVolumeTracker for T {}

trait Inner: StateWrite {
#[instrument(skip(self))]
fn update_volume(
&mut self,
epoch_index: u64,
asset_id: &asset::Id,
position_id: &position::Id,
old_volume: Amount,
new_volume: Amount,
) {
// First, update the lookup index with the new volume.
let lookup_key = lqt::v1::lp::lookup::volume_by_position(epoch_index, position_id);
use penumbra_sdk_proto::StateWriteProto;
self.nonverifiable_put(lookup_key.to_vec(), new_volume);

// Then, update the sorted index:
let old_index_key =
lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, old_volume);
// Delete the old key:
self.nonverifiable_delete(old_index_key.to_vec());
// Store the new one:
let new_index_key =
lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, new_volume);
self.nonverifiable_put(new_index_key.to_vec(), new_volume);
}
}

impl<T: StateWrite + ?Sized> Inner for T {}
118 changes: 118 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,124 @@ pub fn aggregate_value() -> &'static str {
"dex/aggregate_value"
}

pub mod lqt {
pub mod v1 {
pub mod pair {
pub mod lookup {
use penumbra_sdk_asset::asset;

pub(crate) fn prefix(epoch_index: u64) -> String {
format!("dex/lqt/v1/pair/lookup/{epoch_index:020}/")
}

// A lookup index used to inspect aggregate outflows for a given pair.
/// It maps a trading pair (staking token, asset) to the cumulative volume of outbound liquidity.
///
/// # Key Encoding
/// The lookup key is encoded as `prefix || asset`
/// # Value Encoding
/// The value is encoded as `BE(Amount)`
pub(crate) fn volume_by_pair(epoch_index: u64, asset: asset::Id) -> [u8; 76] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 76];
key[0..44].copy_from_slice(prefix_bytes.as_bytes());
key[44..44 + 32].copy_from_slice(&asset.to_bytes());
key
}
}
}

pub mod lp {
pub mod lookup {
pub(crate) fn prefix(epoch_index: u64) -> String {
format!("dex/lqt/v1/lp/lookup/{epoch_index:020}/")
}

/// A lookup index used to update the `by_volume` index.
/// It maps a position id to the latest tally of outbound cumulative volume.
///
/// # Key Encoding
/// The lookup key is encoded as `prefix || position_id`
/// # Value Encoding
/// The value is encoded as `BE(Amount)`
pub(crate) fn volume_by_position(
epoch_index: u64,
position_id: &crate::lp::position::Id,
) -> [u8; 74] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 74];
key[0..42].copy_from_slice(prefix_bytes.as_bytes());
key[42..42 + 32].copy_from_slice(&position_id.0);
key
}
}

pub mod by_volume {
use anyhow::{ensure, Result};
use penumbra_sdk_asset::asset;
use penumbra_sdk_num::Amount;

use crate::lp::position;

pub fn prefix(epoch_index: u64) -> String {
format!("dex/lqt/v1/lp/by_volume/{epoch_index:020}/")
}

pub fn prefix_with_asset(epoch_index: u64, asset: &asset::Id) -> [u8; 77] {
let prefix = prefix(epoch_index);
let mut key = [0u8; 77];
key[0..45].copy_from_slice(prefix.as_bytes());
key[45..45 + 32].copy_from_slice(&asset.to_bytes());
key
}

/// Tracks the cumulative volume of outbound liquidity for a given pair.
/// The pair is always connected by the staking token, which is the implicit numeraire.
///
/// # Encoding
/// The full key is encoded as: `prefix || asset || BE(!volume) || position`
pub(crate) fn key(
epoch_index: u64,
asset: &asset::Id,
position: &position::Id,
volume: Amount,
) -> [u8; 125] {
let prefix_bytes = prefix(epoch_index);
let mut key = [0u8; 125];
key[0..45].copy_from_slice(prefix_bytes.as_bytes());
key[45..45 + 32].copy_from_slice(&asset.to_bytes());
key[45 + 32..45 + 32 + 16].copy_from_slice(&(!volume).to_be_bytes());
key[45 + 32 + 16..45 + 32 + 16 + 32].copy_from_slice(&position.0);
key
}

/// Parse a raw key into its constituent parts.
///
/// # Errors
/// This function will return an error if the key is not 125 bytes. Or, if the
/// key contains an invalid asset or position identifier.
pub(crate) fn parse_key(key: &[u8]) -> Result<(asset::Id, Amount, position::Id)> {
ensure!(key.len() == 125, "key must be 125 bytes");

// Skip the first 45 bytes, which is the prefix.
let raw_asset: [u8; 32] = key[45..45 + 32].try_into()?;
let asset: asset::Id = raw_asset.try_into()?;

let raw_amount: [u8; 16] = key[45 + 32..45 + 32 + 16].try_into()?;
let amount_complement = Amount::from_be_bytes(raw_amount);
let amount = !amount_complement;

let raw_position_id: [u8; 32] =
key[45 + 32 + 16..45 + 32 + 16 + 32].try_into()?;
let position_id = position::Id(raw_position_id);

Ok((asset, amount, position_id))
}
}
}
}
}

pub(crate) mod engine {
use super::*;
use crate::lp::BareTradingFunction;
Expand Down

0 comments on commit 943a9a8

Please sign in to comment.