diff --git a/Cargo.toml b/Cargo.toml index 5850013..fc570c8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,5 @@ clap = { version = "4.4.8", features = ["derive"] } serde_json = "1.0.113" log = "0.4.20" futures-util = "0.3.30" -RustQuant = "0.0.49" \ No newline at end of file +RustQuant = "0.0.49" +futures = "0.3.30" diff --git a/configs/stable_pool.toml b/configs/stable_pool.toml index 5c2b8c2..f219801 100644 --- a/configs/stable_pool.toml +++ b/configs/stable_pool.toml @@ -8,4 +8,4 @@ PoolAdmin = {} TokenAdmin = { token_data.TKN0 = { name = "TKN0", symbol = "Token 0", decimals = 18 } } [[changer]] -PriceChanger = { mu = 1, theta = 1, sigma = 1, cursor = 0, value = 1 } \ No newline at end of file +PriceChanger = { mu = 1, theta = 1, sigma = 1, cursor = 0, value = 1, seed = 69 } \ No newline at end of file diff --git a/src/behaviors/deployer.rs b/src/behaviors/deployer.rs index f86e187..c3226cd 100644 --- a/src/behaviors/deployer.rs +++ b/src/behaviors/deployer.rs @@ -15,8 +15,8 @@ use crate::bindings::uniswap_v3_factory::UniswapV3Factory; #[derive(Debug, Deserialize, Serialize)] pub struct DeploymentData { - factory: H160, - liquid_exchange: H160, + pub factory: H160, + pub liquid_exchange: H160, } impl DeploymentData { diff --git a/src/behaviors/pool_admin.rs b/src/behaviors/pool_admin.rs index 7616b56..1d111d2 100644 --- a/src/behaviors/pool_admin.rs +++ b/src/behaviors/pool_admin.rs @@ -42,7 +42,7 @@ impl Behavior for PoolAdmin { self.client = Some(client.clone()); self.messager = Some(messager.clone()); - Ok(None) + Ok(Some(messager.clone().stream().unwrap())) } async fn process(&mut self, event: Message) -> Result { @@ -172,7 +172,7 @@ mod tests { world.add_agent(pool_admin); world.add_agent(deployer); - world.run().await.expect("world failed to run"); + let _ = world.run().await.expect("world failed to run"); let env = world.environment; let mut world = World::new("univ3"); diff --git a/src/behaviors/price_changer.rs b/src/behaviors/price_changer.rs index cfed42f..10f5cfb 100644 --- a/src/behaviors/price_changer.rs +++ b/src/behaviors/price_changer.rs @@ -4,13 +4,14 @@ use anyhow::Result; use arbiter_core::middleware::ArbiterMiddleware; use arbiter_engine::messager::{Message, Messager}; use ethers::types::H160; +use futures::stream::StreamExt; use RustQuant::{ models::*, stochastics::{process::Trajectories, *}, }; use super::*; -use crate::bindings::liquid_exchange::LiquidExchange; +use crate::{behaviors::deployer::DeploymentData, bindings::liquid_exchange::LiquidExchange}; #[derive(Serialize, Deserialize)] pub struct PriceChanger { @@ -18,6 +19,8 @@ pub struct PriceChanger { sigma: f64, theta: f64, + seed: u64, + #[serde(skip)] #[serde(default = "trajectory_default")] pub current_chunk: Trajectories, @@ -25,6 +28,8 @@ pub struct PriceChanger { #[serde(skip)] pub client: Option>, + pub liquid_exchange: Option, + cursor: usize, value: f64, } @@ -43,25 +48,26 @@ impl fmt::Debug for PriceChanger { } #[derive(Debug, Serialize, Deserialize)] -pub struct PriceUpdate { - liquid_exchange: H160, -} +pub struct PriceUpdate; impl PriceChanger { /// Public constructor function to create a [`PriceChanger`] behaviour. - pub fn new(initial_value: f64, mu: f64, sigma: f64, theta: f64) -> Self { + pub fn new(initial_value: f64, mu: f64, sigma: f64, theta: f64, seed: u64) -> Self { let ou = OrnsteinUhlenbeck::new(mu, sigma, theta); // Chunk our price trajectory over 100 price points. - let current_chunk = ou.euler_maruyama(initial_value, 0.0, 100.0, 100_usize, 1_usize, false); + let current_chunk = + ou.seedable_euler_maruyama(initial_value, 0.0, 100.0, 100_usize, 1_usize, false, seed); Self { mu, sigma, theta, + seed, current_chunk, cursor: 0, client: None, + liquid_exchange: None, value: initial_value, } } @@ -72,17 +78,27 @@ impl Behavior for PriceChanger { async fn startup( &mut self, client: Arc, - _messager: Messager, + messager: Messager, ) -> Result>> { self.client = Some(client); - Ok(None) + while let Some(message) = messager.clone().stream().unwrap().next().await { + match serde_json::from_str::(&message.data) { + Ok(data) => { + self.liquid_exchange = Some(data.liquid_exchange); + break; + } + Err(_) => continue, + }; + } + + Ok(Some(messager.clone().stream().unwrap())) } async fn process(&mut self, event: Message) -> Result { let ou = OrnsteinUhlenbeck::new(self.mu, self.sigma, self.theta); - let query: PriceUpdate = match serde_json::from_str(&event.data) { + let _query: PriceUpdate = match serde_json::from_str(&event.data) { Ok(query) => query, Err(_) => { eprintln!("Failed to deserialize the event data into a PriceUpdate"); @@ -93,12 +109,13 @@ impl Behavior for PriceChanger { if self.cursor >= 99 { self.cursor = 0; self.value = self.current_chunk.paths.clone()[0][self.cursor]; - self.current_chunk = - ou.euler_maruyama(self.value, 0.0, 100.0, 100_usize, 1_usize, false); + self.current_chunk = ou.seedable_euler_maruyama( + self.value, 0.0, 100.0, 100_usize, 1_usize, false, self.seed, + ); } let liquid_exchange = - LiquidExchange::new(query.liquid_exchange, self.client.clone().unwrap()); + LiquidExchange::new(self.liquid_exchange.unwrap(), self.client.clone().unwrap()); let price = self.current_chunk.paths.clone()[0][self.cursor]; diff --git a/src/behaviors/token_admin.rs b/src/behaviors/token_admin.rs index 75c7b1f..9bbb92c 100644 --- a/src/behaviors/token_admin.rs +++ b/src/behaviors/token_admin.rs @@ -88,7 +88,7 @@ impl Behavior for TokenAdmin { let _ = messager.send(To::All, &message_content).await; - Ok(None) + Ok(Some(messager.clone().stream().unwrap())) } async fn process(&mut self, event: Message) -> Result {