Skip to content

Commit

Permalink
dex: pr review improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Apr 12, 2024
1 parent 9d430fd commit 78fbf51
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 81 deletions.
2 changes: 0 additions & 2 deletions crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ pub(crate) use arb::Arbitrage;
pub use circuit_breaker::ExecutionCircuitBreaker;
pub(crate) use circuit_breaker::ValueCircuitBreaker;
pub use dex::{Dex, StateReadExt, StateWriteExt};
// TODO(erwan): exposing a DEX interface to other components
// is useful but maybe we should restrict it to open/queue/close positions
pub use position_manager::{PositionManager, PositionRead};
pub use swap_manager::SwapManager;
#[cfg(test)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use cnidarium::StateWrite;
use penumbra_num::Amount;
use position::State::*;

use crate::lp::position::{self, Position, State};
use crate::lp::position::{self, Position};
use crate::state_key::engine;
use crate::DirectedTradingPair;
use penumbra_proto::{StateReadProto, StateWriteProto};
Expand Down Expand Up @@ -61,34 +61,62 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite {
new_state: &Position,
id: &position::Id,
) -> Result<()> {
match prev_state {
Some(prev_state) => match (prev_state.state, new_state.state) {
// We only want to update the index when we process active positions.
(Opened, Closed) => {}
(Opened, Opened) => {}
_ => return Ok(()),
},
None => {}
}

// We need to reconstruct the position's previous contribution and compute
// its new contribution to the index. We do this for each asset in the pair
// and short-circuit if all contributions are zero.
let canonical_pair = new_state.phi.pair;
let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2());

let (prev_a, prev_b) = prev_state
.as_ref()
.map(|p| {
(
p.reserves_for(pair_ab.start).expect("asset ids match"),
p.reserves_for(pair_ab.end).expect("asset ids match"),
)
})
.unwrap_or_else(|| (Amount::zero(), Amount::zero()));
// We reconstruct the position's *previous* contribution so that we can deduct them later:
let (prev_a, prev_b) = match prev_state {
// The position was just created, so its previous contributions are zero.
None => (Amount::zero(), Amount::zero()),
Some(prev) => match prev.state {
// The position was previously closed or withdrawn, so its previous contributions are zero.
Closed | Withdrawn { sequence: _ } => (Amount::zero(), Amount::zero()),
// The position's previous contributions are the reserves for the start and end assets.
_ => (
prev.reserves_for(pair_ab.start)
.expect("asset ids match for start"),
prev.reserves_for(pair_ab.end)
.expect("asset ids match for end"),
),
},
};

// For each asset, we compute the new position's contribution to the index:
let (new_a, new_b) = if matches!(new_state.state, Closed | Withdrawn { sequence: _ }) {
// The position is being closed or withdrawn, so its new contributions are zero.
// Note a withdrawn position MUST have zero reserves, so hardcoding this is extra.
(Amount::zero(), Amount::zero())
} else {
(
// The new amount of asset A:
new_state
.reserves_for(pair_ab.start)
.expect("asset ids match for start"),
// The new amount of asset B:
new_state
.reserves_for(pair_ab.end)
.expect("asset ids match for end"),
)
};

// If all contributions are zero, we can skip the update.
// This can happen if we're processing inactive transitions like `Closed -> Withdrawn`.
if prev_a == Amount::zero()
&& new_a == Amount::zero()
&& prev_b == Amount::zero()
&& new_b == Amount::zero()
{
return Ok(());
}

// A -> B
self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_state)
self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_a)
.await?;
// B -> A
self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_state)
self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_b)
.await?;

Ok(())
Expand All @@ -103,7 +131,7 @@ trait Inner: StateWrite {
id: &position::Id,
pair: DirectedTradingPair,
old_contrib: Amount,
new_position: &Position,
new_contrib: Amount,
) -> Result<()> {
let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair);

Expand All @@ -112,28 +140,13 @@ trait Inner: StateWrite {
.await?
.unwrap_or_default();

// The previous contribution for this position is supplied to us by
// the caller. This default to zero if the position was just created.
// We use this to compute a view of the tally that excludes the position
// we are currently processing (and avoid double-counting).
let old_contrib = old_contrib;

// The updated contribution is the total amount of base asset routable
// from an adjacent asset.
let new_contrib = new_position
.reserves_for(pair.start)
.expect("asset ids should match");

let new_tally = match new_position.state {
State::Opened => prev_tally
.saturating_sub(&old_contrib)
.saturating_add(&new_contrib),
State::Closed => prev_tally.saturating_sub(&old_contrib),
_ => unreachable!("inner impl is guarded"),
};
// To compute the new aggregate liquidity, we deduct the old contribution
// and add the new contribution. We use saturating arithmetic defensively.
let new_tally = prev_tally
.saturating_sub(&old_contrib)
.saturating_add(&new_contrib);

// If the update operation is a no-op, we can skip the update
// and return early.
// If the update operation is a no-op, we can skip the update and return early.
if prev_tally == new_tally {
tracing::debug!(
?prev_tally,
Expand Down
54 changes: 26 additions & 28 deletions crates/core/component/dex/src/component/position_manager/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ use anyhow::Result;
pub(super) trait PositionCounterRead: StateRead {
/// Returns the number of position for a [`TradingPair`].
/// If there were no counter initialized for a given pair, this default to zero.
async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 {
async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 {
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
self.get_position_count_from_key(path).await
}

async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 {
async fn get_position_count_from_key(&self, path: [u8; 99]) -> u32 {
let Some(raw_count) = self
.nonverifiable_get_raw(&path)
.await
Expand All @@ -26,10 +26,10 @@ pub(super) trait PositionCounterRead: StateRead {
};

// This is safe because we only increment the counter via [`Self::increase_position_counter`].
let raw_count: [u8; 2] = raw_count
let raw_count: [u8; 4] = raw_count
.try_into()
.expect("position counter is at most two bytes");
u16::from_be_bytes(raw_count)
u32::from_be_bytes(raw_count)
}
}

Expand All @@ -45,18 +45,17 @@ pub(crate) trait PositionCounter: StateWrite {
) -> Result<()> {
use position::State::*;
let trading_pair = new_state.phi.pair;

match prev_state {
Some(prev_state) => match (prev_state.state, new_state.state) {
(Opened, Closed) => {
let _ = self.decrement_position_counter(&trading_pair).await?;
}
_ => {}
},

None => {
match (prev_state.as_ref().map(|p| p.state), new_state.state) {
// Increment the counter whenever a new position is opened
(None, Opened) => {
let _ = self.increment_position_counter(&trading_pair).await?;
}
// Decrement the counter whenever an opened position is closed
(Some(Opened), Closed) => {
let _ = self.decrement_position_counter(&trading_pair).await?;
}
// Other state transitions don't affect the opened position counter
_ => {}
}
Ok(())
}
Expand All @@ -66,7 +65,7 @@ impl<T: StateWrite + ?Sized> PositionCounter for T {}
trait Inner: StateWrite {
/// Increment the number of position for a [`TradingPair`].
/// Returns the updated total, or an error if overflow occurred.
async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u32> {
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

Expand All @@ -79,7 +78,7 @@ trait Inner: StateWrite {

/// Decrement the number of positions for a [`TradingPair`], unless it would underflow.
/// Returns the updated total, or an error if underflow occurred.
async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u32> {
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

Expand All @@ -97,12 +96,13 @@ impl<T: StateWrite + ?Sized> Inner for T {}
// Silence the warnings until I find a fix.
#[allow(unused_imports)]
mod tests {
use cnidarium::{StateDelta, TempStorage};
use cnidarium::{StateDelta, StateWrite, TempStorage};
use penumbra_asset::{asset::REGISTRY, Value};

use crate::component::position_manager::counter::{
Inner, PositionCounter, PositionCounterRead,
};
use crate::state_key::engine;
use crate::TradingPair;

#[tokio::test]
Expand All @@ -114,22 +114,20 @@ mod tests {

let storage = TempStorage::new().await?;
let mut delta = StateDelta::new(storage.latest_snapshot());
let path = engine::counter::num_positions::by_trading_pair(&trading_pair);
// Manually set the counter to the maximum value
delta.nonverifiable_put_raw(path.to_vec(), u32::MAX.to_be_bytes().to_vec());

for i in 0..u16::MAX {
let total = delta.increment_position_counter(&trading_pair).await?;

anyhow::ensure!(
total == i + 1,
"the total amount should be total={}, found={total}",
i + 1
);
}
// Check that the counter is at the maximum value
let total = delta.get_position_count(&trading_pair).await;
assert_eq!(total, u32::MAX);

// Check that we can handle an overflow
assert!(delta
.increment_position_counter(&trading_pair)
.await
.is_err());
assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX);
assert_eq!(delta.get_position_count(&trading_pair).await, u32::MAX);

Ok(())
}
Expand All @@ -148,7 +146,7 @@ mod tests {
assert!(maybe_total.is_err());

let counter = delta.get_position_count(&trading_pair).await;
assert_eq!(counter, 0u16);
assert_eq!(counter, 0u32);
Ok(())
}
}
14 changes: 7 additions & 7 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ pub(crate) mod engine {

use super::*;

/// A prefix key that takes a start asset `A` (aka. base asset) and surface adjacent
/// assets `B` (aka. quote asset), in ascending order of the base liquidity available.
///
// An ordered encoding of every asset `B` routable from `A` based on the
// aggregate liquidity available to route from `B` to `A` (aka. the base liquidity).
//
/// # Encoding
/// The prefix key is encoded as `domain || A`.
pub(crate) fn starting_from(from: &asset::Id) -> [u8; 39] {
Expand All @@ -121,9 +121,8 @@ pub(crate) mod engine {
key
}

/// An entry in the routable asset index that implements the mapping between
/// a base asset `A` and a quote asset `B`, based on the aggregate liquidity
/// available to route from `B` to `A` (aka. the base liquidity).
/// A record that an asset `A` is routable to an asset `B` and contains the
/// aggregate liquidity available to route from `B` to `A` (aka. the base liquidity).
///
/// # Encoding
/// The full key is encoded as: `prefix || BE(aggregate_base_liquidity)`
Expand All @@ -136,7 +135,8 @@ pub(crate) mod engine {
}

/// A lookup index used to reconstruct and update the primary index entries.
/// It maps a directed trading pair `A -> B` to the current base liquidity available.
/// It maps a directed trading pair `A -> B` to the aggregate liquidity available
/// to route from `B` to `A` (aka. the base asset liquidity).
///
/// # Encoding
/// The lookup key is encoded as `prefix_lookup || start_asset|| end_asset`.
Expand Down

0 comments on commit 78fbf51

Please sign in to comment.