diff --git a/robusta/src/main.rs b/robusta/src/main.rs index bc26994..610b2f2 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use std::fs; +use std::future::Future; use std::io::Write; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -19,6 +21,7 @@ use axum::{ use futures_util::SinkExt; use lazy_static::lazy_static; use reqwest::StatusCode; +use std::sync::atomic::AtomicBool; use tokio::sync::mpsc::{Receiver, Sender}; use tower::util::ServiceExt; use tower_http::{ @@ -42,6 +45,13 @@ const TEAMS_FILE: &str = "teams.json"; /// The name used for the Mr. X team. const MRX: &str = "Mr. X"; +static PLAYERS_ONLINE: AtomicBool = AtomicBool::new(true); + +const USER_ONLINE_CHECK_MS: u64 = 100; +const TRIAS_FETCH_TIMEOUT_S: u64 = 20; +const USER_UPDATE_MS: u64 = 500; +const USER_PROCESS_UPDATE_BATCHES: u64 = 50; + #[derive(Debug)] enum InputMessage { Client(ClientMessage, u32), @@ -61,7 +71,7 @@ struct Client { id: u32, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ClientConnection { id: u32, team_id: u32, @@ -271,33 +281,50 @@ async fn main() { update_bindings(); info!("Starting server"); - let (send, recv) = tokio::sync::mpsc::channel(100); + let (send, recv) = tokio::sync::mpsc::channel(10000); let state = load_state(send.clone()); - if *FETCH_TRAINS { - kvv::init().await; - - // fetch departures every 60 seconds and send them to the game logic queue - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - loop { - interval.tick().await; - let departures = kvv::fetch_departures_for_region().await; - if departures.is_empty() { - warn!("Fetched no departures"); - } - if let Err(err) = send - .send(InputMessage::Server(ServerMessage::Departures(departures))) - .await - { - error!("Error while fetching data: {err}") + let fetch_trains = if *FETCH_TRAINS { + Box::pin(async move { + kvv::init().await; + // fetch departures periodically and send them to the game logic queue if at least one player is online + tokio::spawn(async move { + let mut user_online_check_interval = tokio::time::interval(Duration::from_millis(USER_ONLINE_CHECK_MS)); + let mut timeout = tokio::time::interval(Duration::from_secs(TRIAS_FETCH_TIMEOUT_S)); + // the first tick completes immediately + user_online_check_interval.tick().await; + timeout.tick().await; + loop { + // Skip fetching updates if no player is currently connected + if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) { + user_online_check_interval.tick().await; + continue; + } + + let departures = kvv::fetch_departures_for_region().await; + if departures.is_empty() { + warn!("Fetched no departures"); + } + if let Err(err) = send + .send(InputMessage::Server(ServerMessage::Departures(departures))) + .await + { + error!("Error while fetching data: {err}") + } + timeout.tick().await; } - } - }); - } + }); + }) as Pin>> + } else { + Box::pin(std::future::ready(())) as Pin>> + }; info!("Starting game loop"); - tokio::spawn(run_game_loop(recv, state.clone())); + let move_state = state.clone(); + let game_loop = async move { + fetch_trains.await; + tokio::spawn(run_game_loop(recv, move_state)).await + }; let api = Router::new() .route("/create-team", post(create_team)) @@ -321,10 +348,8 @@ async fn main() { let port = dotenv::var("PORT").unwrap_or_else(|_| "3000".to_string()); // run it with hyper on localhost:3000 - axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); + let axum_server = axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()).serve(app.into_make_service()); + futures_util::join!(axum_server, game_loop).0.unwrap(); } fn update_bindings() { @@ -382,61 +407,69 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .expect("failed to initialize rolling file appender"); // the time for a single frame - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let interval_ms = USER_UPDATE_MS; + let batches = USER_PROCESS_UPDATE_BATCHES; - loop { - interval.tick().await; + let mut interval = tokio::time::interval(Duration::from_millis(interval_ms / batches)); - // handle messages - let mut state = state.lock().await; - while let Ok(msg) = recv.try_recv() { - match msg { - InputMessage::Client(msg, id) => { - info!("Got message from client {}: {:?}", id, msg); - match msg { - ClientMessage::Position { long, lat } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.long = (long + team.long) / 2.; - team.lat = (lat + team.lat) / 2.; + loop { + // Process game updates in smaller batches to reduce latency + for _ in 0..batches { + interval.tick().await; + + // handle messages + let mut state = state.lock().await; + while let Ok(msg) = recv.try_recv() { + match msg { + InputMessage::Client(msg, id) => { + info!("Got message from client {}: {:?}", id, msg); + match msg { + ClientMessage::Position { long, lat } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.long = (long + team.long) / 2.; + team.lat = (lat + team.lat) / 2.; + } } - } - ClientMessage::SetTeamPosition { long, lat } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.long = long; - team.lat = lat; + ClientMessage::SetTeamPosition { long, lat } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.long = long; + team.lat = lat; + } } - } - ClientMessage::Message(msg) => { - info!("Got message: {}", msg); - } - ClientMessage::JoinTeam { team_id } => { - let Some(client) = state.client_mut(id) else { - warn!("Client {} not found", id); - continue; - }; - client.team_id = team_id; - } - ClientMessage::EmbarkTrain { train_id } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.on_train = Some(train_id); + ClientMessage::Message(msg) => { + info!("Got message: {}", msg); } - } - ClientMessage::DisembarkTrain => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.on_train = None; + ClientMessage::JoinTeam { team_id } => { + let Some(client) = state.client_mut(id) else { + warn!("Client {} not found", id); + continue; + }; + client.team_id = team_id; + } + ClientMessage::EmbarkTrain { train_id } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.on_train = Some(train_id); + } + } + ClientMessage::DisembarkTrain => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.on_train = None; + } } } } - } - InputMessage::Server(ServerMessage::Departures(deps)) => { - departures = deps; - } - InputMessage::Server(ServerMessage::ClientDisconnected(id)) => { - info!("Client {} disconnected", id); - state.connections.retain(|x| x.id != id); + InputMessage::Server(ServerMessage::Departures(deps)) => { + departures = deps; + } + InputMessage::Server(ServerMessage::ClientDisconnected(id)) => { + info!("Client {} disconnected", id); + state.connections.retain(|x| x.id != id); + } } } + PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::Relaxed); } + let mut state = state.lock().await; // compute train positions let time = chrono::Utc::now(); @@ -471,7 +504,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap(); // send game state to clients - for connection in state.connections.iter_mut() { + for connection in state.connections.iter() { let game_state = GameState { teams: game_state .teams @@ -481,14 +514,13 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .collect(), trains: game_state.trains.clone(), }; - if let Err(err) = connection - .send - .send(ClientResponse::GameState(game_state.clone())) - .await - { - error!("failed to send game state to client {}: {}", connection.id, err); - continue; - } + // Distributes updates concurrently + let connection = connection.clone(); + tokio::task::spawn(async move { + if let Err(err) = connection.send.send(ClientResponse::GameState(game_state)).await { + error!("failed to send game state to client {}: {}", connection.id, err); + } + }); } } } diff --git a/shell.nix b/shell.nix index 1fb845e..c47274d 100644 --- a/shell.nix +++ b/shell.nix @@ -28,10 +28,8 @@ let }; # Define the rustc we need - rustc-wasm = pkgs.rust-bin.stable.latest.default.override { - targets = [ "wasm32-unknown-unknown" ]; - # wasm-pack needs this - extensions = [ "rust-src" ]; + rustc = pkgs.rust-bin.stable.latest.default.override { + extensions = [ "clippy" "rust-analyzer" ]; }; lib = pkgs.lib; @@ -39,33 +37,18 @@ in # Make a shell with the dependencies we need pkgs.mkShell { buildInputs = [ - rustc-wasm + rustc pkgs.nodejs pkgs.cargo - pkgs.cargo-watch - pkgs.wasm-pack pkgs.clang pkgs.openssl pkgs.openssl.dev - pkgs.glib - pkgs.gtk3 - pkgs.libsoup - pkgs.webkitgtk - pkgs.freetype - pkgs.freetype.dev pkgs.pkg-config - # Use Mold as a Linke + # Use Mold as a Linker pkgs.mold - - # Vulkan - #pkgs.glslang - pkgs.shaderc - pkgs.vulkan-headers - pkgs.vulkan-loader - #pkgs.vulkan-validation-layers ]; # Hacky way to run cago through Mold