From 65b84989030197894f688d2fb78c2a553fdf1310 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Mon, 8 Apr 2024 11:17:02 -0400 Subject: [PATCH] dex: track total number of positions per pair (#4167) ## Describe your changes Working towards achieving #4077. This PR adds a `PositionCounter` extension trait which implements a `TradingPair` scoped position counter. I am not totally sold on a `u16` for the counter type, yet, but it's useful for testing anyway. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > This is consensus-breaking because it introduces a new implicit validation rule: a hard-limit on the number of position opened for each trading pair (via counter overflow). It could also be consensus breaking if there was a bug in the DEX that allowed closed positions to be updated, but my assumption is that it's not the case. --- .../component/action_handler/position/open.rs | 1 - .../src/component/circuit_breaker/value.rs | 2 +- .../core/component/dex/src/component/mod.rs | 2 +- .../dex/src/component/position_counter.rs | 118 ++++++++++++++++++ .../dex/src/component/position_manager.rs | 46 ++++++- crates/core/component/dex/src/state_key.rs | 19 +++ 6 files changed, 182 insertions(+), 6 deletions(-) create mode 100644 crates/core/component/dex/src/component/position_counter.rs diff --git a/crates/core/component/dex/src/component/action_handler/position/open.rs b/crates/core/component/dex/src/component/action_handler/position/open.rs index cdc9dd2995..4f2445292b 100644 --- a/crates/core/component/dex/src/component/action_handler/position/open.rs +++ b/crates/core/component/dex/src/component/action_handler/position/open.rs @@ -13,7 +13,6 @@ use crate::{ impl ActionHandler for PositionOpen { type CheckStatelessContext = (); async fn check_stateless(&self, _context: ()) -> Result<()> { - // TODO(chris, erwan, henry): brainstorm safety on `TradingFunction`. // Check: // + reserves are at most 80 bits wide, // + the trading function coefficients are at most 80 bits wide. diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 8f4b6396b2..1824eda1a6 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -227,7 +227,7 @@ mod tests { let position = buy_1; state_tx.index_position_by_price(&position, &position.id()); state_tx - .update_available_liquidity(&position, &None) + .update_available_liquidity(&None, &position) .await .expect("able to update liquidity"); state_tx.put(state_key::position_by_id(&id), position); diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 871bd67748..8a6459ee47 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,6 +11,7 @@ mod arb; pub(crate) mod circuit_breaker; mod dex; mod flow; +pub(crate) mod position_counter; pub(crate) mod position_manager; mod swap_manager; @@ -21,6 +22,5 @@ pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; - #[cfg(test)] pub(crate) mod tests; diff --git a/crates/core/component/dex/src/component/position_counter.rs b/crates/core/component/dex/src/component/position_counter.rs new file mode 100644 index 0000000000..ed4e755e46 --- /dev/null +++ b/crates/core/component/dex/src/component/position_counter.rs @@ -0,0 +1,118 @@ +use anyhow::bail; +use async_trait::async_trait; +use cnidarium::StateWrite; + +use crate::state_key; +use crate::TradingPair; +use anyhow::Result; + +#[async_trait] +pub(crate) trait PositionCounter: StateWrite { + /// Returns the number of position for a [`TradingPair`]. + /// If there were no counter initialized for a given pair, this default to zero. + async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + self.get_position_count_from_key(path).await + } + + async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 { + let Some(raw_count) = self + .nonverifiable_get_raw(&path) + .await + .expect("no deserialization failure") + else { + return 0; + }; + + // This is safe because we only increment the counter via a [`Self::increase_position_counter`]. + let raw_count: [u8; 2] = raw_count + .try_into() + .expect("position counter is at most two bytes"); + u16::from_be_bytes(raw_count) + } + + /// Increment the number of position for a [`TradingPair`]. + /// Returns the updated total, or an error if overflow occurred. + async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + let prev = self.get_position_count_from_key(path).await; + + let Some(new_total) = prev.checked_add(1) else { + bail!("incrementing position counter would overflow") + }; + self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec()); + Ok(new_total) + } + + /// Decrement the number of positions for a [`TradingPair`], unless it would underflow. + /// Returns the updated total, or an error if underflow occurred. + async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + let prev = self.get_position_count_from_key(path).await; + + let Some(new_total) = prev.checked_sub(1) else { + bail!("decrementing position counter would underflow") + }; + self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec()); + Ok(new_total) + } +} +impl PositionCounter for T {} + +// For some reason, `rust-analyzer` is complaining about used imports. +// Silence the warnings until I find a fix. +#[allow(unused_imports)] +mod tests { + use cnidarium::{StateDelta, TempStorage}; + use penumbra_asset::{asset::REGISTRY, Value}; + + use crate::component::position_counter::PositionCounter; + use crate::TradingPair; + + #[tokio::test] + /// Test that we can detect overflows and that they are handled properly: increment is ignored / no crash. + async fn test_no_overflow() -> anyhow::Result<()> { + let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id(); + let asset_b = REGISTRY.parse_denom("pizza").unwrap().id(); + let trading_pair = TradingPair::new(asset_a, asset_b); + + let storage = TempStorage::new().await?; + let mut delta = StateDelta::new(storage.latest_snapshot()); + + for i in 0..u16::MAX { + let total = delta.increment_position_counter(&trading_pair).await?; + + anyhow::ensure!( + total == i + 1, + "the total amount should be total={}, found={total}", + i + 1 + ); + } + + assert!(delta + .increment_position_counter(&trading_pair) + .await + .is_err()); + assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX); + + Ok(()) + } + + #[tokio::test] + /// Test that we can detect underflow and that they are handled properly: decrement is ignored / no crash. + async fn test_no_underflow() -> anyhow::Result<()> { + let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id(); + let asset_b = REGISTRY.parse_denom("pizza").unwrap().id(); + let trading_pair = TradingPair::new(asset_a, asset_b); + + let storage = TempStorage::new().await?; + let mut delta = StateDelta::new(storage.latest_snapshot()); + + let maybe_total = delta.decrement_position_counter(&trading_pair).await; + assert!(maybe_total.is_err()); + + let counter = delta.get_position_count(&trading_pair).await; + assert_eq!(counter, 0u16); + Ok(()) + } +} diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 1b27fb573f..c22947c376 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -16,6 +16,7 @@ use crate::event; use crate::lp::position::State; use crate::lp::Reserves; use crate::{ + component::position_counter::PositionCounter, component::ValueCircuitBreaker, lp::position::{self, Position}, state_key, DirectedTradingPair, @@ -156,6 +157,17 @@ pub trait PositionManager: StateWrite + PositionRead { prev_state.state ); + // Optimization: skip state update if the position is already closed. + // This can happen if the position was queued for closure and premptively + // closed by the DEX engine during execution (e.g. auto-closing). + if prev_state.state == position::State::Closed { + tracing::debug!( + ?id, + "position is already closed so we can skip state updates" + ); + return Ok(()); + } + let new_state = { let mut new_state = prev_state.clone(); new_state.state = position::State::Closed; @@ -186,6 +198,17 @@ pub trait PositionManager: StateWrite + PositionRead { /// Opens a new position, updating all necessary indexes and checking for /// its nonexistence prior to being opened. + /// + /// # Errors + /// This method returns an error if the position is malformed + /// e.g. it is set to a state other than `Opened` + /// or, it specifies a position identifier already used by another position. + /// + /// An error can also occur if a DEX engine invariant is breached + /// e.g. overflowing the position counter (`u16::MAX`) + /// or, overflowing the value circuit breaker (`u128::MAX`) + /// + /// In any of those cases, we do not want to allow a new position to be opened. #[tracing::instrument(level = "debug", skip_all)] async fn open_position(&mut self, position: position::Position) -> Result<()> { // Double-check that the position is in the `Opened` state @@ -202,6 +225,9 @@ pub trait PositionManager: StateWrite + PositionRead { ); } + // Increase the position counter + self.increment_position_counter(&position.phi.pair).await?; + // Credit the DEX for the inflows from this position. self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; @@ -364,6 +390,8 @@ pub(crate) trait Inner: StateWrite { prev_state: Option, new_state: Position, ) -> Result<()> { + use position::State::*; + tracing::debug!(?prev_state, ?new_state, "updating position state"); let id = new_state.id(); @@ -375,13 +403,25 @@ pub(crate) trait Inner: StateWrite { } // Only index the position's liquidity if it is active. - if new_state.state == position::State::Opened { + if new_state.state == Opened { self.index_position_by_price(&new_state, &id); } + if new_state.state == Closed { + // Make sure that we don't double decrement the position + // counter if a position was queued for closure AND closed + // by the DEX engine. + let is_already_closed = prev_state + .as_ref() + .map_or(false, |old_position| old_position.state == Closed); + if !is_already_closed { + self.decrement_position_counter(&new_state.phi.pair).await?; + } + } + // Update the available liquidity for this position's trading pair. // TODO: refactor and streamline this method while implementing eviction. - self.update_available_liquidity(&new_state, &prev_state) + self.update_available_liquidity(&prev_state, &new_state) .await?; self.put(state_key::position_by_id(&id), new_state); @@ -580,8 +620,8 @@ pub(crate) trait Inner: StateWrite { async fn update_available_liquidity( &mut self, - position: &Position, prev_position: &Option, + position: &Position, ) -> Result<()> { // Since swaps may be performed in either direction, the available liquidity indices // need to be calculated and stored for both the A -> B and B -> A directions. diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index dad76e571e..05e3e85ee3 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -84,6 +84,25 @@ pub(crate) mod internal { use super::*; use crate::lp::BareTradingFunction; + pub mod counter { + pub mod num_positions { + use crate::TradingPair; + + pub fn prefix() -> &'static str { + "dex/internal/counter/num_positions/" + } + + pub fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] { + let mut key = [0u8; 99]; + let prefix_bytes = prefix().as_bytes(); + let canonical_pair_bytes = trading_pair.to_bytes(); + + key[0..35].copy_from_slice(prefix_bytes); + key[35..99].copy_from_slice(&canonical_pair_bytes); + key + } + } + } /// Find assets with liquidity positions from asset `from`, ordered by price. pub mod routable_assets { use penumbra_asset::asset;