From 7a0a52ad02bb0c89124925fceabe64d4e97c8515 Mon Sep 17 00:00:00 2001 From: Elad Kaplan Date: Wed, 29 Jan 2025 15:31:11 +0200 Subject: [PATCH] change job queue status from cli --- src/bgworker/mod.rs | 30 ++++++++++++++++++++ src/bgworker/pg.rs | 67 ++++++++++++++++++++++++++++++++++++++++++++ src/bgworker/sqlt.rs | 60 +++++++++++++++++++++++++++++++++++++++ src/cli.rs | 10 +++++++ 4 files changed, 167 insertions(+) diff --git a/src/bgworker/mod.rs b/src/bgworker/mod.rs index 1fe9ba0a9..b94fb089a 100644 --- a/src/bgworker/mod.rs +++ b/src/bgworker/mod.rs @@ -444,6 +444,36 @@ impl Queue { } } + /// Change the status of jobs. + /// + /// # Errors + /// - If no queue provider is configured, it will return an error indicating the lack of configuration. + /// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported. + /// - Any error in the underlying provider's job clearing logic will propagate from the respective function. + pub async fn change_status(&self, from: &JobStatus, to: &JobStatus) -> Result<()> { + tracing::debug!(from = ?from,to = ?to, "clear jobs by status"); + match self { + #[cfg(feature = "bg_pg")] + Self::Postgres(pool, _, _) => pg::change_status(pool, from, to).await, + #[cfg(feature = "bg_sqlt")] + Self::Sqlite(pool, _, _) => sqlt::change_status(pool, from, to).await, + #[cfg(feature = "bg_redis")] + Self::Redis(_, _, _) => { + tracing::error!("Update status for redis provider not implemented"); + Err(Error::string( + "Update status not supported for redis provider", + )) + } + Self::None => { + tracing::error!( + "no queue provider is configured: compile with at least one queue provider \ + feature" + ); + Err(Error::string("provider not configure")) + } + } + } + /// Dumps the list of jobs to a YAML file at the specified path. /// /// This function retrieves jobs from the queue, optionally filtered by their status, and diff --git a/src/bgworker/pg.rs b/src/bgworker/pg.rs index ff7790ff4..0cad2158e 100644 --- a/src/bgworker/pg.rs +++ b/src/bgworker/pg.rs @@ -391,6 +391,23 @@ pub async fn clear_jobs_older_than( Ok(()) } +/// Change the status of jobs in the `pg_loco_queue` table. +/// +/// This function changes the status of all jobs that currently have the `from` status +/// to the new `to` status. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn change_status(pool: &PgPool, from: &JobStatus, to: &JobStatus) -> Result<()> { + sqlx::query("UPDATE pg_loco_queue SET status = $1, updated_at = NOW() WHERE status = $2") + .bind(to.to_string()) + .bind(from.to_string()) + .execute(pool) + .await?; + Ok(()) +} + /// Ping system /// /// # Errors @@ -888,4 +905,54 @@ mod tests { 2 ); } + + #[sqlx::test] + async fn can_change_status(pool: PgPool) { + assert!(initialize_database(&pool).await.is_ok()); + + sqlx::query( + r"INSERT INTO pg_loco_queue (id, name, task_data, status, run_at,created_at, updated_at) VALUES + ('job1', 'Test Job 1', '{}', 'completed', NOW(), NOW() - INTERVAL '20days', NOW()), + ('job2', 'Test Job 2', '{}', 'failed', NOW(),NOW() - INTERVAL '15 days', NOW()), + ('job3', 'Test Job 3', '{}', 'completed', NOW(),NOW() - INTERVAL '5 days', NOW()), + ('job4', 'Test Job 3', '{}','cancelled', NOW(), NOW(), NOW())" + ) + .execute(&pool) + .await + .unwrap(); + + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Failed]), None) + .await + .expect("get jobs") + .len(), + 1 + ); + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Queued]), None) + .await + .expect("get jobs") + .len(), + 0 + ); + + change_status(&pool, &JobStatus::Failed, &JobStatus::Queued) + .await + .expect("update jobs"); + + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Failed]), None) + .await + .expect("get jobs") + .len(), + 0 + ); + assert_eq!( + get_jobs(&pool, Some(&vec![JobStatus::Queued]), None) + .await + .expect("get jobs") + .len(), + 1 + ); + } } diff --git a/src/bgworker/sqlt.rs b/src/bgworker/sqlt.rs index 4d9e99dc0..8ee2dcd2f 100644 --- a/src/bgworker/sqlt.rs +++ b/src/bgworker/sqlt.rs @@ -418,6 +418,25 @@ pub async fn clear_by_status(pool: &SqlitePool, status: Vec) -> Resul Ok(()) } +/// Change the status of jobs in the `sqlt_loco_queue` table. +/// +/// This function changes the status of all jobs that currently have the `from` status +/// to the new `to` status. +/// +/// # Errors +/// +/// This function will return an error if it fails +pub async fn change_status(pool: &SqlitePool, from: &JobStatus, to: &JobStatus) -> Result<()> { + sqlx::query( + "UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE status = $2", + ) + .bind(to.to_string()) + .bind(from.to_string()) + .execute(pool) + .await?; + Ok(()) +} + /// Deletes jobs from the `sqlt_loco_queue` table that are older than a specified number of days. /// /// This function removes jobs that have a `created_at` timestamp older than the provided @@ -1085,4 +1104,45 @@ mod tests { 2 ); } + + #[tokio::test] + async fn can_change_status() { + let tree_fs = tree_fs::TreeBuilder::default() + .drop(true) + .create() + .expect("create temp folder"); + let pool = init(&tree_fs.root).await; + + assert!(initialize_database(&pool).await.is_ok()); + tests_cfg::queue::sqlite_seed_data(&pool).await; + let jobs = get_all_jobs(&pool).await; + let processing_job_count = jobs + .iter() + .filter(|job| job.status == JobStatus::Processing) + .count(); + let queued_job_count = jobs + .iter() + .filter(|job| job.status == JobStatus::Queued) + .count(); + + assert!(processing_job_count > 0); + assert_eq!(queued_job_count, 5); + assert!( + change_status(&pool, &JobStatus::Processing, &JobStatus::Queued) + .await + .is_ok() + ); + let jobs = get_all_jobs(&pool).await; + let processing_job_count = jobs + .iter() + .filter(|job| job.status == JobStatus::Processing) + .count(); + let queued_job_count = jobs + .iter() + .filter(|job| job.status == JobStatus::Queued) + .count(); + + assert_eq!(processing_job_count, 0); + assert_eq!(queued_job_count, 8); + } } diff --git a/src/cli.rs b/src/cli.rs index 224c24124..a39876325 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -566,6 +566,15 @@ enum JobsCommands { #[arg(short, long)] file: PathBuf, }, + /// Change the status of jobs. + ChangeStatus { + /// Current status of the jobs to be updated. + #[arg(short, long)] + from: JobStatus, + /// New status to assign to the jobs. + #[arg(short, long)] + to: JobStatus, + }, } /// Parse a single key-value pair @@ -1044,6 +1053,7 @@ async fn handle_job_command( Ok(()) } JobsCommands::Import { file } => queue.import(file.as_path()).await, + JobsCommands::ChangeStatus { from, to } => queue.change_status(from, to).await, } }