From ecd7688eb532bfd7e01cf3e78eb9832e205e8ea3 Mon Sep 17 00:00:00 2001 From: onur-ozkan Date: Thu, 28 Dec 2023 10:55:58 +0300 Subject: [PATCH] save dev state Signed-off-by: onur-ozkan --- main/src/main.rs | 4 +- main/src/net/websocket.rs | 88 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 1 deletion(-) create mode 100644 main/src/net/websocket.rs diff --git a/main/src/main.rs b/main/src/main.rs index 3cac011..aa57917 100644 --- a/main/src/main.rs +++ b/main/src/main.rs @@ -10,6 +10,8 @@ mod ctx; mod db; #[path = "net/http.rs"] mod http; +#[path = "net/websocket.rs"] +mod websocket; #[path = "security/jwt.rs"] mod jwt; #[path = "security/proof_of_funding.rs"] @@ -34,7 +36,7 @@ async fn main() -> GenericResult<()> { let cfg = get_app_config(); // to panic if redis is not available - get_redis_connection(cfg).await; + // get_redis_connection(cfg).await; serve(cfg).await } diff --git a/main/src/net/websocket.rs b/main/src/net/websocket.rs new file mode 100644 index 0000000..2525f41 --- /dev/null +++ b/main/src/net/websocket.rs @@ -0,0 +1,88 @@ +use std::time::Duration; + +use futures_util::{FutureExt, SinkExt, StreamExt}; +use log::{error, info}; +use tokio::{io, net::TcpListener, sync, time}; +use tokio_tungstenite::tungstenite::Message; + +async fn spawn_proxy(_addr: &str) -> io::Result<()> { + let (tx, _) = sync::broadcast::channel(10); + + let bind_addr = "127.0.0.1:6678"; + let downstream_addresses = [ + "wss://necessary-quaint-road.ethereum-sepolia.quiknode.pro/3173295b7544258f98517fac5bdaa8d02349594a", + ]; + + let server = TcpListener::bind(bind_addr).await?; + info!("Listening on {bind_addr}"); + + for addr in downstream_addresses { + let tx = tx.clone(); + tokio::spawn(async move { + loop { + match tokio_tungstenite::connect_async(addr).await { + Ok((mut socket, _)) => { + info!("Outgoing connection to {addr}"); + time::sleep(Duration::from_secs(1)).await; + socket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap(); + socket + .for_each(|msg| async { + println!("oooooo {:?}", msg); + if let Ok(msg) = msg { + tx.send(msg).ok(); + } + }) + .await; + info!("closed to {addr}"); + } + e => { + panic!("{e:?}"); + } + } + + time::sleep(Duration::from_secs(1)).await; + } + }); + } + + while let Ok((stream, socket)) = server.accept().await { + let mut rx = tx.subscribe(); + tokio::spawn(async move { + info!("Open: {socket}"); + let mut websocket = tokio_tungstenite::accept_async(stream).await.unwrap(); + let mut keepalive_interval = time::interval(Duration::from_secs(30)); + loop { + let msg = futures_util::select! { + _ = keepalive_interval.tick().fuse() => { + // Ensure that we don't let the WebSocket connection get timed out by + // sending a periodic ping + Some(Message::Ping(Vec::new())) + } + msg = rx.recv().fuse() => { + Some(msg.unwrap()) + } + msg = websocket.next() => { + println!("msg {:?}", msg); + if msg.is_none() { + // Socket was closed + break; + } + + websocket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap(); + None + } + }; + + println!("GOT MSG {:?}", msg); + if let Some(msg) = msg { + if let Err(e) = websocket.send(msg).await { + error!("Send failed: {:?}", e); + } + } + } + info!("Closed: {socket}"); + }); + } + + Ok(()) +}