diff --git a/.env.test b/.env.test index beba0eb7..73d1d6da 100644 --- a/.env.test +++ b/.env.test @@ -4,12 +4,18 @@ AWS_ACCESS_KEY_ID=AWS_ACCESS_KEY_ID AWS_SECRET_ACCESS_KEY=AWS_SECRET_ACCESS_KEY AWS_REGION=us-east-1 - # For Aws sdk AWS_ENDPOINT_URL=http://localhost.localstack.cloud:4566 # For Omniqueue AWS_DEFAULT_REGION=localhost +# For EventBridge + +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME=madara-orchestrator-worker-trigger +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_ROLE_NAME=madara-orchestrator-worker-trigger-role +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_POLICY_NAME=madara-orchestrator-worker-trigger-policy + + #### ALERTS #### @@ -46,7 +52,7 @@ MADARA_ORCHESTRATOR_GPS_VERIFIER_CONTRACT_ADDRESS=0x07ec0D28e50322Eb0C159B9090ec ## ATLANTIC ## -MADARA_ORCHESTRATOR_ATLANTIC_API_KEY="73307f1b-6464-412d-8cce-9473b5073cc2" +MADARA_ORCHESTRATOR_ATLANTIC_API_KEY="API-KEY" MADARA_ORCHESTRATOR_ATLANTIC_SERVICE_URL="https://atlantic.api.herodotus.cloud" MADARA_ORCHESTRATOR_ATLANTIC_MOCK_FACT_HASH="false" # Whether to use mock fact registry MADARA_ORCHESTRATOR_ATLANTIC_PROVER_TYPE="herodotus" # ("herodotus" | "starkware") @@ -59,6 +65,7 @@ MADARA_ORCHESTRATOR_ATLANTIC_RPC_NODE_URL=http://127.0.0.1:8545 MADARA_ORCHESTRATOR_SQS_PREFIX=madara_orchestrator MADARA_ORCHESTRATOR_SQS_SUFFIX=queue +MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME=madara_orchestrator_worker_trigger_queue MADARA_ORCHESTRATOR_SQS_BASE_QUEUE_URL=http://sqs.us-east-1.localhost.localstack.cloud:4566/000000000000 #### SETTLEMENT #### @@ -92,7 +99,6 @@ MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME=madara-orchestrator-test-bucket MADARA_ORCHESTRATOR_OTEL_SERVICE_NAME=orchestrator - #### SERVER #### MADARA_ORCHESTRATOR_HOST=127.0.0.1 diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fe8aaac..168dac2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -80,6 +80,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## Fixed +- refactor: aws setup for Event Bridge - refactor: RUST_LOG filtering support - refactor: cargo.toml files cleaned - blob data formation process from state update diff --git a/Cargo.lock b/Cargo.lock index 97353e35..6c1dd252 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1779,7 +1779,7 @@ dependencies = [ "alloy 0.2.1", "async-trait", "base64 0.22.1", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "chrono", "color-eyre", "dotenvy", @@ -1910,15 +1910,16 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.4.0" +version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f42c2d4218de4dcd890a109461e2f799a1a2ba3bcd2cde9af88360f5df9266c6" +checksum = "a10d5c055aa540164d9561a0e2e74ad30f0dcf7393c3a92f6733ddf9c5762468" dependencies = [ "aws-credential-types", "aws-sigv4", "aws-smithy-async", "aws-smithy-eventstream", "aws-smithy-http", + "aws-smithy-runtime", "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", @@ -1955,6 +1956,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-sdk-iam" +version = "1.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94a8c7a2bff782d1eecd80f6d56547aae572c7b46a3f7253dd3dcd1e7fce27ed" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-query", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-smithy-xml", + "aws-types", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-s3" version = "1.38.0" @@ -1990,6 +2014,29 @@ dependencies = [ "url", ] +[[package]] +name = "aws-sdk-scheduler" +version = "1.49.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9c5c5a4a8389740229ff10f69962b3955aaa63ae2f297a4a366d563b4133591" +dependencies = [ + "aws-credential-types", + "aws-runtime", + "aws-smithy-async", + "aws-smithy-http", + "aws-smithy-json", + "aws-smithy-runtime", + "aws-smithy-runtime-api", + "aws-smithy-types", + "aws-types", + "bytes", + "fastrand 2.1.1", + "http 0.2.12", + "once_cell", + "regex-lite", + "tracing", +] + [[package]] name = "aws-sdk-sns" version = "1.40.0" @@ -2104,9 +2151,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.3" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5df1b0fa6be58efe9d4ccc257df0a53b89cd8909e86591a13ca54817c87517be" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" dependencies = [ "aws-credential-types", "aws-smithy-eventstream", @@ -2165,9 +2212,9 @@ dependencies = [ [[package]] name = "aws-smithy-eventstream" -version = "0.60.4" +version = "0.60.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858" +checksum = "cef7d0a272725f87e51ba2bf89f8c21e4df61b9e49ae1ac367a6d69916ef7c90" dependencies = [ "aws-smithy-types", "bytes", @@ -2176,9 +2223,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.9" +version = "0.60.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9cd0ae3d97daa0a2bf377a4d8e8e1362cae590c4a1aad0d40058ebca18eb91e" +checksum = "5c8bc3e8fdc6b8d07d976e301c02fe553f72a39b7a9fea820e023268467d7ab6" dependencies = [ "aws-smithy-eventstream", "aws-smithy-runtime-api", @@ -2216,9 +2263,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.6.3" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0abbf454960d0db2ad12684a1640120e7557294b0ff8e2f11236290a1b293225" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -2243,9 +2290,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -2260,9 +2307,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.4" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "273dcdfd762fae3e1650b8024624e7cd50e484e37abdab73a7a706188ad34543" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" dependencies = [ "base64-simd", "bytes", @@ -2286,9 +2333,9 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.60.8" +version = "0.60.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d123fbc2a4adc3c301652ba8e149bf4bc1d1725affb9784eb20c953ace06bf55" +checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc" dependencies = [ "xmlparser", ] @@ -3378,7 +3425,7 @@ dependencies = [ [[package]] name = "cairo-type-derive" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "proc-macro2", "quote", @@ -3419,7 +3466,7 @@ dependencies = [ [[package]] name = "cairo-vm" version = "1.0.1" -source = "git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization#6036ab9533e1754f1612fbea9c9d2a27063ca5f8" +source = "git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04#1fa902bafae507424b5ea83a625830ffe6b0eca5" dependencies = [ "anyhow", "ark-ff 0.4.2", @@ -4516,7 +4563,7 @@ dependencies = [ "async-trait", "bytes", "c-kzg", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "color-eyre", "dotenvy", "lazy_static", @@ -5074,7 +5121,7 @@ dependencies = [ "aws-config", "aws-sdk-s3", "bincode 1.3.3", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "dotenvy", "itertools 0.13.0", "log", @@ -6880,7 +6927,9 @@ dependencies = [ "aws-config", "aws-credential-types", "aws-sdk-eventbridge", + "aws-sdk-iam", "aws-sdk-s3", + "aws-sdk-scheduler", "aws-sdk-sns", "aws-sdk-sqs", "axum 0.7.5", @@ -6888,7 +6937,7 @@ dependencies = [ "bincode 1.3.3", "bytes", "c-kzg", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "chrono", "clap", "color-eyre", @@ -7677,13 +7726,13 @@ dependencies = [ [[package]] name = "prove_block" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "anyhow", "blockifier", "cairo-lang-starknet-classes", "cairo-lang-utils", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "clap", "env_logger", "futures-core", @@ -7712,7 +7761,7 @@ name = "prover-client-interface" version = "0.1.0" dependencies = [ "async-trait", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "gps-fact-checker", "mockall", "starknet-os", @@ -8140,7 +8189,7 @@ dependencies = [ [[package]] name = "rpc-client" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "log", "reqwest 0.11.27", @@ -8155,7 +8204,7 @@ dependencies = [ [[package]] name = "rpc-replay" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "blockifier", "cairo-lang-starknet-classes", @@ -9086,7 +9135,7 @@ dependencies = [ "alloy 0.2.1", "async-trait", "base64 0.22.1", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "color-eyre", "dotenvy", "gps-fact-checker", @@ -9518,7 +9567,7 @@ dependencies = [ [[package]] name = "starknet-os" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "anyhow", "ark-ec", @@ -9533,7 +9582,7 @@ dependencies = [ "cairo-lang-starknet", "cairo-lang-starknet-classes", "cairo-type-derive", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "futures", "futures-util", "heck 0.4.1", @@ -9567,11 +9616,11 @@ dependencies = [ [[package]] name = "starknet-os-types" version = "0.1.0" -source = "git+https://github.com/Mohiiit/snos?rev=2dcd3b0f0c6bfa368a802de12de29cd124c72bb8#2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" +source = "git+https://github.com/keep-starknet-strange/snos?branch=tmp%2Fsnos_devnet_zero_txs#2c472815ebebca927caa5382c3bb8e3c1bbf0d86" dependencies = [ "blockifier", "cairo-lang-starknet-classes", - "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=herman%2Ffix-pie-serialization)", + "cairo-vm 1.0.1 (git+https://github.com/Moonsong-Labs/cairo-vm?branch=notlesh%2Fsnos-2024-11-04)", "flate2", "num-bigint", "serde", diff --git a/Cargo.toml b/Cargo.toml index bf817cbe..ea9ac4f8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -41,6 +41,8 @@ aws-sdk-s3 = { version = "1.38.0", features = ["behavior-version-latest"] } aws-sdk-eventbridge = { version = "1.41.0", features = [ "behavior-version-latest", ] } +aws-sdk-iam = "1.52.0" +aws-sdk-scheduler = "1.49.0" aws-sdk-sns = { version = "1.40.0", features = ["behavior-version-latest"] } aws-credential-types = { version = "1.2.1", features = [ "hardcoded-credentials", @@ -122,17 +124,17 @@ tracing-subscriber = { version = "0.3.18", features = [ tracing-opentelemetry = "0.26.0" # Cairo VM -cairo-vm = { git = "https://github.com/Moonsong-Labs/cairo-vm", branch = "herman/fix-pie-serialization", features = [ +cairo-vm = { git = "https://github.com/Moonsong-Labs/cairo-vm", branch = "notlesh/snos-2024-11-04", features = [ "cairo-1-hints", "extensive_hints", "mod_builtin", ] } + # Snos & Sharp (Starkware) # TODO: need to update this once the updated PR merges -starknet-os = { git = "https://github.com/Mohiiit/snos", rev = "2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" } -prove_block = { git = "https://github.com/Mohiiit/snos", rev = "2dcd3b0f0c6bfa368a802de12de29cd124c72bb8" } - +starknet-os = { git = "https://github.com/keep-starknet-strange/snos", branch = "tmp/snos_devnet_zero_txs" } +prove_block = { git = "https://github.com/keep-starknet-strange/snos", branch = "tmp/snos_devnet_zero_txs" } # Madara prover API madara-prover-common = { git = "https://github.com/Moonsong-Labs/madara-prover-api", branch = "od/use-latest-cairo-vm" } diff --git a/crates/orchestrator/Cargo.toml b/crates/orchestrator/Cargo.toml index c4ed2c47..e52076a0 100644 --- a/crates/orchestrator/Cargo.toml +++ b/crates/orchestrator/Cargo.toml @@ -74,6 +74,9 @@ url = { workspace = true } utils = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } +aws-sdk-iam = { workspace = true } +aws-sdk-scheduler = { workspace = true } + #Instrumentation opentelemetry = { workspace = true, features = ["metrics", "logs"] } opentelemetry-appender-tracing = { workspace = true, default-features = false } diff --git a/crates/orchestrator/src/cli/cron/event_bridge.rs b/crates/orchestrator/src/cli/cron/event_bridge.rs index 923d3736..e9e83c25 100644 --- a/crates/orchestrator/src/cli/cron/event_bridge.rs +++ b/crates/orchestrator/src/cli/cron/event_bridge.rs @@ -16,6 +16,14 @@ pub struct AWSEventBridgeCliArgs { pub cron_time: Option, /// The name of the event bridge trigger rule. - #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-event-bridge-trigger-rule-name"), help = "The name of the event bridge trigger rule.")] + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME", long, default_value = Some("madara-orchestrator-worker-trigger"), help = "The name of the event bridge trigger rule.")] pub trigger_rule_name: Option, + + /// The name of the queue for the event bridge + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_ROLE_NAME", long, default_value = Some("madara-orchestrator-worker-trigger-role"), help = "The name of the Trigger Role to assign to the event bridge")] + pub trigger_role_name: Option, + + /// The name of the queue for the event bridge + #[arg(env = "MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_POLICY_NAME", long, default_value = Some("madara-orchestrator-worker-trigger-policy"), help = "The name of the Trigger Policy to assign to the event bridge")] + pub trigger_policy_name: Option, } diff --git a/crates/orchestrator/src/cli/mod.rs b/crates/orchestrator/src/cli/mod.rs index 61f2b328..b17d9369 100644 --- a/crates/orchestrator/src/cli/mod.rs +++ b/crates/orchestrator/src/cli/mod.rs @@ -89,7 +89,7 @@ pub enum Commands { ), group( ArgGroup::new("prover") - .args(&["sharp"]) + .args(&["sharp", "atlantic"]) .required(true) .multiple(false) ), @@ -421,6 +421,16 @@ pub mod validate_params { .trigger_rule_name .clone() .expect("Trigger rule name is required"), + + trigger_role_name: aws_event_bridge_args + .trigger_role_name + .clone() + .expect("Trigger role name is required"), + + trigger_policy_name: aws_event_bridge_args + .trigger_policy_name + .clone() + .expect("Trigger policy name is required"), })) } else { Err("Only AWS Event Bridge is supported as of now".to_string()) @@ -709,7 +719,11 @@ pub mod validate_params { #[case(false, true)] #[case(false, false)] fn test_validate_storage_params(#[case] is_aws: bool, #[case] is_s3: bool) { - let aws_s3_args: AWSS3CliArgs = AWSS3CliArgs { aws_s3: is_s3, bucket_name: Some("".to_string()) }; + let aws_s3_args: AWSS3CliArgs = AWSS3CliArgs { + aws_s3: is_s3, + bucket_name: Some("".to_string()), + bucket_location_constraint: Some("".to_string()), + }; let aws_config_args: AWSConfigCliArgs = AWSConfigCliArgs { aws: is_aws, aws_access_key_id: "".to_string(), @@ -831,6 +845,8 @@ pub mod validate_params { target_queue_name: Some(String::from("test")), cron_time: Some(String::from("12")), trigger_rule_name: Some(String::from("test")), + trigger_role_name: Some(String::from("test-role")), + trigger_policy_name: Some(String::from("test-policy")), }; let aws_config_args: AWSConfigCliArgs = AWSConfigCliArgs { aws: is_aws, diff --git a/crates/orchestrator/src/cli/storage/aws_s3.rs b/crates/orchestrator/src/cli/storage/aws_s3.rs index d8da0692..75be1811 100644 --- a/crates/orchestrator/src/cli/storage/aws_s3.rs +++ b/crates/orchestrator/src/cli/storage/aws_s3.rs @@ -11,4 +11,8 @@ pub struct AWSS3CliArgs { /// The name of the S3 bucket. #[arg(env = "MADARA_ORCHESTRATOR_AWS_S3_BUCKET_NAME", long, default_value = Some("madara-orchestrator-bucket"))] pub bucket_name: Option, + + /// The S3 Bucket Location Constraint. + #[arg(env = "MADARA_ORCHESTRATOR_AWS_BUCKET_LOCATION_CONSTRAINT", long)] + pub bucket_location_constraint: Option, } diff --git a/crates/orchestrator/src/cron/event_bridge.rs b/crates/orchestrator/src/cron/event_bridge.rs index b8b5c6e3..112665f0 100644 --- a/crates/orchestrator/src/cron/event_bridge.rs +++ b/crates/orchestrator/src/cron/event_bridge.rs @@ -1,27 +1,36 @@ use std::time::Duration; +use async_std::task::sleep; use async_trait::async_trait; use aws_config::SdkConfig; -use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; -use aws_sdk_eventbridge::Client as EventBridgeClient; +use aws_sdk_scheduler::types::{FlexibleTimeWindow, FlexibleTimeWindowMode, Target}; +use aws_sdk_scheduler::Client as SchedulerClient; use aws_sdk_sqs::types::QueueAttributeName; use aws_sdk_sqs::Client as SqsClient; +use color_eyre::eyre::Ok; +use super::{get_worker_trigger_message, TriggerArns}; use crate::cron::Cron; +use crate::queue::job_queue::WorkerTriggerType; #[derive(Clone, Debug)] pub struct AWSEventBridgeValidatedArgs { pub target_queue_name: String, pub cron_time: Duration, pub trigger_rule_name: String, + pub trigger_role_name: String, + pub trigger_policy_name: String, } pub struct AWSEventBridge { target_queue_name: String, cron_time: Duration, trigger_rule_name: String, - client: EventBridgeClient, + client: SchedulerClient, queue_client: SqsClient, + iam_client: aws_sdk_iam::Client, + trigger_role_name: String, + trigger_policy_name: String, } impl AWSEventBridge { @@ -30,8 +39,11 @@ impl AWSEventBridge { target_queue_name: params.target_queue_name.clone(), cron_time: params.cron_time, trigger_rule_name: params.trigger_rule_name.clone(), - client: aws_sdk_eventbridge::Client::new(aws_config), + client: aws_sdk_scheduler::Client::new(aws_config), queue_client: aws_sdk_sqs::Client::new(aws_config), + iam_client: aws_sdk_iam::Client::new(aws_config), + trigger_role_name: params.trigger_role_name.clone(), + trigger_policy_name: params.trigger_policy_name.clone(), } } } @@ -39,18 +51,8 @@ impl AWSEventBridge { #[async_trait] #[allow(unreachable_patterns)] impl Cron for AWSEventBridge { - async fn create_cron(&self) -> color_eyre::Result<()> { - self.client - .put_rule() - .name(&self.trigger_rule_name) - .schedule_expression(duration_to_rate_string(self.cron_time)) - .state(RuleState::Enabled) - .send() - .await?; - - Ok(()) - } - async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()> { + async fn create_cron(&self) -> color_eyre::Result { + // Get Queue Info let queue_url = self.queue_client.get_queue_url().queue_name(&self.target_queue_name).send().await?; let queue_attributes = self @@ -62,20 +64,87 @@ impl Cron for AWSEventBridge { .await?; let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap(); - // Create the EventBridge target with the input transformer - let input_transformer = - InputTransformer::builder().input_paths_map("$.time", "time").input_template(message).build()?; + // Create IAM role for EventBridge + let role_name = format!("{}-{}", self.trigger_role_name, uuid::Uuid::new_v4()); + let assume_role_policy = r#"{ + "Version": "2012-10-17", + "Statement": [{ + "Effect": "Allow", + "Principal": { + "Service": "scheduler.amazonaws.com" + }, + "Action": "sts:AssumeRole" + }] + }"#; + + let create_role_resp = self + .iam_client + .create_role() + .role_name(&role_name) + .assume_role_policy_document(assume_role_policy) + .send() + .await?; + + let role_arn = create_role_resp.role().unwrap().arn(); + + // Create policy document for SQS access + let policy_document = format!( + r#"{{ + "Version": "2012-10-17", + "Statement": [{{ + "Effect": "Allow", + "Action": [ + "sqs:SendMessage" + ], + "Resource": "{}" + }}] + }}"#, + queue_arn + ); + + let policy_name = format!("{}-{}", self.trigger_policy_name, uuid::Uuid::new_v4()); + // Create and attach the policy + let policy_resp = + self.iam_client.create_policy().policy_name(&policy_name).policy_document(&policy_document).send().await?; + + let policy_arn = policy_resp.policy().unwrap().arn().unwrap().to_string(); + + // Attach the policy to the role + self.iam_client.attach_role_policy().role_name(&role_name).policy_arn(&policy_arn).send().await?; + + sleep(Duration::from_secs(60)).await; + + Ok(TriggerArns { queue_arn: queue_arn.to_string(), role_arn: role_arn.to_string() }) + } + + async fn add_cron_target_queue( + &self, + trigger_type: &WorkerTriggerType, + trigger_arns: &TriggerArns, + ) -> color_eyre::Result<()> { + let trigger_name = format!("{}-{}", self.trigger_rule_name, trigger_type); + + // Set flexible time window (you can adjust this as needed) + let flexible_time_window = FlexibleTimeWindow::builder().mode(FlexibleTimeWindowMode::Off).build()?; + + let message = get_worker_trigger_message(trigger_type.clone())?; + + // Create target for SQS queue + let target = Target::builder() + .arn(trigger_arns.queue_arn.clone()) + .role_arn(trigger_arns.role_arn.clone()) + .input(message) + .build()?; + + // Create the schedule self.client - .put_targets() - .rule(&self.trigger_rule_name) - .targets( - Target::builder() - .id(uuid::Uuid::new_v4().to_string()) - .arn(queue_arn) - .input_transformer(input_transformer) - .build()?, - ) + .create_schedule() + .name(trigger_name) + .schedule_expression_timezone("UTC") + .flexible_time_window(flexible_time_window) + .schedule_expression(duration_to_rate_string(self.cron_time)) + .target(target) .send() .await?; diff --git a/crates/orchestrator/src/cron/mod.rs b/crates/orchestrator/src/cron/mod.rs index 5f1ed51a..4673cea4 100644 --- a/crates/orchestrator/src/cron/mod.rs +++ b/crates/orchestrator/src/cron/mod.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; use lazy_static::lazy_static; -use crate::queue::job_queue::{WorkerTriggerMessage, WorkerTriggerType}; +use crate::queue::job_queue::WorkerTriggerType; pub mod event_bridge; @@ -14,20 +14,28 @@ lazy_static! { ]; } +#[derive(Debug, Clone)] +pub struct TriggerArns { + queue_arn: String, + role_arn: String, +} #[async_trait] pub trait Cron { - async fn create_cron(&self) -> color_eyre::Result<()>; - async fn add_cron_target_queue(&self, message: String) -> color_eyre::Result<()>; + async fn create_cron(&self) -> color_eyre::Result; + async fn add_cron_target_queue( + &self, + trigger_type: &WorkerTriggerType, + trigger_arns: &TriggerArns, + ) -> color_eyre::Result<()>; async fn setup(&self) -> color_eyre::Result<()> { - self.create_cron().await?; - for triggers in WORKER_TRIGGERS.iter() { - self.add_cron_target_queue(get_worker_trigger_message(triggers.clone())?).await?; + let trigger_arns = self.create_cron().await?; + for trigger in WORKER_TRIGGERS.iter() { + self.add_cron_target_queue(trigger, &trigger_arns).await?; } Ok(()) } } -fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result { - let message = WorkerTriggerMessage { worker: worker_trigger_type }; - Ok(serde_json::to_string(&message)?) +pub fn get_worker_trigger_message(worker_trigger_type: WorkerTriggerType) -> color_eyre::Result { + Ok(worker_trigger_type.to_string()) } diff --git a/crates/orchestrator/src/data_storage/aws_s3/mod.rs b/crates/orchestrator/src/data_storage/aws_s3/mod.rs index 0ca6a3f9..6eeda7b0 100644 --- a/crates/orchestrator/src/data_storage/aws_s3/mod.rs +++ b/crates/orchestrator/src/data_storage/aws_s3/mod.rs @@ -1,6 +1,9 @@ +use std::str::FromStr; + use async_trait::async_trait; use aws_config::SdkConfig; use aws_sdk_s3::primitives::ByteStream; +use aws_sdk_s3::types::{BucketLocationConstraint, CreateBucketConfiguration}; use aws_sdk_s3::Client; use bytes::Bytes; use color_eyre::Result; @@ -18,6 +21,7 @@ pub struct AWSS3ValidatedArgs { pub struct AWSS3 { client: Client, bucket: String, + bucket_location_constraint: BucketLocationConstraint, } /// Implementation for AWS S3 client. Contains the function for : @@ -31,7 +35,11 @@ impl AWSS3 { // this is necessary for it to work with localstack in test cases s3_config_builder.set_force_path_style(Some(true)); let client = Client::from_conf(s3_config_builder.build()); - Self { client, bucket: s3_config.bucket_name.clone() } + + let region_str = aws_config.region().expect("Could not get region as string").to_string(); + let location_constraint: BucketLocationConstraint = BucketLocationConstraint::from_str(region_str.as_str()) + .expect("Could not get location constraint from region string"); + Self { client, bucket: s3_config.bucket_name.clone(), bucket_location_constraint: location_constraint } } } @@ -79,7 +87,22 @@ impl DataStorage for AWSS3 { } async fn create_bucket(&self, bucket_name: &str) -> Result<()> { - self.client.create_bucket().bucket(bucket_name).send().await?; + if self.bucket_location_constraint.as_str() == "us-east-1" { + self.client.create_bucket().bucket(bucket_name).send().await?; + return Ok(()); + } + + let bucket_configuration = Some( + CreateBucketConfiguration::builder().location_constraint(self.bucket_location_constraint.clone()).build(), + ); + + self.client + .create_bucket() + .bucket(bucket_name) + .set_create_bucket_configuration(bucket_configuration) + .send() + .await?; + Ok(()) } } diff --git a/crates/orchestrator/src/database/mongodb/mod.rs b/crates/orchestrator/src/database/mongodb/mod.rs index 4f18eae8..1c426017 100644 --- a/crates/orchestrator/src/database/mongodb/mod.rs +++ b/crates/orchestrator/src/database/mongodb/mod.rs @@ -1,3 +1,5 @@ +use std::time::Instant; + use async_std::stream::StreamExt; use async_trait::async_trait; use chrono::{SubsecRound, Utc}; @@ -10,6 +12,7 @@ use mongodb::options::{ UpdateOptions, }; use mongodb::{bson, Client, Collection}; +use opentelemetry::KeyValue; use url::Url; use utils::ToDocument; use uuid::Uuid; @@ -17,6 +20,7 @@ use uuid::Uuid; use crate::database::Database; use crate::jobs::types::{JobItem, JobItemUpdates, JobStatus, JobType}; use crate::jobs::JobError; +use crate::metrics::ORCHESTRATOR_METRICS; mod utils; @@ -67,6 +71,7 @@ impl MongoDb { impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn create_job(&self, job: JobItem) -> Result { + let start = Instant::now(); let options = UpdateOptions::builder().upsert(true).build(); let updates = job.to_document().map_err(|e| JobError::Other(e.into()))?; @@ -93,6 +98,12 @@ impl Database for MongoDb { .map_err(|e| JobError::Other(e.to_string().into()))?; if result.matched_count == 0 { + let attributes = [ + KeyValue::new("db_operation_name", "create_job"), + KeyValue::new("db_operation_job", format!("{:?}", job)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(job) } else { Err(JobError::JobAlreadyExists { internal_id: job.internal_id, job_type: job.job_type }) @@ -101,25 +112,40 @@ impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_job_by_id(&self, id: Uuid) -> Result> { + let start = Instant::now(); let filter = doc! { "id": id }; tracing::debug!(job_id = %id, category = "db_call", "Fetched job by ID"); + let attributes = [ + KeyValue::new("db_operation_name", "get_job_by_id"), + KeyValue::new("db_operation_id", format!("{:?}", id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, None).await?) } #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_job_by_internal_id_and_type(&self, internal_id: &str, job_type: &JobType) -> Result> { + let start = Instant::now(); let filter = doc! { "internal_id": internal_id, "job_type": mongodb::bson::to_bson(&job_type)?, }; tracing::debug!(internal_id = %internal_id, job_type = ?job_type, category = "db_call", "Fetched job by internal ID and type"); + let attributes = [ + KeyValue::new("db_operation_name", "get_job_by_internal_id_and_type"), + KeyValue::new("db_operation_id", format!("{:?}", internal_id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, None).await?) } #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn update_job(&self, current_job: &JobItem, updates: JobItemUpdates) -> Result { + let start = Instant::now(); // Filters to search for the job let filter = doc! { "id": current_job.id, @@ -154,6 +180,12 @@ impl Database for MongoDb { match result { Some(job) => { tracing::debug!(job_id = %current_job.id, category = "db_call", "Job updated successfully"); + let attributes = [ + KeyValue::new("db_operation_name", "update_job"), + KeyValue::new("db_operation_id", format!("{:?}", current_job.id)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(job) } None => { @@ -165,6 +197,7 @@ impl Database for MongoDb { #[tracing::instrument(skip(self), fields(function_type = "db_call"), ret, err)] async fn get_latest_job_by_type(&self, job_type: JobType) -> Result> { + let start = Instant::now(); let pipeline = vec![ doc! { "$match": { @@ -199,6 +232,12 @@ impl Database for MongoDb { match cursor.try_next().await? { Some(doc) => { let job: JobItem = mongodb::bson::from_document(doc)?; + let attributes = [ + KeyValue::new("db_operation_name", "get_latest_job_by_type"), + KeyValue::new("db_operation_job_type", format!("{:?}", job_type)), + ]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(Some(job)) } None => Ok(None), @@ -233,6 +272,7 @@ impl Database for MongoDb { job_a_status: JobStatus, job_b_type: JobType, ) -> Result> { + let start = Instant::now(); // Convert enums to Bson strings let job_a_type_bson = Bson::String(format!("{:?}", job_a_type)); let job_a_status_bson = Bson::String(format!("{:?}", job_a_status)); @@ -323,6 +363,9 @@ impl Database for MongoDb { } tracing::debug!(job_count = vec_jobs.len(), category = "db_call", "Retrieved jobs without successor"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_without_successor")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(vec_jobs) } @@ -332,6 +375,7 @@ impl Database for MongoDb { job_type: JobType, job_status: JobStatus, ) -> Result> { + let start = Instant::now(); let filter = doc! { "job_type": bson::to_bson(&job_type)?, "status": bson::to_bson(&job_status)? @@ -339,6 +383,9 @@ impl Database for MongoDb { let find_options = FindOneOptions::builder().sort(doc! { "internal_id": -1 }).build(); tracing::debug!(job_type = ?job_type, job_status = ?job_status, category = "db_call", "Fetched latest job by type and status"); + let attributes = [KeyValue::new("db_operation_name", "get_latest_job_by_type_and_status")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(self.get_job_collection().find_one(filter, find_options).await?) } @@ -349,6 +396,7 @@ impl Database for MongoDb { job_status: JobStatus, internal_id: String, ) -> Result> { + let start = Instant::now(); let filter = doc! { "job_type": bson::to_bson(&job_type)?, "status": bson::to_bson(&job_status)?, @@ -361,11 +409,15 @@ impl Database for MongoDb { }; let jobs: Vec = self.get_job_collection().find(filter, None).await?.try_collect().await?; tracing::debug!(job_type = ?job_type, job_status = ?job_status, internal_id = internal_id, category = "db_call", "Fetched jobs after internal ID by job type"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_after_internal_id_by_job_type")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(jobs) } #[tracing::instrument(skip(self, limit), fields(function_type = "db_call"), ret, err)] async fn get_jobs_by_statuses(&self, job_status: Vec, limit: Option) -> Result> { + let start = Instant::now(); let filter = doc! { "status": { // TODO: Check that the conversion leads to valid output! @@ -377,6 +429,9 @@ impl Database for MongoDb { let jobs: Vec = self.get_job_collection().find(filter, find_options).await?.try_collect().await?; tracing::debug!(job_count = jobs.len(), category = "db_call", "Retrieved jobs by statuses"); + let attributes = [KeyValue::new("db_operation_name", "get_jobs_by_statuses")]; + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.db_calls_response_time.record(duration.as_secs_f64(), &attributes); Ok(jobs) } } diff --git a/crates/orchestrator/src/jobs/mod.rs b/crates/orchestrator/src/jobs/mod.rs index e86aa984..91b6f1e5 100644 --- a/crates/orchestrator/src/jobs/mod.rs +++ b/crates/orchestrator/src/jobs/mod.rs @@ -2,7 +2,7 @@ use std::collections::HashMap; use std::fmt; use std::panic::AssertUnwindSafe; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use async_trait::async_trait; use color_eyre::eyre::{eyre, Context}; @@ -151,6 +151,7 @@ pub async fn create_job( metadata: HashMap, config: Arc, ) -> Result<(), JobError> { + let start = Instant::now(); tracing::info!(log_type = "starting", category = "general", function_type = "create_job", job_type = ?job_type, block_no = %internal_id, "General create job started for block"); tracing::debug!( @@ -180,13 +181,16 @@ pub async fn create_job( .map_err(|e| JobError::Other(OtherError(e)))?; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job_type)), - KeyValue::new("type", "create_job"), - KeyValue::new("job", format!("{:?}", job_item)), + KeyValue::new("operation_job_type", format!("{:?}", job_type)), + KeyValue::new("operation_type", "create_job"), + KeyValue::new("operation_job", format!("{:?}", job_item)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "create_job", block_no = %internal_id, "General create job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&internal_id)?, &attributes); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) } @@ -194,6 +198,7 @@ pub async fn create_job( /// DB. It then adds the job to the verification queue. #[tracing::instrument(skip(config), fields(category = "general", job, job_type, internal_id), ret, err)] pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let start = Instant::now(); let job = get_job(id, config.clone()).await?; let internal_id = job.internal_id.clone(); tracing::info!(log_type = "starting", category = "general", function_type = "process_job", block_no = %internal_id, "General process job started for block"); @@ -292,13 +297,16 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> })?; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job.job_type)), - KeyValue::new("type", "process_job"), - KeyValue::new("job", format!("{:?}", job)), + KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), + KeyValue::new("operation_type", "process_job"), + KeyValue::new("operation_job", format!("{:?}", job)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "process_job", block_no = %internal_id, "General process job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); Ok(()) } @@ -315,6 +323,7 @@ pub async fn process_job(id: Uuid, config: Arc) -> Result<(), JobError> err )] pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { + let start = Instant::now(); let mut job = get_job(id, config.clone()).await?; let internal_id = job.internal_id.clone(); tracing::info!(log_type = "starting", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job started for block"); @@ -433,13 +442,16 @@ pub async fn verify_job(id: Uuid, config: Arc) -> Result<(), JobError> { }; let attributes = [ - KeyValue::new("job_type", format!("{:?}", job.job_type)), - KeyValue::new("type", "verify_job"), - KeyValue::new("job", format!("{:?}", job)), + KeyValue::new("operation_job_type", format!("{:?}", job.job_type)), + KeyValue::new("operation_type", "verify_job"), + KeyValue::new("operation_job", format!("{:?}", job)), ]; - ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); tracing::info!(log_type = "completed", category = "general", function_type = "verify_job", block_no = %internal_id, "General verify job completed for block"); + let duration = start.elapsed(); + ORCHESTRATOR_METRICS.successful_jobs.add(1.0, &attributes); + ORCHESTRATOR_METRICS.jobs_response_time.record(duration.as_secs_f64(), &attributes); + ORCHESTRATOR_METRICS.block_gauge.record(parse_string(&job.internal_id)?, &attributes); Ok(()) } diff --git a/crates/orchestrator/src/jobs/proving_job/mod.rs b/crates/orchestrator/src/jobs/proving_job/mod.rs index d9e69e32..138d2734 100644 --- a/crates/orchestrator/src/jobs/proving_job/mod.rs +++ b/crates/orchestrator/src/jobs/proving_job/mod.rs @@ -128,6 +128,9 @@ impl Job for ProvingJob { Ok(JobVerificationStatus::Pending) } TaskStatus::Succeeded => { + // TODO: call isValid on the contract over here to cross-verify whether the proof was registered on + // chain or not + tracing::info!(log_type = "completed", category = "proving", function_type = "verify_job", job_id = ?job.id, block_no = %internal_id, "Proving job verification completed."); Ok(JobVerificationStatus::Verified) } @@ -142,14 +145,14 @@ impl Job for ProvingJob { } fn max_process_attempts(&self) -> u64 { - 1 + 2 } fn max_verification_attempts(&self) -> u64 { - 1 + 1200 } fn verification_polling_delay_seconds(&self) -> u64 { - 60 + 30 } } diff --git a/crates/orchestrator/src/metrics.rs b/crates/orchestrator/src/metrics.rs index 0eade9f6..67679d17 100644 --- a/crates/orchestrator/src/metrics.rs +++ b/crates/orchestrator/src/metrics.rs @@ -1,14 +1,18 @@ use once_cell; use once_cell::sync::Lazy; -use opentelemetry::metrics::Gauge; +use opentelemetry::metrics::{Counter, Gauge}; use opentelemetry::{global, KeyValue}; -use utils::metrics::lib::{register_gauge_metric_instrument, Metrics}; +use utils::metrics::lib::{register_counter_metric_instrument, register_gauge_metric_instrument, Metrics}; use utils::register_metric; register_metric!(ORCHESTRATOR_METRICS, OrchestratorMetrics); pub struct OrchestratorMetrics { pub block_gauge: Gauge, + pub successful_jobs: Counter, + pub failed_jobs: Counter, + pub jobs_response_time: Gauge, + pub db_calls_response_time: Gauge, } impl Metrics for OrchestratorMetrics { @@ -31,6 +35,34 @@ impl Metrics for OrchestratorMetrics { "block".to_string(), ); - Self { block_gauge } + let successful_jobs = register_counter_metric_instrument( + &orchestrator_meter, + "successful_jobs".to_string(), + "A counter to show count of successful jobs over time".to_string(), + "jobs".to_string(), + ); + + let failed_jobs = register_counter_metric_instrument( + &orchestrator_meter, + "failed_jobs".to_string(), + "A counter to show count of failed jobs over time".to_string(), + "jobs".to_string(), + ); + + let jobs_response_time = register_gauge_metric_instrument( + &orchestrator_meter, + "jobs_response_time".to_string(), + "A gauge to show response time of jobs over time".to_string(), + "Time".to_string(), + ); + + let db_calls_response_time = register_gauge_metric_instrument( + &orchestrator_meter, + "db_calls_response_time".to_string(), + "A gauge to show response time of jobs over time".to_string(), + "Time".to_string(), + ); + + Self { block_gauge, successful_jobs, failed_jobs, jobs_response_time, db_calls_response_time } } } diff --git a/crates/orchestrator/src/queue/job_queue.rs b/crates/orchestrator/src/queue/job_queue.rs index 16b5eedd..1527f493 100644 --- a/crates/orchestrator/src/queue/job_queue.rs +++ b/crates/orchestrator/src/queue/job_queue.rs @@ -7,6 +7,7 @@ use color_eyre::eyre::Context; use color_eyre::Result as EyreResult; use omniqueue::{Delivery, QueueError}; use serde::{Deserialize, Deserializer, Serialize}; +use strum::Display; use thiserror::Error; use tokio::time::sleep; use uuid::Uuid; @@ -42,7 +43,8 @@ pub struct JobQueueMessage { pub id: Uuid, } -#[derive(Debug, Serialize, Deserialize, PartialEq, Clone)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Display)] +#[strum(serialize_all = "PascalCase")] pub enum WorkerTriggerType { Snos, Proving, @@ -80,13 +82,14 @@ impl<'de> Deserialize<'de> for WorkerTriggerMessage { where D: Deserializer<'de>, { - let s = String::deserialize(deserializer)?; - let s = s.trim_start_matches('{').trim_end_matches('}'); - let parts: Vec<&str> = s.split(':').collect(); - if parts.len() != 2 || parts[0] != "worker" { - return Err(serde::de::Error::custom("Invalid format")); + #[derive(Deserialize, Debug)] + struct Helper { + worker: String, } - Ok(WorkerTriggerMessage { worker: WorkerTriggerType::from_str(parts[1]).map_err(serde::de::Error::custom)? }) + let helper = Helper::deserialize(deserializer)?; + Ok(WorkerTriggerMessage { + worker: WorkerTriggerType::from_str(&helper.worker).map_err(serde::de::Error::custom)?, + }) } } @@ -221,11 +224,15 @@ fn parse_job_message(message: &Delivery) -> Result, Cons .map_err(|e| ConsumptionError::Other(OtherError::from(e))) } +/// Using string since localstack currently is instable with deserializing maps. +/// Change this to accept a map after localstack is stable fn parse_worker_message(message: &Delivery) -> Result, ConsumptionError> { - message - .payload_serde_json() - .wrap_err("Payload Serde Error") - .map_err(|e| ConsumptionError::Other(OtherError::from(e))) + let payload = message + .borrow_payload() + .ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?; + let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string(); + let trigger_type = WorkerTriggerType::from_str(message_string.as_str()).expect("trigger type unwrapping failed"); + Ok(Some(WorkerTriggerMessage { worker: trigger_type })) } async fn handle_job_message( @@ -258,7 +265,8 @@ where .await .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; - match message.nack().await { + // not using `nack` as we dont' want retries in case of failures + match message.ack().await { Ok(_) => Err(ConsumptionError::FailedToHandleJob { job_id: job_message.id, error_msg: "Job handling failed, message nack-ed".to_string(), @@ -302,7 +310,8 @@ where .await .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?; - message.nack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; + // not using `nack` as we dont' want retries in case of failures + message.ack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?; Err(ConsumptionError::FailedToSpawnWorker { worker_trigger_type: job_message.worker, error_msg: "Worker handling failed, message nack-ed".to_string(), diff --git a/crates/orchestrator/src/queue/mod.rs b/crates/orchestrator/src/queue/mod.rs index cf9d9ece..98ce3e66 100644 --- a/crates/orchestrator/src/queue/mod.rs +++ b/crates/orchestrator/src/queue/mod.rs @@ -91,7 +91,7 @@ lazy_static! { }, QueueConfig { name: QueueType::UpdateStateJobProcessing, - visibility_timeout: 300, + visibility_timeout: 900, dlq_config: Some(DlqConfig { max_receive_count: 5, dlq_name: QueueType::JobHandleFailure }) }, QueueConfig { diff --git a/crates/orchestrator/src/routes/job_routes.rs b/crates/orchestrator/src/routes/job_routes.rs index aeeb25c2..636aa17e 100644 --- a/crates/orchestrator/src/routes/job_routes.rs +++ b/crates/orchestrator/src/routes/job_routes.rs @@ -4,12 +4,14 @@ use axum::extract::{Path, State}; use axum::response::IntoResponse; use axum::routing::get; use axum::Router; +use opentelemetry::KeyValue; use serde::{Deserialize, Serialize}; use uuid::Uuid; use super::ApiResponse; use crate::config::Config; use crate::jobs::{process_job, verify_job, JobError}; +use crate::metrics::ORCHESTRATOR_METRICS; #[derive(Deserialize)] struct JobId { @@ -40,7 +42,14 @@ async fn handle_process_job_request( let response = JobApiResponse { job_id: job_id.to_string(), status: "completed".to_string() }; ApiResponse::success(response).into_response() } - Err(e) => ApiResponse::::error(e.to_string()).into_response(), + Err(e) => { + let attributes = [ + KeyValue::new("operation_type", "process_job"), + KeyValue::new("operation_job_id", format!("{:?}", job_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ApiResponse::::error(e.to_string()).into_response() + } } } @@ -62,7 +71,14 @@ async fn handle_verify_job_request( let response = JobApiResponse { job_id: job_id.to_string(), status: "verified".to_string() }; ApiResponse::success(response).into_response() } - Err(e) => ApiResponse::::error(e.to_string()).into_response(), + Err(e) => { + let attributes = [ + KeyValue::new("operation_type", "verify_job"), + KeyValue::new("operation_job_id", format!("{:?}", job_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + ApiResponse::::error(e.to_string()).into_response() + } } } pub fn job_router(config: Arc) -> Router { diff --git a/crates/orchestrator/src/setup/mod.rs b/crates/orchestrator/src/setup/mod.rs index 6e1a3f0a..e14f00fc 100644 --- a/crates/orchestrator/src/setup/mod.rs +++ b/crates/orchestrator/src/setup/mod.rs @@ -1,5 +1,7 @@ use std::process::Command; +use std::time::Duration; +use async_std::task::sleep; use aws_config::SdkConfig; use crate::alerts::aws_sns::AWSSNS; @@ -25,24 +27,12 @@ pub enum SetupConfig { // Note: we are using println! instead of tracing::info! because telemetry is not yet initialized // and it get initialized during the run_orchestrator function. pub async fn setup_cloud(setup_cmd: &SetupCmd) -> color_eyre::Result<()> { - println!("Setting up cloud. ⏳"); // AWS + println!("Setting up cloud. ⏳"); let provider_params = setup_cmd.validate_provider_params().expect("Failed to validate provider params"); let provider_config = build_provider_config(&provider_params).await; - - // Data Storage - println!("Setting up data storage. ⏳"); - let data_storage_params = setup_cmd.validate_storage_params().expect("Failed to validate storage params"); let aws_config = provider_config.get_aws_client_or_panic(); - match data_storage_params { - StorageValidatedArgs::AWSS3(aws_s3_params) => { - let s3 = Box::new(AWSS3::new_with_args(&aws_s3_params, aws_config).await); - s3.setup(&StorageValidatedArgs::AWSS3(aws_s3_params.clone())).await? - } - } - println!("Data storage setup completed ✅"); - // Queues println!("Setting up queues. ⏳"); let queue_params = setup_cmd.validate_queue_params().expect("Failed to validate queue params"); @@ -54,8 +44,25 @@ pub async fn setup_cloud(setup_cmd: &SetupCmd) -> color_eyre::Result<()> { } println!("Queues setup completed ✅"); + // Waiting for few seconds to let AWS index the queues + sleep(Duration::from_secs(20)).await; + + // Data Storage + println!("Setting up data storage. ⏳"); + let data_storage_params = setup_cmd.validate_storage_params().expect("Failed to validate storage params"); + + match data_storage_params { + StorageValidatedArgs::AWSS3(aws_s3_params) => { + let s3 = Box::new(AWSS3::new_with_args(&aws_s3_params, aws_config).await); + s3.setup(&StorageValidatedArgs::AWSS3(aws_s3_params.clone())).await? + } + } + println!("Data storage setup completed ✅"); + // Cron println!("Setting up cron. ⏳"); + // Sleeping for few seconds to let AWS index the newly created queues to be used for setting up cron + sleep(Duration::from_secs(100)).await; let cron_params = setup_cmd.validate_cron_params().expect("Failed to validate cron params"); match cron_params { CronValidatedArgs::AWSEventBridge(aws_event_bridge_params) => { diff --git a/crates/orchestrator/src/tests/common/mod.rs b/crates/orchestrator/src/tests/common/mod.rs index 7e5f7c9e..63065c69 100644 --- a/crates/orchestrator/src/tests/common/mod.rs +++ b/crates/orchestrator/src/tests/common/mod.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use ::uuid::Uuid; use aws_config::SdkConfig; +use aws_sdk_s3::Client as S3Client; use aws_sdk_sns::error::SdkError; use aws_sdk_sns::operation::create_topic::CreateTopicError; use chrono::{SubsecRound, Utc}; @@ -16,6 +17,7 @@ use strum::IntoEnumIterator as _; use crate::cli::alert::AlertValidatedArgs; use crate::cli::database::DatabaseValidatedArgs; use crate::cli::queue::QueueValidatedArgs; +use crate::cli::storage::StorageValidatedArgs; use crate::config::ProviderConfig; use crate::data_storage::aws_s3::{AWSS3ValidatedArgs, AWSS3}; use crate::data_storage::DataStorage; @@ -76,6 +78,59 @@ pub async fn drop_database(database_params: &DatabaseValidatedArgs) -> color_eyr Ok(()) } +pub async fn delete_storage( + provider_config: Arc, + data_storage_args: &StorageValidatedArgs, +) -> color_eyre::Result<()> { + match data_storage_args { + StorageValidatedArgs::AWSS3(s3_params) => { + let bucket_name = s3_params.bucket_name.clone(); + let aws_config = provider_config.get_aws_client_or_panic(); + + let mut s3_config_builder = aws_sdk_s3::config::Builder::from(aws_config); + // this is necessary for it to work with localstack in test cases + s3_config_builder.set_force_path_style(Some(true)); + let client = S3Client::from_conf(s3_config_builder.build()); + + // Check if bucket exists + match client.head_bucket().bucket(&bucket_name).send().await { + Ok(_) => { + println!("Bucket exists, proceeding with deletion"); + } + Err(e) => { + println!("Bucket '{}' does not exist or is not accessible: {}", &bucket_name, e); + return Ok(()); + } + } + + // First, delete all objects in the bucket (required for non-empty buckets) + let objects = client.list_objects_v2().bucket(&bucket_name).send().await?.contents().to_vec(); + + // If there are objects, delete them + if !objects.is_empty() { + let objects_to_delete: Vec<_> = objects + .iter() + .map(|obj| { + aws_sdk_s3::types::ObjectIdentifier::builder() + .key(obj.key().unwrap_or_default()) + .build() + .expect("Could not build object builder") + }) + .collect(); + + let delete = aws_sdk_s3::types::Delete::builder().set_objects(Some(objects_to_delete)).build()?; + + client.delete_objects().bucket(&bucket_name).delete(delete).send().await?; + } + + // After deleting all objects, delete the bucket itself + client.delete_bucket().bucket(&bucket_name).send().await?; + + Ok(()) + } + } +} + // SQS structs & functions pub async fn create_queues( diff --git a/crates/orchestrator/src/tests/config.rs b/crates/orchestrator/src/tests/config.rs index d954ff9a..e46fdc23 100644 --- a/crates/orchestrator/src/tests/config.rs +++ b/crates/orchestrator/src/tests/config.rs @@ -17,6 +17,7 @@ use starknet::providers::JsonRpcClient; use url::Url; use utils::env_utils::{get_env_var_optional, get_env_var_or_panic}; +use super::common::delete_storage; use crate::alerts::aws_sns::AWSSNSValidatedArgs; use crate::alerts::Alerts; use crate::cli::alert::AlertValidatedArgs; @@ -230,6 +231,9 @@ impl TestConfigBuilder { let prover_client = implement_client::init_prover_client(prover_client_type, ¶ms.prover_params).await; + // Delete the Storage before use + delete_storage(provider_config.clone(), ¶ms.storage_params).await.expect("Could not delete storage"); + // External Dependencies let storage = implement_client::init_storage_client(storage_type, ¶ms.storage_params, provider_config.clone()).await; diff --git a/crates/orchestrator/src/tests/workers/update_state/mod.rs b/crates/orchestrator/src/tests/workers/update_state/mod.rs index d569e92d..c9cc4781 100644 --- a/crates/orchestrator/src/tests/workers/update_state/mod.rs +++ b/crates/orchestrator/src/tests/workers/update_state/mod.rs @@ -76,8 +76,8 @@ async fn update_state_worker_first_block_missing() { .build() .await; - // skiip first block from DA completion - let mut job_item = get_job_item_mock_by_id("1".to_string(), Uuid::new_v4()); + // skip first block from DA completion + let mut job_item = get_job_item_mock_by_id("2".to_string(), Uuid::new_v4()); job_item.status = JobStatus::Completed; job_item.job_type = JobType::DataSubmission; services.config.database().create_job(job_item).await.unwrap(); diff --git a/crates/orchestrator/src/workers/data_submission_worker.rs b/crates/orchestrator/src/workers/data_submission_worker.rs index 08c6068c..20e1955a 100644 --- a/crates/orchestrator/src/workers/data_submission_worker.rs +++ b/crates/orchestrator/src/workers/data_submission_worker.rs @@ -2,10 +2,12 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct DataSubmissionWorker; @@ -24,7 +26,18 @@ impl Worker for DataSubmissionWorker { .await?; for job in successful_proving_jobs { - create_job(JobType::DataSubmission, job.internal_id, HashMap::new(), config.clone()).await?; + match create_job(JobType::DataSubmission, job.internal_id.clone(), HashMap::new(), config.clone()).await { + Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new data submission job"), + Err(e) => { + tracing::warn!(block_id = %job.internal_id, error = %e, "Failed to create new data submission job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::DataSubmission)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", job.internal_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + } + } } tracing::trace!(log_type = "completed", category = "DataSubmissionWorker", "DataSubmissionWorker completed."); diff --git a/crates/orchestrator/src/workers/proving.rs b/crates/orchestrator/src/workers/proving.rs index 523f310d..145235ea 100644 --- a/crates/orchestrator/src/workers/proving.rs +++ b/crates/orchestrator/src/workers/proving.rs @@ -1,10 +1,12 @@ use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct ProvingWorker; @@ -25,7 +27,18 @@ impl Worker for ProvingWorker { for job in successful_snos_jobs { tracing::debug!(job_id = %job.internal_id, "Creating proof creation job for SNOS job"); - create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await?; + match create_job(JobType::ProofCreation, job.internal_id.to_string(), job.metadata, config.clone()).await { + Ok(_) => tracing::info!(block_id = %job.internal_id, "Successfully created new proving job"), + Err(e) => { + tracing::warn!(job_id = %job.internal_id, error = %e, "Failed to create new state transition job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::ProofCreation)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", job.internal_id)), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); + } + } } tracing::trace!(log_type = "completed", category = "ProvingWorker", "ProvingWorker completed."); diff --git a/crates/orchestrator/src/workers/snos.rs b/crates/orchestrator/src/workers/snos.rs index a346dd37..fc322f16 100644 --- a/crates/orchestrator/src/workers/snos.rs +++ b/crates/orchestrator/src/workers/snos.rs @@ -3,11 +3,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use starknet::providers::Provider; use crate::config::Config; use crate::jobs::create_job; use crate::jobs::types::JobType; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct SnosWorker; @@ -47,9 +49,15 @@ impl Worker for SnosWorker { for block_num in block_start..latest_block_number + 1 { match create_job(JobType::SnosRun, block_num.to_string(), HashMap::new(), config.clone()).await { - Ok(_) => {} + Ok(_) => tracing::info!(block_id = %block_num, "Successfully created new Snos job"), Err(e) => { - log::warn!("Failed to create job: {:?}", e); + tracing::warn!(block_id = %block_num, error = %e, "Failed to create new Snos job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", block_num.to_string())), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); } } } diff --git a/crates/orchestrator/src/workers/update_state.rs b/crates/orchestrator/src/workers/update_state.rs index 961d2522..b2708c2d 100644 --- a/crates/orchestrator/src/workers/update_state.rs +++ b/crates/orchestrator/src/workers/update_state.rs @@ -2,11 +2,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use opentelemetry::KeyValue; use crate::config::Config; use crate::jobs::constants::JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY; use crate::jobs::create_job; use crate::jobs::types::{JobStatus, JobType}; +use crate::metrics::ORCHESTRATOR_METRICS; use crate::workers::Worker; pub struct UpdateStateWorker; @@ -101,7 +103,11 @@ impl Worker for UpdateStateWorker { } } - let blocks_to_process: Vec = find_successive_blocks_in_vector(blocks_to_process); + let mut blocks_to_process: Vec = find_successive_blocks_in_vector(blocks_to_process); + + if blocks_to_process.len() > 10 { + blocks_to_process = blocks_to_process.into_iter().take(10).collect(); + } let mut metadata = HashMap::new(); metadata.insert( @@ -112,9 +118,15 @@ impl Worker for UpdateStateWorker { // Creating a single job for all the pending blocks. let new_job_id = blocks_to_process[0].to_string(); match create_job(JobType::StateTransition, new_job_id.clone(), metadata, config.clone()).await { - Ok(_) => tracing::info!(job_id = %new_job_id, "Successfully created new state transition job"), + Ok(_) => tracing::info!(block_id = %new_job_id, "Successfully created new state transition job"), Err(e) => { tracing::error!(job_id = %new_job_id, error = %e, "Failed to create new state transition job"); + let attributes = [ + KeyValue::new("operation_job_type", format!("{:?}", JobType::SnosRun)), + KeyValue::new("operation_type", format!("{:?}", "create_job")), + KeyValue::new("operation_internal_id", format!("{:?}", new_job_id.to_string())), + ]; + ORCHESTRATOR_METRICS.failed_jobs.add(1.0, &attributes); return Err(e.into()); } } diff --git a/crates/prover-clients/atlantic-service/src/client.rs b/crates/prover-clients/atlantic-service/src/client.rs index cddbc635..5b00436d 100644 --- a/crates/prover-clients/atlantic-service/src/client.rs +++ b/crates/prover-clients/atlantic-service/src/client.rs @@ -24,14 +24,14 @@ trait ProvingLayer: Send + Sync { struct EthereumLayer; impl ProvingLayer for EthereumLayer { fn customize_request<'a>(&self, request: RequestBuilder<'a>) -> RequestBuilder<'a> { - request.path("/l1/atlantic-query/proof-generation-verification") + request.path("v1").path("l1/atlantic-query/proof-generation-verification") } } struct StarknetLayer; impl ProvingLayer for StarknetLayer { fn customize_request<'a>(&self, request: RequestBuilder<'a>) -> RequestBuilder<'a> { - request.path("/l2/submit-sharp-query/from-proof-generation-to-proof-verification") + request.path("v1").path("l2/submit-sharp-query/from-proof-generation-to-proof-verification") } } @@ -100,6 +100,7 @@ impl AtlanticClient { .client .request() .method(Method::GET) + .path("v1") .path("atlantic-query") .path(job_key) .send() diff --git a/crates/prover-clients/atlantic-service/src/lib.rs b/crates/prover-clients/atlantic-service/src/lib.rs index bd1a6e07..d10e11ba 100644 --- a/crates/prover-clients/atlantic-service/src/lib.rs +++ b/crates/prover-clients/atlantic-service/src/lib.rs @@ -1,6 +1,9 @@ pub mod client; pub mod error; mod types; +use std::str::FromStr; + +use alloy::primitives::B256; use async_trait::async_trait; use cairo_vm::types::layout_name::LayoutName; use gps_fact_checker::FactChecker; @@ -9,6 +12,7 @@ use tempfile::NamedTempFile; use url::Url; use crate::client::AtlanticClient; +use crate::types::AtlanticQueryStatus; pub const ATLANTIC_SETTINGS_NAME: &str = "atlantic"; @@ -54,36 +58,31 @@ impl ProverClient for AtlanticProverService { let atlantic_job_response = self.atlantic_client.add_job(pie_file_path, proof_layout, self.atlantic_api_key.clone()).await?; // sleep for 2 seconds to make sure the job is submitted - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; log::debug!("Successfully submitted task to atlantic: {:?}", atlantic_job_response); // The temporary file will be automatically deleted when `temp_file` goes out of scope - Ok(atlantic_job_response.sharp_query_id) + Ok(atlantic_job_response.atlantic_query_id) } } } #[tracing::instrument(skip(self))] async fn get_task_status(&self, job_key: &str, fact: &str) -> Result { - // let res = self.atlantic_client.get_job_status(job_key).await?; - // - // match res.sharp_query.status { - // SharpQueryStatus::InProgress => Ok(TaskStatus::Processing), - // SharpQueryStatus::Done => { - // let fact = B256::from_str(fact).map_err(|e| - // ProverClientError::FailedToConvertFact(e.to_string()))?; if - // self.fact_checker.is_valid(&fact).await? { Ok(TaskStatus::Succeeded) - // } else { - // Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", - // hex::encode(fact)))) } - // } - // SharpQueryStatus::Failed => { - // Ok(TaskStatus::Failed("Task failed while processing on Atlantic side".to_string())) - // } - // } - - // TODO: Commented the above code since, atlantic infra is not able - // to prove snos blocks currently so to run e2e tests returning Succeeded. - Ok(TaskStatus::Succeeded) + let res = self.atlantic_client.get_job_status(job_key).await?; + match res.atlantic_query.status { + AtlanticQueryStatus::InProgress => Ok(TaskStatus::Processing), + AtlanticQueryStatus::Done => { + let fact = B256::from_str(fact).map_err(|e| ProverClientError::FailedToConvertFact(e.to_string()))?; + if self.fact_checker.is_valid(&fact).await? { + Ok(TaskStatus::Succeeded) + } else { + Ok(TaskStatus::Failed(format!("Fact {} is not valid or not registered", hex::encode(fact)))) + } + } + AtlanticQueryStatus::Failed => { + Ok(TaskStatus::Failed("Task failed while processing on Atlantic side".to_string())) + } + } } } diff --git a/crates/prover-clients/atlantic-service/src/types.rs b/crates/prover-clients/atlantic-service/src/types.rs index a13e7169..3992bd9e 100644 --- a/crates/prover-clients/atlantic-service/src/types.rs +++ b/crates/prover-clients/atlantic-service/src/types.rs @@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize}; #[derive(Default, Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AtlanticAddJobResponse { - pub sharp_query_id: String, + pub atlantic_query_id: String, } #[derive(Default, Debug, Clone, Serialize, Deserialize)] @@ -21,8 +21,8 @@ pub struct AtlanticGetStatusResponse { pub struct AtlanticQuery { pub id: String, pub submitted_by_client: String, - pub status: SharpQueryStatus, - pub step: SharpQueryStep, + pub status: AtlanticQueryStatus, + pub step: Option, pub program_hash: Option, pub layout: Option, pub program_fact_hash: Option, @@ -30,12 +30,12 @@ pub struct AtlanticQuery { pub prover: String, pub chain: String, pub price: String, - pub steps: Vec, + pub steps: Vec, } #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum SharpQueryStatus { +pub enum AtlanticQueryStatus { InProgress, Done, Failed, @@ -43,7 +43,7 @@ pub enum SharpQueryStatus { #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "SCREAMING_SNAKE_CASE")] -pub enum SharpQueryStep { +pub enum AtlanticQueryStep { ProofGeneration, FactHashGeneration, FactHashRegistration, diff --git a/crates/prover-clients/atlantic-service/tests/lib.rs b/crates/prover-clients/atlantic-service/tests/lib.rs index 3270d2af..f103aa56 100644 --- a/crates/prover-clients/atlantic-service/tests/lib.rs +++ b/crates/prover-clients/atlantic-service/tests/lib.rs @@ -11,9 +11,8 @@ use crate::constants::CAIRO_PIE_PATH; mod constants; #[tokio::test] -async fn atlantic_client_submit_task_calls_correct_endpoint() { +async fn atlantic_client_submit_task_when_mock_works() { let _ = env_logger::try_init(); - color_eyre::install().expect("Unable to install color_eyre"); dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); let atlantic_params = AtlanticValidatedArgs { atlantic_api_key: get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_API_KEY"), @@ -31,9 +30,9 @@ async fn atlantic_client_submit_task_calls_correct_endpoint() { // Create a mock for the submit endpoint let submit_mock = mock_server.mock(|when, then| { - when.method("POST").path("/l1/atlantic-query/proof-generation-verification"); + when.method("POST").path("/v1/l1/atlantic-query/proof-generation-verification"); then.status(200).header("content-type", "application/json").json_body(serde_json::json!({ - "sharpQueryId": "mock_query_id_123" + "atlanticQueryId": "mock_query_id_123" })); }); @@ -48,3 +47,25 @@ async fn atlantic_client_submit_task_calls_correct_endpoint() { assert!(task_result.is_ok()); submit_mock.assert(); } + +#[tokio::test] +async fn atlantic_client_get_task_status_works() { + let _ = env_logger::try_init(); + dotenvy::from_filename("../.env.test").expect("Failed to load the .env file"); + let atlantic_params = AtlanticValidatedArgs { + atlantic_api_key: get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_API_KEY"), + atlantic_service_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_SERVICE_URL")).unwrap(), + atlantic_rpc_node_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_RPC_NODE_URL")).unwrap(), + atlantic_mock_fact_hash: get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_MOCK_FACT_HASH"), + atlantic_prover_type: get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_PROVER_TYPE"), + atlantic_settlement_layer: get_env_var_or_panic("MADARA_ORCHESTRATOR_ATLANTIC_SETTLEMENT_LAYER"), + atlantic_verifier_contract_address: get_env_var_or_panic( + "MADARA_ORCHESTRATOR_ATLANTIC_VERIFIER_CONTRACT_ADDRESS", + ), + }; + let atlantic_service = AtlanticProverService::new_with_args(&atlantic_params); + + let atlantic_query_id = "01JDY6EKVQD8QYR8HE64WZC9VB"; + let task_result = atlantic_service.atlantic_client.get_job_status(atlantic_query_id).await; + assert!(task_result.is_ok()); +} diff --git a/crates/utils/src/metrics/lib.rs b/crates/utils/src/metrics/lib.rs index d31b6d15..07f69c2d 100644 --- a/crates/utils/src/metrics/lib.rs +++ b/crates/utils/src/metrics/lib.rs @@ -1,4 +1,4 @@ -use opentelemetry::metrics::{Gauge, Meter}; +use opentelemetry::metrics::{Counter, Gauge, Meter}; pub trait Metrics { fn register() -> Self; @@ -19,3 +19,12 @@ pub fn register_gauge_metric_instrument( ) -> Gauge { crate_meter.f64_gauge(instrument_name).with_description(desc).with_unit(unit).init() } + +pub fn register_counter_metric_instrument( + crate_meter: &Meter, + instrument_name: String, + desc: String, + unit: String, +) -> Counter { + crate_meter.f64_counter(instrument_name).with_description(desc).with_unit(unit).init() +} diff --git a/e2e-tests/tests.rs b/e2e-tests/tests.rs index 89a63ee4..9430109d 100644 --- a/e2e-tests/tests.rs +++ b/e2e-tests/tests.rs @@ -4,6 +4,8 @@ use std::io::Read; use std::time::{Duration, Instant}; use aws_config::meta::region::RegionProviderChain; +use aws_sdk_eventbridge::types::{InputTransformer, RuleState, Target}; +use aws_sdk_sqs::types::QueueAttributeName; use chrono::{SubsecRound, Utc}; use e2e_tests::anvil::AnvilSetup; use e2e_tests::mock_server::MockResponseBodyType; @@ -13,6 +15,7 @@ use e2e_tests::utils::{get_mongo_db_client, read_state_update_from_file, vec_u8_ use e2e_tests::{MongoDbServer, Orchestrator}; use mongodb::bson::doc; use orchestrator::cli::database::DatabaseValidatedArgs; +use orchestrator::cron::{get_worker_trigger_message, WORKER_TRIGGERS}; use orchestrator::data_storage::DataStorage; use orchestrator::database::mongodb::MongoDBValidatedArgs; use orchestrator::jobs::constants::{JOB_METADATA_SNOS_BLOCK, JOB_METADATA_STATE_UPDATE_BLOCKS_TO_SETTLE_KEY}; @@ -49,7 +52,6 @@ struct Setup { } impl Setup { - /// Initialise a new setup pub async fn new(l2_block_number: String) -> Self { let db_params = DatabaseValidatedArgs::MongoDB(MongoDBValidatedArgs { connection_url: Url::parse(&get_env_var_or_panic("MADARA_ORCHESTRATOR_MONGODB_CONNECTION_URL")) @@ -168,6 +170,15 @@ async fn test_orchestrator_workflow(#[case] l2_block_number: String) { println!("✅ Orchestrator setup completed."); + let trigger_rule_name = &get_env_var_or_panic("MADARA_ORCHESTRATOR_EVENT_BRIDGE_TRIGGER_RULE_NAME"); + let target_queue_name = &get_env_var_or_panic("MADARA_ORCHESTRATOR_EVENT_BRIDGE_TARGET_QUEUE_NAME"); + + // Setup eventbridge rules + create_event_bridge_rule(trigger_rule_name, target_queue_name).await.expect( + "Unable to create + event bridge rule", + ); + // Run orchestrator let mut orchestrator = Orchestrator::new(OrchestratorMode::Run, setup_config.envs()).expect("Failed to start orchestrator"); @@ -226,6 +237,58 @@ async fn test_orchestrator_workflow(#[case] l2_block_number: String) { assert!(test_result.is_ok(), "After Update State Job state DB state assertion failed."); } +/// Function that adds rules to tests for localstack +/// This can be removed after https://github.com/localstack/localstack/issues/9861 is closed +async fn create_event_bridge_rule(trigger_rule_name: &String, target_queue_name: &String) -> color_eyre::Result<()> { + let aws_config = &aws_config::from_env().load().await; + + let queue_client = aws_sdk_sqs::Client::new(aws_config); + + let event_bridge_client = aws_sdk_eventbridge::Client::new(aws_config); + + let queue_url = queue_client.get_queue_url().queue_name(target_queue_name).send().await?; + + let queue_attributes = queue_client + .get_queue_attributes() + .queue_url(queue_url.queue_url.unwrap()) + .attribute_names(QueueAttributeName::QueueArn) + .send() + .await?; + let queue_arn = queue_attributes.attributes().unwrap().get(&QueueAttributeName::QueueArn).unwrap(); + + // Create the EventBridge target with the input transformer + + for trigger in WORKER_TRIGGERS.iter() { + let message = get_worker_trigger_message(trigger.clone())?; + let input_transformer = + InputTransformer::builder().input_paths_map("time", "$.time").input_template(message).build()?; + + let trigger_name = format!("{}-{}", trigger_rule_name, trigger); + event_bridge_client + .put_rule() + .name(trigger_name.clone()) + .schedule_expression("rate(1 minute)") + .state(RuleState::Enabled) + .send() + .await?; + + event_bridge_client + .put_targets() + .rule(trigger_name.clone()) + .targets( + Target::builder() + .id(uuid::Uuid::new_v4().to_string()) + .arn(queue_arn) + .input_transformer(input_transformer.clone()) + .build()?, + ) + .send() + .await?; + } + + Ok(()) +} + /// Function to check db for expected state continuously async fn wait_for_db_state( timeout: Duration,