From 78fbf5141c6c88ce9d0157f604a11a683dc2610f Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 12 Apr 2024 19:59:31 -0400 Subject: [PATCH] dex: pr review improvements --- .../core/component/dex/src/component/mod.rs | 2 - .../position_manager/base_liquidity_index.rs | 101 ++++++++++-------- .../src/component/position_manager/counter.rs | 54 +++++----- crates/core/component/dex/src/state_key.rs | 14 +-- 4 files changed, 90 insertions(+), 81 deletions(-) diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index a1baa22118..c6a4101ff3 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -19,8 +19,6 @@ pub(crate) use arb::Arbitrage; pub use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; -// TODO(erwan): exposing a DEX interface to other components -// is useful but maybe we should restrict it to open/queue/close positions pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; #[cfg(test)] diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index a038270486..ac55ec97fd 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -3,7 +3,7 @@ use cnidarium::StateWrite; use penumbra_num::Amount; use position::State::*; -use crate::lp::position::{self, Position, State}; +use crate::lp::position::{self, Position}; use crate::state_key::engine; use crate::DirectedTradingPair; use penumbra_proto::{StateReadProto, StateWriteProto}; @@ -61,34 +61,62 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite { new_state: &Position, id: &position::Id, ) -> Result<()> { - match prev_state { - Some(prev_state) => match (prev_state.state, new_state.state) { - // We only want to update the index when we process active positions. - (Opened, Closed) => {} - (Opened, Opened) => {} - _ => return Ok(()), - }, - None => {} - } - + // We need to reconstruct the position's previous contribution and compute + // its new contribution to the index. We do this for each asset in the pair + // and short-circuit if all contributions are zero. let canonical_pair = new_state.phi.pair; let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let (prev_a, prev_b) = prev_state - .as_ref() - .map(|p| { - ( - p.reserves_for(pair_ab.start).expect("asset ids match"), - p.reserves_for(pair_ab.end).expect("asset ids match"), - ) - }) - .unwrap_or_else(|| (Amount::zero(), Amount::zero())); + // We reconstruct the position's *previous* contribution so that we can deduct them later: + let (prev_a, prev_b) = match prev_state { + // The position was just created, so its previous contributions are zero. + None => (Amount::zero(), Amount::zero()), + Some(prev) => match prev.state { + // The position was previously closed or withdrawn, so its previous contributions are zero. + Closed | Withdrawn { sequence: _ } => (Amount::zero(), Amount::zero()), + // The position's previous contributions are the reserves for the start and end assets. + _ => ( + prev.reserves_for(pair_ab.start) + .expect("asset ids match for start"), + prev.reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ), + }, + }; + + // For each asset, we compute the new position's contribution to the index: + let (new_a, new_b) = if matches!(new_state.state, Closed | Withdrawn { sequence: _ }) { + // The position is being closed or withdrawn, so its new contributions are zero. + // Note a withdrawn position MUST have zero reserves, so hardcoding this is extra. + (Amount::zero(), Amount::zero()) + } else { + ( + // The new amount of asset A: + new_state + .reserves_for(pair_ab.start) + .expect("asset ids match for start"), + // The new amount of asset B: + new_state + .reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ) + }; + + // If all contributions are zero, we can skip the update. + // This can happen if we're processing inactive transitions like `Closed -> Withdrawn`. + if prev_a == Amount::zero() + && new_a == Amount::zero() + && prev_b == Amount::zero() + && new_b == Amount::zero() + { + return Ok(()); + } // A -> B - self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_state) + self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_a) .await?; // B -> A - self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_state) + self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_b) .await?; Ok(()) @@ -103,7 +131,7 @@ trait Inner: StateWrite { id: &position::Id, pair: DirectedTradingPair, old_contrib: Amount, - new_position: &Position, + new_contrib: Amount, ) -> Result<()> { let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair); @@ -112,28 +140,13 @@ trait Inner: StateWrite { .await? .unwrap_or_default(); - // The previous contribution for this position is supplied to us by - // the caller. This default to zero if the position was just created. - // We use this to compute a view of the tally that excludes the position - // we are currently processing (and avoid double-counting). - let old_contrib = old_contrib; - - // The updated contribution is the total amount of base asset routable - // from an adjacent asset. - let new_contrib = new_position - .reserves_for(pair.start) - .expect("asset ids should match"); - - let new_tally = match new_position.state { - State::Opened => prev_tally - .saturating_sub(&old_contrib) - .saturating_add(&new_contrib), - State::Closed => prev_tally.saturating_sub(&old_contrib), - _ => unreachable!("inner impl is guarded"), - }; + // To compute the new aggregate liquidity, we deduct the old contribution + // and add the new contribution. We use saturating arithmetic defensively. + let new_tally = prev_tally + .saturating_sub(&old_contrib) + .saturating_add(&new_contrib); - // If the update operation is a no-op, we can skip the update - // and return early. + // If the update operation is a no-op, we can skip the update and return early. if prev_tally == new_tally { tracing::debug!( ?prev_tally, diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 6112adb3b6..5185fe1f2d 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -11,12 +11,12 @@ use anyhow::Result; pub(super) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. - async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { + async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 { let path = engine::counter::num_positions::by_trading_pair(trading_pair); self.get_position_count_from_key(path).await } - async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 { + async fn get_position_count_from_key(&self, path: [u8; 99]) -> u32 { let Some(raw_count) = self .nonverifiable_get_raw(&path) .await @@ -26,10 +26,10 @@ pub(super) trait PositionCounterRead: StateRead { }; // This is safe because we only increment the counter via [`Self::increase_position_counter`]. - let raw_count: [u8; 2] = raw_count + let raw_count: [u8; 4] = raw_count .try_into() .expect("position counter is at most two bytes"); - u16::from_be_bytes(raw_count) + u32::from_be_bytes(raw_count) } } @@ -45,18 +45,17 @@ pub(crate) trait PositionCounter: StateWrite { ) -> Result<()> { use position::State::*; let trading_pair = new_state.phi.pair; - - match prev_state { - Some(prev_state) => match (prev_state.state, new_state.state) { - (Opened, Closed) => { - let _ = self.decrement_position_counter(&trading_pair).await?; - } - _ => {} - }, - - None => { + match (prev_state.as_ref().map(|p| p.state), new_state.state) { + // Increment the counter whenever a new position is opened + (None, Opened) => { let _ = self.increment_position_counter(&trading_pair).await?; } + // Decrement the counter whenever an opened position is closed + (Some(Opened), Closed) => { + let _ = self.decrement_position_counter(&trading_pair).await?; + } + // Other state transitions don't affect the opened position counter + _ => {} } Ok(()) } @@ -66,7 +65,7 @@ impl PositionCounter for T {} trait Inner: StateWrite { /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. - async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -79,7 +78,7 @@ trait Inner: StateWrite { /// Decrement the number of positions for a [`TradingPair`], unless it would underflow. /// Returns the updated total, or an error if underflow occurred. - async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -97,12 +96,13 @@ impl Inner for T {} // Silence the warnings until I find a fix. #[allow(unused_imports)] mod tests { - use cnidarium::{StateDelta, TempStorage}; + use cnidarium::{StateDelta, StateWrite, TempStorage}; use penumbra_asset::{asset::REGISTRY, Value}; use crate::component::position_manager::counter::{ Inner, PositionCounter, PositionCounterRead, }; + use crate::state_key::engine; use crate::TradingPair; #[tokio::test] @@ -114,22 +114,20 @@ mod tests { let storage = TempStorage::new().await?; let mut delta = StateDelta::new(storage.latest_snapshot()); + let path = engine::counter::num_positions::by_trading_pair(&trading_pair); + // Manually set the counter to the maximum value + delta.nonverifiable_put_raw(path.to_vec(), u32::MAX.to_be_bytes().to_vec()); - for i in 0..u16::MAX { - let total = delta.increment_position_counter(&trading_pair).await?; - - anyhow::ensure!( - total == i + 1, - "the total amount should be total={}, found={total}", - i + 1 - ); - } + // Check that the counter is at the maximum value + let total = delta.get_position_count(&trading_pair).await; + assert_eq!(total, u32::MAX); + // Check that we can handle an overflow assert!(delta .increment_position_counter(&trading_pair) .await .is_err()); - assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX); + assert_eq!(delta.get_position_count(&trading_pair).await, u32::MAX); Ok(()) } @@ -148,7 +146,7 @@ mod tests { assert!(maybe_total.is_err()); let counter = delta.get_position_count(&trading_pair).await; - assert_eq!(counter, 0u16); + assert_eq!(counter, 0u32); Ok(()) } } diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 7cf1aac337..6b302fd46e 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -109,9 +109,9 @@ pub(crate) mod engine { use super::*; - /// A prefix key that takes a start asset `A` (aka. base asset) and surface adjacent - /// assets `B` (aka. quote asset), in ascending order of the base liquidity available. - /// + // An ordered encoding of every asset `B` routable from `A` based on the + // aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). + // /// # Encoding /// The prefix key is encoded as `domain || A`. pub(crate) fn starting_from(from: &asset::Id) -> [u8; 39] { @@ -121,9 +121,8 @@ pub(crate) mod engine { key } - /// An entry in the routable asset index that implements the mapping between - /// a base asset `A` and a quote asset `B`, based on the aggregate liquidity - /// available to route from `B` to `A` (aka. the base liquidity). + /// A record that an asset `A` is routable to an asset `B` and contains the + /// aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). /// /// # Encoding /// The full key is encoded as: `prefix || BE(aggregate_base_liquidity)` @@ -136,7 +135,8 @@ pub(crate) mod engine { } /// A lookup index used to reconstruct and update the primary index entries. - /// It maps a directed trading pair `A -> B` to the current base liquidity available. + /// It maps a directed trading pair `A -> B` to the aggregate liquidity available + /// to route from `B` to `A` (aka. the base asset liquidity). /// /// # Encoding /// The lookup key is encoded as `prefix_lookup || start_asset|| end_asset`.