diff --git a/genealogos-api/src/config.rs b/genealogos-api/src/config.rs new file mode 100644 index 0000000..10f364b --- /dev/null +++ b/genealogos-api/src/config.rs @@ -0,0 +1,17 @@ +use super::job_map::{GCInterval, GCStaleAfter}; + +#[derive(rocket::serde::Deserialize, Debug)] +#[serde(crate = "rocket::serde")] +pub struct Config { + #[serde(flatten)] + pub gc: GCConfig, +} + +#[derive(rocket::serde::Deserialize, Debug)] +#[serde(crate = "rocket::serde")] +pub struct GCConfig { + #[serde(default, rename = "gc_interval")] + pub interval: GCInterval, + #[serde(default, rename = "gc_stale_after")] + pub stale_after: GCStaleAfter, +} diff --git a/genealogos-api/src/jobs.rs b/genealogos-api/src/jobs.rs index f3cc532..87efc5f 100644 --- a/genealogos-api/src/jobs.rs +++ b/genealogos-api/src/jobs.rs @@ -90,9 +90,7 @@ pub async fn status( job_id: JobId, job_map: &rocket::State, ) -> Result { - let mut locked_map = job_map - .try_lock() - .map_err(|e| messages::ErrResponse::with_job_id(job_id, e))?; + let mut locked_map = job_map.lock().await; let status = locked_map.get(&job_id).unwrap_or(&JobStatus::Stopped); diff --git a/genealogos-api/src/jobs/job_map.rs b/genealogos-api/src/jobs/job_map.rs index 597f149..ffc3f39 100644 --- a/genealogos-api/src/jobs/job_map.rs +++ b/genealogos-api/src/jobs/job_map.rs @@ -36,7 +36,7 @@ impl ToString for JobStatus { JobStatus::Running(_) => "running".to_string(), JobStatus::Done(_, _) => "done".to_string(), JobStatus::Stopped => "stopped".to_string(), - JobStatus::Error(e) => e.to_owned(), + JobStatus::Error(e) => format!("Error: {}", e), } } } @@ -68,6 +68,26 @@ impl JobHashMap { } } +#[derive(rocket::serde::Deserialize, Debug)] +#[serde(crate = "rocket::serde")] +pub struct GCInterval(u64); + +#[derive(rocket::serde::Deserialize, Debug)] +#[serde(crate = "rocket::serde")] +pub struct GCStaleAfter(u64); + +impl Default for GCInterval { + fn default() -> Self { + Self(10) + } +} + +impl Default for GCStaleAfter { + fn default() -> Self { + Self(60 * 10) + } +} + /// The garbage collector will check for any stale jobs in the `JobMap` and remove them /// after a certain amount of time. The interval is how often the garbage collector /// will run, and the remove_after is when a job is considered stale. @@ -76,26 +96,33 @@ impl JobHashMap { /// /// # Arguments /// * `job_map` - A reference to the `JobMap` that contains all the jobs -/// * `interval` - How often the garbage collector will run -/// * `remove_after` - How long after a job is considered stale -pub async fn garbage_collector( - job_map: JobMap, - interval: time::Duration, - remove_after: time::Duration, -) { - let mut interval = time::interval(interval); +/// * `gc_config` - The configuration for the garbage collector +pub async fn garbage_collector(job_map: JobMap, gc_config: crate::config::GCConfig) { + let stale_after = time::Duration::from_secs(gc_config.stale_after.0); + let mut interval = time::interval(time::Duration::from_secs(gc_config.interval.0)); log::info!("Started the garbage collector"); loop { - log::info!("Collecting garbage"); interval.tick().await; - for (job_id, job_entry) in job_map.lock().await.0.iter_mut() { - if job_entry.last_updated.elapsed() > remove_after { - log::info!("Removing a stale job"); - job_map.lock().await.remove(job_id); + let mut count = 0; + + let mut job_map = job_map.lock().await; + log::info!("Current job count: {}", job_map.0.len()); + + // Retain allo jobs that are not stale + job_map.0.retain(|_, entry| { + if entry.last_updated.elapsed() < stale_after { + true + } else { + count += 1; + false } + }); + + if count > 0 { + log::info!("Removed {} stale jobs", count); } } } diff --git a/genealogos-api/src/main.rs b/genealogos-api/src/main.rs index 9cc0e27..426cfc3 100644 --- a/genealogos-api/src/main.rs +++ b/genealogos-api/src/main.rs @@ -10,6 +10,7 @@ use rocket::serde::json::Json; use rocket::tokio::sync::Mutex; use rocket::Request; +mod config; mod jobs; mod messages; @@ -76,7 +77,13 @@ fn rocket() -> _ { let job_map = Arc::new(Mutex::new(job_map::JobHashMap::new())); let job_map_clone = job_map.clone(); - rocket::build() + + let rocket = rocket::build(); + let figment = rocket.figment(); + + let config: config::Config = figment.extract().expect("Failed to load configuration"); + + rocket .attach(rocket::fairing::AdHoc::on_response("cors", |_req, resp| { Box::pin(async move { resp.set_header(rocket::http::Header::new( @@ -90,13 +97,7 @@ fn rocket() -> _ { |_| { Box::pin(async move { rocket::tokio::spawn(async move { - let interval = std::time::Duration::from_secs(10); - garbage_collector( - job_map_clone, - interval, - std::time::Duration::from_secs(5 * 60), - ) - .await; + garbage_collector(job_map_clone, config.gc).await; }); }) },