diff --git a/crates/core/component/compact-block/src/component/manager.rs b/crates/core/component/compact-block/src/component/manager.rs index 8824a0c16a..2d38177ebe 100644 --- a/crates/core/component/compact-block/src/component/manager.rs +++ b/crates/core/component/compact-block/src/component/manager.rs @@ -2,7 +2,7 @@ use anyhow::{Context, Result}; use async_trait::async_trait; use cnidarium::StateWrite; #[cfg(feature = "component")] -use penumbra_dex::component::{StateReadExt, SwapManager as _}; +use penumbra_dex::component::SwapDataRead; use penumbra_fee::component::StateReadExt as _; use penumbra_governance::StateReadExt as _; use penumbra_proto::DomainType; diff --git a/crates/core/component/dex/src/component/action_handler/swap.rs b/crates/core/component/dex/src/component/action_handler/swap.rs index 8eb5e7a388..63fd35ca7c 100644 --- a/crates/core/component/dex/src/component/action_handler/swap.rs +++ b/crates/core/component/dex/src/component/action_handler/swap.rs @@ -9,7 +9,7 @@ use penumbra_proto::StateWriteProto; use penumbra_sct::component::source::SourceContext; use crate::{ - component::{position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager}, + component::{InternalDexWrite, StateReadExt, SwapDataRead, SwapDataWrite, SwapManager}, event, swap::{proof::SwapProofPublic, Swap}, }; diff --git a/crates/core/component/dex/src/component/arb.rs b/crates/core/component/dex/src/component/arb.rs index c67928f5a5..0aa06a26aa 100644 --- a/crates/core/component/dex/src/component/arb.rs +++ b/crates/core/component/dex/src/component/arb.rs @@ -10,14 +10,11 @@ use penumbra_sct::component::clock::EpochRead; use tracing::instrument; use crate::{ - component::{ExecutionCircuitBreaker, ValueCircuitBreaker}, + component::{ExecutionCircuitBreaker, InternalDexWrite, ValueCircuitBreaker}, event, SwapExecution, }; -use super::{ - router::{RouteAndFill, RoutingParams}, - StateWriteExt, -}; +use super::router::{RouteAndFill, RoutingParams}; #[async_trait] pub trait Arbitrage: StateWrite + Sized { diff --git a/crates/core/component/dex/src/component/chandelier.rs b/crates/core/component/dex/src/component/chandelier.rs index 966665c349..f8a819c650 100644 --- a/crates/core/component/dex/src/component/chandelier.rs +++ b/crates/core/component/dex/src/component/chandelier.rs @@ -271,7 +271,7 @@ mod tests { use crate::{ component::{ router::create_buy, tests::TempStorageExt as _, Dex, PositionManager as _, - StateReadExt as _, StateWriteExt as _, + SwapDataRead, SwapDataWrite, }, DirectedUnitPair, }; diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 46db88523e..5b75517ab0 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -64,7 +64,7 @@ mod tests { use crate::component::position_manager::price_index::PositionByPriceIndex; use crate::component::router::HandleBatchSwaps as _; - use crate::component::{StateReadExt as _, StateWriteExt as _}; + use crate::component::{InternalDexWrite, StateReadExt as _, SwapDataRead, SwapDataWrite}; use crate::lp::plan::PositionWithdrawPlan; use crate::{ component::{router::create_buy, tests::TempStorageExt}, diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 51cf8df30f..3f3f544b2f 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -1,19 +1,23 @@ -use std::{collections::BTreeMap, sync::Arc}; +use std::collections::BTreeSet; +use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; use cnidarium_component::Component; +use penumbra_asset::asset; use penumbra_asset::{Value, STAKING_TOKEN_ASSET_ID}; use penumbra_proto::{StateReadProto, StateWriteProto}; use tendermint::v0_37::abci; use tracing::instrument; +use crate::state_key::block_scoped; use crate::{ - component::flow::SwapFlow, event, genesis, state_key, BatchSwapOutputData, DexParameters, - DirectedTradingPair, SwapExecution, TradingPair, + component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key, + BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair, }; +use super::eviction_manager::EvictionManager; use super::{ chandelier::Chandelier, router::{HandleBatchSwaps, RoutingParams}, @@ -29,9 +33,7 @@ impl Component for Dex { #[instrument(name = "dex", skip(state, app_state))] async fn init_chain(mut state: S, app_state: Option<&Self::AppState>) { match app_state { - None => { - // Checkpoint -- no-op - } + None => { /* no-op */ } Some(app_state) => { state.put_dex_params(app_state.dex_params.clone()); } @@ -54,7 +56,6 @@ impl Component for Dex { // This has already happened in the action handlers for each `PositionOpen` action. // 2. For each batch swap during the block, calculate clearing prices and set in the JMT. - let routing_params = state.routing_params().await.expect("dex params are set"); for (trading_pair, swap_flows) in state.swap_flows() { @@ -113,7 +114,15 @@ impl Component for Dex { Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"), } - // 4. Close all positions queued for closure at the end of the block. + // 4. Inspect trading pairs that saw new position opened during this block, and + // evict their excess LPs if any are found. + let _ = Arc::get_mut(state) + .expect("state should be uniquely referenced after batch swaps complete") + .evict_positions() + .await + .map_err(|e| tracing::error!(?e, "error evicting positions, skipping")); + + // 5. Close all positions queued for closure at the end of the block. // It's important to do this after execution, to allow block-scoped JIT liquidity. Arc::get_mut(state) .expect("state should be uniquely referenced after batch swaps complete") @@ -135,9 +144,21 @@ impl Component for Dex { } } -/// Extension trait providing read access to dex data. +/// Provides public read access to DEX data. #[async_trait] pub trait StateReadExt: StateRead { + /// Gets the DEX parameters from the state. + async fn get_dex_params(&self) -> Result { + self.get(state_key::config::dex_params()) + .await? + .ok_or_else(|| anyhow::anyhow!("Missing DexParameters")) + } + + /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation. + async fn routing_params(&self) -> Result { + self.get_dex_params().await.map(RoutingParams::from) + } + async fn output_data( &self, height: u64, @@ -160,47 +181,67 @@ pub trait StateReadExt: StateRead { self.get(&state_key::arb_execution(height)).await } - /// Get the swap flow for the given trading pair accumulated in this block so far. - fn swap_flow(&self, pair: &TradingPair) -> SwapFlow { - self.swap_flows().get(pair).cloned().unwrap_or_default() - } - - fn swap_flows(&self) -> BTreeMap { - self.object_get::>(state_key::swap_flows()) + /// Return a set of [`TradingPair`]s for which liquidity positions were opened + /// during this block. + fn get_active_trading_pairs_in_block(&self) -> BTreeSet { + self.object_get(block_scoped::active::trading_pairs()) .unwrap_or_default() } - - fn pending_batch_swap_outputs(&self) -> im::OrdMap { - self.object_get(state_key::pending_outputs()) - .unwrap_or_default() - } - - /// Gets the DEX parameters from the state. - async fn get_dex_params(&self) -> Result { - self.get(state_key::config::dex_params()) - .await? - .ok_or_else(|| anyhow::anyhow!("Missing DexParameters")) - } - - /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation. - async fn routing_params(&self) -> Result { - let dex_params = self.get_dex_params().await?; - Ok(RoutingParams { - max_hops: dex_params.max_hops as usize, - fixed_candidates: Arc::new(dex_params.fixed_candidates), - price_limit: None, - }) - } } impl StateReadExt for T {} /// Extension trait providing write access to dex data. #[async_trait] -pub trait StateWriteExt: StateWrite + StateReadExt { +pub trait StateWriteExt: StateWrite { fn put_dex_params(&mut self, params: DexParameters) { self.put(state_key::config::dex_params().to_string(), params); } +} + +impl StateWriteExt for T {} + +/// The maximum number of "hot" asset identifiers to track for this block. +const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; + +/// Provide write access to internal dex data. +pub(crate) trait InternalDexWrite: StateWrite { + /// Adds an asset ID to the list of recently accessed assets, + /// making it a candidate for the current block's arbitrage routing. + /// + /// This ensures that assets associated with recently active positions + /// will be eligible for arbitrage if mispriced positions are opened. + #[tracing::instrument(level = "debug", skip_all)] + fn add_recently_accessed_asset( + &mut self, + asset_id: asset::Id, + fixed_candidates: Arc>, + ) { + let mut assets = self.recently_accessed_assets(); + + // Limit the number of recently accessed assets to prevent blowing + // up routing time. + if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT { + return; + } + + // If the asset is already in the fixed candidate list, don't insert it. + if fixed_candidates.contains(&asset_id) { + return; + } + + assets.insert(asset_id); + self.object_put(state_key::recently_accessed_assets(), assets); + } + + /// Mark a [`TradingPair`] as active during this block. + fn mark_trading_pair_as_active(&mut self, pair: TradingPair) { + let mut active_pairs = self.get_active_trading_pairs_in_block(); + + if active_pairs.insert(pair) { + self.object_put(block_scoped::active::trading_pairs(), active_pairs) + } + } async fn set_output_data( &mut self, @@ -258,44 +299,6 @@ pub trait StateWriteExt: StateWrite + StateReadExt { fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) { self.put(state_key::arb_execution(height), execution); } - - async fn put_swap_flow( - &mut self, - trading_pair: &TradingPair, - swap_flow: SwapFlow, - ) -> Result<()> { - // Credit the DEX for the swap inflows. - // - // Note that we credit the DEX for _all_ inflows, since we don't know - // how much will eventually be filled. - self.dex_vcb_credit(Value { - amount: swap_flow.0, - asset_id: trading_pair.asset_1, - }) - .await?; - self.dex_vcb_credit(Value { - amount: swap_flow.1, - asset_id: trading_pair.asset_2, - }) - .await?; - - // TODO: replace with IM struct later - let mut swap_flows = self.swap_flows(); - swap_flows.insert(*trading_pair, swap_flow); - self.object_put(state_key::swap_flows(), swap_flows); - - Ok(()) - } - - fn put_swap_execution_at_height( - &mut self, - height: u64, - pair: DirectedTradingPair, - swap_execution: SwapExecution, - ) { - let path = state_key::swap_execution(height, pair); - self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution); - } } -impl StateWriteExt for T {} +impl InternalDexWrite for T {} diff --git a/crates/core/component/dex/src/component/eviction_manager.rs b/crates/core/component/dex/src/component/eviction_manager.rs new file mode 100644 index 0000000000..59cea41da4 --- /dev/null +++ b/crates/core/component/dex/src/component/eviction_manager.rs @@ -0,0 +1,112 @@ +use crate::component::StateReadExt; +use crate::{component::position_manager::counter::PositionCounterRead, lp::position}; +use futures::{StreamExt as _, TryStreamExt}; +use std::collections::BTreeSet; + +use crate::state_key::eviction_queue; +use anyhow::Result; +use cnidarium::StateWrite; +use tracing::instrument; + +use crate::{component::PositionManager, DirectedTradingPair, TradingPair}; + +pub(crate) trait EvictionManager: StateWrite { + /// Evict liquidity positions that are in excess of the trading pair limit. + /// + /// # Overview + /// This method enforce the approximate limit on the number of + /// positions that can be active for a given trading pair, as defined + /// by [`max_positions_per_pair`](DexParameters#max_positions_per_pair). + /// + /// # Mechanism + /// + /// The eviction mechanism functions by inspecting every trading pair which + /// had LP opened during the block. For each of them, it computes the "excess" + /// amount of positions `M`, defined as follow: + /// `M = N - N_max` where N is the number of positions, + /// and N_max is a chain parameter. + /// + /// Since a [`TradingPair`] defines two possible directed pairs, we need + /// to ensure that we don't evict LPs if they provide important liquidity + /// to at least one direction of the pair. + /// + /// To do this effectively, we maintain a liquidity index which orders LPs + /// by ascending liquidity for each direction of a trading pair. This allow + /// us to easily fetch the worst M positions for each index, and only evict + /// overlapping LPs. + /// + /// This approach sidestep the problem of adjudicating which position deserve + /// to be preserved depending on the trading direction, and limit the number + /// of reads to 2*M. On the other hand, it means that the actual maximum number + /// of positions per pair is 2*N_max. + /// + /// ## Diagram + /// + /// Q_1: A -> B + /// ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + /// ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃ + /// ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + /// ▽▼▼▼▼▼▼▼▽▽▼▽ + /// ┌─▶ find overlap + /// │ ▲▲▲▲△△△▲▲▲▲▲ Q_2: B -> A + /// │ ╔════════════╦━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + /// │ ║ Bottom M ┃ M+1 │ M+2 │ . . . │ N-1 │ N ┃ + /// │ ╚════════════╩━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + /// │ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━▶ + /// │ Ordered by inventory + /// │ + /// │ ▽▼▼▼▼▼▼▼▽▽▼▽ + /// └───────▶ ∩ Across both indices, the + /// ▲▲▲▲△△△▲▲▲▲▲ bottom M positions overlap + /// k times where 0 <= k <= M + /// for N < 2*N_max + /// + #[instrument(skip_all, err, level = "trace")] + async fn evict_positions(&mut self) -> Result<()> { + let hot_pairs: BTreeSet = self.get_active_trading_pairs_in_block(); + let max_positions_per_pair = self.get_dex_params().await?.max_positions_per_pair; + + for pair in hot_pairs.iter() { + let total_positions = self.get_position_count(pair).await; + let overhead_size = total_positions.saturating_sub(max_positions_per_pair); + if overhead_size == 0 { + continue; + } + + let pair_ab = DirectedTradingPair::new(pair.asset_1(), pair.asset_2()); + let pair_ba = pair_ab.flip(); + let key_ab = eviction_queue::inventory_index::by_trading_pair(&pair_ab); + let key_ba = eviction_queue::inventory_index::by_trading_pair(&pair_ba); + + let stream_ab = self.nonverifiable_prefix_raw(&key_ab).boxed(); + let stream_ba = self.nonverifiable_prefix_raw(&key_ba).boxed(); + + let overhead_ab = stream_ab + .take(overhead_size as usize) + .and_then(|(k, _)| async move { + let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?; + Ok(position::Id(raw_id)) + }) + .try_collect::>() + .await?; + + let overhead_ba = stream_ba + .take(overhead_size as usize) + .and_then(|(k, _)| async move { + let raw_id = eviction_queue::inventory_index::parse_id_from_key(k)?; + Ok(position::Id(raw_id)) + }) + .try_collect::>() + .await?; + + let overlap = overhead_ab.intersection(&overhead_ba); + + for id in overlap { + self.close_position_by_id(id).await?; + } + } + Ok(()) + } +} + +impl EvictionManager for T {} diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 888df993d6..286b7717d6 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,16 +11,26 @@ mod arb; mod chandelier; pub(crate) mod circuit_breaker; mod dex; +mod eviction_manager; mod flow; mod position_manager; mod swap_manager; -pub use self::metrics::register_metrics; +pub use dex::{Dex, StateReadExt, StateWriteExt}; +pub use position_manager::PositionManager; + +// Read data from the Dex component; +pub use position_manager::PositionRead; +pub use swap_manager::SwapDataRead; + pub(crate) use arb::Arbitrage; pub(crate) use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; -pub use dex::{Dex, StateReadExt, StateWriteExt}; -pub use position_manager::{PositionManager, PositionRead}; -pub use swap_manager::SwapManager; +pub(crate) use dex::InternalDexWrite; +pub(crate) use swap_manager::SwapDataWrite; +pub(crate) use swap_manager::SwapManager; + #[cfg(test)] pub(crate) mod tests; + +pub use self::metrics::register_metrics; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index c40da0a79e..86e76d0d7a 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -14,6 +14,7 @@ use tap::Tap; use tracing::instrument; use crate::component::{ + dex::InternalDexWrite, dex::StateReadExt as _, position_manager::{ base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, @@ -33,11 +34,10 @@ use crate::{event, state_key}; use super::chandelier::Chandelier; const DYNAMIC_ASSET_LIMIT: usize = 10; -const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; mod base_liquidity_index; -mod counter; -mod inventory_index; +pub(crate) mod counter; +pub(crate) mod inventory_index; pub(crate) mod price_index; #[async_trait] @@ -274,6 +274,9 @@ pub trait PositionManager: StateWrite + PositionRead { position.phi.pair.asset_2(), routing_params.fixed_candidates, ); + // Mark the trading pair as active so that we can inspect it + // at the end of the block and garbage collect excess LPs. + self.mark_trading_pair_as_active(position.phi.pair); // Finally, record the new position state. self.record_proto(event::position_open(&position)); @@ -282,34 +285,6 @@ pub trait PositionManager: StateWrite + PositionRead { Ok(()) } - /// Adds an asset ID to the list of recently accessed assets, - /// making it a candidate for the current block's arbitrage routing. - /// - /// This ensures that assets associated with recently active positions - /// will be eligible for arbitrage if mispriced positions are opened. - #[tracing::instrument(level = "debug", skip_all)] - fn add_recently_accessed_asset( - &mut self, - asset_id: asset::Id, - fixed_candidates: Arc>, - ) { - let mut assets = self.recently_accessed_assets(); - - // Limit the number of recently accessed assets to prevent blowing - // up routing time. - if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT { - return; - } - - // If the asset is already in the fixed candidate list, don't insert it. - if fixed_candidates.contains(&asset_id) { - return; - } - - assets.insert(asset_id); - self.object_put(state_key::recently_accessed_assets(), assets); - } - /// Record execution against an opened position. /// /// IMPORTANT: This method can mutate its input state. diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 5185fe1f2d..023494f0dd 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -8,7 +8,7 @@ use crate::TradingPair; use anyhow::Result; #[async_trait] -pub(super) trait PositionCounterRead: StateRead { +pub(crate) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 { diff --git a/crates/core/component/dex/src/component/router/params.rs b/crates/core/component/dex/src/component/router/params.rs index 95b0022dfa..6d18a9c2c7 100644 --- a/crates/core/component/dex/src/component/router/params.rs +++ b/crates/core/component/dex/src/component/router/params.rs @@ -3,6 +3,8 @@ use std::sync::Arc; use penumbra_asset::asset; use penumbra_num::fixpoint::U128x128; +use crate::DexParameters; + #[derive(Debug, Clone)] pub struct RoutingParams { pub price_limit: Option, @@ -37,3 +39,19 @@ impl RoutingParams { } } } + +impl From for RoutingParams { + fn from( + DexParameters { + fixed_candidates, + max_hops, + .. + }: DexParameters, + ) -> Self { + Self { + fixed_candidates: Arc::new(fixed_candidates), + max_hops: max_hops as usize, + price_limit: None, + } + } +} diff --git a/crates/core/component/dex/src/component/router/route_and_fill.rs b/crates/core/component/dex/src/component/router/route_and_fill.rs index 139dc68589..be45696c2d 100644 --- a/crates/core/component/dex/src/component/router/route_and_fill.rs +++ b/crates/core/component/dex/src/component/router/route_and_fill.rs @@ -13,7 +13,7 @@ use crate::{ chandelier::Chandelier, flow::SwapFlow, router::{FillRoute, PathSearch, RoutingParams}, - ExecutionCircuitBreaker, PositionManager, StateWriteExt, + ExecutionCircuitBreaker, InternalDexWrite, PositionManager, }, lp::position::MAX_RESERVE_AMOUNT, BatchSwapOutputData, SwapExecution, TradingPair, diff --git a/crates/core/component/dex/src/component/router/tests.rs b/crates/core/component/dex/src/component/router/tests.rs index 80c065845a..4f94abcbd1 100644 --- a/crates/core/component/dex/src/component/router/tests.rs +++ b/crates/core/component/dex/src/component/router/tests.rs @@ -8,6 +8,8 @@ use penumbra_num::{fixpoint::U128x128, Amount}; use rand_core::OsRng; use std::sync::Arc; +use crate::component::SwapDataRead; +use crate::component::SwapDataWrite; use crate::lp::SellOrder; use crate::DexParameters; use crate::{ diff --git a/crates/core/component/dex/src/component/swap_manager.rs b/crates/core/component/dex/src/component/swap_manager.rs index 901f432b84..f2f59f25bd 100644 --- a/crates/core/component/dex/src/component/swap_manager.rs +++ b/crates/core/component/dex/src/component/swap_manager.rs @@ -1,37 +1,105 @@ +use std::collections::BTreeMap; + use async_trait::async_trait; -use cnidarium::StateWrite; +use cnidarium::{StateRead, StateWrite}; +use penumbra_asset::Value; use penumbra_sct::{component::tree::SctManager, CommitmentSource}; use penumbra_tct as tct; use tracing::instrument; -use crate::{state_key, swap::SwapPayload}; +use crate::component::circuit_breaker::value::ValueCircuitBreaker; +use crate::BatchSwapOutputData; +use crate::SwapExecution; +use crate::{ + component::flow::SwapFlow, state_key, swap::SwapPayload, DirectedTradingPair, TradingPair, +}; +use anyhow::Result; +use penumbra_proto::StateWriteProto; /// Manages the addition of new notes to the chain state. #[async_trait] -pub trait SwapManager: StateWrite { +pub(crate) trait SwapManager: StateWrite { #[instrument(skip(self, swap), fields(commitment = ?swap.commitment))] async fn add_swap_payload(&mut self, swap: SwapPayload, source: CommitmentSource) { - tracing::debug!("adding swap payload"); - - // 0. Record an ABCI event for transaction indexing. - //self.record(event::state_payload(&payload)); + tracing::trace!("adding swap payload"); - // 1. Insert it into the SCT, recording its source + // Record the swap commitment and its metadata in the SCT let position = self.add_sct_commitment(swap.commitment, source.clone()) .await - // TODO: why? can't we exceed the number of state commitments in a block? + // TODO(erwan): Tracked in #830: we should handle this gracefully .expect("inserting into the state commitment tree should not fail because we should budget commitments per block (currently unimplemented)"); - // 3. Finally, record it to be inserted into the compact block: + // Record the payload in object-storage so that we can include it in this block's [`CompactBlock`]. let mut payloads = self.pending_swap_payloads(); payloads.push_back((position, swap, source)); self.object_put(state_key::pending_payloads(), payloads); } +} +impl SwapManager for T {} + +pub trait SwapDataRead: StateRead { fn pending_swap_payloads(&self) -> im::Vector<(tct::Position, SwapPayload, CommitmentSource)> { self.object_get(state_key::pending_payloads()) .unwrap_or_default() } + + /// Get the swap flow for the given trading pair accumulated in this block so far. + fn swap_flow(&self, pair: &TradingPair) -> SwapFlow { + self.swap_flows().get(pair).cloned().unwrap_or_default() + } + + fn swap_flows(&self) -> BTreeMap { + self.object_get::>(state_key::swap_flows()) + .unwrap_or_default() + } + + fn pending_batch_swap_outputs(&self) -> im::OrdMap { + self.object_get(state_key::pending_outputs()) + .unwrap_or_default() + } } -impl SwapManager for T {} +impl SwapDataRead for T {} + +pub(crate) trait SwapDataWrite: StateWrite { + async fn put_swap_flow( + &mut self, + trading_pair: &TradingPair, + swap_flow: SwapFlow, + ) -> Result<()> { + // Credit the DEX for the swap inflows. + // + // Note that we credit the DEX for _all_ inflows, since we don't know + // how much will eventually be filled. + self.dex_vcb_credit(Value { + amount: swap_flow.0, + asset_id: trading_pair.asset_1, + }) + .await?; + self.dex_vcb_credit(Value { + amount: swap_flow.1, + asset_id: trading_pair.asset_2, + }) + .await?; + + // TODO: replace with IM struct later + let mut swap_flows = self.swap_flows(); + swap_flows.insert(*trading_pair, swap_flow); + self.object_put(state_key::swap_flows(), swap_flows); + + Ok(()) + } + + fn put_swap_execution_at_height( + &mut self, + height: u64, + pair: DirectedTradingPair, + swap_execution: SwapExecution, + ) { + let path = state_key::swap_execution(height, pair); + self.nonverifiable_put(path.as_bytes().to_vec(), swap_execution); + } +} + +impl SwapDataWrite for T {} diff --git a/crates/core/component/dex/src/component/tests.rs b/crates/core/component/dex/src/component/tests.rs index ae7bb3a527..1875c05861 100644 --- a/crates/core/component/dex/src/component/tests.rs +++ b/crates/core/component/dex/src/component/tests.rs @@ -8,6 +8,7 @@ use penumbra_asset::{asset, Value}; use penumbra_num::Amount; use rand_core::OsRng; +use crate::component::{SwapDataRead, SwapDataWrite}; use crate::lp::action::PositionOpen; use crate::lp::{position, SellOrder}; use crate::DexParameters; diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 5be9bc1a5c..6f0747d8f8 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -60,6 +60,14 @@ pub mod candlesticks { } } +pub mod block_scoped { + pub mod active { + pub fn trading_pairs() -> &'static str { + "dex/block_scoped/active/trading_pairs" + } + } +} + pub fn output_data(height: u64, trading_pair: TradingPair) -> String { format!( "dex/output/{:020}/{}/{}", @@ -214,6 +222,7 @@ pub(crate) mod eviction_queue { pub(crate) mod inventory_index { use crate::lp::position; use crate::DirectedTradingPair; + use anyhow::ensure; use penumbra_num::Amount; pub(crate) fn by_trading_pair(pair: &DirectedTradingPair) -> [u8; 107] { @@ -237,5 +246,11 @@ pub(crate) mod eviction_queue { full_key } + + pub(crate) fn parse_id_from_key(key: Vec) -> anyhow::Result<[u8; 32]> { + ensure!(key.len() == 155, "key must be 155 bytes"); + let k = &key[123..155]; + Ok(k.try_into()?) + } } }