Skip to content

Commit

Permalink
save dev state
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Dec 28, 2023
1 parent 453b49f commit ecd7688
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 1 deletion.
4 changes: 3 additions & 1 deletion main/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ mod ctx;
mod db;
#[path = "net/http.rs"]
mod http;
#[path = "net/websocket.rs"]
mod websocket;
#[path = "security/jwt.rs"]
mod jwt;
#[path = "security/proof_of_funding.rs"]
Expand All @@ -34,7 +36,7 @@ async fn main() -> GenericResult<()> {

let cfg = get_app_config();
// to panic if redis is not available
get_redis_connection(cfg).await;
// get_redis_connection(cfg).await;

serve(cfg).await
}
88 changes: 88 additions & 0 deletions main/src/net/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use std::time::Duration;

use futures_util::{FutureExt, SinkExt, StreamExt};
use log::{error, info};
use tokio::{io, net::TcpListener, sync, time};
use tokio_tungstenite::tungstenite::Message;

async fn spawn_proxy(_addr: &str) -> io::Result<()> {
let (tx, _) = sync::broadcast::channel(10);

let bind_addr = "127.0.0.1:6678";
let downstream_addresses = [
"wss://necessary-quaint-road.ethereum-sepolia.quiknode.pro/3173295b7544258f98517fac5bdaa8d02349594a",
];

let server = TcpListener::bind(bind_addr).await?;
info!("Listening on {bind_addr}");

for addr in downstream_addresses {
let tx = tx.clone();
tokio::spawn(async move {
loop {
match tokio_tungstenite::connect_async(addr).await {
Ok((mut socket, _)) => {
info!("Outgoing connection to {addr}");
time::sleep(Duration::from_secs(1)).await;
socket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap();
socket
.for_each(|msg| async {
println!("oooooo {:?}", msg);
if let Ok(msg) = msg {
tx.send(msg).ok();
}
})
.await;
info!("closed to {addr}");
}
e => {
panic!("{e:?}");
}
}

time::sleep(Duration::from_secs(1)).await;
}
});
}

while let Ok((stream, socket)) = server.accept().await {
let mut rx = tx.subscribe();
tokio::spawn(async move {
info!("Open: {socket}");
let mut websocket = tokio_tungstenite::accept_async(stream).await.unwrap();
let mut keepalive_interval = time::interval(Duration::from_secs(30));
loop {
let msg = futures_util::select! {
_ = keepalive_interval.tick().fuse() => {
// Ensure that we don't let the WebSocket connection get timed out by
// sending a periodic ping
Some(Message::Ping(Vec::new()))
}
msg = rx.recv().fuse() => {
Some(msg.unwrap())
}
msg = websocket.next() => {
println!("msg {:?}", msg);
if msg.is_none() {
// Socket was closed
break;
}

websocket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap();
None
}
};

println!("GOT MSG {:?}", msg);
if let Some(msg) = msg {
if let Err(e) = websocket.send(msg).await {
error!("Send failed: {:?}", e);
}
}
}
info!("Closed: {socket}");
});
}

Ok(())
}

0 comments on commit ecd7688

Please sign in to comment.