From 085d6bc49c6e62c8273b449a46f39a1cb10060b7 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 19 Dec 2024 14:16:24 +0530 Subject: [PATCH 1/3] update: added operation_job_status --- crates/orchestrator/src/jobs/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 2c791a27..cde5f82b 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -347,11 +347,12 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { let verification_status = job_handler.verify_job(config.clone(), &mut job).await?; tracing::Span::current().record("verification_status", format!("{:?}", &verification_status)); - let attributes = [ + let mut attributes = vec![ KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), KeyValue::new("operation_type", "verify_job"), KeyValue::new("operation_verification_status", format!("{:?}", &verification_status)), ]; + let mut operation_job_status: Option = None; match verification_status { JobVerificationStatus::Verified => { @@ -380,12 +381,13 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to Completed"); JobError::Other(OtherError(e)) })?; + operation_job_status = Some(JobStatus::Completed); } JobVerificationStatus::Rejected(e) => { tracing::warn!(job_id = ?id, error = ?e, "Job verification rejected"); - let mut new_job = job.clone(); - new_job.metadata.insert(JOB_METADATA_ERROR.to_string(), e); - new_job.status = JobStatus::VerificationFailed; + let mut new_job_metadata = job.metadata.clone(); + new_job_metadata.insert(JOB_METADATA_ERROR.to_string(), e); + operation_job_status = Some(JobStatus::VerificationFailed); let process_attempts = get_u64_from_metadata(&job.metadata, JOB_PROCESS_ATTEMPT_METADATA_KEY) .map_err(|e| JobError::Other(OtherError(e)))?; @@ -402,7 +404,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { &job, JobItemUpdates::new() .update_status(JobStatus::VerificationFailed) - .update_metadata(new_job.metadata) + .update_metadata(new_job_metadata) .build(), ) .await @@ -437,6 +439,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { tracing::error!(job_id = ?id, error = ?e, "Failed to update job status to VerificationTimeout"); JobError::Other(OtherError(e)) })?; + operation_job_status = Some(JobStatus::VerificationTimeout); return Ok(()); } let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; @@ -463,6 +466,8 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { } }; + attributes.push(KeyValue::new("operation_job_status", format!("{:?}", operation_job_status))); + tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); From cda15ddbb34fafa54908511918553e48a0dfee40 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 19 Dec 2024 15:14:30 +0530 Subject: [PATCH 2/3] update: added operation_job_status --- CHANGELOG.md | 2 +- crates/orchestrator/src/jobs/mod.rs | 49 ++++++++++++++++------------- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b89ef854..d4013b42 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,7 +81,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed -- refactor: instrumentations +- refactor: instrumentation - `is_worker_enabled` status check moved from `VerificationFailed` to `Failed` - refactor: static attributes for telemetry - refactor: aws setup for Event Bridge diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index cde5f82b..e57f23d0 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -440,33 +440,38 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { JobError::Other(OtherError(e)) })?; operation_job_status = Some(JobStatus::VerificationTimeout); - return Ok(()); - } - let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; + } else { + let metadata = increment_key_in_metadata(&job.metadata, JOB_VERIFICATION_ATTEMPT_METADATA_KEY)?; + + config + .database() + .update_job(&job, JobItemUpdates::new().update_metadata(metadata).build()) + .await + .map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata"); + JobError::Other(OtherError(e)) + })?; - config.database().update_job(&job, JobItemUpdates::new().update_metadata(metadata).build()).await.map_err( - |e| { - tracing::error!(job_id = ?id, error = ?e, "Failed to update job metadata"); + tracing::debug!(job_id = ?id, "Adding job back to verification queue"); + add_job_to_verification_queue( + job.id, + &job.job_type, + Duration::from_secs(job_handler.verification_polling_delay_seconds()), + config.clone(), + ) + .await + .map_err(|e| { + tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue"); JobError::Other(OtherError(e)) - }, - )?; - - tracing::debug!(job_id = ?id, "Adding job back to verification queue"); - add_job_to_verification_queue( - job.id, - &job.job_type, - Duration::from_secs(job_handler.verification_polling_delay_seconds()), - config.clone(), - ) - .await - .map_err(|e| { - tracing::error!(job_id = ?id, error = ?e, "Failed to add job to verification queue"); - JobError::Other(OtherError(e)) - })?; + })?; + } } }; - attributes.push(KeyValue::new("operation_job_status", format!("{:?}", operation_job_status))); + attributes.push(KeyValue::new( + "operation_job_status", + format!("{}", operation_job_status.expect("operation_job_status not found")), + )); tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); let duration = start.elapsed(); From dbed2f420a2fb4ad6e8b2a9cda41d8a2960bece2 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Thu, 19 Dec 2024 17:30:07 +0530 Subject: [PATCH 3/3] update: added operation_job_status --- crates/orchestrator/src/jobs/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e57f23d0..181d64f4 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -468,10 +468,9 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { } }; - attributes.push(KeyValue::new( - "operation_job_status", - format!("{}", operation_job_status.expect("operation_job_status not found")), - )); + if let Some(job_status) = operation_job_status { + attributes.push(KeyValue::new("operation_job_status", format!("{}", job_status))); + } tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); let duration = start.elapsed();