From 7428b737b1efbf171b951af63245b7fedf6e3531 Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Tue, 5 Nov 2024 12:37:28 +0100 Subject: [PATCH 1/4] Improve message handling and pause fetching if no one is online --- robusta/src/main.rs | 194 +++++++++++++++++++++++++------------------- shell.nix | 25 +----- 2 files changed, 115 insertions(+), 104 deletions(-) diff --git a/robusta/src/main.rs b/robusta/src/main.rs index bc26994..35ed972 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -1,7 +1,9 @@ use std::collections::HashMap; use std::fs; +use std::future::Future; use std::io::Write; use std::path::PathBuf; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -16,11 +18,15 @@ use axum::{ routing::{get, get_service, post}, Json, Router, }; -use futures_util::SinkExt; +use futures_util::{FutureExt, SinkExt}; use lazy_static::lazy_static; use reqwest::StatusCode; -use tokio::sync::mpsc::{Receiver, Sender}; -use tower::util::ServiceExt; +use std::sync::atomic::AtomicBool; +use tokio::{ + pin, + sync::mpsc::{Receiver, Sender}, +}; +use tower::util::{Oneshot, ServiceExt}; use tower_http::{ cors::CorsLayer, services::{ServeDir, ServeFile}, @@ -42,6 +48,8 @@ const TEAMS_FILE: &str = "teams.json"; /// The name used for the Mr. X team. const MRX: &str = "Mr. X"; +static PLAYERS_ONLINE: AtomicBool = AtomicBool::new(true); + #[derive(Debug)] enum InputMessage { Client(ClientMessage, u32), @@ -61,7 +69,7 @@ struct Client { id: u32, } -#[derive(Debug)] +#[derive(Debug, Clone)] struct ClientConnection { id: u32, team_id: u32, @@ -271,33 +279,48 @@ async fn main() { update_bindings(); info!("Starting server"); - let (send, recv) = tokio::sync::mpsc::channel(100); + let (send, recv) = tokio::sync::mpsc::channel(10000); let state = load_state(send.clone()); - if *FETCH_TRAINS { - kvv::init().await; - - // fetch departures every 60 seconds and send them to the game logic queue - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - loop { - interval.tick().await; - let departures = kvv::fetch_departures_for_region().await; - if departures.is_empty() { - warn!("Fetched no departures"); - } - if let Err(err) = send - .send(InputMessage::Server(ServerMessage::Departures(departures))) - .await - { - error!("Error while fetching data: {err}") + let fetch_trains = if *FETCH_TRAINS { + Box::pin(async move { + kvv::init().await; + // fetch departures every 60 seconds and send them to the game logic queue + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_millis(100)); + let mut timeout = tokio::time::interval(Duration::from_secs(10)); + loop { + interval.tick().await; + // Skip fetching updates if no player is currently connected + if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) { + continue; + } + + let departures = kvv::fetch_departures_for_region().await; + if departures.is_empty() { + warn!("Fetched no departures"); + } + if let Err(err) = send + .send(InputMessage::Server(ServerMessage::Departures(departures))) + .await + { + error!("Error while fetching data: {err}") + } + timeout.tick().await; } - } - }); - } + }); + }) as Pin>> + } else { + Box::pin(std::future::ready(())) as Pin>> + }; + pin!(fetch_trains); info!("Starting game loop"); - tokio::spawn(run_game_loop(recv, state.clone())); + let move_state = state.clone(); + let game_loop = async move { + fetch_trains.await; + tokio::spawn(run_game_loop(recv, move_state)).await + }; let api = Router::new() .route("/create-team", post(create_team)) @@ -321,10 +344,8 @@ async fn main() { let port = dotenv::var("PORT").unwrap_or_else(|_| "3000".to_string()); // run it with hyper on localhost:3000 - axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()) - .serve(app.into_make_service()) - .await - .unwrap(); + let axum_server = axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()).serve(app.into_make_service()); + futures_util::join!(axum_server, game_loop).0.unwrap(); } fn update_bindings() { @@ -382,61 +403,69 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .expect("failed to initialize rolling file appender"); // the time for a single frame - let mut interval = tokio::time::interval(Duration::from_millis(500)); + let interval_ms = 500; + let batches = 50; - loop { - interval.tick().await; + let mut interval = tokio::time::interval(Duration::from_millis(interval_ms / batches)); - // handle messages - let mut state = state.lock().await; - while let Ok(msg) = recv.try_recv() { - match msg { - InputMessage::Client(msg, id) => { - info!("Got message from client {}: {:?}", id, msg); - match msg { - ClientMessage::Position { long, lat } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.long = (long + team.long) / 2.; - team.lat = (lat + team.lat) / 2.; + loop { + // Process game updates in smaller batches to reduce latency + for _ in 0..batches { + interval.tick().await; + + // handle messages + let mut state = state.lock().await; + while let Ok(msg) = recv.try_recv() { + match msg { + InputMessage::Client(msg, id) => { + info!("Got message from client {}: {:?}", id, msg); + match msg { + ClientMessage::Position { long, lat } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.long = (long + team.long) / 2.; + team.lat = (lat + team.lat) / 2.; + } } - } - ClientMessage::SetTeamPosition { long, lat } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.long = long; - team.lat = lat; + ClientMessage::SetTeamPosition { long, lat } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.long = long; + team.lat = lat; + } } - } - ClientMessage::Message(msg) => { - info!("Got message: {}", msg); - } - ClientMessage::JoinTeam { team_id } => { - let Some(client) = state.client_mut(id) else { - warn!("Client {} not found", id); - continue; - }; - client.team_id = team_id; - } - ClientMessage::EmbarkTrain { train_id } => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.on_train = Some(train_id); + ClientMessage::Message(msg) => { + info!("Got message: {}", msg); } - } - ClientMessage::DisembarkTrain => { - if let Some(team) = state.team_mut_by_client_id(id) { - team.on_train = None; + ClientMessage::JoinTeam { team_id } => { + let Some(client) = state.client_mut(id) else { + warn!("Client {} not found", id); + continue; + }; + client.team_id = team_id; + } + ClientMessage::EmbarkTrain { train_id } => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.on_train = Some(train_id); + } + } + ClientMessage::DisembarkTrain => { + if let Some(team) = state.team_mut_by_client_id(id) { + team.on_train = None; + } } } } - } - InputMessage::Server(ServerMessage::Departures(deps)) => { - departures = deps; - } - InputMessage::Server(ServerMessage::ClientDisconnected(id)) => { - info!("Client {} disconnected", id); - state.connections.retain(|x| x.id != id); + InputMessage::Server(ServerMessage::Departures(deps)) => { + departures = deps; + } + InputMessage::Server(ServerMessage::ClientDisconnected(id)) => { + info!("Client {} disconnected", id); + state.connections.retain(|x| x.id != id); + } } } + PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::SeqCst); } + let mut state = state.lock().await; // compute train positions let time = chrono::Utc::now(); @@ -471,7 +500,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap(); // send game state to clients - for connection in state.connections.iter_mut() { + for connection in state.connections.iter() { let game_state = GameState { teams: game_state .teams @@ -481,14 +510,13 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .collect(), trains: game_state.trains.clone(), }; - if let Err(err) = connection - .send - .send(ClientResponse::GameState(game_state.clone())) - .await - { - error!("failed to send game state to client {}: {}", connection.id, err); - continue; - } + // Distributes updates concurrently + let connection = connection.clone(); + tokio::task::spawn(async move { + if let Err(err) = connection.send.send(ClientResponse::GameState(game_state)).await { + error!("failed to send game state to client {}: {}", connection.id, err); + } + }); } } } diff --git a/shell.nix b/shell.nix index 1fb845e..c47274d 100644 --- a/shell.nix +++ b/shell.nix @@ -28,10 +28,8 @@ let }; # Define the rustc we need - rustc-wasm = pkgs.rust-bin.stable.latest.default.override { - targets = [ "wasm32-unknown-unknown" ]; - # wasm-pack needs this - extensions = [ "rust-src" ]; + rustc = pkgs.rust-bin.stable.latest.default.override { + extensions = [ "clippy" "rust-analyzer" ]; }; lib = pkgs.lib; @@ -39,33 +37,18 @@ in # Make a shell with the dependencies we need pkgs.mkShell { buildInputs = [ - rustc-wasm + rustc pkgs.nodejs pkgs.cargo - pkgs.cargo-watch - pkgs.wasm-pack pkgs.clang pkgs.openssl pkgs.openssl.dev - pkgs.glib - pkgs.gtk3 - pkgs.libsoup - pkgs.webkitgtk - pkgs.freetype - pkgs.freetype.dev pkgs.pkg-config - # Use Mold as a Linke + # Use Mold as a Linker pkgs.mold - - # Vulkan - #pkgs.glslang - pkgs.shaderc - pkgs.vulkan-headers - pkgs.vulkan-loader - #pkgs.vulkan-validation-layers ]; # Hacky way to run cago through Mold From 2de30eb8845c3e5b5564221fc65b28e4c8d296fe Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Wed, 13 Nov 2024 16:51:12 +0100 Subject: [PATCH 2/4] Refactor timeouts as configurable constants --- robusta/src/main.rs | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/robusta/src/main.rs b/robusta/src/main.rs index 35ed972..30cea6e 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -50,6 +50,11 @@ const MRX: &str = "Mr. X"; static PLAYERS_ONLINE: AtomicBool = AtomicBool::new(true); +const USER_ONLINE_CHECK_MS: u64 = 100; +const TRIAS_FETCH_TIMEOUT_S: u64 = 20; +const USER_UPDATE_MS: u64 = 500; +const USER_PROCESS_UPDATE_BATCHES: u64 = 50; + #[derive(Debug)] enum InputMessage { Client(ClientMessage, u32), @@ -285,14 +290,14 @@ async fn main() { let fetch_trains = if *FETCH_TRAINS { Box::pin(async move { kvv::init().await; - // fetch departures every 60 seconds and send them to the game logic queue + // fetch departures periodically and send them to the game logic queue if at least one player is online tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(100)); - let mut timeout = tokio::time::interval(Duration::from_secs(10)); + let mut user_online_check_interval = tokio::time::interval(Duration::from_millis(USER_ONLINE_CHECK_MS)); + let mut timeout = tokio::time::interval(Duration::from_secs(TRIAS_FETCH_TIMEOUT_S)); loop { - interval.tick().await; // Skip fetching updates if no player is currently connected if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) { + user_online_check_interval.tick().await; continue; } @@ -314,7 +319,6 @@ async fn main() { Box::pin(std::future::ready(())) as Pin>> }; - pin!(fetch_trains); info!("Starting game loop"); let move_state = state.clone(); let game_loop = async move { @@ -403,8 +407,8 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { .expect("failed to initialize rolling file appender"); // the time for a single frame - let interval_ms = 500; - let batches = 50; + let interval_ms = USER_UPDATE_MS; + let batches = USER_PROCESS_UPDATE_BATCHES; let mut interval = tokio::time::interval(Duration::from_millis(interval_ms / batches)); @@ -463,7 +467,7 @@ async fn run_game_loop(mut recv: Receiver, state: SharedState) { } } } - PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::SeqCst); + PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::Relaxed); } let mut state = state.lock().await; From 931d04ed4e309811879c9897b9b47e252ec7a7a5 Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Wed, 13 Nov 2024 16:53:17 +0100 Subject: [PATCH 3/4] Remove unused imports --- robusta/src/main.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/robusta/src/main.rs b/robusta/src/main.rs index 30cea6e..0f28849 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -18,15 +18,12 @@ use axum::{ routing::{get, get_service, post}, Json, Router, }; -use futures_util::{FutureExt, SinkExt}; +use futures_util::SinkExt; use lazy_static::lazy_static; use reqwest::StatusCode; use std::sync::atomic::AtomicBool; -use tokio::{ - pin, - sync::mpsc::{Receiver, Sender}, -}; -use tower::util::{Oneshot, ServiceExt}; +use tokio::sync::mpsc::{Receiver, Sender}; +use tower::util::ServiceExt; use tower_http::{ cors::CorsLayer, services::{ServeDir, ServeFile}, From 0b74ad341c9b755c324e9635e22b4cbce267440d Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Tue, 19 Nov 2024 11:37:06 +0100 Subject: [PATCH 4/4] Update robusta/src/main.rs Co-authored-by: konsumlamm <44230978+konsumlamm@users.noreply.github.com> --- robusta/src/main.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/robusta/src/main.rs b/robusta/src/main.rs index 0f28849..610b2f2 100644 --- a/robusta/src/main.rs +++ b/robusta/src/main.rs @@ -291,6 +291,9 @@ async fn main() { tokio::spawn(async move { let mut user_online_check_interval = tokio::time::interval(Duration::from_millis(USER_ONLINE_CHECK_MS)); let mut timeout = tokio::time::interval(Duration::from_secs(TRIAS_FETCH_TIMEOUT_S)); + // the first tick completes immediately + user_online_check_interval.tick().await; + timeout.tick().await; loop { // Skip fetching updates if no player is currently connected if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) {