Skip to content

Commit

Permalink
dex: track total number of positions per pair (#4167)
Browse files Browse the repository at this point in the history
## Describe your changes

Working towards achieving #4077.

This PR adds a `PositionCounter` extension trait which implements a
`TradingPair` scoped position counter. I am not totally sold on a `u16`
for the counter type, yet, but it's useful for testing anyway.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

> This is consensus-breaking because it introduces a new implicit
validation rule: a hard-limit on the number of position opened for each
trading pair (via counter overflow). It could also be consensus breaking
if there was a bug in the DEX that allowed closed positions to be
updated, but my assumption is that it's not the case.
  • Loading branch information
erwanor authored Apr 8, 2024
1 parent 4fce953 commit 65b8498
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
impl ActionHandler for PositionOpen {
type CheckStatelessContext = ();
async fn check_stateless(&self, _context: ()) -> Result<()> {
// TODO(chris, erwan, henry): brainstorm safety on `TradingFunction`.
// Check:
// + reserves are at most 80 bits wide,
// + the trading function coefficients are at most 80 bits wide.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
let position = buy_1;
state_tx.index_position_by_price(&position, &position.id());
state_tx
.update_available_liquidity(&position, &None)
.update_available_liquidity(&None, &position)
.await
.expect("able to update liquidity");
state_tx.put(state_key::position_by_id(&id), position);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod arb;
pub(crate) mod circuit_breaker;
mod dex;
mod flow;
pub(crate) mod position_counter;
pub(crate) mod position_manager;
mod swap_manager;

Expand All @@ -21,6 +22,5 @@ pub(crate) use circuit_breaker::ValueCircuitBreaker;
pub use dex::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::{PositionManager, PositionRead};
pub use swap_manager::SwapManager;

#[cfg(test)]
pub(crate) mod tests;
118 changes: 118 additions & 0 deletions crates/core/component/dex/src/component/position_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use anyhow::bail;
use async_trait::async_trait;
use cnidarium::StateWrite;

use crate::state_key;
use crate::TradingPair;
use anyhow::Result;

#[async_trait]
pub(crate) trait PositionCounter: StateWrite {
/// Returns the number of position for a [`TradingPair`].
/// If there were no counter initialized for a given pair, this default to zero.
async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
self.get_position_count_from_key(path).await
}

async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 {
let Some(raw_count) = self
.nonverifiable_get_raw(&path)
.await
.expect("no deserialization failure")
else {
return 0;
};

// This is safe because we only increment the counter via a [`Self::increase_position_counter`].
let raw_count: [u8; 2] = raw_count
.try_into()
.expect("position counter is at most two bytes");
u16::from_be_bytes(raw_count)
}

/// Increment the number of position for a [`TradingPair`].
/// Returns the updated total, or an error if overflow occurred.
async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_add(1) else {
bail!("incrementing position counter would overflow")
};
self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec());
Ok(new_total)
}

/// Decrement the number of positions for a [`TradingPair`], unless it would underflow.
/// Returns the updated total, or an error if underflow occurred.
async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_sub(1) else {
bail!("decrementing position counter would underflow")
};
self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec());
Ok(new_total)
}
}
impl<T: StateWrite + ?Sized> PositionCounter for T {}

// For some reason, `rust-analyzer` is complaining about used imports.
// Silence the warnings until I find a fix.
#[allow(unused_imports)]
mod tests {
use cnidarium::{StateDelta, TempStorage};
use penumbra_asset::{asset::REGISTRY, Value};

use crate::component::position_counter::PositionCounter;
use crate::TradingPair;

#[tokio::test]
/// Test that we can detect overflows and that they are handled properly: increment is ignored / no crash.
async fn test_no_overflow() -> anyhow::Result<()> {
let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id();
let asset_b = REGISTRY.parse_denom("pizza").unwrap().id();
let trading_pair = TradingPair::new(asset_a, asset_b);

let storage = TempStorage::new().await?;
let mut delta = StateDelta::new(storage.latest_snapshot());

for i in 0..u16::MAX {
let total = delta.increment_position_counter(&trading_pair).await?;

anyhow::ensure!(
total == i + 1,
"the total amount should be total={}, found={total}",
i + 1
);
}

assert!(delta
.increment_position_counter(&trading_pair)
.await
.is_err());
assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX);

Ok(())
}

#[tokio::test]
/// Test that we can detect underflow and that they are handled properly: decrement is ignored / no crash.
async fn test_no_underflow() -> anyhow::Result<()> {
let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id();
let asset_b = REGISTRY.parse_denom("pizza").unwrap().id();
let trading_pair = TradingPair::new(asset_a, asset_b);

let storage = TempStorage::new().await?;
let mut delta = StateDelta::new(storage.latest_snapshot());

let maybe_total = delta.decrement_position_counter(&trading_pair).await;
assert!(maybe_total.is_err());

let counter = delta.get_position_count(&trading_pair).await;
assert_eq!(counter, 0u16);
Ok(())
}
}
46 changes: 43 additions & 3 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::event;
use crate::lp::position::State;
use crate::lp::Reserves;
use crate::{
component::position_counter::PositionCounter,
component::ValueCircuitBreaker,
lp::position::{self, Position},
state_key, DirectedTradingPair,
Expand Down Expand Up @@ -156,6 +157,17 @@ pub trait PositionManager: StateWrite + PositionRead {
prev_state.state
);

// Optimization: skip state update if the position is already closed.
// This can happen if the position was queued for closure and premptively
// closed by the DEX engine during execution (e.g. auto-closing).
if prev_state.state == position::State::Closed {
tracing::debug!(
?id,
"position is already closed so we can skip state updates"
);
return Ok(());
}

let new_state = {
let mut new_state = prev_state.clone();
new_state.state = position::State::Closed;
Expand Down Expand Up @@ -186,6 +198,17 @@ pub trait PositionManager: StateWrite + PositionRead {

/// Opens a new position, updating all necessary indexes and checking for
/// its nonexistence prior to being opened.
///
/// # Errors
/// This method returns an error if the position is malformed
/// e.g. it is set to a state other than `Opened`
/// or, it specifies a position identifier already used by another position.
///
/// An error can also occur if a DEX engine invariant is breached
/// e.g. overflowing the position counter (`u16::MAX`)
/// or, overflowing the value circuit breaker (`u128::MAX`)
///
/// In any of those cases, we do not want to allow a new position to be opened.
#[tracing::instrument(level = "debug", skip_all)]
async fn open_position(&mut self, position: position::Position) -> Result<()> {
// Double-check that the position is in the `Opened` state
Expand All @@ -202,6 +225,9 @@ pub trait PositionManager: StateWrite + PositionRead {
);
}

// Increase the position counter
self.increment_position_counter(&position.phi.pair).await?;

// Credit the DEX for the inflows from this position.
self.vcb_credit(position.reserves_1()).await?;
self.vcb_credit(position.reserves_2()).await?;
Expand Down Expand Up @@ -364,6 +390,8 @@ pub(crate) trait Inner: StateWrite {
prev_state: Option<Position>,
new_state: Position,
) -> Result<()> {
use position::State::*;

tracing::debug!(?prev_state, ?new_state, "updating position state");

let id = new_state.id();
Expand All @@ -375,13 +403,25 @@ pub(crate) trait Inner: StateWrite {
}

// Only index the position's liquidity if it is active.
if new_state.state == position::State::Opened {
if new_state.state == Opened {
self.index_position_by_price(&new_state, &id);
}

if new_state.state == Closed {
// Make sure that we don't double decrement the position
// counter if a position was queued for closure AND closed
// by the DEX engine.
let is_already_closed = prev_state
.as_ref()
.map_or(false, |old_position| old_position.state == Closed);
if !is_already_closed {
self.decrement_position_counter(&new_state.phi.pair).await?;
}
}

// Update the available liquidity for this position's trading pair.
// TODO: refactor and streamline this method while implementing eviction.
self.update_available_liquidity(&new_state, &prev_state)
self.update_available_liquidity(&prev_state, &new_state)
.await?;

self.put(state_key::position_by_id(&id), new_state);
Expand Down Expand Up @@ -580,8 +620,8 @@ pub(crate) trait Inner: StateWrite {

async fn update_available_liquidity(
&mut self,
position: &Position,
prev_position: &Option<Position>,
position: &Position,
) -> Result<()> {
// Since swaps may be performed in either direction, the available liquidity indices
// need to be calculated and stored for both the A -> B and B -> A directions.
Expand Down
19 changes: 19 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ pub(crate) mod internal {
use super::*;
use crate::lp::BareTradingFunction;

pub mod counter {
pub mod num_positions {
use crate::TradingPair;

pub fn prefix() -> &'static str {
"dex/internal/counter/num_positions/"
}

pub fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] {
let mut key = [0u8; 99];
let prefix_bytes = prefix().as_bytes();
let canonical_pair_bytes = trading_pair.to_bytes();

key[0..35].copy_from_slice(prefix_bytes);
key[35..99].copy_from_slice(&canonical_pair_bytes);
key
}
}
}
/// Find assets with liquidity positions from asset `from`, ordered by price.
pub mod routable_assets {
use penumbra_asset::asset;
Expand Down

0 comments on commit 65b8498

Please sign in to comment.