Skip to content

Commit

Permalink
fix: connector loop error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
filipton committed Sep 1, 2024
1 parent 9db5356 commit b08eade
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions src/server/tunnel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::structs::{SharedProxyState, TunnelError, TunnelRequest, TunnelSender};
use anyhow::{anyhow, Result};
use kanal::AsyncReceiver;
use std::{collections::HashMap, sync::Arc, time::Duration};
use tokio::{
io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
net::{TcpListener, TcpStream},
};
use tokio_rustls::{rustls::pki_types, TlsAcceptor, TlsConnector};
use tokio_rustls::{client::TlsStream, rustls::pki_types, TlsAcceptor, TlsConnector};

const PANEL_HTML: &str = include_str!("./resources/index.html");
const ERROR_HTML: &str = include_str!("./resources/error.html");
Expand Down Expand Up @@ -93,36 +94,12 @@ async fn connector_handler(

let (tx, rx) = kanal::unbounded_async::<TunnelRequest>();
state.insert_tunnel_connector(token, tx).await;

let mut pinger = tokio::time::interval(Duration::from_secs(15));
loop {
tokio::select! {
res = rx.recv() => {
let res = res?;
match res {
TunnelRequest::Close => return Ok(()),
TunnelRequest::Request { ssl, tunnel_id } => {
stream.write_u8(u8::from(ssl)).await?;
stream.write_u128(tunnel_id).await?;
}
}
}
res = stream.read_u8() => {
if res.is_err() {
state.remove_tunnel(token).await;
// tunnel is closed
return Ok(());
}
}
_ = pinger.tick() => {
stream.write_u8(0x69).await?;
let read = stream.read_u8().await?;
if read != 0x69 {
tracing::error!("Wrong pong response: {:x}", read);
}
}
}
let res = connector_loop(&mut stream, rx).await;
if let Err(e) = res {
tracing::error!("Connector loop: {e:?}");
}

state.remove_tunnel(token).await;
} else if connection_buff[0] == 1 {
// im the tunnel!
let tunnel_id = u128::from_be_bytes(connection_buff[26..42].try_into().unwrap());
Expand All @@ -137,6 +114,39 @@ async fn connector_handler(
Ok(())
}

async fn connector_loop(
stream: &mut TlsStream<TcpStream>,
rx: AsyncReceiver<TunnelRequest>,
) -> Result<()> {
let mut pinger = tokio::time::interval(Duration::from_secs(15));
loop {
tokio::select! {
res = rx.recv() => {
let res = res?;
match res {
TunnelRequest::Close => return Ok(()),
TunnelRequest::Request { ssl, tunnel_id } => {
stream.write_u8(u8::from(ssl)).await?;
stream.write_u128(tunnel_id).await?;
}
}
}
res = stream.read_u8() => {
if res.is_err() {
return Ok(());
}
}
_ = pinger.tick() => {
stream.write_u8(0x69).await?;
let read = stream.read_u8().await?;
if read != 0x69 {
tracing::error!("Wrong pong response: {:x}", read);
}
}
}
}
}

async fn remote_listener(addr: &str, state: SharedProxyState, ssl: bool) -> Result<()> {
tracing::info!("Remote listening on: {addr} (SSL: {ssl})");
let listener = TcpListener::bind(addr).await?;
Expand Down

0 comments on commit b08eade

Please sign in to comment.