From e7b81abe5c616e52b698a0fd437f407d09b5f791 Mon Sep 17 00:00:00 2001 From: Chris Czub Date: Fri, 26 Jan 2024 16:09:15 -0500 Subject: [PATCH] PR review --- .../dex/src/circuit_breaker/execution.rs | 18 +++++++++++++++--- crates/core/component/dex/src/component/arb.rs | 11 +++++++++-- .../dex/src/component/router/route_and_fill.rs | 7 +++++-- crates/core/component/dex/src/component/rpc.rs | 10 +++++++++- 4 files changed, 38 insertions(+), 8 deletions(-) diff --git a/crates/core/component/dex/src/circuit_breaker/execution.rs b/crates/core/component/dex/src/circuit_breaker/execution.rs index 3a5e3a0a54..f805587d65 100644 --- a/crates/core/component/dex/src/circuit_breaker/execution.rs +++ b/crates/core/component/dex/src/circuit_breaker/execution.rs @@ -8,6 +8,7 @@ const MAX_EXECUTIONS: u32 = 64; /// to the search and execution limits managed by the circuit breaker. /// /// The circuit breaker ensures the swap will not use unbounded time complexity. +#[derive(Debug, Clone)] pub(crate) struct ExecutionCircuitBreaker { /// The maximum number of times to perform path searches before stopping. pub max_path_searches: u32, @@ -20,11 +21,11 @@ pub(crate) struct ExecutionCircuitBreaker { } impl ExecutionCircuitBreaker { - pub fn new() -> Self { + pub fn new(max_path_searches: u32, max_executions: u32) -> Self { Self { - max_path_searches: MAX_PATH_SEARCHES, + max_path_searches, current_path_searches: 0, - max_executions: MAX_EXECUTIONS, + max_executions, current_executions: 0, } } @@ -34,3 +35,14 @@ impl ExecutionCircuitBreaker { || self.current_executions > self.max_executions } } + +impl Default for ExecutionCircuitBreaker { + fn default() -> Self { + Self { + max_path_searches: MAX_PATH_SEARCHES, + current_path_searches: 0, + max_executions: MAX_EXECUTIONS, + current_executions: 0, + } + } +} diff --git a/crates/core/component/dex/src/component/arb.rs b/crates/core/component/dex/src/component/arb.rs index 5170322504..b0e00b04cf 100644 --- a/crates/core/component/dex/src/component/arb.rs +++ b/crates/core/component/dex/src/component/arb.rs @@ -7,7 +7,7 @@ use penumbra_asset::{asset, Value}; use penumbra_chain::component::StateReadExt; use tracing::instrument; -use crate::SwapExecution; +use crate::{ExecutionCircuitBreaker, SwapExecution}; use super::{ router::{RouteAndFill, RoutingParams}, @@ -50,8 +50,15 @@ pub trait Arbitrage: StateWrite + Sized { amount: u64::MAX.into(), }; + let execution_circuit_breaker = ExecutionCircuitBreaker::default(); let swap_execution = this - .route_and_fill(arb_token, arb_token, flash_loan.amount, params) + .route_and_fill( + arb_token, + arb_token, + flash_loan.amount, + params, + execution_circuit_breaker, + ) .await?; let filled_input = swap_execution.input.amount; let output = swap_execution.output.amount; diff --git a/crates/core/component/dex/src/component/router/route_and_fill.rs b/crates/core/component/dex/src/component/router/route_and_fill.rs index f097fddd73..6369ec2798 100644 --- a/crates/core/component/dex/src/component/router/route_and_fill.rs +++ b/crates/core/component/dex/src/component/router/route_and_fill.rs @@ -47,6 +47,8 @@ pub trait HandleBatchSwaps: StateWrite + Sized { tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps"); + let execution_circuit_breaker = ExecutionCircuitBreaker::default(); + let swap_execution_1_for_2 = if delta_1.value() > 0 { Some( self.route_and_fill( @@ -54,6 +56,7 @@ pub trait HandleBatchSwaps: StateWrite + Sized { trading_pair.asset_2(), delta_1, params.clone(), + execution_circuit_breaker.clone(), ) .await?, ) @@ -69,6 +72,7 @@ pub trait HandleBatchSwaps: StateWrite + Sized { trading_pair.asset_1(), delta_2, params.clone(), + execution_circuit_breaker, ) .await?, ) @@ -129,6 +133,7 @@ pub trait RouteAndFill: StateWrite + Sized { asset_2: asset::Id, input: Amount, params: RoutingParams, + mut execution_circuit_breaker: ExecutionCircuitBreaker, ) -> Result where Self: 'static, @@ -145,8 +150,6 @@ pub trait RouteAndFill: StateWrite + Sized { let max_delta_1: Amount = MAX_RESERVE_AMOUNT.into(); - let mut execution_circuit_breaker = ExecutionCircuitBreaker::new(); - // Termination conditions: // 1. We have no more delta_1 remaining // 2. A path can no longer be found diff --git a/crates/core/component/dex/src/component/rpc.rs b/crates/core/component/dex/src/component/rpc.rs index 5967a78b11..8e97af6917 100644 --- a/crates/core/component/dex/src/component/rpc.rs +++ b/crates/core/component/dex/src/component/rpc.rs @@ -1,5 +1,6 @@ use std::{pin::Pin, sync::Arc}; +use crate::ExecutionCircuitBreaker; use async_stream::try_stream; use cnidarium::{StateDelta, Storage}; use futures::{StreamExt, TryStreamExt}; @@ -549,8 +550,15 @@ impl SimulationService for Server { let state = self.storage.latest_snapshot(); let mut state_tx = Arc::new(StateDelta::new(state)); + let execution_circuit_breaker = ExecutionCircuitBreaker::default(); let swap_execution = state_tx - .route_and_fill(input.asset_id, output_id, input.amount, routing_params) + .route_and_fill( + input.asset_id, + output_id, + input.amount, + routing_params, + execution_circuit_breaker, + ) .await .map_err(|e| tonic::Status::internal(format!("error simulating trade: {:#}", e)))?;