Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin implementation of dex routing #1954

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions component/src/dex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ pub mod metrics;
pub mod state_key;

mod position_manager;
mod router;

pub use self::metrics::register_metrics;
pub use component::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::PositionManager;
pub use router::TradeRouter;

#[cfg(test)]
mod tests;
10 changes: 10 additions & 0 deletions component/src/dex/position_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Result;
use async_trait::async_trait;
use futures::TryStreamExt;
use penumbra_crypto::dex::{
lp::{
position::{self, Position},
Expand All @@ -18,6 +19,15 @@ pub trait PositionRead: StateRead {
self.get(&state_key::position_by_id(id)).await
}

/// Return all trading positions.
async fn positions(&self) -> Result<Vec<position::Metadata>> {
self.prefix(state_key::positions_prefix())
// The prefix stream returns keys and values, but we only want the values.
.map_ok(|(_key, metadata)| metadata)
.try_collect()
.await
}

async fn check_nonce_unused(&self, position: &Position) -> Result<()> {
if let Some(()) = self
.get_proto::<()>(&state_key::position_nonce(&position.nonce))
Expand Down
233 changes: 233 additions & 0 deletions component/src/dex/router.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
use anyhow::{Error, Result};
use std::collections::{BTreeMap, HashSet};
use tokio::task::JoinSet;

use penumbra_crypto::{
asset,
dex::{
execution::Path,
lp::{position, TradingFunction},
DirectedTradingPair, TradingPair,
},
Amount,
};

use super::position_manager::PositionRead;

/// The maximum number of hops allowed in a trade.
/// Prevents exploding the number of paths that need to be considered.
const MAX_HOPS: usize = 5;

/// Represent the distance between two assets in a weighted graph.
type TradeDistance = f64;

/// Finds the best route for a trade, based on a Bellman-Ford algorithm
/// across dex trading pairs with available liquidity.
pub struct TradeRouter<T: PositionRead> {
// TODO: maybe these should be in non-consensus storage
/// Maintains a map of best prices between assets.
pub optimal_prices: BTreeMap<asset::Id, TradeDistance>,
/// Maintains a map of the best predecessor (best priced position) for each asset.
pub predecessors: BTreeMap<asset::Id, Option<asset::Id>>,
/// The `TradeRouter` needs to be able to read trading positions from state.
state: T,
/// Tracks known liquidity positions when the `TradeRouter` is constructed.
positions: Vec<position::Metadata>,
}

impl<T: PositionRead> TradeRouter<T> {
pub async fn new(state: T) -> Result<Self> {
let positions = state.positions().await?;

Ok(Self {
optimal_prices: BTreeMap::new(),
predecessors: BTreeMap::new(),
state,
positions,
})
}

/// Finds the best route for a trade, based on a Bellman-Ford algorithm
/// across dex trading pairs with available liquidity.
///
/// This takes place against a state fork directly, rather than constructing separate in-memory structures.
pub async fn find_route(
&mut self,
trading_pair: &DirectedTradingPair,
amount: &Amount,
) -> Result<Path> {
// The distance from the source asset to itself is always 0. TODO: isn't it actually 1??? but that doesn't work for bellman-ford.
// are all the optimal prices actually 1.0 - effective price?
self.optimal_prices.entry(trading_pair.start).or_insert(0.0);

// Initialize predecessors for the source and target assets.
self.predecessors.entry(trading_pair.start).or_default();
self.predecessors.entry(trading_pair.end).or_default();

// For storing each unique asset.
let mut known_assets = HashSet::new();
known_assets.insert(trading_pair.start.0);
known_assets.insert(trading_pair.end.0);

// The distance from the source asset to all other assets is initially infinite.
// TODO: use a JoinSet to parallelize this
for position in self.state.positions().await?.iter() {
// Skip positions that are not opened.
if position.state != position::State::Opened {
continue;
}

let position_pair = position.position.phi.pair;

// If there's not a distance from the source asset to either asset of the position's trading pair,
// initialize it to infinite.
if position_pair.asset_1() != trading_pair.start {
self.optimal_prices
.entry(position_pair.asset_1())
.or_insert(f64::INFINITY);
}

if position_pair.asset_2() != trading_pair.start {
self.optimal_prices
.entry(position_pair.asset_2())
.or_insert(f64::INFINITY);
}

// Initialize all predecessors to None.
self.predecessors
.entry(position_pair.asset_1())
.or_default();
self.predecessors
.entry(position_pair.asset_2())
.or_default();

// Insert the position's trading pair's assets into the known assets set.
known_assets.insert(position_pair.asset_1().0);
known_assets.insert(position_pair.asset_2().0);
}

// Perform edge relaxation |V| - 1 times (where |V| is the number of unique assets present within positions).
for _ in 0..known_assets.len() - 1 {
// TODO: account for MAX_HOPS, binning, dust position exclusion
// For each position...
for position in self.state.positions().await?.iter() {
// Skip positions that are not opened.
if position.state != position::State::Opened {
continue;
}

// If the distance to the destination can be shortened by taking the edge, update the optimal path.

let composed_price=
self
.optimal_prices
.get(&position.position.phi.pair.asset_1())
.unwrap()
// TODO: this shouldn't be a simple addition, i think it needs to compose the two trading functions
+
position.position.phi.component.effective_price();
if *self
.optimal_prices
.get(&position.position.phi.pair.asset_1())
// Should be safe because all assets were initialized earlier
.expect("all assets should be initialized")
!= f64::INFINITY
&& composed_price < *self
.optimal_prices
.get(&position.position.phi.pair.asset_2())
.expect("all assets should be initialized")
{
self.optimal_prices.insert(
position.position.phi.pair.asset_2(),
self
.optimal_prices
.get(&position.position.phi.pair.asset_1())
.unwrap()
// TODO: this shouldn't be a simple addition, i think it needs to compose the two trading functions
+
position.position.phi.component.effective_price()
);
self.predecessors.insert(
position.position.phi.pair.asset_2(),
Some(position.position.phi.pair.asset_1()),
);
}
}
}

// Detect negative cycles.
for position in self.state.positions().await?.iter() {
// Skip positions that are not opened.
if position.state != position::State::Opened {
continue;
}

// If the destination gets a better price by taking the position, update the optimal path.
if *self
.optimal_prices
.get(&position.position.phi.pair.asset_1())
// Should be safe because all assets were initialized earlier
.expect("all assets should be initialized")
!= f64::INFINITY
&& self
.optimal_prices
.get(&position.position.phi.pair.asset_1())
.unwrap()
// TODO: should not be a simple addition, needs to compose the prices
+
position.position.phi.component.effective_price()
< *self
.optimal_prices
.get(&position.position.phi.pair.asset_2())
.expect("all assets should be initialized")
{
return Err(anyhow::anyhow!("graph contains negative weight cycle"));
}
}

// Calculate optimal path from start -> end
// The path begins as 0-length, from start to itself, with no fee.
let mut path = Path::new(trading_pair.start, trading_pair.start, TradingFunction::new(TradingPair::new(trading_pair.start, trading_pair.start), 0, amount.clone(), amount.clone())).expect("able to instantiate new path");
let mut current = Some(trading_pair.start);

loop {
let pred = self.predecessors.get(&current.unwrap()).expect("predecessors initialized");
if pred.is_none() {
break;
}

// TODO: use correct amounts and fees
path.extend(TradingFunction::new(TradingPair::new(pred.unwrap(), current.unwrap()), 0, 1.into(), 1.into()));
current = pred.clone();
}

Ok(path)
}
}


mod tests {
use std::sync::Arc;

use penumbra_storage::TempStorage;

use crate::TempStorageExt;

#[tokio::test]
async fn test_simple() -> anyhow::Result<()> {
// Create a storage backend for testing.
let storage = TempStorage::new().await?.apply_default_genesis().await?;

let mut state = Arc::new(storage.latest_state());

// Test trading a source asset for itself.

todo!();

// Test a single position between a source asset and target asset.

todo!();

Ok(())
}
}
5 changes: 5 additions & 0 deletions component/src/dex/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub fn position_by_id(id: &position::Id) -> String {
format!("dex/position/{}", id)
}

/// Prefix for finding all positions
pub fn positions_prefix() -> &'static str {
"dex/position/"
}

/// Encompasses non-consensus state keys.
pub(crate) mod internal {
use super::*;
Expand Down