From 943a9a8976d6b80ebc1ae04fe737b124dadae0b6 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 30 Jan 2025 00:23:29 -0500 Subject: [PATCH] lqt(dex): setup volume trackers (#5016) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Describe your changes This PR: - expose a component level api `LqtRead` - define two new DEX state key modules: `lqt::v1::lp` and `lqt::v1::pair` - implements a `position_manager::volume_tracker` - stubs out the inner position manager entrypoint, deferring implementation to later ## Volume definition We track the **outflow** of staking tokens from the position. This means that an attacker controlled asset must commit to a staking token inventory for at least a full block execution. ## State key modeling The lookup index maps an epoch index and a position id to a cumulative volume tally. The full sorted index orders position ids by cumulative volume (keyed to the epoch). ## Issue ticket number and link Part of #5015 ## Checklist before requesting a review - [x] I have added guiding text to explain how a reviewer should test these changes. N/A - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > LQT branch --------- Signed-off-by: Erwan Or Co-authored-by: Lúcás Meier --- .../core/component/dex/src/component/lqt.rs | 67 ++++++++++ .../core/component/dex/src/component/mod.rs | 2 + .../dex/src/component/position_manager.rs | 1 + .../position_manager/volume_tracker.rs | 122 ++++++++++++++++++ crates/core/component/dex/src/state_key.rs | 118 +++++++++++++++++ 5 files changed, 310 insertions(+) create mode 100644 crates/core/component/dex/src/component/lqt.rs create mode 100644 crates/core/component/dex/src/component/position_manager/volume_tracker.rs diff --git a/crates/core/component/dex/src/component/lqt.rs b/crates/core/component/dex/src/component/lqt.rs new file mode 100644 index 0000000000..1b26ef93d2 --- /dev/null +++ b/crates/core/component/dex/src/component/lqt.rs @@ -0,0 +1,67 @@ +use crate::lp::position; +use crate::state_key::lqt; +use anyhow::Result; +use async_trait::async_trait; +use cnidarium::StateRead; +use futures::StreamExt; +use penumbra_sdk_asset::asset; +use penumbra_sdk_num::Amount; +use penumbra_sdk_proto::StateReadProto; +use penumbra_sdk_sct::component::clock::EpochRead; +use std::pin::Pin; + +/// Provides public read access to LQT data. +#[async_trait] +pub trait LqtRead: StateRead { + /// Returns the cumulative volume of staking token for a trading pair. + /// This is the sum of the outflows of the staking token from all positions in the pair. + /// + /// Default to zero if no volume is found. + async fn get_volume_for_pair(&self, asset: asset::Id) -> Amount { + let epoch = self.get_current_epoch().await.expect("epoch is always set"); + let key = lqt::v1::pair::lookup::volume_by_pair(epoch.index, asset); + let value = self.nonverifiable_get(&key).await.unwrap_or_default(); + value.unwrap_or_default() + } + + /// Returns the cumulative volume of staking token for a given position id. + /// This is the sum of the outflows of the staking token from the position. + /// + /// Default to zero if no volume is found. + async fn get_volume_for_position(&self, position_id: &position::Id) -> Amount { + let epoch = self.get_current_epoch().await.expect("epoch is always set"); + let key = lqt::v1::lp::lookup::volume_by_position(epoch.index, position_id); + let value = self.nonverifiable_get(&key).await.unwrap_or_default(); + value.unwrap_or_default() + } + + /// Returns a stream of position ids sorted by descending volume. + /// The volume is the sum of the outflows of the staking token from the position. + fn positions_by_volume_stream( + &self, + epoch_index: u64, + asset_id: asset::Id, + ) -> Result< + Pin< + Box< + dyn futures::Stream> + + Send + + 'static, + >, + >, + > { + let key = lqt::v1::lp::by_volume::prefix_with_asset(epoch_index, &asset_id); + Ok(self + .nonverifiable_prefix_raw(&key) + .map(|res| { + res.map(|(raw_entry, _)| { + let (asset, volume, position_id) = + lqt::v1::lp::by_volume::parse_key(&raw_entry).expect("internal invariant failed: failed to parse state key for lqt::v1::lp::by_volume"); + (asset, position_id, volume) + }) + }) + .boxed()) + } +} + +impl LqtRead for T {} diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 454f1ca53d..7512a8dc23 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod circuit_breaker; mod dex; mod eviction_manager; mod flow; +mod lqt; mod position_manager; mod swap_manager; @@ -20,6 +21,7 @@ pub use dex::{Dex, StateReadExt, StateWriteExt}; pub use position_manager::PositionManager; // Read data from the Dex component; +pub use lqt::LqtRead; pub use position_manager::PositionRead; pub use swap_manager::SwapDataRead; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index bb042013d4..daa02bd31b 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -39,6 +39,7 @@ mod base_liquidity_index; pub(crate) mod counter; pub(crate) mod inventory_index; pub(crate) mod price_index; +pub(crate) mod volume_tracker; #[async_trait] pub trait PositionRead: StateRead { diff --git a/crates/core/component/dex/src/component/position_manager/volume_tracker.rs b/crates/core/component/dex/src/component/position_manager/volume_tracker.rs new file mode 100644 index 0000000000..2675a4e2d4 --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/volume_tracker.rs @@ -0,0 +1,122 @@ +#![allow(unused_imports, unused_variables, dead_code)] +use anyhow::Result; +use cnidarium::StateWrite; +use penumbra_sdk_asset::{asset, STAKING_TOKEN_ASSET_ID}; +use penumbra_sdk_num::Amount; +use position::State::*; +use tracing::instrument; + +use crate::component::lqt::LqtRead; +use crate::lp::position::{self, Position}; +use crate::state_key::{engine, lqt}; +use crate::{trading_pair, DirectedTradingPair, TradingPair}; +use async_trait::async_trait; +use penumbra_sdk_proto::{StateReadProto, StateWriteProto}; +use penumbra_sdk_sct::component::clock::EpochRead; + +#[async_trait] +pub(crate) trait PositionVolumeTracker: StateWrite { + async fn increase_volume_index( + &mut self, + position_id: &position::Id, + prev_state: &Option, + new_state: &Position, + ) { + // We only index the volume for staking token pairs. + if !new_state.phi.matches_input(*STAKING_TOKEN_ASSET_ID) { + return; + } + + // Or if the position has existed before. + if prev_state.is_none() { + tracing::debug!(?position_id, "newly opened position, skipping volume index"); + return; + } + + // Short-circuit if the position is transitioning to a non-open state. + // This might miss some volume updates, but is more conservative on state-flow. + if !matches!(new_state.state, position::State::Opened) { + tracing::debug!( + ?position_id, + "new state is not `Opened`, skipping volume index" + ); + return; + } + + let trading_pair = new_state.phi.pair.clone(); + + // We want to track the **outflow** of staking tokens from the position. + // This means that we track the amount of staking tokens that have left the position. + // We do this by comparing the previous and new reserves of the staking token. + // We **DO NOT** want to track the volume of the other asset denominated in staking tokens. + let prev_state = prev_state.as_ref().expect("the previous state exists"); + let prev_balance = prev_state + .reserves_for(*STAKING_TOKEN_ASSET_ID) + .expect("the staking token is in the pair"); + + let new_balance = new_state + .reserves_for(*STAKING_TOKEN_ASSET_ID) + .expect("the staking token is in the pair"); + + // We track the *outflow* of the staking token. + // "How much inventory has left the position?" + let staking_token_outflow = prev_balance.saturating_sub(&new_balance); + + // We lookup the previous volume index entry. + let old_volume = self.get_volume_for_position(position_id).await; + let new_volume = old_volume.saturating_add(&staking_token_outflow); + + // Grab the ambient epoch index. + let epoch_index = self + .get_current_epoch() + .await + .expect("epoch is always set") + .index; + + // Find the trading pair asset that is not the staking token. + let other_asset = if trading_pair.asset_1() == *STAKING_TOKEN_ASSET_ID { + trading_pair.asset_2() + } else { + trading_pair.asset_1() + }; + + self.update_volume( + epoch_index, + &other_asset, + position_id, + old_volume, + new_volume, + ) + } +} + +impl PositionVolumeTracker for T {} + +trait Inner: StateWrite { + #[instrument(skip(self))] + fn update_volume( + &mut self, + epoch_index: u64, + asset_id: &asset::Id, + position_id: &position::Id, + old_volume: Amount, + new_volume: Amount, + ) { + // First, update the lookup index with the new volume. + let lookup_key = lqt::v1::lp::lookup::volume_by_position(epoch_index, position_id); + use penumbra_sdk_proto::StateWriteProto; + self.nonverifiable_put(lookup_key.to_vec(), new_volume); + + // Then, update the sorted index: + let old_index_key = + lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, old_volume); + // Delete the old key: + self.nonverifiable_delete(old_index_key.to_vec()); + // Store the new one: + let new_index_key = + lqt::v1::lp::by_volume::key(epoch_index, asset_id, position_id, new_volume); + self.nonverifiable_put(new_index_key.to_vec(), new_volume); + } +} + +impl Inner for T {} diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 6ce4e5ccb6..3eae3ef44d 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -119,6 +119,124 @@ pub fn aggregate_value() -> &'static str { "dex/aggregate_value" } +pub mod lqt { + pub mod v1 { + pub mod pair { + pub mod lookup { + use penumbra_sdk_asset::asset; + + pub(crate) fn prefix(epoch_index: u64) -> String { + format!("dex/lqt/v1/pair/lookup/{epoch_index:020}/") + } + + // A lookup index used to inspect aggregate outflows for a given pair. + /// It maps a trading pair (staking token, asset) to the cumulative volume of outbound liquidity. + /// + /// # Key Encoding + /// The lookup key is encoded as `prefix || asset` + /// # Value Encoding + /// The value is encoded as `BE(Amount)` + pub(crate) fn volume_by_pair(epoch_index: u64, asset: asset::Id) -> [u8; 76] { + let prefix_bytes = prefix(epoch_index); + let mut key = [0u8; 76]; + key[0..44].copy_from_slice(prefix_bytes.as_bytes()); + key[44..44 + 32].copy_from_slice(&asset.to_bytes()); + key + } + } + } + + pub mod lp { + pub mod lookup { + pub(crate) fn prefix(epoch_index: u64) -> String { + format!("dex/lqt/v1/lp/lookup/{epoch_index:020}/") + } + + /// A lookup index used to update the `by_volume` index. + /// It maps a position id to the latest tally of outbound cumulative volume. + /// + /// # Key Encoding + /// The lookup key is encoded as `prefix || position_id` + /// # Value Encoding + /// The value is encoded as `BE(Amount)` + pub(crate) fn volume_by_position( + epoch_index: u64, + position_id: &crate::lp::position::Id, + ) -> [u8; 74] { + let prefix_bytes = prefix(epoch_index); + let mut key = [0u8; 74]; + key[0..42].copy_from_slice(prefix_bytes.as_bytes()); + key[42..42 + 32].copy_from_slice(&position_id.0); + key + } + } + + pub mod by_volume { + use anyhow::{ensure, Result}; + use penumbra_sdk_asset::asset; + use penumbra_sdk_num::Amount; + + use crate::lp::position; + + pub fn prefix(epoch_index: u64) -> String { + format!("dex/lqt/v1/lp/by_volume/{epoch_index:020}/") + } + + pub fn prefix_with_asset(epoch_index: u64, asset: &asset::Id) -> [u8; 77] { + let prefix = prefix(epoch_index); + let mut key = [0u8; 77]; + key[0..45].copy_from_slice(prefix.as_bytes()); + key[45..45 + 32].copy_from_slice(&asset.to_bytes()); + key + } + + /// Tracks the cumulative volume of outbound liquidity for a given pair. + /// The pair is always connected by the staking token, which is the implicit numeraire. + /// + /// # Encoding + /// The full key is encoded as: `prefix || asset || BE(!volume) || position` + pub(crate) fn key( + epoch_index: u64, + asset: &asset::Id, + position: &position::Id, + volume: Amount, + ) -> [u8; 125] { + let prefix_bytes = prefix(epoch_index); + let mut key = [0u8; 125]; + key[0..45].copy_from_slice(prefix_bytes.as_bytes()); + key[45..45 + 32].copy_from_slice(&asset.to_bytes()); + key[45 + 32..45 + 32 + 16].copy_from_slice(&(!volume).to_be_bytes()); + key[45 + 32 + 16..45 + 32 + 16 + 32].copy_from_slice(&position.0); + key + } + + /// Parse a raw key into its constituent parts. + /// + /// # Errors + /// This function will return an error if the key is not 125 bytes. Or, if the + /// key contains an invalid asset or position identifier. + pub(crate) fn parse_key(key: &[u8]) -> Result<(asset::Id, Amount, position::Id)> { + ensure!(key.len() == 125, "key must be 125 bytes"); + + // Skip the first 45 bytes, which is the prefix. + let raw_asset: [u8; 32] = key[45..45 + 32].try_into()?; + let asset: asset::Id = raw_asset.try_into()?; + + let raw_amount: [u8; 16] = key[45 + 32..45 + 32 + 16].try_into()?; + let amount_complement = Amount::from_be_bytes(raw_amount); + let amount = !amount_complement; + + let raw_position_id: [u8; 32] = + key[45 + 32 + 16..45 + 32 + 16 + 32].try_into()?; + let position_id = position::Id(raw_position_id); + + Ok((asset, amount, position_id)) + } + } + } + } +} + pub(crate) mod engine { use super::*; use crate::lp::BareTradingFunction;