Skip to content

Commit

Permalink
Improve message handling and pause fetching if no one is online (#45)
Browse files Browse the repository at this point in the history
* Refactor timeouts as configurable constants

---------

Co-authored-by: konsumlamm <[email protected]>
  • Loading branch information
TrueDoctor and konsumlamm authored Nov 19, 2024
1 parent cceb77a commit 1375eed
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 101 deletions.
192 changes: 112 additions & 80 deletions robusta/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::fs;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -19,6 +21,7 @@ use axum::{
use futures_util::SinkExt;
use lazy_static::lazy_static;
use reqwest::StatusCode;
use std::sync::atomic::AtomicBool;
use tokio::sync::mpsc::{Receiver, Sender};
use tower::util::ServiceExt;
use tower_http::{
Expand All @@ -42,6 +45,13 @@ const TEAMS_FILE: &str = "teams.json";
/// The name used for the Mr. X team.
const MRX: &str = "Mr. X";

static PLAYERS_ONLINE: AtomicBool = AtomicBool::new(true);

const USER_ONLINE_CHECK_MS: u64 = 100;
const TRIAS_FETCH_TIMEOUT_S: u64 = 20;
const USER_UPDATE_MS: u64 = 500;
const USER_PROCESS_UPDATE_BATCHES: u64 = 50;

#[derive(Debug)]
enum InputMessage {
Client(ClientMessage, u32),
Expand All @@ -61,7 +71,7 @@ struct Client {
id: u32,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct ClientConnection {
id: u32,
team_id: u32,
Expand Down Expand Up @@ -271,33 +281,50 @@ async fn main() {
update_bindings();

info!("Starting server");
let (send, recv) = tokio::sync::mpsc::channel(100);
let (send, recv) = tokio::sync::mpsc::channel(10000);
let state = load_state(send.clone());

if *FETCH_TRAINS {
kvv::init().await;

// fetch departures every 60 seconds and send them to the game logic queue
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let departures = kvv::fetch_departures_for_region().await;
if departures.is_empty() {
warn!("Fetched no departures");
}
if let Err(err) = send
.send(InputMessage::Server(ServerMessage::Departures(departures)))
.await
{
error!("Error while fetching data: {err}")
let fetch_trains = if *FETCH_TRAINS {
Box::pin(async move {
kvv::init().await;
// fetch departures periodically and send them to the game logic queue if at least one player is online
tokio::spawn(async move {
let mut user_online_check_interval = tokio::time::interval(Duration::from_millis(USER_ONLINE_CHECK_MS));
let mut timeout = tokio::time::interval(Duration::from_secs(TRIAS_FETCH_TIMEOUT_S));
// the first tick completes immediately
user_online_check_interval.tick().await;
timeout.tick().await;
loop {
// Skip fetching updates if no player is currently connected
if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) {
user_online_check_interval.tick().await;
continue;
}

let departures = kvv::fetch_departures_for_region().await;
if departures.is_empty() {
warn!("Fetched no departures");
}
if let Err(err) = send
.send(InputMessage::Server(ServerMessage::Departures(departures)))
.await
{
error!("Error while fetching data: {err}")
}
timeout.tick().await;
}
}
});
}
});
}) as Pin<Box<dyn Future<Output = ()>>>
} else {
Box::pin(std::future::ready(())) as Pin<Box<dyn Future<Output = ()>>>
};

info!("Starting game loop");
tokio::spawn(run_game_loop(recv, state.clone()));
let move_state = state.clone();
let game_loop = async move {
fetch_trains.await;
tokio::spawn(run_game_loop(recv, move_state)).await
};

let api = Router::new()
.route("/create-team", post(create_team))
Expand All @@ -321,10 +348,8 @@ async fn main() {

let port = dotenv::var("PORT").unwrap_or_else(|_| "3000".to_string());
// run it with hyper on localhost:3000
axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
let axum_server = axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()).serve(app.into_make_service());
futures_util::join!(axum_server, game_loop).0.unwrap();
}

fn update_bindings() {
Expand Down Expand Up @@ -382,61 +407,69 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.expect("failed to initialize rolling file appender");

// the time for a single frame
let mut interval = tokio::time::interval(Duration::from_millis(500));
let interval_ms = USER_UPDATE_MS;
let batches = USER_PROCESS_UPDATE_BATCHES;

loop {
interval.tick().await;
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms / batches));

// handle messages
let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
match msg {
InputMessage::Client(msg, id) => {
info!("Got message from client {}: {:?}", id, msg);
match msg {
ClientMessage::Position { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = (long + team.long) / 2.;
team.lat = (lat + team.lat) / 2.;
loop {
// Process game updates in smaller batches to reduce latency
for _ in 0..batches {
interval.tick().await;

// handle messages
let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
match msg {
InputMessage::Client(msg, id) => {
info!("Got message from client {}: {:?}", id, msg);
match msg {
ClientMessage::Position { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = (long + team.long) / 2.;
team.lat = (lat + team.lat) / 2.;
}
}
}
ClientMessage::SetTeamPosition { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = long;
team.lat = lat;
ClientMessage::SetTeamPosition { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = long;
team.lat = lat;
}
}
}
ClientMessage::Message(msg) => {
info!("Got message: {}", msg);
}
ClientMessage::JoinTeam { team_id } => {
let Some(client) = state.client_mut(id) else {
warn!("Client {} not found", id);
continue;
};
client.team_id = team_id;
}
ClientMessage::EmbarkTrain { train_id } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = Some(train_id);
ClientMessage::Message(msg) => {
info!("Got message: {}", msg);
}
}
ClientMessage::DisembarkTrain => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = None;
ClientMessage::JoinTeam { team_id } => {
let Some(client) = state.client_mut(id) else {
warn!("Client {} not found", id);
continue;
};
client.team_id = team_id;
}
ClientMessage::EmbarkTrain { train_id } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = Some(train_id);
}
}
ClientMessage::DisembarkTrain => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = None;
}
}
}
}
}
InputMessage::Server(ServerMessage::Departures(deps)) => {
departures = deps;
}
InputMessage::Server(ServerMessage::ClientDisconnected(id)) => {
info!("Client {} disconnected", id);
state.connections.retain(|x| x.id != id);
InputMessage::Server(ServerMessage::Departures(deps)) => {
departures = deps;
}
InputMessage::Server(ServerMessage::ClientDisconnected(id)) => {
info!("Client {} disconnected", id);
state.connections.retain(|x| x.id != id);
}
}
}
PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::Relaxed);
}
let mut state = state.lock().await;

// compute train positions
let time = chrono::Utc::now();
Expand Down Expand Up @@ -471,7 +504,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap();

// send game state to clients
for connection in state.connections.iter_mut() {
for connection in state.connections.iter() {
let game_state = GameState {
teams: game_state
.teams
Expand All @@ -481,14 +514,13 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.collect(),
trains: game_state.trains.clone(),
};
if let Err(err) = connection
.send
.send(ClientResponse::GameState(game_state.clone()))
.await
{
error!("failed to send game state to client {}: {}", connection.id, err);
continue;
}
// Distributes updates concurrently
let connection = connection.clone();
tokio::task::spawn(async move {
if let Err(err) = connection.send.send(ClientResponse::GameState(game_state)).await {
error!("failed to send game state to client {}: {}", connection.id, err);
}
});
}
}
}
25 changes: 4 additions & 21 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,27 @@ let
};

# Define the rustc we need
rustc-wasm = pkgs.rust-bin.stable.latest.default.override {
targets = [ "wasm32-unknown-unknown" ];
# wasm-pack needs this
extensions = [ "rust-src" ];
rustc = pkgs.rust-bin.stable.latest.default.override {
extensions = [ "clippy" "rust-analyzer" ];
};

lib = pkgs.lib;
in
# Make a shell with the dependencies we need
pkgs.mkShell {
buildInputs = [
rustc-wasm
rustc
pkgs.nodejs
pkgs.cargo
pkgs.cargo-watch
pkgs.wasm-pack
pkgs.clang

pkgs.openssl
pkgs.openssl.dev
pkgs.glib
pkgs.gtk3
pkgs.libsoup
pkgs.webkitgtk
pkgs.freetype
pkgs.freetype.dev

pkgs.pkg-config

# Use Mold as a Linke
# Use Mold as a Linker
pkgs.mold

# Vulkan
#pkgs.glslang
pkgs.shaderc
pkgs.vulkan-headers
pkgs.vulkan-loader
#pkgs.vulkan-validation-layers
];

# Hacky way to run cago through Mold
Expand Down

0 comments on commit 1375eed

Please sign in to comment.