Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve message handling and pause fetching if no one is online #45

Merged
merged 4 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 109 additions & 80 deletions robusta/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::collections::HashMap;
use std::fs;
use std::future::Future;
use std::io::Write;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -19,6 +21,7 @@ use axum::{
use futures_util::SinkExt;
use lazy_static::lazy_static;
use reqwest::StatusCode;
use std::sync::atomic::AtomicBool;
use tokio::sync::mpsc::{Receiver, Sender};
use tower::util::ServiceExt;
use tower_http::{
Expand All @@ -42,6 +45,13 @@ const TEAMS_FILE: &str = "teams.json";
/// The name used for the Mr. X team.
const MRX: &str = "Mr. X";

static PLAYERS_ONLINE: AtomicBool = AtomicBool::new(true);

const USER_ONLINE_CHECK_MS: u64 = 100;
const TRIAS_FETCH_TIMEOUT_S: u64 = 20;
const USER_UPDATE_MS: u64 = 500;
const USER_PROCESS_UPDATE_BATCHES: u64 = 50;

#[derive(Debug)]
enum InputMessage {
Client(ClientMessage, u32),
Expand All @@ -61,7 +71,7 @@ struct Client {
id: u32,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
struct ClientConnection {
id: u32,
team_id: u32,
Expand Down Expand Up @@ -271,33 +281,47 @@ async fn main() {
update_bindings();

info!("Starting server");
let (send, recv) = tokio::sync::mpsc::channel(100);
let (send, recv) = tokio::sync::mpsc::channel(10000);
let state = load_state(send.clone());

if *FETCH_TRAINS {
kvv::init().await;

// fetch departures every 60 seconds and send them to the game logic queue
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let departures = kvv::fetch_departures_for_region().await;
if departures.is_empty() {
warn!("Fetched no departures");
}
if let Err(err) = send
.send(InputMessage::Server(ServerMessage::Departures(departures)))
.await
{
error!("Error while fetching data: {err}")
let fetch_trains = if *FETCH_TRAINS {
Box::pin(async move {
kvv::init().await;
// fetch departures periodically and send them to the game logic queue if at least one player is online
tokio::spawn(async move {
let mut user_online_check_interval = tokio::time::interval(Duration::from_millis(USER_ONLINE_CHECK_MS));
let mut timeout = tokio::time::interval(Duration::from_secs(TRIAS_FETCH_TIMEOUT_S));
loop {
TrueDoctor marked this conversation as resolved.
Show resolved Hide resolved
// Skip fetching updates if no player is currently connected
if !PLAYERS_ONLINE.load(std::sync::atomic::Ordering::Relaxed) {
user_online_check_interval.tick().await;
continue;
}

let departures = kvv::fetch_departures_for_region().await;
if departures.is_empty() {
warn!("Fetched no departures");
}
if let Err(err) = send
.send(InputMessage::Server(ServerMessage::Departures(departures)))
.await
{
error!("Error while fetching data: {err}")
}
timeout.tick().await;
}
}
});
}
});
}) as Pin<Box<dyn Future<Output = ()>>>
} else {
Box::pin(std::future::ready(())) as Pin<Box<dyn Future<Output = ()>>>
};

info!("Starting game loop");
tokio::spawn(run_game_loop(recv, state.clone()));
let move_state = state.clone();
let game_loop = async move {
fetch_trains.await;
konsumlamm marked this conversation as resolved.
Show resolved Hide resolved
tokio::spawn(run_game_loop(recv, move_state)).await
};

let api = Router::new()
.route("/create-team", post(create_team))
Expand All @@ -321,10 +345,8 @@ async fn main() {

let port = dotenv::var("PORT").unwrap_or_else(|_| "3000".to_string());
// run it with hyper on localhost:3000
axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
let axum_server = axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()).serve(app.into_make_service());
futures_util::join!(axum_server, game_loop).0.unwrap();
}

fn update_bindings() {
Expand Down Expand Up @@ -382,61 +404,69 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.expect("failed to initialize rolling file appender");

// the time for a single frame
let mut interval = tokio::time::interval(Duration::from_millis(500));
let interval_ms = USER_UPDATE_MS;
let batches = USER_PROCESS_UPDATE_BATCHES;

loop {
interval.tick().await;
let mut interval = tokio::time::interval(Duration::from_millis(interval_ms / batches));

// handle messages
let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
match msg {
InputMessage::Client(msg, id) => {
info!("Got message from client {}: {:?}", id, msg);
match msg {
ClientMessage::Position { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = (long + team.long) / 2.;
team.lat = (lat + team.lat) / 2.;
loop {
// Process game updates in smaller batches to reduce latency
for _ in 0..batches {
interval.tick().await;

// handle messages
let mut state = state.lock().await;
while let Ok(msg) = recv.try_recv() {
match msg {
InputMessage::Client(msg, id) => {
info!("Got message from client {}: {:?}", id, msg);
match msg {
ClientMessage::Position { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = (long + team.long) / 2.;
team.lat = (lat + team.lat) / 2.;
}
}
}
ClientMessage::SetTeamPosition { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = long;
team.lat = lat;
ClientMessage::SetTeamPosition { long, lat } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.long = long;
team.lat = lat;
}
}
}
ClientMessage::Message(msg) => {
info!("Got message: {}", msg);
}
ClientMessage::JoinTeam { team_id } => {
let Some(client) = state.client_mut(id) else {
warn!("Client {} not found", id);
continue;
};
client.team_id = team_id;
}
ClientMessage::EmbarkTrain { train_id } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = Some(train_id);
ClientMessage::Message(msg) => {
info!("Got message: {}", msg);
}
}
ClientMessage::DisembarkTrain => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = None;
ClientMessage::JoinTeam { team_id } => {
let Some(client) = state.client_mut(id) else {
warn!("Client {} not found", id);
continue;
};
client.team_id = team_id;
}
ClientMessage::EmbarkTrain { train_id } => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = Some(train_id);
}
}
ClientMessage::DisembarkTrain => {
if let Some(team) = state.team_mut_by_client_id(id) {
team.on_train = None;
}
}
}
}
}
InputMessage::Server(ServerMessage::Departures(deps)) => {
departures = deps;
}
InputMessage::Server(ServerMessage::ClientDisconnected(id)) => {
info!("Client {} disconnected", id);
state.connections.retain(|x| x.id != id);
InputMessage::Server(ServerMessage::Departures(deps)) => {
departures = deps;
}
InputMessage::Server(ServerMessage::ClientDisconnected(id)) => {
info!("Client {} disconnected", id);
state.connections.retain(|x| x.id != id);
}
}
}
PLAYERS_ONLINE.store(!state.connections.is_empty(), std::sync::atomic::Ordering::Relaxed);
}
let mut state = state.lock().await;

// compute train positions
let time = chrono::Utc::now();
Expand Down Expand Up @@ -471,7 +501,7 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
fs::write(TEAMS_FILE, serde_json::to_string_pretty(&game_state.teams).unwrap()).unwrap();

// send game state to clients
for connection in state.connections.iter_mut() {
for connection in state.connections.iter() {
let game_state = GameState {
teams: game_state
.teams
Expand All @@ -481,14 +511,13 @@ async fn run_game_loop(mut recv: Receiver<InputMessage>, state: SharedState) {
.collect(),
trains: game_state.trains.clone(),
};
if let Err(err) = connection
.send
.send(ClientResponse::GameState(game_state.clone()))
.await
{
error!("failed to send game state to client {}: {}", connection.id, err);
continue;
}
// Distributes updates concurrently
let connection = connection.clone();
tokio::task::spawn(async move {
if let Err(err) = connection.send.send(ClientResponse::GameState(game_state)).await {
error!("failed to send game state to client {}: {}", connection.id, err);
}
});
}
}
}
25 changes: 4 additions & 21 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,27 @@ let
};

# Define the rustc we need
rustc-wasm = pkgs.rust-bin.stable.latest.default.override {
targets = [ "wasm32-unknown-unknown" ];
# wasm-pack needs this
extensions = [ "rust-src" ];
rustc = pkgs.rust-bin.stable.latest.default.override {
extensions = [ "clippy" "rust-analyzer" ];
};

lib = pkgs.lib;
in
# Make a shell with the dependencies we need
pkgs.mkShell {
buildInputs = [
rustc-wasm
rustc
pkgs.nodejs
pkgs.cargo
pkgs.cargo-watch
pkgs.wasm-pack
pkgs.clang

pkgs.openssl
pkgs.openssl.dev
pkgs.glib
pkgs.gtk3
pkgs.libsoup
pkgs.webkitgtk
pkgs.freetype
pkgs.freetype.dev

pkgs.pkg-config

# Use Mold as a Linke
# Use Mold as a Linker
pkgs.mold

# Vulkan
#pkgs.glslang
pkgs.shaderc
pkgs.vulkan-headers
pkgs.vulkan-loader
#pkgs.vulkan-validation-layers
];

# Hacky way to run cago through Mold
Expand Down
Loading