Skip to content

Commit

Permalink
PR review
Browse files Browse the repository at this point in the history
  • Loading branch information
zbuc committed Jan 26, 2024
1 parent 5aef9de commit e7b81ab
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
18 changes: 15 additions & 3 deletions crates/core/component/dex/src/circuit_breaker/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ const MAX_EXECUTIONS: u32 = 64;
/// to the search and execution limits managed by the circuit breaker.
///
/// The circuit breaker ensures the swap will not use unbounded time complexity.
#[derive(Debug, Clone)]
pub(crate) struct ExecutionCircuitBreaker {
/// The maximum number of times to perform path searches before stopping.
pub max_path_searches: u32,
Expand All @@ -20,11 +21,11 @@ pub(crate) struct ExecutionCircuitBreaker {
}

impl ExecutionCircuitBreaker {
pub fn new() -> Self {
pub fn new(max_path_searches: u32, max_executions: u32) -> Self {
Self {
max_path_searches: MAX_PATH_SEARCHES,
max_path_searches,
current_path_searches: 0,
max_executions: MAX_EXECUTIONS,
max_executions,
current_executions: 0,
}
}
Expand All @@ -34,3 +35,14 @@ impl ExecutionCircuitBreaker {
|| self.current_executions > self.max_executions
}
}

impl Default for ExecutionCircuitBreaker {
fn default() -> Self {
Self {
max_path_searches: MAX_PATH_SEARCHES,
current_path_searches: 0,
max_executions: MAX_EXECUTIONS,
current_executions: 0,
}
}
}
11 changes: 9 additions & 2 deletions crates/core/component/dex/src/component/arb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use penumbra_asset::{asset, Value};
use penumbra_chain::component::StateReadExt;
use tracing::instrument;

use crate::SwapExecution;
use crate::{ExecutionCircuitBreaker, SwapExecution};

use super::{
router::{RouteAndFill, RoutingParams},
Expand Down Expand Up @@ -50,8 +50,15 @@ pub trait Arbitrage: StateWrite + Sized {
amount: u64::MAX.into(),
};

let execution_circuit_breaker = ExecutionCircuitBreaker::default();
let swap_execution = this
.route_and_fill(arb_token, arb_token, flash_loan.amount, params)
.route_and_fill(
arb_token,
arb_token,
flash_loan.amount,
params,
execution_circuit_breaker,
)
.await?;
let filled_input = swap_execution.input.amount;
let output = swap_execution.output.amount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,16 @@ pub trait HandleBatchSwaps: StateWrite + Sized {

tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");

let execution_circuit_breaker = ExecutionCircuitBreaker::default();

let swap_execution_1_for_2 = if delta_1.value() > 0 {
Some(
self.route_and_fill(
trading_pair.asset_1(),
trading_pair.asset_2(),
delta_1,
params.clone(),
execution_circuit_breaker.clone(),
)
.await?,
)
Expand All @@ -69,6 +72,7 @@ pub trait HandleBatchSwaps: StateWrite + Sized {
trading_pair.asset_1(),
delta_2,
params.clone(),
execution_circuit_breaker,
)
.await?,
)
Expand Down Expand Up @@ -129,6 +133,7 @@ pub trait RouteAndFill: StateWrite + Sized {
asset_2: asset::Id,
input: Amount,
params: RoutingParams,
mut execution_circuit_breaker: ExecutionCircuitBreaker,
) -> Result<SwapExecution>
where
Self: 'static,
Expand All @@ -145,8 +150,6 @@ pub trait RouteAndFill: StateWrite + Sized {

let max_delta_1: Amount = MAX_RESERVE_AMOUNT.into();

let mut execution_circuit_breaker = ExecutionCircuitBreaker::new();

// Termination conditions:
// 1. We have no more delta_1 remaining
// 2. A path can no longer be found
Expand Down
10 changes: 9 additions & 1 deletion crates/core/component/dex/src/component/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{pin::Pin, sync::Arc};

use crate::ExecutionCircuitBreaker;
use async_stream::try_stream;
use cnidarium::{StateDelta, Storage};
use futures::{StreamExt, TryStreamExt};
Expand Down Expand Up @@ -549,8 +550,15 @@ impl SimulationService for Server {

let state = self.storage.latest_snapshot();
let mut state_tx = Arc::new(StateDelta::new(state));
let execution_circuit_breaker = ExecutionCircuitBreaker::default();
let swap_execution = state_tx
.route_and_fill(input.asset_id, output_id, input.amount, routing_params)
.route_and_fill(
input.asset_id,
output_id,
input.amount,
routing_params,
execution_circuit_breaker,
)
.await
.map_err(|e| tonic::Status::internal(format!("error simulating trade: {:#}", e)))?;

Expand Down

0 comments on commit e7b81ab

Please sign in to comment.