Skip to content

Commit

Permalink
fix: enable configuration for garbage collection
Browse files Browse the repository at this point in the history
  • Loading branch information
Erin van der Veen committed May 17, 2024
1 parent a48611c commit 4e155ad
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 25 deletions.
17 changes: 17 additions & 0 deletions genealogos-api/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use super::job_map::{GCInterval, GCStaleAfter};

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct Config {
#[serde(flatten)]
pub gc: GCConfig,
}

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCConfig {
#[serde(default, rename = "gc_interval")]
pub interval: GCInterval,
#[serde(default, rename = "gc_stale_after")]
pub stale_after: GCStaleAfter,
}
4 changes: 1 addition & 3 deletions genealogos-api/src/jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,7 @@ pub async fn status(
job_id: JobId,
job_map: &rocket::State<JobMap>,
) -> Result<messages::StatusResponse> {
let mut locked_map = job_map
.try_lock()
.map_err(|e| messages::ErrResponse::with_job_id(job_id, e))?;
let mut locked_map = job_map.lock().await;

let status = locked_map.get(&job_id).unwrap_or(&JobStatus::Stopped);

Expand Down
55 changes: 41 additions & 14 deletions genealogos-api/src/jobs/job_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl ToString for JobStatus {
JobStatus::Running(_) => "running".to_string(),
JobStatus::Done(_, _) => "done".to_string(),
JobStatus::Stopped => "stopped".to_string(),
JobStatus::Error(e) => e.to_owned(),
JobStatus::Error(e) => format!("Error: {}", e),
}
}
}
Expand Down Expand Up @@ -68,6 +68,26 @@ impl JobHashMap {
}
}

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCInterval(u64);

#[derive(rocket::serde::Deserialize, Debug)]
#[serde(crate = "rocket::serde")]
pub struct GCStaleAfter(u64);

impl Default for GCInterval {
fn default() -> Self {
Self(10)
}
}

impl Default for GCStaleAfter {
fn default() -> Self {
Self(60 * 10)
}
}

/// The garbage collector will check for any stale jobs in the `JobMap` and remove them
/// after a certain amount of time. The interval is how often the garbage collector
/// will run, and the remove_after is when a job is considered stale.
Expand All @@ -76,26 +96,33 @@ impl JobHashMap {
///
/// # Arguments
/// * `job_map` - A reference to the `JobMap` that contains all the jobs
/// * `interval` - How often the garbage collector will run
/// * `remove_after` - How long after a job is considered stale
pub async fn garbage_collector(
job_map: JobMap,
interval: time::Duration,
remove_after: time::Duration,
) {
let mut interval = time::interval(interval);
/// * `gc_config` - The configuration for the garbage collector
pub async fn garbage_collector(job_map: JobMap, gc_config: crate::config::GCConfig) {
let stale_after = time::Duration::from_secs(gc_config.stale_after.0);
let mut interval = time::interval(time::Duration::from_secs(gc_config.interval.0));

log::info!("Started the garbage collector");

loop {
log::info!("Collecting garbage");
interval.tick().await;

for (job_id, job_entry) in job_map.lock().await.0.iter_mut() {
if job_entry.last_updated.elapsed() > remove_after {
log::info!("Removing a stale job");
job_map.lock().await.remove(job_id);
let mut count = 0;

let mut job_map = job_map.lock().await;
log::info!("Current job count: {}", job_map.0.len());

// Retain allo jobs that are not stale
job_map.0.retain(|_, entry| {
if entry.last_updated.elapsed() < stale_after {
true
} else {
count += 1;
false
}
});

if count > 0 {
log::info!("Removed {} stale jobs", count);
}
}
}
17 changes: 9 additions & 8 deletions genealogos-api/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use rocket::serde::json::Json;
use rocket::tokio::sync::Mutex;
use rocket::Request;

mod config;
mod jobs;
mod messages;

Expand Down Expand Up @@ -76,7 +77,13 @@ fn rocket() -> _ {
let job_map = Arc::new(Mutex::new(job_map::JobHashMap::new()));

let job_map_clone = job_map.clone();
rocket::build()

let rocket = rocket::build();
let figment = rocket.figment();

let config: config::Config = figment.extract().expect("Failed to load configuration");

rocket
.attach(rocket::fairing::AdHoc::on_response("cors", |_req, resp| {
Box::pin(async move {
resp.set_header(rocket::http::Header::new(
Expand All @@ -90,13 +97,7 @@ fn rocket() -> _ {
|_| {
Box::pin(async move {
rocket::tokio::spawn(async move {
let interval = std::time::Duration::from_secs(10);
garbage_collector(
job_map_clone,
interval,
std::time::Duration::from_secs(5 * 60),
)
.await;
garbage_collector(job_map_clone, config.gc).await;
});
})
},
Expand Down

0 comments on commit 4e155ad

Please sign in to comment.