From 1eb726a19943e51ae15b41180f697225602dceed Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 26 Dec 2024 11:38:29 +0530 Subject: [PATCH 01/17] feat: added retry endpoint for failed jobs --- CHANGELOG.md | 1 + crates/orchestrator/src/jobs/mod.rs | 4 +- crates/orchestrator/src/jobs/types.rs | 2 + crates/orchestrator/src/routes/job_routes.rs | 61 +++++++++++++++++++ .../src/tests/server/job_routes.rs | 38 ++++++++++++ 5 files changed, 104 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e8b0acf..3da02e9c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Added +- Added retry job endpoint for failed jobs - readme: setup instructions added - Added : Grafana dashboard - tests: http_client tests added diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 181d64f4..57939bd8 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -209,8 +209,8 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> match job.status { // we only want to process jobs that are in the created or verification failed state. // verification failed state means that the previous processing failed and we want to retry - JobStatus::Created | JobStatus::VerificationFailed => { - tracing::info!(job_id = ?id, status = ?job.status, "Job status is Created or VerificationFailed, proceeding with processing"); + JobStatus::Created | JobStatus::VerificationFailed | JobStatus::RetryAttempt => { + tracing::info!(job_id = ?id, status = ?job.status, "Job status is Created or VerificationFailed or RetryAttempt, proceeding with processing"); } _ => { tracing::warn!(job_id = ?id, status = ?job.status, "Job status is Invalid. Cannot process."); diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index 0eeba5e3..c621bacc 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -103,6 +103,8 @@ pub enum JobStatus { VerificationFailed, /// The job failed completing Failed, + /// The job is being retried + RetryAttempt, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index a10730af..21b801a3 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -10,6 +10,7 @@ use uuid::Uuid; use super::ApiResponse; use crate::config::Config; +use crate::jobs::types::{JobItemUpdates, JobStatus}; use crate::jobs::{process_job, verify_job, JobError}; use crate::metrics::ORCHESTRATOR_METRICS; @@ -73,6 +74,65 @@ async fn handle_verify_job_request( } } } + +async fn handle_retry_job_request( + Path(JobId { id }): Path, + State(config): State>, +) -> impl IntoResponse { + let job_id = match Uuid::parse_str(&id) { + Ok(id) => id, + Err(_) => { + return ApiResponse::::error((JobError::InvalidId { id }).to_string()).into_response(); + } + }; + + // Get the job and verify it's in a failed state + let job = match config.database().get_job_by_id(job_id).await { + Ok(Some(job)) => job, + Ok(None) => { + return ApiResponse::::error(JobError::JobNotFound { id: job_id }.to_string()) + .into_response(); + } + Err(e) => { + return ApiResponse::::error(e.to_string()).into_response(); + } + }; + + // Check if job is in a failed state + if job.status != JobStatus::Failed { + return ApiResponse::::error(format!( + "Job {} cannot be retried: current status is {:?}", + id, job.status + )) + .into_response(); + } + + // Update the job status to RetryAttempt + match config.database().update_job(&job, JobItemUpdates::new().update_status(JobStatus::RetryAttempt).build()).await + { + Ok(_) => { + // Process the job after successful status update + match process_job(job_id, config).await { + Ok(_) => { + let response = + JobApiResponse { job_id: job_id.to_string(), status: "retry_processing".to_string() }; + ApiResponse::success(response).into_response() + } + Err(e) => { + ORCHESTRATOR_METRICS + .failed_job_operations + .add(1.0, &[KeyValue::new("operation_type", "retry_job")]); + ApiResponse::::error(e.to_string()).into_response() + } + } + } + Err(e) => { + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "retry_job")]); + ApiResponse::::error(e.to_string()).into_response() + } + } +} + pub fn job_router(config: Arc) -> Router { Router::new().nest("/jobs", trigger_router(config.clone())) } @@ -81,5 +141,6 @@ fn trigger_router(config: Arc) -> Router { Router::new() .route("/:id/process", get(handle_process_job_request)) .route("/:id/verify", get(handle_verify_job_request)) + .route("/:id/retry", get(handle_retry_job_request)) .with_state(config) } diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index c2a3b0cc..eced0318 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -119,6 +119,44 @@ async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc)) { + let (addr, config) = setup_trigger.await; + + let job_type = JobType::DataSubmission; + + // Create a failed job + let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1); + let mut job_handler = MockJob::new(); + + // We expect process_job to be called since retry triggers processing + job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); + job_handler.expect_verification_polling_delay_seconds().return_const(1u64); + + config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.clone().id; + + let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); + ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); + + let client = hyper::Client::new(); + let response = client + .request(Request::builder().uri(format!("http://{}/jobs/{}/retry", addr, job_id)).body(Body::empty()).unwrap()) + .await + .unwrap(); + + // assertions + if let Some(job_fetched) = config.database().get_job_by_id(job_id).await.unwrap() { + assert_eq!(response.status(), 200); + assert_eq!(job_fetched.id, job_item.id); + assert_eq!(job_fetched.status, JobStatus::PendingVerification); + } else { + panic!("Could not get job from database") + } +} + #[rstest] #[tokio::test] async fn test_init_consumer() { From db81c351d41383de3383d33ae49c908cf12cb12f Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 26 Dec 2024 13:06:56 +0530 Subject: [PATCH 02/17] refactor: verify job will allow verification of the job with timeout status --- crates/orchestrator/src/jobs/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 57939bd8..c60cddbb 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -333,8 +333,8 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::Span::current().record("internal_id", job.internal_id.clone()); match job.status { - JobStatus::PendingVerification => { - tracing::debug!(job_id = ?id, "Job status is PendingVerification, proceeding with verification"); + JobStatus::PendingVerification | JobStatus::VerificationTimeout => { + tracing::info!(job_id = ?id, status = ?job.status, "Job status is PendingVerification or VerificationTimeout, proceeding with verification"); } _ => { tracing::error!(job_id = ?id, status = ?job.status, "Invalid job status for verification"); From d34594c16676a9e939853d55a564cec2e0197ef9 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 26 Dec 2024 13:07:56 +0530 Subject: [PATCH 03/17] chore: changelog updated --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3da02e9c..d9ec3d72 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -51,6 +51,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Changed +- verify_job now handles VerificationTimeout status - refactor: Readme and .env.example - refactor: http_mock version updated - refactor: prover-services renamed to prover-clients From c69b4be9bcef62d42502c38efe515179e34bfeaa Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 2 Jan 2025 18:42:16 +0530 Subject: [PATCH 04/17] chore: comments updated --- crates/orchestrator/src/jobs/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index c60cddbb..1dda406c 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -207,13 +207,12 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::debug!(job_id = ?id, status = ?job.status, "Current job status"); match job.status { - // we only want to process jobs that are in the created or verification failed state. - // verification failed state means that the previous processing failed and we want to retry + // Only process jobs that need initial processing or require a retry JobStatus::Created | JobStatus::VerificationFailed | JobStatus::RetryAttempt => { - tracing::info!(job_id = ?id, status = ?job.status, "Job status is Created or VerificationFailed or RetryAttempt, proceeding with processing"); + tracing::info!(job_id = ?id, status = ?job.status, "Processing job"); } _ => { - tracing::warn!(job_id = ?id, status = ?job.status, "Job status is Invalid. Cannot process."); + tracing::warn!(job_id = ?id, status = ?job.status, "Cannot process job with current status"); return Err(JobError::InvalidStatus { id, job_status: job.status }); } } From a5dd65d726b4e3c455b07e73a8a495fc8a61318f Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 01:47:47 +0530 Subject: [PATCH 05/17] refactor: business logic of retry jobs moved to jobs/mod.rs --- crates/orchestrator/src/jobs/mod.rs | 63 +++++++++- crates/orchestrator/src/jobs/types.rs | 2 +- crates/orchestrator/src/routes/error.rs | 35 ++++++ crates/orchestrator/src/routes/job_routes.rs | 121 ++++++------------- crates/orchestrator/src/routes/mod.rs | 44 +------ crates/orchestrator/src/routes/types.rs | 27 +++++ 6 files changed, 165 insertions(+), 127 deletions(-) create mode 100644 crates/orchestrator/src/routes/error.rs create mode 100644 crates/orchestrator/src/routes/types.rs diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 1dda406c..85a6a4f3 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -208,7 +208,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::debug!(job_id = ?id, status = ?job.status, "Current job status"); match job.status { // Only process jobs that need initial processing or require a retry - JobStatus::Created | JobStatus::VerificationFailed | JobStatus::RetryAttempt => { + JobStatus::Created | JobStatus::VerificationFailed | JobStatus::PendingRetry => { tracing::info!(job_id = ?id, status = ?job.status, "Processing job"); } _ => { @@ -479,6 +479,67 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { Ok(()) } +/// Retries a failed job by reprocessing it. +/// Only jobs with Failed status can be retried. +#[tracing::instrument(skip(config), fields(category = "general"), ret, err)] +pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let job = get_job(id, config.clone()).await?; + let internal_id = job.internal_id.clone(); + tracing::info!( + log_type = "starting", + category = "general", + function_type = "retry_job", + block_no = %internal_id, + "General retry job started for block" + ); + + if job.status != JobStatus::Failed { + tracing::error!( + job_id = ?id, + status = ?job.status, + "Cannot retry job: invalid status" + ); + return Err(JobError::InvalidStatus { id, job_status: job.status }); + } + + // Update job status to PendingRetry before processing + config + .database() + .update_job(&job, JobItemUpdates::new().update_status(JobStatus::PendingRetry).build()) + .await + .map_err(|e| { + tracing::error!( + job_id = ?id, + error = ?e, + "Failed to update job status to PendingRetry" + ); + JobError::Other(OtherError(e)) + })?; + + let result = process_job(job.id, config.clone()).await; + + if let Err(e) = &result { + tracing::error!( + log_type = "error", + category = "general", + function_type = "retry_job", + block_no = %internal_id, + error = %e, + "General retry job failed for block" + ); + } else { + tracing::info!( + log_type = "completed", + category = "general", + function_type = "retry_job", + block_no = %internal_id, + "General retry job completed for block" + ); + } + + result +} + /// Terminates the job and updates the status of the job in the DB. /// Logs error if the job status `Completed` is existing on DL queue. #[tracing::instrument(skip(config), fields(job_status, job_type), ret, err)] diff --git a/crates/orchestrator/src/jobs/types.rs b/crates/orchestrator/src/jobs/types.rs index c621bacc..32954b58 100644 --- a/crates/orchestrator/src/jobs/types.rs +++ b/crates/orchestrator/src/jobs/types.rs @@ -104,7 +104,7 @@ pub enum JobStatus { /// The job failed completing Failed, /// The job is being retried - RetryAttempt, + PendingRetry, } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] diff --git a/crates/orchestrator/src/routes/error.rs b/crates/orchestrator/src/routes/error.rs new file mode 100644 index 00000000..55c118c7 --- /dev/null +++ b/crates/orchestrator/src/routes/error.rs @@ -0,0 +1,35 @@ +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::Json; + +use super::types::ApiResponse; + +#[derive(Debug, thiserror::Error)] +pub enum JobRouteError { + #[error("Invalid job ID: {0}")] + InvalidId(String), + #[error("Job not found: {0}")] + NotFound(String), + #[error("Job processing error: {0}")] + ProcessingError(String), + #[error("Invalid job state: {0}")] + InvalidJobState(String), + #[error("Database error")] + DatabaseError, +} + +impl IntoResponse for JobRouteError { + fn into_response(self) -> Response { + let (status, message) = match self { + JobRouteError::InvalidId(id) => (StatusCode::BAD_REQUEST, format!("Invalid job ID: {}", id)), + JobRouteError::NotFound(id) => (StatusCode::NOT_FOUND, format!("Job not found: {}", id)), + JobRouteError::ProcessingError(msg) => { + (StatusCode::INTERNAL_SERVER_ERROR, format!("Processing error: {}", msg)) + } + JobRouteError::InvalidJobState(msg) => (StatusCode::CONFLICT, format!("Invalid job state: {}", msg)), + JobRouteError::DatabaseError => (StatusCode::INTERNAL_SERVER_ERROR, "Database error occurred".to_string()), + }; + + (status, Json(ApiResponse::error(message))).into_response() + } +} diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 21b801a3..6e3da7b6 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -3,140 +3,89 @@ use std::sync::Arc; use axum::extract::{Path, State}; use axum::response::IntoResponse; use axum::routing::get; -use axum::Router; +use axum::{Json, Router}; use opentelemetry::KeyValue; -use serde::{Deserialize, Serialize}; +use tracing::{error, info, instrument}; use uuid::Uuid; -use super::ApiResponse; +use super::error::JobRouteError; +use super::types::{ApiResponse, JobId, JobRouteResult}; use crate::config::Config; -use crate::jobs::types::{JobItemUpdates, JobStatus}; -use crate::jobs::{process_job, verify_job, JobError}; +use crate::jobs::{process_job, retry_job, verify_job}; use crate::metrics::ORCHESTRATOR_METRICS; -#[derive(Deserialize)] -struct JobId { - id: String, -} - -#[derive(Serialize)] -struct JobApiResponse { - job_id: String, - status: String, -} - +#[instrument(skip(config), fields(job_id = %id))] async fn handle_process_job_request( Path(JobId { id }): Path, State(config): State>, -) -> impl IntoResponse { - // Parse UUID - let job_id = match Uuid::parse_str(&id) { - Ok(id) => id, - Err(_) => { - return ApiResponse::::error((JobError::InvalidId { id }).to_string()).into_response(); - } - }; +) -> JobRouteResult { + let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - // Process job match process_job(job_id, config).await { Ok(_) => { - let response = JobApiResponse { job_id: job_id.to_string(), status: "completed".to_string() }; - ApiResponse::success(response).into_response() + info!("Job processed successfully"); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]); + + Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { + error!(error = %e, "Failed to process job"); ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]); - ApiResponse::::error(e.to_string()).into_response() + Err(JobRouteError::ProcessingError(e.to_string())) } } } +#[instrument(skip(config), fields(job_id = %id))] async fn handle_verify_job_request( Path(JobId { id }): Path, State(config): State>, -) -> impl IntoResponse { - // Parse UUID - let job_id = match Uuid::parse_str(&id) { - Ok(id) => id, - Err(_) => { - return ApiResponse::::error((JobError::InvalidId { id }).to_string()).into_response(); - } - }; +) -> JobRouteResult { + let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - // Verify job match verify_job(job_id, config).await { Ok(_) => { - let response = JobApiResponse { job_id: job_id.to_string(), status: "verified".to_string() }; - ApiResponse::success(response).into_response() + info!("Job verified successfully"); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]); + + Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { + error!(error = %e, "Failed to verify job"); ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]); - ApiResponse::::error(e.to_string()).into_response() + Err(JobRouteError::ProcessingError(e.to_string())) } } } +#[instrument(skip(config), fields(job_id = %id))] async fn handle_retry_job_request( Path(JobId { id }): Path, State(config): State>, -) -> impl IntoResponse { - let job_id = match Uuid::parse_str(&id) { - Ok(id) => id, - Err(_) => { - return ApiResponse::::error((JobError::InvalidId { id }).to_string()).into_response(); - } - }; - - // Get the job and verify it's in a failed state - let job = match config.database().get_job_by_id(job_id).await { - Ok(Some(job)) => job, - Ok(None) => { - return ApiResponse::::error(JobError::JobNotFound { id: job_id }.to_string()) - .into_response(); - } - Err(e) => { - return ApiResponse::::error(e.to_string()).into_response(); - } - }; - - // Check if job is in a failed state - if job.status != JobStatus::Failed { - return ApiResponse::::error(format!( - "Job {} cannot be retried: current status is {:?}", - id, job.status - )) - .into_response(); - } +) -> JobRouteResult { + let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - // Update the job status to RetryAttempt - match config.database().update_job(&job, JobItemUpdates::new().update_status(JobStatus::RetryAttempt).build()).await - { + match retry_job(job_id, config).await { Ok(_) => { - // Process the job after successful status update - match process_job(job_id, config).await { - Ok(_) => { - let response = - JobApiResponse { job_id: job_id.to_string(), status: "retry_processing".to_string() }; - ApiResponse::success(response).into_response() - } - Err(e) => { - ORCHESTRATOR_METRICS - .failed_job_operations - .add(1.0, &[KeyValue::new("operation_type", "retry_job")]); - ApiResponse::::error(e.to_string()).into_response() - } - } + info!("Job retry initiated successfully"); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "retry_job")]); + + Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { + error!(error = %e, "Failed to retry job"); ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "retry_job")]); - ApiResponse::::error(e.to_string()).into_response() + Err(JobRouteError::ProcessingError(e.to_string())) } } } +/// Creates a router for job-related endpoints pub fn job_router(config: Arc) -> Router { Router::new().nest("/jobs", trigger_router(config.clone())) } +/// Creates the nested router for job trigger endpoints fn trigger_router(config: Arc) -> Router { Router::new() .route("/:id/process", get(handle_process_job_request)) diff --git a/crates/orchestrator/src/routes/mod.rs b/crates/orchestrator/src/routes/mod.rs index b43b6a7e..652a7db7 100644 --- a/crates/orchestrator/src/routes/mod.rs +++ b/crates/orchestrator/src/routes/mod.rs @@ -2,16 +2,17 @@ use std::net::SocketAddr; use std::sync::Arc; use app_routes::{app_router, handler_404}; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; -use axum::{Json, Router}; +use axum::Router; use job_routes::job_router; -use serde::Serialize; use crate::config::Config; pub mod app_routes; +pub mod error; pub mod job_routes; +pub mod types; + +pub use error::JobRouteError; #[derive(Debug, Clone)] pub struct ServerParams { @@ -19,41 +20,6 @@ pub struct ServerParams { pub port: u16, } -#[derive(Debug, Serialize)] -struct ApiResponse -where - T: Serialize, -{ - data: Option, - error: Option, -} - -impl ApiResponse -where - T: Serialize, -{ - pub fn success(data: T) -> Self { - Self { data: Some(data), error: None } - } - - pub fn error(message: impl Into) -> Self { - Self { data: None, error: Some(message.into()) } - } -} - -impl IntoResponse for ApiResponse -where - T: Serialize, -{ - fn into_response(self) -> Response { - let status = if self.error.is_some() { StatusCode::INTERNAL_SERVER_ERROR } else { StatusCode::OK }; - - let json = Json(self); - - (status, json).into_response() - } -} - pub async fn setup_server(config: Arc) -> SocketAddr { let (api_server_url, listener) = get_server_url(config.server_config()).await; diff --git a/crates/orchestrator/src/routes/types.rs b/crates/orchestrator/src/routes/types.rs new file mode 100644 index 00000000..b7b59031 --- /dev/null +++ b/crates/orchestrator/src/routes/types.rs @@ -0,0 +1,27 @@ +use axum::response::Response; +use serde::{Deserialize, Serialize}; + +use super::error::JobRouteError; + +#[derive(Deserialize)] +pub struct JobId { + pub id: String, +} + +#[derive(Serialize)] +pub struct ApiResponse { + pub success: bool, + pub message: Option, +} + +impl ApiResponse { + pub fn success() -> Self { + Self { success: true, message: None } + } + + pub fn error(message: String) -> Self { + Self { success: false, message: Some(message) } + } +} + +pub type JobRouteResult = Result; From c4a492474493b890e13016da899a525ea723c04b Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 02:19:12 +0530 Subject: [PATCH 06/17] test: test added for retry job endpoint when status not failed --- crates/orchestrator/src/routes/error.rs | 33 +++++++++++----- crates/orchestrator/src/routes/job_routes.rs | 1 + .../src/tests/server/job_routes.rs | 38 +++++++++++++------ 3 files changed, 51 insertions(+), 21 deletions(-) diff --git a/crates/orchestrator/src/routes/error.rs b/crates/orchestrator/src/routes/error.rs index 55c118c7..3cd8d42f 100644 --- a/crates/orchestrator/src/routes/error.rs +++ b/crates/orchestrator/src/routes/error.rs @@ -16,20 +16,35 @@ pub enum JobRouteError { InvalidJobState(String), #[error("Database error")] DatabaseError, + #[error("Invalid status: {id}: {job_status}")] + InvalidStatus { id: String, job_status: String }, } impl IntoResponse for JobRouteError { fn into_response(self) -> Response { - let (status, message) = match self { - JobRouteError::InvalidId(id) => (StatusCode::BAD_REQUEST, format!("Invalid job ID: {}", id)), - JobRouteError::NotFound(id) => (StatusCode::NOT_FOUND, format!("Job not found: {}", id)), + match self { + JobRouteError::InvalidId(id) => { + (StatusCode::BAD_REQUEST, Json(ApiResponse::error(format!("Invalid job ID: {}", id)))).into_response() + } + JobRouteError::NotFound(id) => { + (StatusCode::NOT_FOUND, Json(ApiResponse::error(format!("Job not found: {}", id)))).into_response() + } JobRouteError::ProcessingError(msg) => { - (StatusCode::INTERNAL_SERVER_ERROR, format!("Processing error: {}", msg)) + (StatusCode::BAD_REQUEST, Json(ApiResponse::error(format!("Processing error: {}", msg)))) + .into_response() } - JobRouteError::InvalidJobState(msg) => (StatusCode::CONFLICT, format!("Invalid job state: {}", msg)), - JobRouteError::DatabaseError => (StatusCode::INTERNAL_SERVER_ERROR, "Database error occurred".to_string()), - }; - - (status, Json(ApiResponse::error(message))).into_response() + JobRouteError::InvalidJobState(msg) => { + (StatusCode::CONFLICT, Json(ApiResponse::error(format!("Invalid job state: {}", msg)))).into_response() + } + JobRouteError::DatabaseError => { + (StatusCode::INTERNAL_SERVER_ERROR, Json(ApiResponse::error("Database error occurred".to_string()))) + .into_response() + } + JobRouteError::InvalidStatus { id, job_status } => ( + StatusCode::BAD_REQUEST, + Json(ApiResponse::error(format!("Cannot retry job {id}: invalid status {job_status}"))), + ) + .into_response(), + } } } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 6e3da7b6..e4e9cf1d 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -64,6 +64,7 @@ async fn handle_retry_job_request( State(config): State>, ) -> JobRouteResult { let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; + println!("retry_job_request: {:?}", job_id); match retry_job(job_id, config).await { Ok(_) => { diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index eced0318..2f119be5 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -119,19 +119,26 @@ async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc)) { +#[case::failed_job(JobStatus::Failed, 200)] +#[case::pending_verification_job(JobStatus::PendingVerification, 400)] +#[case::completed_job(JobStatus::Completed, 400)] +#[tokio::test] +async fn test_trigger_retry_job( + #[future] setup_trigger: (SocketAddr, Arc), + #[case] initial_status: JobStatus, + #[case] expected_response_status: u16, +) { let (addr, config) = setup_trigger.await; - let job_type = JobType::DataSubmission; - // Create a failed job - let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1); + let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1); let mut job_handler = MockJob::new(); - // We expect process_job to be called since retry triggers processing - job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); + job_handler + .expect_process_job() + .times(if expected_response_status == 200 { 1 } else { 0 }) + .returning(move |_, _| Ok("0xbeef".to_string())); job_handler.expect_verification_polling_delay_seconds().return_const(1u64); config.database().create_job(job_item.clone()).await.unwrap(); @@ -139,7 +146,10 @@ async fn test_trigger_retry_job(#[future] setup_trigger: (SocketAddr, Arc> = Arc::new(Box::new(job_handler)); let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); + ctx.expect() + .times(if expected_response_status == 200 { 1 } else { 0 }) + .with(eq(job_type)) + .returning(move |_| Arc::clone(&job_handler)); let client = hyper::Client::new(); let response = client @@ -147,11 +157,15 @@ async fn test_trigger_retry_job(#[future] setup_trigger: (SocketAddr, Arc Date: Fri, 3 Jan 2025 02:28:04 +0530 Subject: [PATCH 07/17] chore: comments resolved regarding docs updatation --- crates/orchestrator/src/jobs/mod.rs | 6 +++++- crates/orchestrator/src/routes/job_routes.rs | 11 ++++++++--- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 85a6a4f3..fe00a86c 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -207,7 +207,9 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::debug!(job_id = ?id, status = ?job.status, "Current job status"); match job.status { - // Only process jobs that need initial processing or require a retry + // we only want to process jobs that are in the created or verification failed state or if it's been called from + // the retry endpoint (in this case it would be PendingRetry status) verification failed state means + // that the previous processing failed and we want to retry JobStatus::Created | JobStatus::VerificationFailed | JobStatus::PendingRetry => { tracing::info!(job_id = ?id, status = ?job.status, "Processing job"); } @@ -332,6 +334,8 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::Span::current().record("internal_id", job.internal_id.clone()); match job.status { + // it's okay to retry the job if it's verificationTimeout, because we are just adding job again to the + // verification queue JobStatus::PendingVerification | JobStatus::VerificationTimeout => { tracing::info!(job_id = ?id, status = ?job.status, "Job status is PendingVerification or VerificationTimeout, proceeding with verification"); } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index e4e9cf1d..24c59b25 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -64,18 +64,23 @@ async fn handle_retry_job_request( State(config): State>, ) -> JobRouteResult { let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - println!("retry_job_request: {:?}", job_id); match retry_job(job_id, config).await { Ok(_) => { info!("Job retry initiated successfully"); - ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "retry_job")]); + ORCHESTRATOR_METRICS.successful_job_operations.add( + 1.0, + &[KeyValue::new("operation_type", "process_job"), KeyValue::new("operation_info", "retry_job")], + ); Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { error!(error = %e, "Failed to retry job"); - ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "retry_job")]); + ORCHESTRATOR_METRICS.failed_job_operations.add( + 1.0, + &[KeyValue::new("operation_type", "process_job"), KeyValue::new("operation_info", "retry_job")], + ); Err(JobRouteError::ProcessingError(e.to_string())) } } From c43646958a0618c1a131ef39686ef1b3d3c0ea82 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 02:44:29 +0530 Subject: [PATCH 08/17] docs: udpated for the routes module --- crates/orchestrator/src/jobs/mod.rs | 194 ++++++++++++++++++- crates/orchestrator/src/routes/app_routes.rs | 62 ++++++ crates/orchestrator/src/routes/error.rs | 68 +++++++ crates/orchestrator/src/routes/job_routes.rs | 77 +++++++- crates/orchestrator/src/routes/mod.rs | 60 ++++++ crates/orchestrator/src/routes/types.rs | 73 +++++++ 6 files changed, 529 insertions(+), 5 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index fe00a86c..d8711a47 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -40,47 +40,62 @@ pub mod state_update_job; pub mod types; use thiserror::Error; +/// Error types for job-related operations in the orchestrator #[derive(Error, Debug, PartialEq)] pub enum JobError { + /// Indicates an invalid job ID was provided #[error("Job id {id:?} is invalid.")] InvalidId { id: String }, + /// Indicates an attempt to create a duplicate job #[error("Job already exists for internal_id {internal_id:?} and job_type {job_type:?}. Skipping!")] JobAlreadyExists { internal_id: String, job_type: JobType }, + /// Indicates the job is in an invalid status for the requested operation #[error("Invalid status {job_status:?} for job with id {id:?}. Cannot process.")] InvalidStatus { id: Uuid, job_status: JobStatus }, + /// Indicates the requested job could not be found #[error("Failed to find job with id {id:?}")] JobNotFound { id: Uuid }, + /// Indicates a metadata counter would overflow if incremented #[error("Incrementing key {} in metadata would exceed u64::MAX", key)] KeyOutOfBounds { key: String }, + /// Wraps errors from DA layer operations #[error("DA Error: {0}")] DaJobError(#[from] DaError), + /// Wraps errors from proving operations #[error("Proving Error: {0}")] ProvingJobError(#[from] ProvingError), + /// Wraps errors from state update operations #[error("Proving Error: {0}")] StateUpdateJobError(#[from] StateUpdateError), + /// Wraps errors from SNOS operations #[error("Snos Error: {0}")] SnosJobError(#[from] SnosError), + /// Wraps errors from queue handling operations #[error("Queue Handling Error: {0}")] ConsumptionError(#[from] ConsumptionError), + /// Wraps errors from fact operations #[error("Fact Error: {0}")] FactError(#[from] FactError), + /// Wraps general errors that don't fit other categories #[error("Other error: {0}")] Other(#[from] OtherError), } -// ==================================================== /// Wrapper Type for Other(<>) job type +/// +/// Provides a generic error type for cases that don't fit into specific error categories +/// while maintaining error chain context. #[derive(Debug)] pub struct OtherError(color_eyre::eyre::Error); @@ -109,42 +124,89 @@ impl From for OtherError { OtherError(eyre!(error_string)) } } -// ==================================================== /// Job Trait /// /// The Job trait is used to define the methods that a job /// should implement to be used as a job for the orchestrator. The orchestrator automatically /// handles queueing and processing of jobs as long as they implement the trait. +/// +/// # Implementation Requirements +/// Implementors must be both `Send` and `Sync` to work with the async processing system. #[automock] #[async_trait] pub trait Job: Send + Sync { /// Should build a new job item and return it + /// + /// # Arguments + /// * `config` - Shared configuration for the job + /// * `internal_id` - Unique identifier for internal tracking + /// * `metadata` - Additional key-value pairs associated with the job + /// + /// # Returns + /// * `Result` - The created job item or an error async fn create_job( &self, config: Arc, internal_id: String, metadata: HashMap, ) -> Result; + /// Should process the job and return the external_id which can be used to /// track the status of the job. For example, a DA job will submit the state diff /// to the DA layer and return the txn hash. + /// + /// # Arguments + /// * `config` - Shared configuration for the job + /// * `job` - Mutable reference to the job being processed + /// + /// # Returns + /// * `Result` - External tracking ID or an error async fn process_job(&self, config: Arc, job: &mut JobItem) -> Result; + /// Should verify the job and return the status of the verification. For example, /// a DA job will verify the inclusion of the state diff in the DA layer and return /// the status of the verification. + /// + /// # Arguments + /// * `config` - Shared configuration for the job + /// * `job` - Mutable reference to the job being verified + /// + /// # Returns + /// * `Result` - Current verification status or an error async fn verify_job(&self, config: Arc, job: &mut JobItem) -> Result; + /// Should return the maximum number of attempts to process the job. A new attempt is made /// every time the verification returns `JobVerificationStatus::Rejected` fn max_process_attempts(&self) -> u64; + /// Should return the maximum number of attempts to verify the job. A new attempt is made /// every few seconds depending on the result `verification_polling_delay_seconds` fn max_verification_attempts(&self) -> u64; + /// Should return the number of seconds to wait before polling for verification fn verification_polling_delay_seconds(&self) -> u64; } /// Creates the job in the DB in the created state and adds it to the process queue +/// +/// # Arguments +/// * `job_type` - Type of job to create +/// * `internal_id` - Unique identifier for internal tracking +/// * `metadata` - Additional key-value pairs for the job +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # Metrics +/// * Records block gauge +/// * Updates successful job operations count +/// * Records job response time +/// +/// # Notes +/// * Skips creation if job already exists with same internal_id and job_type +/// * Automatically adds the job to the process queue upon successful creation #[tracing::instrument(fields(category = "general"), skip(config), ret, err)] pub async fn create_job( job_type: JobType, @@ -194,6 +256,29 @@ pub async fn create_job( /// Processes the job, increments the process attempt count and updates the status of the job in the /// DB. It then adds the job to the verification queue. +/// +/// # Arguments +/// * `id` - UUID of the job to process +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # State Transitions +/// * `Created` -> `LockedForProcessing` -> `PendingVerification` +/// * `VerificationFailed` -> `LockedForProcessing` -> `PendingVerification` +/// * `PendingRetry` -> `LockedForProcessing` -> `PendingVerification` +/// +/// # Metrics +/// * Updates block gauge +/// * Records successful job operations +/// * Tracks job response time +/// +/// # Notes +/// * Only processes jobs in Created, VerificationFailed, or PendingRetry status +/// * Updates job version to prevent concurrent processing +/// * Adds processing completion timestamp to metadata +/// * Automatically adds job to verification queue upon successful processing #[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)] pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> { let start = Instant::now(); @@ -317,6 +402,28 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> /// retries processing the job if the max attempts have not been exceeded. If the max attempts have /// been exceeded, it marks the job as timed out. If the verification is still pending, it pushes /// the job back to the queue. +/// +/// # Arguments +/// * `id` - UUID of the job to verify +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # State Transitions +/// * `PendingVerification` -> `Completed` (on successful verification) +/// * `PendingVerification` -> `VerificationFailed` (on verification rejection) +/// * `PendingVerification` -> `VerificationTimeout` (max attempts reached) +/// +/// # Metrics +/// * Records verification time if processing completion timestamp exists +/// * Updates block gauge and job operation metrics +/// * Tracks successful operations and response time +/// +/// # Notes +/// * Only jobs in `PendingVerification` or `VerificationTimeout` status can be verified +/// * Automatically retries processing if verification fails and max attempts not reached +/// * Removes processing_completed_at from metadata upon successful verification #[tracing::instrument( skip(config), fields(category = "general", job, job_type, internal_id, verification_status), @@ -485,6 +592,21 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { /// Retries a failed job by reprocessing it. /// Only jobs with Failed status can be retried. +/// +/// # Arguments +/// * `id` - UUID of the job to retry +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # State Transitions +/// * `Failed` -> `PendingRetry` -> (normal processing flow) +/// +/// # Notes +/// * Only jobs in Failed status can be retried +/// * Transitions through PendingRetry status before normal processing +/// * Uses standard process_job function after status update #[tracing::instrument(skip(config), fields(category = "general"), ret, err)] pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; @@ -545,7 +667,18 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { } /// Terminates the job and updates the status of the job in the DB. -/// Logs error if the job status `Completed` is existing on DL queue. +/// +/// # Arguments +/// * `id` - UUID of the job to handle failure for +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # Notes +/// * Logs error if the job status `Completed` is existing on DL queue +/// * Updates job status to Failed and records failure reason in metadata +/// * Updates metrics for failed jobs #[tracing::instrument(skip(config), fields(job_status, job_type), ret, err)] pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?.clone(); @@ -561,6 +694,20 @@ pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), Job .await } +/// Moves a job to the Failed state with the provided reason +/// +/// # Arguments +/// * `job` - Reference to the job to mark as failed +/// * `config` - Shared configuration +/// * `reason` - Failure reason to record in metadata +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # Notes +/// * Skips processing if job is already in Failed status +/// * Records failure reason in job metadata +/// * Updates metrics for failed jobs async fn move_job_to_failed(job: &JobItem, config: Arc, reason: String) -> Result<(), JobError> { if job.status == JobStatus::Completed { tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue"); @@ -597,6 +744,14 @@ async fn move_job_to_failed(job: &JobItem, config: Arc, reason: String) } } +/// Retrieves a job by its ID from the database +/// +/// # Arguments +/// * `id` - UUID of the job to retrieve +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result` - The job if found, or JobNotFound error async fn get_job(id: Uuid, config: Arc) -> Result { let job = config.database().get_job_by_id(id).await.map_err(|e| JobError::Other(OtherError(e)))?; match job { @@ -605,6 +760,18 @@ async fn get_job(id: Uuid, config: Arc) -> Result { } } +/// Increments a numeric value in the job metadata +/// +/// # Arguments +/// * `metadata` - Current metadata map +/// * `key` - Key to increment +/// +/// # Returns +/// * `Result, JobError>` - Updated metadata or an error +/// +/// # Errors +/// * Returns KeyOutOfBounds if incrementing would exceed u64::MAX +/// * Returns error if value cannot be parsed as u64 pub fn increment_key_in_metadata( metadata: &HashMap, key: &str, @@ -620,6 +787,18 @@ pub fn increment_key_in_metadata( Ok(new_metadata) } +/// Retrieves a u64 value from the metadata map +/// +/// # Arguments +/// * `metadata` - Metadata map to search +/// * `key` - Key to retrieve +/// +/// # Returns +/// * `color_eyre::Result` - The parsed value or an error +/// +/// # Notes +/// * Returns 0 if the key doesn't exist in the metadata +/// * Wraps parsing errors with additional context fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { metadata .get(key) @@ -632,10 +811,12 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color mod tests { use super::*; + /// Tests for increment_key_in_metadata function mod test_increment_key_in_metadata { use super::*; #[test] + /// Tests incrementing a non-existent key (should start at 0) fn key_does_not_exist() { let metadata = HashMap::new(); let key = "test_key"; @@ -644,6 +825,7 @@ mod tests { } #[test] + /// Tests incrementing an existing numeric value fn key_exists_with_numeric_value() { let mut metadata = HashMap::new(); metadata.insert("test_key".to_string(), "41".to_string()); @@ -653,6 +835,7 @@ mod tests { } #[test] + /// Tests handling of non-numeric values fn key_exists_with_non_numeric_value() { let mut metadata = HashMap::new(); metadata.insert("test_key".to_string(), "not_a_number".to_string()); @@ -662,6 +845,7 @@ mod tests { } #[test] + /// Tests overflow handling at u64::MAX fn key_exists_with_max_u64_value() { let mut metadata = HashMap::new(); metadata.insert("test_key".to_string(), u64::MAX.to_string()); @@ -671,10 +855,12 @@ mod tests { } } + /// Tests for get_u64_from_metadata function mod test_get_u64_from_metadata { use super::*; #[test] + /// Tests retrieving a valid u64 value fn key_exists_with_valid_u64_value() { let mut metadata = HashMap::new(); metadata.insert("key1".to_string(), "12345".to_string()); @@ -683,6 +869,7 @@ mod tests { } #[test] + /// Tests handling of invalid numeric strings fn key_exists_with_invalid_value() { let mut metadata = HashMap::new(); metadata.insert("key2".to_string(), "not_a_number".to_string()); @@ -691,6 +878,7 @@ mod tests { } #[test] + /// Tests default behavior when key doesn't exist fn key_does_not_exist() { let metadata = HashMap::::new(); let result = get_u64_from_metadata(&metadata, "key3").unwrap(); diff --git a/crates/orchestrator/src/routes/app_routes.rs b/crates/orchestrator/src/routes/app_routes.rs index 205c8790..a416ed36 100644 --- a/crates/orchestrator/src/routes/app_routes.rs +++ b/crates/orchestrator/src/routes/app_routes.rs @@ -3,18 +3,80 @@ use axum::response::IntoResponse; use axum::routing::get; use axum::Router; +/// Creates the main application router with basic health check and development routes. +/// +/// This router provides fundamental application endpoints including: +/// - Health check endpoint at `/health` +/// - Development routes under `/v1/dev` +/// +/// The router is used in conjunction with job routes to form the complete API surface. +/// See the main router setup in: +/// ```rust:crates/orchestrator/src/routes/mod.rs +/// startLine: 23 +/// endLine: 35 +/// ``` +/// +/// # Returns +/// * `Router` - Configured application router with health and dev routes +/// +/// # Examples +/// ``` +/// let app = app_router(); +/// // Health check endpoint will respond with "UP" +/// // GET /health -> 200 OK +/// ``` pub fn app_router() -> Router { Router::new().route("/health", get(root)).nest("/v1/dev", dev_routes()) } +/// Health check endpoint handler. +/// +/// Returns a simple "UP" response to indicate the service is running. +/// This endpoint is commonly used by load balancers and monitoring systems +/// to verify service availability. +/// +/// # Returns +/// * `&'static str` - Always returns "UP" +/// +/// # Examples +/// Used in tests as shown in: +/// ```rust:crates/orchestrator/src/tests/server/mod.rs +/// startLine: 12 +/// endLine: 32 +/// ``` async fn root() -> &'static str { "UP" } +/// Handles 404 Not Found responses for the application. +/// +/// This handler is used as a fallback when no other routes match the request. +/// It provides a consistent error response format across the application. +/// +/// # Returns +/// * `impl IntoResponse` - Returns a 404 status code with a descriptive message +/// +/// # Examples +/// ``` +/// // When accessing an undefined route: +/// // GET /undefined -> 404 Not Found +/// // Response: "The requested resource was not found" +/// ``` pub async fn handler_404() -> impl IntoResponse { (StatusCode::NOT_FOUND, "The requested resource was not found") } +/// Creates a router for development-only endpoints. +/// +/// This router is nested under `/v1/dev` and is intended for +/// development and testing purposes. Currently empty but provides +/// a location for adding development-specific endpoints. +/// +/// # Returns +/// * `Router` - Empty router for development endpoints +/// +/// # Security +/// These routes should be disabled or properly secured in production environments. fn dev_routes() -> Router { Router::new() } diff --git a/crates/orchestrator/src/routes/error.rs b/crates/orchestrator/src/routes/error.rs index 3cd8d42f..fb7a1cd6 100644 --- a/crates/orchestrator/src/routes/error.rs +++ b/crates/orchestrator/src/routes/error.rs @@ -4,22 +4,90 @@ use axum::Json; use super::types::ApiResponse; +/// Represents errors that can occur during job route handling operations. +/// +/// This enum implements both `Debug` and the custom `Error` trait from thiserror, +/// providing formatted error messages for each variant. +/// +/// # Error Variants +/// Each variant maps to a specific HTTP status code when converted to a response: +/// * `InvalidId` - 400 Bad Request +/// * `NotFound` - 404 Not Found +/// * `ProcessingError` - 400 Bad Request +/// * `InvalidJobState` - 409 Conflict +/// * `DatabaseError` - 500 Internal Server Error +/// * `InvalidStatus` - 400 Bad Request +/// +/// # Examples +/// ``` +/// use crate::routes::error::JobRouteError; +/// +/// // Creating an invalid ID error +/// let error = JobRouteError::InvalidId("123-invalid".to_string()); +/// +/// // Creating a processing error +/// let error = JobRouteError::ProcessingError("Failed to process job".to_string()); +/// ``` #[derive(Debug, thiserror::Error)] pub enum JobRouteError { + /// Indicates that the provided job ID is not valid (e.g., not a valid UUID) #[error("Invalid job ID: {0}")] InvalidId(String), + + /// Indicates that the requested job could not be found in the system #[error("Job not found: {0}")] NotFound(String), + + /// Represents errors that occur during job processing #[error("Job processing error: {0}")] ProcessingError(String), + + /// Indicates that the job is in an invalid state for the requested operation #[error("Invalid job state: {0}")] InvalidJobState(String), + + /// Represents errors from database operations #[error("Database error")] DatabaseError, + + /// Indicates that the job status is invalid for the requested operation + /// Contains both the job ID and the current status #[error("Invalid status: {id}: {job_status}")] InvalidStatus { id: String, job_status: String }, } +/// Implementation of axum's `IntoResponse` trait for converting errors into HTTP responses. +/// +/// This implementation ensures that each error variant is mapped to an appropriate +/// HTTP status code and formatted response body. +/// +/// # Response Format +/// All responses are returned as JSON with the following structure: +/// ```json +/// { +/// "success": false, +/// "message": "Error message here" +/// } +/// ``` +/// +/// # Status Code Mapping +/// * `InvalidId` -> 400 Bad Request +/// * `NotFound` -> 404 Not Found +/// * `ProcessingError` -> 400 Bad Request +/// * `InvalidJobState` -> 409 Conflict +/// * `DatabaseError` -> 500 Internal Server Error +/// * `InvalidStatus` -> 400 Bad Request +/// +/// # Examples +/// This implementation is used automatically when returning errors from route handlers: +/// ```rust +/// async fn handle_job(id: String) -> Result { +/// if !is_valid_id(&id) { +/// return Err(JobRouteError::InvalidId(id)); +/// } +/// // ... rest of handler +/// } +/// ``` impl IntoResponse for JobRouteError { fn into_response(self) -> Response { match self { diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 24c59b25..1705eded 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -14,6 +14,25 @@ use crate::config::Config; use crate::jobs::{process_job, retry_job, verify_job}; use crate::metrics::ORCHESTRATOR_METRICS; +/// Handles HTTP requests to process a job. +/// +/// This endpoint initiates the processing of a job identified by its UUID. It performs the +/// following: +/// 1. Validates and parses the job ID from the URL path parameter +/// 2. Calls the job processing logic +/// 3. Records metrics for successful/failed operations +/// 4. Returns an appropriate API response +/// +/// # Arguments +/// * `Path(JobId { id })` - The job ID extracted from the URL path +/// * `State(config)` - Shared application configuration +/// +/// # Returns +/// * `JobRouteResult` - Success response or error details +/// +/// # Errors +/// * `JobRouteError::InvalidId` - If the provided ID is not a valid UUID +/// * `JobRouteError::ProcessingError` - If job processing fails #[instrument(skip(config), fields(job_id = %id))] async fn handle_process_job_request( Path(JobId { id }): Path, @@ -36,6 +55,24 @@ async fn handle_process_job_request( } } +/// Handles HTTP requests to verify a job's status. +/// +/// This endpoint checks the current status and validity of a job. It performs: +/// 1. Validates and parses the job ID +/// 2. Verifies the job's current state +/// 3. Records metrics for the verification attempt +/// 4. Returns the verification result +/// +/// # Arguments +/// * `Path(JobId { id })` - The job ID extracted from the URL path +/// * `State(config)` - Shared application configuration +/// +/// # Returns +/// * `JobRouteResult` - Success response or error details +/// +/// # Errors +/// * `JobRouteError::InvalidId` - If the provided ID is not a valid UUID +/// * `JobRouteError::ProcessingError` - If verification fails #[instrument(skip(config), fields(job_id = %id))] async fn handle_verify_job_request( Path(JobId { id }): Path, @@ -58,6 +95,24 @@ async fn handle_verify_job_request( } } +/// Handles HTTP requests to retry a failed job. +/// +/// This endpoint attempts to retry a previously failed job. It: +/// 1. Validates and parses the job ID +/// 2. Initiates the retry process +/// 3. Records metrics with additional retry context +/// 4. Returns the retry attempt result +/// +/// # Arguments +/// * `Path(JobId { id })` - The job ID extracted from the URL path +/// * `State(config)` - Shared application configuration +/// +/// # Returns +/// * `JobRouteResult` - Success response or error details +/// +/// # Errors +/// * `JobRouteError::InvalidId` - If the provided ID is not a valid UUID +/// * `JobRouteError::ProcessingError` - If retry attempt fails #[instrument(skip(config), fields(job_id = %id))] async fn handle_retry_job_request( Path(JobId { id }): Path, @@ -86,12 +141,30 @@ async fn handle_retry_job_request( } } -/// Creates a router for job-related endpoints +/// Creates a router for job-related endpoints. +/// +/// This function sets up the main router for all job-related operations, +/// nesting the specific job trigger endpoints under the "/jobs" path. +/// +/// # Arguments +/// * `config` - Shared application configuration +/// +/// # Returns +/// * `Router` - Configured router with all job endpoints pub fn job_router(config: Arc) -> Router { Router::new().nest("/jobs", trigger_router(config.clone())) } -/// Creates the nested router for job trigger endpoints +/// Creates the nested router for job trigger endpoints. +/// +/// Sets up specific routes for processing, verifying, and retrying jobs. +/// All endpoints are configured as GET requests and share the application config. +/// +/// # Arguments +/// * `config` - Shared application configuration +/// +/// # Returns +/// * `Router` - Configured router with trigger endpoints fn trigger_router(config: Arc) -> Router { Router::new() .route("/:id/process", get(handle_process_job_request)) diff --git a/crates/orchestrator/src/routes/mod.rs b/crates/orchestrator/src/routes/mod.rs index 652a7db7..b2fd2036 100644 --- a/crates/orchestrator/src/routes/mod.rs +++ b/crates/orchestrator/src/routes/mod.rs @@ -7,6 +7,14 @@ use job_routes::job_router; use crate::config::Config; +/// Routes module for the orchestrator service. +/// +/// This module provides the core routing and server setup functionality, organizing +/// different route handlers into submodules: +/// - `app_routes`: General application routes (e.g., health checks) +/// - `job_routes`: Job processing and management routes +/// - `error`: Error handling and HTTP response mapping +/// - `types`: Shared type definitions for route handlers pub mod app_routes; pub mod error; pub mod job_routes; @@ -14,12 +22,45 @@ pub mod types; pub use error::JobRouteError; +/// Configuration parameters for the HTTP server. +/// +/// Contains the necessary information to bind and start the server. +/// +/// # Examples +/// ``` +/// let params = ServerParams { host: "127.0.0.1".to_string(), port: 8080 }; +/// ``` #[derive(Debug, Clone)] pub struct ServerParams { + /// The host address to bind to (e.g., "127.0.0.1", "0.0.0.0") pub host: String, + /// The port number to listen on pub port: u16, } +/// Sets up and starts the HTTP server with configured routes. +/// +/// This function: +/// 1. Initializes the server with the provided configuration +/// 2. Sets up all route handlers (both app and job routes) +/// 3. Starts the server in a separate tokio task +/// +/// # Arguments +/// * `config` - Shared application configuration +/// +/// # Returns +/// * `SocketAddr` - The bound address of the server +/// +/// # Panics +/// * If the server fails to start +/// * If the address cannot be bound +/// +/// # Examples +/// ``` +/// let config = Arc::new(Config::new()); +/// let addr = setup_server(config).await; +/// println!("Server listening on {}", addr); +/// ``` pub async fn setup_server(config: Arc) -> SocketAddr { let (api_server_url, listener) = get_server_url(config.server_config()).await; @@ -34,6 +75,25 @@ pub async fn setup_server(config: Arc) -> SocketAddr { api_server_url } +/// Creates a TCP listener and returns its address. +/// +/// This function handles the low-level socket binding and address resolution. +/// +/// # Arguments +/// * `server_params` - Configuration for the server binding +/// +/// # Returns +/// * `(SocketAddr, TcpListener)` - The bound address and the TCP listener +/// +/// # Panics +/// * If binding to the specified address fails +/// * If the listener cannot be created +/// +/// # Examples +/// ``` +/// let params = ServerParams { host: "127.0.0.1".to_string(), port: 8080 }; +/// let (addr, listener) = get_server_url(¶ms).await; +/// ``` pub async fn get_server_url(server_params: &ServerParams) -> (SocketAddr, tokio::net::TcpListener) { let address = format!("{}:{}", server_params.host, server_params.port); let listener = tokio::net::TcpListener::bind(address.clone()).await.expect("Failed to get listener"); diff --git a/crates/orchestrator/src/routes/types.rs b/crates/orchestrator/src/routes/types.rs index b7b59031..fec1f286 100644 --- a/crates/orchestrator/src/routes/types.rs +++ b/crates/orchestrator/src/routes/types.rs @@ -3,25 +3,98 @@ use serde::{Deserialize, Serialize}; use super::error::JobRouteError; +/// Represents a job identifier in API requests. +/// +/// This struct is used to deserialize job IDs from incoming HTTP requests, +/// particularly in path parameters. +/// +/// # Examples +/// ``` +/// let job_id = JobId { id: "123e4567-e89b-12d3-a456-426614174000".to_string() }; +/// ``` #[derive(Deserialize)] pub struct JobId { + /// The string representation of the job's UUID pub id: String, } +/// Represents a standardized API response structure. +/// +/// This struct provides a consistent format for all API responses, including +/// both successful operations and errors. It implements serialization for +/// converting responses to JSON. +/// +/// # Fields +/// * `success` - Indicates whether the operation was successful +/// * `message` - Optional message providing additional details (typically used for errors) +/// +/// # Examples +/// ``` +/// // Success response +/// let response = ApiResponse::success(); +/// assert_eq!(response.success, true); +/// assert_eq!(response.message, None); +/// +/// // Error response +/// let response = ApiResponse::error("Invalid job ID".to_string()); +/// assert_eq!(response.success, false); +/// assert_eq!(response.message, Some("Invalid job ID".to_string())); +/// ``` #[derive(Serialize)] pub struct ApiResponse { + /// Indicates if the operation was successful pub success: bool, + /// Optional message, typically used for error details pub message: Option, } impl ApiResponse { + /// Creates a successful response with no message. + /// + /// # Returns + /// Returns an `ApiResponse` with `success` set to `true` and no message. + /// + /// # Examples + /// ``` + /// let response = ApiResponse::success(); + /// assert_eq!(response.success, true); + /// ``` pub fn success() -> Self { Self { success: true, message: None } } + /// Creates an error response with the specified message. + /// + /// # Arguments + /// * `message` - The error message to include in the response + /// + /// # Returns + /// Returns an `ApiResponse` with `success` set to `false` and the provided message. + /// + /// # Examples + /// ``` + /// let response = ApiResponse::error("Operation failed".to_string()); + /// assert_eq!(response.success, false); + /// assert_eq!(response.message, Some("Operation failed".to_string())); + /// ``` pub fn error(message: String) -> Self { Self { success: false, message: Some(message) } } } +/// Type alias for the result type used in job route handlers. +/// +/// This type combines axum's `Response` type with our custom `JobRouteError`, +/// providing a consistent error handling pattern across all job-related routes. +/// +/// # Examples +/// ``` +/// async fn handle_job() -> JobRouteResult { +/// // Success case +/// Ok(Json(ApiResponse::success()).into_response()) +/// +/// // Error case +/// Err(JobRouteError::NotFound("123".to_string())) +/// } +/// ``` pub type JobRouteResult = Result; From 0b9e51cc9804a21dac5aa60b9a95529854d4ed29 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 13:13:24 +0530 Subject: [PATCH 09/17] chore: comment updated in the verify job --- crates/orchestrator/src/jobs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index d8711a47..92f10882 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -444,7 +444,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { // it's okay to retry the job if it's verificationTimeout, because we are just adding job again to the // verification queue JobStatus::PendingVerification | JobStatus::VerificationTimeout => { - tracing::info!(job_id = ?id, status = ?job.status, "Job status is PendingVerification or VerificationTimeout, proceeding with verification"); + tracing::info!(job_id = ?id, status = ?job.status, "Proceeding with verification"); } _ => { tracing::error!(job_id = ?id, status = ?job.status, "Invalid job status for verification"); From ec654c96ac38096adab250c210b7e1a9b7207868 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 14:40:41 +0530 Subject: [PATCH 10/17] chore: removed redundant comments --- crates/orchestrator/src/routes/app_routes.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/orchestrator/src/routes/app_routes.rs b/crates/orchestrator/src/routes/app_routes.rs index a416ed36..c31a0a0b 100644 --- a/crates/orchestrator/src/routes/app_routes.rs +++ b/crates/orchestrator/src/routes/app_routes.rs @@ -19,11 +19,6 @@ use axum::Router; /// # Returns /// * `Router` - Configured application router with health and dev routes /// -/// # Examples -/// ``` -/// let app = app_router(); -/// // Health check endpoint will respond with "UP" -/// // GET /health -> 200 OK /// ``` pub fn app_router() -> Router { Router::new().route("/health", get(root)).nest("/v1/dev", dev_routes()) @@ -38,11 +33,6 @@ pub fn app_router() -> Router { /// # Returns /// * `&'static str` - Always returns "UP" /// -/// # Examples -/// Used in tests as shown in: -/// ```rust:crates/orchestrator/src/tests/server/mod.rs -/// startLine: 12 -/// endLine: 32 /// ``` async fn root() -> &'static str { "UP" From 2373eb7f3123e68498b6fc8815984b2d74d1c235 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Fri, 3 Jan 2025 15:00:59 +0530 Subject: [PATCH 11/17] chore: tests for retry endpoint separated --- .../src/tests/server/job_routes.rs | 67 +++++++++++-------- 1 file changed, 38 insertions(+), 29 deletions(-) diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 2f119be5..d0918559 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -119,26 +119,17 @@ async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc), - #[case] initial_status: JobStatus, - #[case] expected_response_status: u16, -) { +#[rstest] +async fn test_trigger_retry_job_when_failed(#[future] setup_trigger: (SocketAddr, Arc)) { let (addr, config) = setup_trigger.await; let job_type = JobType::DataSubmission; - let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1); + let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1); let mut job_handler = MockJob::new(); - job_handler - .expect_process_job() - .times(if expected_response_status == 200 { 1 } else { 0 }) - .returning(move |_, _| Ok("0xbeef".to_string())); + // Expect process_job to be called once for failed jobs + job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); job_handler.expect_verification_polling_delay_seconds().return_const(1u64); config.database().create_job(job_item.clone()).await.unwrap(); @@ -146,10 +137,7 @@ async fn test_trigger_retry_job( let job_handler: Arc> = Arc::new(Box::new(job_handler)); let ctx = mock_factory::get_job_handler_context(); - ctx.expect() - .times(if expected_response_status == 200 { 1 } else { 0 }) - .with(eq(job_type)) - .returning(move |_| Arc::clone(&job_handler)); + ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); let client = hyper::Client::new(); let response = client @@ -157,18 +145,39 @@ async fn test_trigger_retry_job( .await .unwrap(); - assert_eq!(response.status(), expected_response_status); + assert_eq!(response.status(), 200); - if let Some(job_fetched) = config.database().get_job_by_id(job_id).await.unwrap() { - if expected_response_status == 200 { - assert_eq!(job_fetched.id, job_item.id); - assert_eq!(job_fetched.status, JobStatus::PendingVerification); - } else { - assert_eq!(job_fetched.status, initial_status); - } - } else { - panic!("Could not get job from database") - } + let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); + assert_eq!(job_fetched.id, job_item.id); + assert_eq!(job_fetched.status, JobStatus::PendingVerification); +} + +#[rstest] +#[case::pending_verification_job(JobStatus::PendingVerification)] +#[case::completed_job(JobStatus::Completed)] +#[tokio::test] +async fn test_trigger_retry_job_not_allowed( + #[future] setup_trigger: (SocketAddr, Arc), + #[case] initial_status: JobStatus, +) { + let (addr, config) = setup_trigger.await; + let job_type = JobType::DataSubmission; + + let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1); + + config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.clone().id; + + let client = hyper::Client::new(); + let response = client + .request(Request::builder().uri(format!("http://{}/jobs/{}/retry", addr, job_id)).body(Body::empty()).unwrap()) + .await + .unwrap(); + + assert_eq!(response.status(), 400); + + let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); + assert_eq!(job_fetched.status, initial_status); } #[rstest] From bc710fdb7ec2d59eee43a0bc799b13081c1a5273 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Mon, 6 Jan 2025 21:36:44 +0530 Subject: [PATCH 12/17] refactor: adding retry jobs directly to the queue now --- crates/orchestrator/src/jobs/mod.rs | 27 ++++--- crates/orchestrator/src/tests/jobs/mod.rs | 72 ++++++++++++++++++- .../src/tests/server/job_routes.rs | 29 ++++---- 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 44743aae..2f2250b4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -642,28 +642,27 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { JobError::Other(OtherError(e)) })?; - let result = process_job(job.id, config.clone()).await; - - if let Err(e) = &result { + add_job_to_process_queue(job.id, &job.job_type, config.clone()).await.map_err(|e| { tracing::error!( log_type = "error", category = "general", function_type = "retry_job", block_no = %internal_id, error = %e, - "General retry job failed for block" - ); - } else { - tracing::info!( - log_type = "completed", - category = "general", - function_type = "retry_job", - block_no = %internal_id, - "General retry job completed for block" + "Failed to add job to process queue" ); - } + JobError::Other(OtherError(e)) + })?; + + tracing::info!( + log_type = "completed", + category = "general", + function_type = "retry_job", + block_no = %internal_id, + "Successfully queued job for retry" + ); - result + Ok(()) } /// Terminates the job and updates the status of the job in the DB. diff --git a/crates/orchestrator/src/tests/jobs/mod.rs b/crates/orchestrator/src/tests/jobs/mod.rs index c34fe0ff..c1b3fb72 100644 --- a/crates/orchestrator/src/tests/jobs/mod.rs +++ b/crates/orchestrator/src/tests/jobs/mod.rs @@ -16,7 +16,8 @@ use crate::jobs::constants::{ use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::{ - create_job, handle_job_failure, increment_key_in_metadata, process_job, verify_job, Job, JobError, MockJob, + create_job, handle_job_failure, increment_key_in_metadata, process_job, retry_job, verify_job, Job, JobError, + MockJob, }; use crate::queue::job_queue::QueueNameForJobType; use crate::queue::QueueType; @@ -759,3 +760,72 @@ async fn handle_job_failure_job_status_completed_works(#[case] job_type: JobType assert_eq!(job_fetched, job_expected); } + +#[rstest] +#[tokio::test] +async fn test_retry_job_adds_to_process_queue() { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // Create a failed job + let job_item = build_job_item(JobType::DataSubmission, JobStatus::Failed, 1); + services.config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.id; + + // Retry the job + assert!(retry_job(job_id, services.config.clone()).await.is_ok()); + + // Verify job status was updated to PendingRetry + let updated_job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap(); + assert_eq!(updated_job.status, JobStatus::PendingRetry); + + // Wait for message to be processed + tokio::time::sleep(Duration::from_secs(5)).await; + + // Verify message was added to process queue + let consumed_messages = + services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await.unwrap(); + + let consumed_message_payload: MessagePayloadType = consumed_messages.payload_serde_json().unwrap().unwrap(); + assert_eq!(consumed_message_payload.id, job_id); +} + +#[rstest] +#[case::pending_verification(JobStatus::PendingVerification)] +#[case::completed(JobStatus::Completed)] +#[case::created(JobStatus::Created)] +#[tokio::test] +async fn test_retry_job_invalid_status(#[case] initial_status: JobStatus) { + let services = TestConfigBuilder::new() + .configure_database(ConfigType::Actual) + .configure_queue_client(ConfigType::Actual) + .build() + .await; + + // Create a job with non-Failed status + let job_item = build_job_item(JobType::DataSubmission, initial_status.clone(), 1); + services.config.database().create_job(job_item.clone()).await.unwrap(); + let job_id = job_item.id; + + // Attempt to retry the job + let result = retry_job(job_id, services.config.clone()).await; + assert!(result.is_err()); + + if let Err(error) = result { + assert_matches!(error, JobError::InvalidStatus { .. }); + } + + // Verify job status was not changed + let job = services.config.database().get_job_by_id(job_id).await.unwrap().unwrap(); + assert_eq!(job.status, initial_status); + + // Wait briefly to ensure no messages were added + tokio::time::sleep(Duration::from_secs(5)).await; + + // Verify no message was added to process queue + let queue_result = services.config.queue().consume_message_from_queue(job_item.job_type.process_queue_name()).await; + assert_matches!(queue_result, Err(QueueError::NoData)); +} diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index d0918559..2b513caf 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -17,6 +17,7 @@ use crate::jobs::job_handler_factory::mock_factory; use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; use crate::jobs::{Job, MockJob}; use crate::queue::init_consumers; +use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType}; use crate::tests::config::{ConfigType, TestConfigBuilder}; #[fixture] @@ -126,35 +127,32 @@ async fn test_trigger_retry_job_when_failed(#[future] setup_trigger: (SocketAddr let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), JobStatus::Failed, 1); - let mut job_handler = MockJob::new(); - - // Expect process_job to be called once for failed jobs - job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); - job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; - let job_handler: Arc> = Arc::new(Box::new(job_handler)); - let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); - let client = hyper::Client::new(); let response = client .request(Request::builder().uri(format!("http://{}/jobs/{}/retry", addr, job_id)).body(Body::empty()).unwrap()) .await .unwrap(); - assert_eq!(response.status(), 200); + // Verify job was added to process queue + let queue_message = config.queue().consume_message_from_queue(job_type.process_queue_name()).await.unwrap(); + + let message_payload: JobQueueMessage = queue_message.payload_serde_json().unwrap().unwrap(); + assert_eq!(message_payload.id, job_id); + + // Verify job status changed to PendingRetry let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); assert_eq!(job_fetched.id, job_item.id); - assert_eq!(job_fetched.status, JobStatus::PendingVerification); + assert_eq!(job_fetched.status, JobStatus::PendingRetry); } #[rstest] #[case::pending_verification_job(JobStatus::PendingVerification)] #[case::completed_job(JobStatus::Completed)] +#[case::created_job(JobStatus::Created)] #[tokio::test] async fn test_trigger_retry_job_not_allowed( #[future] setup_trigger: (SocketAddr, Arc), @@ -164,7 +162,6 @@ async fn test_trigger_retry_job_not_allowed( let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), initial_status.clone(), 1); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; @@ -174,10 +171,16 @@ async fn test_trigger_retry_job_not_allowed( .await .unwrap(); + // Verify request was rejected assert_eq!(response.status(), 400); + // Verify job status hasn't changed let job_fetched = config.database().get_job_by_id(job_id).await.unwrap().expect("Could not get job from database"); assert_eq!(job_fetched.status, initial_status); + + // Verify no message was added to the queue + let queue_result = config.queue().consume_message_from_queue(job_type.process_queue_name()).await; + assert!(queue_result.is_err(), "Queue should be empty - no message should be added for non-Failed jobs"); } #[rstest] From 3b30773adc59c873c025d0824abf0123e3df256b Mon Sep 17 00:00:00 2001 From: mohiiit Date: Tue, 7 Jan 2025 12:00:17 +0530 Subject: [PATCH 13/17] chore: removed redundant comment --- crates/orchestrator/src/routes/app_routes.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/crates/orchestrator/src/routes/app_routes.rs b/crates/orchestrator/src/routes/app_routes.rs index c31a0a0b..b6e90176 100644 --- a/crates/orchestrator/src/routes/app_routes.rs +++ b/crates/orchestrator/src/routes/app_routes.rs @@ -9,13 +9,6 @@ use axum::Router; /// - Health check endpoint at `/health` /// - Development routes under `/v1/dev` /// -/// The router is used in conjunction with job routes to form the complete API surface. -/// See the main router setup in: -/// ```rust:crates/orchestrator/src/routes/mod.rs -/// startLine: 23 -/// endLine: 35 -/// ``` -/// /// # Returns /// * `Router` - Configured application router with health and dev routes /// From 6b65a1239a9804439978db43b826cad896720b49 Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 9 Jan 2025 14:28:42 +0530 Subject: [PATCH 14/17] refactor: process and verify endpoint adds job to the queue now --- crates/orchestrator/src/jobs/constants.rs | 2 + crates/orchestrator/src/jobs/mod.rs | 134 +++++++++++++++++-- crates/orchestrator/src/routes/job_routes.rs | 35 +++-- 3 files changed, 140 insertions(+), 31 deletions(-) diff --git a/crates/orchestrator/src/jobs/constants.rs b/crates/orchestrator/src/jobs/constants.rs index 07992401..c1e600ca 100644 --- a/crates/orchestrator/src/jobs/constants.rs +++ b/crates/orchestrator/src/jobs/constants.rs @@ -1,5 +1,7 @@ pub const JOB_PROCESS_ATTEMPT_METADATA_KEY: &str = "process_attempt_no"; +pub const JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY: &str = "process_retry_attempt_no"; pub const JOB_VERIFICATION_ATTEMPT_METADATA_KEY: &str = "verification_attempt_no"; +pub const JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY: &str = "verification_retry_attempt_no"; pub const JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY: &str = "blocks_number_to_settle"; pub const JOB_METADATA_STATE_UPDATE_FETCH_FROM_TESTS: &str = "fetch_from_test_data"; pub const JOB_METADATA_STATE_UPDATE_ATTEMPT_PREFIX: &str = "attempt_tx_hashes_"; diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 0c9f6d4c..acb0c7a6 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -22,7 +22,7 @@ use types::{ExternalId, JobItemUpdates}; use uuid::Uuid; use crate::config::Config; -use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY}; +use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY}; #[double] use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; @@ -125,6 +125,12 @@ impl From for OtherError { } } +impl From for JobError { + fn from(err: color_eyre::Report) -> Self { + JobError::Other(OtherError(err)) + } +} + /// Job Trait /// /// The Job trait is used to define the methods that a job @@ -613,6 +619,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; let internal_id = job.internal_id.clone(); + tracing::info!( log_type = "starting", category = "general", @@ -630,10 +637,25 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { return Err(JobError::InvalidStatus { id, job_status: job.status }); } - // Update job status to PendingRetry before processing + // Increment the process retry counter + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; + + tracing::debug!( + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + "Incrementing process retry attempt counter" + ); + + // Update job status and metadata to PendingRetry before processing config .database() - .update_job(&job, JobItemUpdates::new().update_status(JobStatus::PendingRetry).build()) + .update_job( + &job, + JobItemUpdates::new() + .update_status(JobStatus::PendingRetry) + .update_metadata(metadata) + .build() + ) .await .map_err(|e| { tracing::error!( @@ -828,18 +850,104 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color .wrap_err(format!("Failed to parse u64 from metadata key '{}'", key)) } -fn register_block_gauge(job: &JobItem, attributes: &[KeyValue]) -> Result<(), JobError> { - let block_number = if let JobType::StateTransition = job.job_type { - parse_string( - job.external_id - .unwrap_string() - .map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?, +/// Queues a job for processing by adding it to the process queue +/// +/// # Arguments +/// * `id` - UUID of the job to process +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # State Transitions +/// * Any valid state -> PendingProcess +#[tracing::instrument(skip(config), fields(category = "general"), ret, err)] +pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<(), JobError> { + let job = get_job(id, config.clone()).await?; + + // Increment the process retry counter + let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; + + tracing::debug!( + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + "Incrementing process retry attempt counter" + ); + + // Update job status and metadata to indicate it's queued for processing + config + .database() + .update_job( + &job, + JobItemUpdates::new() + .update_status(JobStatus::PendingRetry) + .update_metadata(metadata) + .build() ) - } else { - parse_string(&job.internal_id) - }?; + .await + .map_err(|e| JobError::Other(OtherError(e)))?; - ORCHESTRATOR_METRICS.block_gauge.record(block_number, attributes); + // Add to process queue + add_job_to_process_queue(id, &job.job_type, config) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + + Ok(()) +} + +/// Queues a job for verification by adding it to the verification queue +/// +/// # Arguments +/// * `id` - UUID of the job to verify +/// * `config` - Shared configuration +/// +/// # Returns +/// * `Result<(), JobError>` - Success or an error +/// +/// # Notes +/// * Resets verification attempt count to 0 +/// * Sets appropriate delay for verification polling +#[tracing::instrument(skip(config), fields(category = "general"), ret, err)] +pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result<(), JobError> { + let job = get_job(id, config.clone()).await?; + let job_handler = factory::get_job_handler(&job.job_type).await; + + // Reset verification attempts and increment retry counter + let mut metadata = job.metadata.clone(); + metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + + // Increment the retry counter using the existing helper function + metadata = increment_key_in_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?; + + tracing::debug!( + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?, + "Incrementing verification retry attempt counter" + ); + + // Update job status and metadata + config + .database() + .update_job( + &job, + JobItemUpdates::new() + .update_status(JobStatus::PendingVerification) + .update_metadata(metadata) + .build(), + ) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + + // Add to verification queue with appropriate delay + add_job_to_verification_queue( + id, + &job.job_type, + Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config, + ) + .await + .map_err(|e| JobError::Other(OtherError(e)))?; + Ok(()) } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 1705eded..ce51cbcd 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use super::error::JobRouteError; use super::types::{ApiResponse, JobId, JobRouteResult}; use crate::config::Config; -use crate::jobs::{process_job, retry_job, verify_job}; +use crate::jobs::{retry_job, queue_job_for_processing, queue_job_for_verification}; use crate::metrics::ORCHESTRATOR_METRICS; /// Handles HTTP requests to process a job. @@ -40,16 +40,15 @@ async fn handle_process_job_request( ) -> JobRouteResult { let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - match process_job(job_id, config).await { + match queue_job_for_processing(job_id, config).await { Ok(_) => { - info!("Job processed successfully"); - ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]); - + info!("Job queued for processing successfully"); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]); Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { - error!(error = %e, "Failed to process job"); - ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "process_job")]); + error!(error = %e, "Failed to queue job for processing"); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]); Err(JobRouteError::ProcessingError(e.to_string())) } } @@ -57,11 +56,12 @@ async fn handle_process_job_request( /// Handles HTTP requests to verify a job's status. /// -/// This endpoint checks the current status and validity of a job. It performs: +/// This endpoint queues the job for verification by: /// 1. Validates and parses the job ID -/// 2. Verifies the job's current state -/// 3. Records metrics for the verification attempt -/// 4. Returns the verification result +/// 2. Adds the job to the verification queue +/// 3. Resets verification attempt counter +/// 4. Records metrics for the queue operation +/// 5. Returns immediate response /// /// # Arguments /// * `Path(JobId { id })` - The job ID extracted from the URL path @@ -72,7 +72,7 @@ async fn handle_process_job_request( /// /// # Errors /// * `JobRouteError::InvalidId` - If the provided ID is not a valid UUID -/// * `JobRouteError::ProcessingError` - If verification fails +/// * `JobRouteError::ProcessingError` - If queueing for verification fails #[instrument(skip(config), fields(job_id = %id))] async fn handle_verify_job_request( Path(JobId { id }): Path, @@ -80,16 +80,15 @@ async fn handle_verify_job_request( ) -> JobRouteResult { let job_id = Uuid::parse_str(&id).map_err(|_| JobRouteError::InvalidId(id.clone()))?; - match verify_job(job_id, config).await { + match queue_job_for_verification(job_id, config).await { Ok(_) => { - info!("Job verified successfully"); - ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]); - + info!("Job queued for verification successfully"); + ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]); Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { - error!(error = %e, "Failed to verify job"); - ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "verify_job")]); + error!(error = %e, "Failed to queue job for verification"); + ORCHESTRATOR_METRICS.failed_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]); Err(JobRouteError::ProcessingError(e.to_string())) } } From b317831e4aee74de4f70ade6ad4b198c864bb33a Mon Sep 17 00:00:00 2001 From: mohiiit Date: Thu, 9 Jan 2025 15:04:27 +0530 Subject: [PATCH 15/17] refactor: test updated for routes --- crates/orchestrator/src/jobs/mod.rs | 62 ++++++------- crates/orchestrator/src/routes/job_routes.rs | 6 +- .../src/tests/server/job_routes.rs | 92 ++++++++++++------- 3 files changed, 91 insertions(+), 69 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index acb0c7a6..c057e6c4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -22,7 +22,10 @@ use types::{ExternalId, JobItemUpdates}; use uuid::Uuid; use crate::config::Config; -use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY}; +use crate::jobs::constants::{ + JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, + JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, +}; #[double] use crate::jobs::job_handler_factory::factory; use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus}; @@ -639,10 +642,10 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { // Increment the process retry counter let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing process retry attempt counter" ); @@ -650,11 +653,8 @@ pub async fn retry_job(id: Uuid, config: Arc) -> Result<(), JobError> { config .database() .update_job( - &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingRetry) - .update_metadata(metadata) - .build() + &job, + JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(), ) .await .map_err(|e| { @@ -842,7 +842,7 @@ pub fn increment_key_in_metadata( /// # Notes /// * Returns 0 if the key doesn't exist in the metadata /// * Wraps parsing errors with additional context -fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { +pub fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color_eyre::Result { metadata .get(key) .unwrap_or(&"0".to_string()) @@ -864,34 +864,29 @@ fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> color #[tracing::instrument(skip(config), fields(category = "general"), ret, err)] pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; - + // Increment the process retry counter let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing process retry attempt counter" ); - + // Update job status and metadata to indicate it's queued for processing config .database() .update_job( - &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingRetry) - .update_metadata(metadata) - .build() + &job, + JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; // Add to process queue - add_job_to_process_queue(id, &job.job_type, config) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - + add_job_to_process_queue(id, &job.job_type, config).await.map_err(|e| JobError::Other(OtherError(e)))?; + Ok(()) } @@ -911,29 +906,26 @@ pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<( pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; let job_handler = factory::get_job_handler(&job.job_type).await; - + // Reset verification attempts and increment retry counter let mut metadata = job.metadata.clone(); metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); - + // Increment the retry counter using the existing helper function metadata = increment_key_in_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?; - + tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?, + job_id = ?id, + retry_count = get_u64_from_metadata(&metadata, JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY)?, "Incrementing verification retry attempt counter" ); - + // Update job status and metadata config .database() .update_job( &job, - JobItemUpdates::new() - .update_status(JobStatus::PendingVerification) - .update_metadata(metadata) - .build(), + JobItemUpdates::new().update_status(JobStatus::PendingVerification).update_metadata(metadata).build(), ) .await .map_err(|e| JobError::Other(OtherError(e)))?; @@ -947,7 +939,7 @@ pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result ) .await .map_err(|e| JobError::Other(OtherError(e)))?; - + Ok(()) } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index ce51cbcd..6503b498 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -11,7 +11,7 @@ use uuid::Uuid; use super::error::JobRouteError; use super::types::{ApiResponse, JobId, JobRouteResult}; use crate::config::Config; -use crate::jobs::{retry_job, queue_job_for_processing, queue_job_for_verification}; +use crate::jobs::{queue_job_for_processing, queue_job_for_verification, retry_job}; use crate::metrics::ORCHESTRATOR_METRICS; /// Handles HTTP requests to process a job. @@ -43,7 +43,9 @@ async fn handle_process_job_request( match queue_job_for_processing(job_id, config).await { Ok(_) => { info!("Job queued for processing successfully"); - ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_process")]); + ORCHESTRATOR_METRICS + .successful_job_operations + .add(1.0, &[KeyValue::new("operation_type", "queue_process")]); Ok(Json(ApiResponse::success()).into_response()) } Err(e) => { diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 2b513caf..8a64baf0 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -1,6 +1,8 @@ use core::panic; +use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use std::time::Duration; use chrono::{SubsecRound as _, Utc}; use hyper::{Body, Request}; @@ -13,9 +15,13 @@ use utils::env_utils::get_env_var_or_panic; use uuid::Uuid; use crate::config::Config; +use crate::jobs::constants::{ + JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY, + JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY, +}; use crate::jobs::job_handler_factory::mock_factory; -use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType, JobVerificationStatus}; -use crate::jobs::{Job, MockJob}; +use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; +use crate::jobs::{get_u64_from_metadata, Job, MockJob}; use crate::queue::init_consumers; use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType}; use crate::tests::config::{ConfigType, TestConfigBuilder}; @@ -46,23 +52,12 @@ async fn setup_trigger() -> (SocketAddr, Arc) { #[rstest] async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc)) { let (addr, config) = setup_trigger.await; - let job_type = JobType::DataSubmission; let job_item = build_job_item(job_type.clone(), JobStatus::Created, 1); - let mut job_handler = MockJob::new(); - - job_handler.expect_process_job().times(1).returning(move |_, _| Ok("0xbeef".to_string())); - config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; - job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - - let job_handler: Arc> = Arc::new(Box::new(job_handler)); - let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); - let client = hyper::Client::new(); let response = client .request( @@ -71,12 +66,23 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc)) { let (addr, config) = setup_trigger.await; - let job_type = JobType::DataSubmission; - let job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1); - let mut job_handler = MockJob::new(); + // Create a simple job without initial metadata + let mut job_item = build_job_item(job_type.clone(), JobStatus::PendingVerification, 1); - job_handler.expect_verify_job().times(1).returning(move |_, _| Ok(JobVerificationStatus::Verified)); + // Initialize metadata with verification counters + let mut metadata = HashMap::new(); + metadata.insert(JOB_VERIFICATION_RETRY_ATTEMPT_METADATA_KEY.to_string(), "0".to_string()); + metadata.insert(JOB_VERIFICATION_ATTEMPT_METADATA_KEY.to_string(), "10".to_string()); + job_item.metadata = metadata; config.database().create_job(job_item.clone()).await.unwrap(); let job_id = job_item.clone().id; + // Set up mock job handler + let mut job_handler = MockJob::new(); job_handler.expect_verification_polling_delay_seconds().return_const(1u64); - let job_handler: Arc> = Arc::new(Box::new(job_handler)); + let ctx = mock_factory::get_job_handler_context(); - ctx.expect().times(1).with(eq(job_type)).returning(move |_| Arc::clone(&job_handler)); + ctx.expect().with(eq(job_type.clone())).times(1).returning(move |_| Arc::clone(&job_handler)); let client = hyper::Client::new(); let response = client @@ -109,15 +120,29 @@ async fn test_trigger_verify_job(#[future] setup_trigger: (SocketAddr, Arc Date: Tue, 14 Jan 2025 18:46:01 +0530 Subject: [PATCH 16/17] chore: comment resolved --- crates/orchestrator/src/jobs/mod.rs | 34 ++++++------------- crates/orchestrator/src/routes/job_routes.rs | 6 ++-- crates/orchestrator/src/routes/types.rs | 4 +-- .../src/tests/server/job_routes.rs | 8 +---- 4 files changed, 16 insertions(+), 36 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index c057e6c4..48571852 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -451,8 +451,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::Span::current().record("internal_id", job.internal_id.clone()); match job.status { - // it's okay to retry the job if it's verificationTimeout, because we are just adding job again to the - // verification queue + // Jobs with `VerificationTimeout` will be retired manually after resetting verification attempt number to 0. JobStatus::PendingVerification | JobStatus::VerificationTimeout => { tracing::info!(job_id = ?id, status = ?job.status, "Proceeding with verification"); } @@ -865,27 +864,11 @@ pub fn get_u64_from_metadata(metadata: &HashMap, key: &str) -> c pub async fn queue_job_for_processing(id: Uuid, config: Arc) -> Result<(), JobError> { let job = get_job(id, config.clone()).await?; - // Increment the process retry counter - let metadata = increment_key_in_metadata(&job.metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?; - - tracing::debug!( - job_id = ?id, - retry_count = get_u64_from_metadata(&metadata, JOB_PROCESS_RETRY_ATTEMPT_METADATA_KEY)?, - "Incrementing process retry attempt counter" - ); - - // Update job status and metadata to indicate it's queued for processing - config - .database() - .update_job( - &job, - JobItemUpdates::new().update_status(JobStatus::PendingRetry).update_metadata(metadata).build(), - ) - .await - .map_err(|e| JobError::Other(OtherError(e)))?; - - // Add to process queue - add_job_to_process_queue(id, &job.job_type, config).await.map_err(|e| JobError::Other(OtherError(e)))?; + // Add to process queue directly + add_job_to_process_queue(id, &job.job_type, config).await.map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to add job to process queue"); + JobError::Other(OtherError(e)) + })?; Ok(()) } @@ -938,7 +921,10 @@ pub async fn queue_job_for_verification(id: Uuid, config: Arc) -> Result config, ) .await - .map_err(|e| JobError::Other(OtherError(e)))?; + .map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue"); + JobError::Other(OtherError(e)) + })?; Ok(()) } diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index 6503b498..f3517a24 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -46,7 +46,7 @@ async fn handle_process_job_request( ORCHESTRATOR_METRICS .successful_job_operations .add(1.0, &[KeyValue::new("operation_type", "queue_process")]); - Ok(Json(ApiResponse::success()).into_response()) + Ok(Json(ApiResponse::success(Some(format!("Job with id {} queued for processing", id)))).into_response()) } Err(e) => { error!(error = %e, "Failed to queue job for processing"); @@ -86,7 +86,7 @@ async fn handle_verify_job_request( Ok(_) => { info!("Job queued for verification successfully"); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &[KeyValue::new("operation_type", "queue_verify")]); - Ok(Json(ApiResponse::success()).into_response()) + Ok(Json(ApiResponse::success(Some(format!("Job with id {} queued for verification", id)))).into_response()) } Err(e) => { error!(error = %e, "Failed to queue job for verification"); @@ -129,7 +129,7 @@ async fn handle_retry_job_request( &[KeyValue::new("operation_type", "process_job"), KeyValue::new("operation_info", "retry_job")], ); - Ok(Json(ApiResponse::success()).into_response()) + Ok(Json(ApiResponse::success(Some(format!("Job with id {} retry initiated", id)))).into_response()) } Err(e) => { error!(error = %e, "Failed to retry job"); diff --git a/crates/orchestrator/src/routes/types.rs b/crates/orchestrator/src/routes/types.rs index fec1f286..aee1bd3d 100644 --- a/crates/orchestrator/src/routes/types.rs +++ b/crates/orchestrator/src/routes/types.rs @@ -59,8 +59,8 @@ impl ApiResponse { /// let response = ApiResponse::success(); /// assert_eq!(response.success, true); /// ``` - pub fn success() -> Self { - Self { success: true, message: None } + pub fn success(message: Option) -> Self { + Self { success: true, message } } /// Creates an error response with the specified message. diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 8a64baf0..5ef00f2f 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -77,12 +77,7 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc Date: Tue, 14 Jan 2025 18:52:50 +0530 Subject: [PATCH 17/17] refactor: testing messages of routes as well --- crates/orchestrator/src/routes/types.rs | 2 +- .../orchestrator/src/tests/server/job_routes.rs | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/crates/orchestrator/src/routes/types.rs b/crates/orchestrator/src/routes/types.rs index aee1bd3d..11679d53 100644 --- a/crates/orchestrator/src/routes/types.rs +++ b/crates/orchestrator/src/routes/types.rs @@ -40,7 +40,7 @@ pub struct JobId { /// assert_eq!(response.success, false); /// assert_eq!(response.message, Some("Invalid job ID".to_string())); /// ``` -#[derive(Serialize)] +#[derive(Serialize, Deserialize)] pub struct ApiResponse { /// Indicates if the operation was successful pub success: bool, diff --git a/crates/orchestrator/src/tests/server/job_routes.rs b/crates/orchestrator/src/tests/server/job_routes.rs index 5ef00f2f..e403f8bd 100644 --- a/crates/orchestrator/src/tests/server/job_routes.rs +++ b/crates/orchestrator/src/tests/server/job_routes.rs @@ -24,6 +24,7 @@ use crate::jobs::types::{ExternalId, JobItem, JobStatus, JobType}; use crate::jobs::{get_u64_from_metadata, Job, MockJob}; use crate::queue::init_consumers; use crate::queue::job_queue::{JobQueueMessage, QueueNameForJobType}; +use crate::routes::types::ApiResponse; use crate::tests::config::{ConfigType, TestConfigBuilder}; #[fixture] @@ -66,8 +67,12 @@ async fn test_trigger_process_job(#[future] setup_trigger: (SocketAddr, Arc