From d7ae80996892dfc63316211ed85631fe3742fcf7 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 27 Dec 2024 16:37:05 +0530 Subject: [PATCH 1/4] update: fixed block_state collection --- crates/orchestrator/src/jobs/mod.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 181d64f4..6ea5adb7 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -297,7 +297,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> JobError::Other(OtherError(e)) })?; - let attributes = [ + let attributes = vec![ KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), KeyValue::new("operation_type", "process_job"), ]; @@ -305,8 +305,8 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block"); let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); + register_block_gauge(&job, &attributes)?; Ok(()) } @@ -476,7 +476,7 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { let duration = start.elapsed(); ORCHESTRATOR_METRICS.successful_job_operations.add(1.0, &attributes); ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); + register_block_gauge(&job, &attributes)?; Ok(()) } @@ -497,6 +497,18 @@ pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), Job .await } +fn register_block_gauge(job: &JobItem, attributes: &Vec) -> Result<(), JobError> { + let block_number = if let JobType::StateTransition = job.job_type { + parse_string(job.external_id.unwrap_string() + .map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?) + } else { + parse_string(&job.internal_id) + }?; + + ORCHESTRATOR_METRICS.block_gauge.record(block_number, attributes); + Ok(()) +} + async fn move_job_to_failed(job: &JobItem, config: Arc, reason: String) -> Result<(), JobError> { if job.status == JobStatus::Completed { tracing::error!(job_id = ?job.id, job_status = ?job.status, "Invalid state exists on DL queue"); From a677ac4be7c350e3536dda860e425ac1a881b5bd Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 27 Dec 2024 16:37:57 +0530 Subject: [PATCH 2/4] change: changelog --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e8b0acf..141bd53d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,7 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed -- refactor: instrumentation +- refactor: instrumentations - `is_worker_enabled` status check moved from `VerificationFailed` to `Failed` - refactor: static attributes for telemetry - refactor: aws setup for Event Bridge From fd4d688dd62e419525c6ddb9cf5ebd0639922c30 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 27 Dec 2024 16:46:11 +0530 Subject: [PATCH 3/4] fix: lint --- crates/orchestrator/src/jobs/mod.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index 6ea5adb7..ebde3b8e 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -297,7 +297,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> JobError::Other(OtherError(e)) })?; - let attributes = vec![ + let attributes = vec![ KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), KeyValue::new("operation_type", "process_job"), ]; @@ -499,8 +499,11 @@ pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), Job fn register_block_gauge(job: &JobItem, attributes: &Vec) -> Result<(), JobError> { let block_number = if let JobType::StateTransition = job.job_type { - parse_string(job.external_id.unwrap_string() - .map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?) + parse_string( + job.external_id + .unwrap_string() + .map_err(|e| JobError::Other(OtherError::from(format!("Could not parse string: {e}"))))?, + ) } else { parse_string(&job.internal_id) }?; From 5e647f3812b921bf667d3719dc019d45e2384f63 Mon Sep 17 00:00:00 2001 From: Heemank Verma Date: Fri, 27 Dec 2024 18:20:19 +0530 Subject: [PATCH 4/4] update: lint fixes --- crates/orchestrator/src/jobs/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index ebde3b8e..e3e7dfde 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -497,7 +497,7 @@ pub async fn handle_job_failure(id: Uuid, config: Arc) -> Result<(), Job .await } -fn register_block_gauge(job: &JobItem, attributes: &Vec) -> Result<(), JobError> { +fn register_block_gauge(job: &JobItem, attributes: &[KeyValue]) -> Result<(), JobError> { let block_number = if let JobType::StateTransition = job.job_type { parse_string( job.external_id