Skip to content

Commit

Permalink
change job queue status from cli
Browse files Browse the repository at this point in the history
  • Loading branch information
kaplanelad committed Jan 29, 2025
1 parent 7ca0e99 commit 7a0a52a
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 0 deletions.
30 changes: 30 additions & 0 deletions src/bgworker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,36 @@ impl Queue {
}
}

/// Change the status of jobs.
///
/// # Errors
/// - If no queue provider is configured, it will return an error indicating the lack of configuration.
/// - If the Redis provider is selected, it will return an error stating that clearing jobs is not supported.
/// - Any error in the underlying provider's job clearing logic will propagate from the respective function.
pub async fn change_status(&self, from: &JobStatus, to: &JobStatus) -> Result<()> {
tracing::debug!(from = ?from,to = ?to, "clear jobs by status");
match self {
#[cfg(feature = "bg_pg")]
Self::Postgres(pool, _, _) => pg::change_status(pool, from, to).await,
#[cfg(feature = "bg_sqlt")]
Self::Sqlite(pool, _, _) => sqlt::change_status(pool, from, to).await,
#[cfg(feature = "bg_redis")]
Self::Redis(_, _, _) => {
tracing::error!("Update status for redis provider not implemented");
Err(Error::string(
"Update status not supported for redis provider",
))
}
Self::None => {
tracing::error!(
"no queue provider is configured: compile with at least one queue provider \
feature"
);
Err(Error::string("provider not configure"))
}
}
}

/// Dumps the list of jobs to a YAML file at the specified path.
///
/// This function retrieves jobs from the queue, optionally filtered by their status, and
Expand Down
67 changes: 67 additions & 0 deletions src/bgworker/pg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,23 @@ pub async fn clear_jobs_older_than(
Ok(())
}

/// Change the status of jobs in the `pg_loco_queue` table.
///
/// This function changes the status of all jobs that currently have the `from` status
/// to the new `to` status.
///
/// # Errors
///
/// This function will return an error if it fails
pub async fn change_status(pool: &PgPool, from: &JobStatus, to: &JobStatus) -> Result<()> {
sqlx::query("UPDATE pg_loco_queue SET status = $1, updated_at = NOW() WHERE status = $2")
.bind(to.to_string())
.bind(from.to_string())
.execute(pool)
.await?;
Ok(())
}

/// Ping system
///
/// # Errors
Expand Down Expand Up @@ -888,4 +905,54 @@ mod tests {
2
);
}

#[sqlx::test]
async fn can_change_status(pool: PgPool) {
assert!(initialize_database(&pool).await.is_ok());

sqlx::query(
r"INSERT INTO pg_loco_queue (id, name, task_data, status, run_at,created_at, updated_at) VALUES
('job1', 'Test Job 1', '{}', 'completed', NOW(), NOW() - INTERVAL '20days', NOW()),
('job2', 'Test Job 2', '{}', 'failed', NOW(),NOW() - INTERVAL '15 days', NOW()),
('job3', 'Test Job 3', '{}', 'completed', NOW(),NOW() - INTERVAL '5 days', NOW()),
('job4', 'Test Job 3', '{}','cancelled', NOW(), NOW(), NOW())"
)
.execute(&pool)
.await
.unwrap();

assert_eq!(
get_jobs(&pool, Some(&vec![JobStatus::Failed]), None)
.await
.expect("get jobs")
.len(),
1
);
assert_eq!(
get_jobs(&pool, Some(&vec![JobStatus::Queued]), None)
.await
.expect("get jobs")
.len(),
0
);

change_status(&pool, &JobStatus::Failed, &JobStatus::Queued)
.await
.expect("update jobs");

assert_eq!(
get_jobs(&pool, Some(&vec![JobStatus::Failed]), None)
.await
.expect("get jobs")
.len(),
0
);
assert_eq!(
get_jobs(&pool, Some(&vec![JobStatus::Queued]), None)
.await
.expect("get jobs")
.len(),
1
);
}
}
60 changes: 60 additions & 0 deletions src/bgworker/sqlt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,25 @@ pub async fn clear_by_status(pool: &SqlitePool, status: Vec<JobStatus>) -> Resul
Ok(())
}

/// Change the status of jobs in the `sqlt_loco_queue` table.
///
/// This function changes the status of all jobs that currently have the `from` status
/// to the new `to` status.
///
/// # Errors
///
/// This function will return an error if it fails
pub async fn change_status(pool: &SqlitePool, from: &JobStatus, to: &JobStatus) -> Result<()> {
sqlx::query(
"UPDATE sqlt_loco_queue SET status = $1, updated_at = CURRENT_TIMESTAMP WHERE status = $2",
)
.bind(to.to_string())
.bind(from.to_string())
.execute(pool)
.await?;
Ok(())
}

/// Deletes jobs from the `sqlt_loco_queue` table that are older than a specified number of days.
///
/// This function removes jobs that have a `created_at` timestamp older than the provided
Expand Down Expand Up @@ -1085,4 +1104,45 @@ mod tests {
2
);
}

#[tokio::test]
async fn can_change_status() {
let tree_fs = tree_fs::TreeBuilder::default()
.drop(true)
.create()
.expect("create temp folder");
let pool = init(&tree_fs.root).await;

assert!(initialize_database(&pool).await.is_ok());
tests_cfg::queue::sqlite_seed_data(&pool).await;
let jobs = get_all_jobs(&pool).await;
let processing_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Processing)
.count();
let queued_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Queued)
.count();

assert!(processing_job_count > 0);
assert_eq!(queued_job_count, 5);
assert!(
change_status(&pool, &JobStatus::Processing, &JobStatus::Queued)
.await
.is_ok()
);
let jobs = get_all_jobs(&pool).await;
let processing_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Processing)
.count();
let queued_job_count = jobs
.iter()
.filter(|job| job.status == JobStatus::Queued)
.count();

assert_eq!(processing_job_count, 0);
assert_eq!(queued_job_count, 8);
}
}
10 changes: 10 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,15 @@ enum JobsCommands {
#[arg(short, long)]
file: PathBuf,
},
/// Change the status of jobs.
ChangeStatus {
/// Current status of the jobs to be updated.
#[arg(short, long)]
from: JobStatus,
/// New status to assign to the jobs.
#[arg(short, long)]
to: JobStatus,
},
}

/// Parse a single key-value pair
Expand Down Expand Up @@ -1044,6 +1053,7 @@ async fn handle_job_command<H: Hooks>(
Ok(())
}
JobsCommands::Import { file } => queue.import(file.as_path()).await,
JobsCommands::ChangeStatus { from, to } => queue.change_status(from, to).await,
}
}

Expand Down

0 comments on commit 7a0a52a

Please sign in to comment.