Skip to content

Commit

Permalink
inbound-outbound socket connection and message forwarding
Browse files Browse the repository at this point in the history
Signed-off-by: onur-ozkan <[email protected]>
  • Loading branch information
onur-ozkan committed Jan 8, 2024
1 parent 820940c commit 8c82ae3
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 93 deletions.
4 changes: 2 additions & 2 deletions main/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ mod ctx;
mod db;
#[path = "net/http.rs"]
mod http;
#[path = "net/websocket.rs"]
mod websocket;
#[path = "security/jwt.rs"]
mod jwt;
#[path = "security/proof_of_funding.rs"]
Expand All @@ -22,6 +20,8 @@ mod rate_limiter;
mod rpc;
#[path = "security/sign.rs"]
mod sign;
#[path = "net/websocket.rs"]
mod websocket;

#[cfg(all(target_os = "linux", target_arch = "x86_64", target_env = "gnu"))]
#[global_allocator]
Expand Down
137 changes: 46 additions & 91 deletions main/src/net/websocket.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::time::Duration;

use futures_util::{FutureExt, SinkExt, StreamExt};
use hyper::{header::HeaderValue, Body, Request, upgrade, Response};
use hyper::{header::HeaderValue, upgrade, Body, Request, Response};
use log::{error, info};
use tokio::{io, net::TcpListener, sync, time};
use tokio_tungstenite::{tungstenite::{handshake, Message, Error}, WebSocketStream};
use tokio_tungstenite::{
tungstenite::{handshake, Error, Message},
WebSocketStream,
};

use crate::GenericResult;

Expand All @@ -20,26 +23,58 @@ pub(crate) fn is_websocket_req(req: &Request<Body>) -> bool {
pub(crate) async fn spawn_proxy(mut req: Request<Body>) -> GenericResult<Response<Body>> {
let _inbound_route = req.uri().clone();

let _outbound_addr =
"wss://necessary-quaint-road.ethereum-sepolia.quiknode.pro/3173295b7544258f98517fac5bdaa8d02349594a";
let outbound_addr = "wss://necessary-quaint-road.ethereum-sepolia.quiknode.pro/3173295b7544258f98517fac5bdaa8d02349594a";
let response = match handshake::server::create_response_with_body(&req, || Body::empty()) {
Ok(response) => {
tokio::spawn(async move {
match upgrade::on(&mut req).await {
Ok(upgraded) => {
let ws_stream = WebSocketStream::from_raw_socket(
let mut inbound_socket = WebSocketStream::from_raw_socket(
upgraded,
tokio_tungstenite::tungstenite::protocol::Role::Server,
None,
)
.await;

let (ws_write, ws_read) = ws_stream.split();

match ws_read.forward(ws_write).await {
Ok(_) => {}
Err(Error::ConnectionClosed) => println!("TODO"),
Err(e) => println!("TODO"),
match tokio_tungstenite::connect_async(outbound_addr).await {
Ok((mut outbound_socket, _)) => {
let mut keepalive_interval =
time::interval(Duration::from_secs(10));

loop {
futures_util::select! {
_ = keepalive_interval.tick().fuse() => {
outbound_socket.send(Message::Ping(Vec::new())).await.unwrap();
inbound_socket.send(Message::Ping(Vec::new())).await.unwrap();
}

msg = outbound_socket.next() => {
match msg {
Some(Ok(msg)) => {
inbound_socket.send(msg).await.unwrap();
},
_ => {
break;
}
};
},

msg = inbound_socket.next() => {
match msg {
Some(Ok(msg)) => {
outbound_socket.send(msg).await.unwrap();
},
_ => {
break;
}
};
}
};
}
}
e => {
panic!("{e:?}");
}
};
}
Err(e) => println!("TODO"),
Expand All @@ -55,84 +90,4 @@ pub(crate) async fn spawn_proxy(mut req: Request<Body>) -> GenericResult<Respons
};

Ok(response)

// let (tx, _) = sync::broadcast::channel(10);

// let bind_addr = "127.0.0.1:6678";
// let downstream_addresses = [
// "wss://necessary-quaint-road.ethereum-sepolia.quiknode.pro/3173295b7544258f98517fac5bdaa8d02349594a",
// ];

// let server = TcpListener::bind(bind_addr).await?;
// info!("Listening on {bind_addr}");

// for addr in downstream_addresses {
// let tx = tx.clone();
// tokio::spawn(async move {
// loop {
// match tokio_tungstenite::connect_async(addr).await {
// Ok((mut socket, _)) => {
// info!("Outgoing connection to {addr}");
// time::sleep(Duration::from_secs(1)).await;
// socket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap();
// socket
// .for_each(|msg| async {
// println!("oooooo {:?}", msg);
// if let Ok(msg) = msg {
// tx.send(msg).ok();
// }
// })
// .await;
// info!("closed to {addr}");
// }
// e => {
// panic!("{e:?}");
// }
// }

// time::sleep(Duration::from_secs(1)).await;
// }
// });
// }

// while let Ok((stream, socket)) = server.accept().await {
// let mut rx = tx.subscribe();
// tokio::spawn(async move {
// info!("Open: {socket}");
// let mut websocket = tokio_tungstenite::accept_async(stream).await.unwrap();
// let mut keepalive_interval = time::interval(Duration::from_secs(30));
// loop {
// let msg = futures_util::select! {
// _ = keepalive_interval.tick().fuse() => {
// // Ensure that we don't let the WebSocket connection get timed out by
// // sending a periodic ping
// Some(Message::Ping(Vec::new()))
// }
// msg = rx.recv().fuse() => {
// Some(msg.unwrap())
// }
// msg = websocket.next() => {
// println!("msg {:?}", msg);
// if msg.is_none() {
// // Socket was closed
// break;
// }

// websocket.send("{\"id\":1,\"jsonrpc\":\"2.0\",\"method\":\"eth_subscribe\",\"params\":[\"newPendingTransactions\"]}".into()).await.unwrap();
// None
// }
// };

// println!("GOT MSG {:?}", msg);
// if let Some(msg) = msg {
// if let Err(e) = websocket.send(msg).await {
// error!("Send failed: {:?}", e);
// }
// }
// }
// info!("Closed: {socket}");
// });
// }

// Ok(())
}

0 comments on commit 8c82ae3

Please sign in to comment.