From 7f8ec10c62940ea1856fdc05fb79b0813a481390 Mon Sep 17 00:00:00 2001 From: Landon James Date: Tue, 18 Feb 2025 21:30:32 -0800 Subject: [PATCH 1/9] First pass at interceptor for collecting metrics --- aws/rust-runtime/Cargo.lock | 11 +- rust-runtime/Cargo.lock | 3 +- .../src/attributes.rs | 2 +- rust-runtime/aws-smithy-runtime/Cargo.toml | 3 +- rust-runtime/aws-smithy-runtime/src/client.rs | 3 + .../aws-smithy-runtime/src/client/defaults.rs | 41 +++- .../aws-smithy-runtime/src/client/metrics.rs | 221 ++++++++++++++++++ .../src/client/orchestrator/operation.rs | 7 +- 8 files changed, 279 insertions(+), 12 deletions(-) create mode 100644 rust-runtime/aws-smithy-runtime/src/client/metrics.rs diff --git a/aws/rust-runtime/Cargo.lock b/aws/rust-runtime/Cargo.lock index d19b4d58e7..8c51cc15a3 100644 --- a/aws/rust-runtime/Cargo.lock +++ b/aws/rust-runtime/Cargo.lock @@ -317,6 +317,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.0" +dependencies = [ + "aws-smithy-runtime-api", + "once_cell", +] + [[package]] name = "aws-smithy-protocol-test" version = "0.63.0" @@ -336,10 +344,11 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" dependencies = [ "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 3957416471..ad52cebdb5 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -629,11 +629,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" dependencies = [ "approx", "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-protocol-test", "aws-smithy-runtime-api", "aws-smithy-types", diff --git a/rust-runtime/aws-smithy-observability/src/attributes.rs b/rust-runtime/aws-smithy-observability/src/attributes.rs index 8b4f5ac661..1e68b3a2c0 100644 --- a/rust-runtime/aws-smithy-observability/src/attributes.rs +++ b/rust-runtime/aws-smithy-observability/src/attributes.rs @@ -25,7 +25,7 @@ pub enum AttributeValue { /// Structured telemetry metadata. #[non_exhaustive] -#[derive(Clone, Default)] +#[derive(Clone, Debug, Default)] pub struct Attributes { attrs: HashMap, } diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index 5ec7722d7e..60f75eaa1c 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" authors = ["AWS Rust SDK Team ", "Zelda Hessler "] description = "The new smithy runtime crate" edition = "2021" @@ -23,6 +23,7 @@ wire-mock = ["test-util", "connector-hyper-0-14-x", "hyper-0-14?/server"] [dependencies] aws-smithy-async = { path = "../aws-smithy-async" } aws-smithy-http = { path = "../aws-smithy-http" } +aws-smithy-observability = { path = "../aws-smithy-observability" } aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" } aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x"] } diff --git a/rust-runtime/aws-smithy-runtime/src/client.rs b/rust-runtime/aws-smithy-runtime/src/client.rs index 33ccc1f09f..f50c8a1df6 100644 --- a/rust-runtime/aws-smithy-runtime/src/client.rs +++ b/rust-runtime/aws-smithy-runtime/src/client.rs @@ -53,3 +53,6 @@ pub mod sdk_feature; /// Smithy support-code for code generated waiters. pub mod waiters; + +/// Interceptor for collecting client metrics. +pub mod metrics; diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index f6699907d7..0dbccf8961 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider; use crate::client::retries::strategy::StandardRetryStrategy; use crate::client::retries::RetryPartition; use aws_smithy_async::rt::sleep::default_async_sleep; -use aws_smithy_async::time::SystemTimeSource; +use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource}; use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion; use aws_smithy_runtime_api::client::http::SharedHttpClient; @@ -33,6 +33,8 @@ use aws_smithy_types::timeout::TimeoutConfig; use std::borrow::Cow; use std::time::Duration; +use super::metrics::MetricsRuntimePlugin; + fn default_plugin(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin where CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder, @@ -211,6 +213,14 @@ fn enforce_content_length_runtime_plugin() -> Option { Some(EnforceContentLengthRuntimePlugin::new().into_shared()) } +fn metrics_runtime_plugin( + service: impl Into>, + operation: impl Into>, + time_source: SharedTimeSource, +) -> Option { + Some(MetricsRuntimePlugin::new(service, operation, time_source).into_shared()) +} + fn validate_stalled_stream_protection_config( components: &RuntimeComponentsBuilder, cfg: &ConfigBag, @@ -245,8 +255,11 @@ fn validate_stalled_stream_protection_config( #[non_exhaustive] #[derive(Debug, Default)] pub struct DefaultPluginParams { + // The retry_partition_name is also the service_name retry_partition_name: Option>, behavior_version: Option, + operation_name: Option>, + time_source: Option, } impl DefaultPluginParams { @@ -266,6 +279,18 @@ impl DefaultPluginParams { self.behavior_version = Some(version); self } + + /// Sets the operation name. + pub fn with_operation_name(mut self, operation_name: impl Into>) -> Self { + self.operation_name = Some(operation_name.into()); + self + } + + /// Sets the time source. + pub fn with_time_source(mut self, time_source: SharedTimeSource) -> Self { + self.time_source = Some(time_source); + self + } } /// All default plugins. @@ -275,20 +300,24 @@ pub fn default_plugins( let behavior_version = params .behavior_version .unwrap_or_else(BehaviorVersion::latest); + let service_name = params + .retry_partition_name + .expect("retry_partition_name is required"); [ default_http_client_plugin(), default_identity_cache_plugin(), - default_retry_config_plugin( - params - .retry_partition_name - .expect("retry_partition_name is required"), - ), + default_retry_config_plugin(service_name.clone()), default_sleep_impl_plugin(), default_time_source_plugin(), default_timeout_config_plugin(), enforce_content_length_runtime_plugin(), default_stalled_stream_protection_config_plugin_v2(behavior_version), + metrics_runtime_plugin( + service_name, + params.operation_name.unwrap_or("unknown_operation".into()), + params.time_source.unwrap_or_default(), + ), ] .into_iter() .flatten() diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs new file mode 100644 index 0000000000..3819479977 --- /dev/null +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -0,0 +1,221 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_async::time::SharedTimeSource; +use aws_smithy_observability::{ + global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes, + ObservabilityError, +}; +use aws_smithy_runtime_api::client::{ + interceptors::Intercept, runtime_components::RuntimeComponentsBuilder, + runtime_plugin::RuntimePlugin, +}; +use aws_smithy_types::config_bag::{Layer, Storable, StoreReplace}; +use std::{borrow::Cow, sync::Arc, time::SystemTime}; + +/// Struct to hold metric data in the ConfigBag +#[derive(Debug, Clone)] +struct MeasurementsContainer { + call_start: SystemTime, + attempts: u32, + attempt_start: SystemTime, +} + +impl Storable for MeasurementsContainer { + type Storer = StoreReplace; +} + +/// Instruments for recording a single operation +#[derive(Debug, Clone)] +pub(crate) struct MetricsInterceptorInstruments { + pub(crate) operation_duration: Arc, + pub(crate) attempt_duration: Arc, +} + +impl MetricsInterceptorInstruments { + pub(crate) fn new() -> Result { + let meter = get_telemetry_provider()? + .meter_provider() + .get_meter("MetricsInterceptor", None); + + Ok(Self{ + operation_duration: meter + .create_histogram("smithy.client.call.duration") + .set_units("s") + .set_description("Overall call duration (including retries and time to send or receive request and response body)") + .build(), + attempt_duration: meter + .create_histogram("smithy.client.call.attempt.duration") + .set_units("s") + .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)") + .build(), + }) + } +} + +#[derive(Debug)] +pub(crate) struct MetricsInterceptor { + instruments: MetricsInterceptorInstruments, + service_name: Cow<'static, str>, + operation_name: Cow<'static, str>, + // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in + // the read_before_execution hook and that is when we need to start the timer for + // the operation. + time_source: SharedTimeSource, +} + +impl MetricsInterceptor { + pub(crate) fn new( + service_name: impl Into>, + operation_name: impl Into>, + time_source: SharedTimeSource, + ) -> Result { + Ok(MetricsInterceptor { + instruments: MetricsInterceptorInstruments::new()?, + service_name: service_name.into(), + operation_name: operation_name.into(), + time_source, + }) + } + + /// Get [Attributes] containing the service and method name for this operation + fn attributes(&self) -> Attributes { + let mut attributes = Attributes::new(); + attributes.set( + "rpc.service", + AttributeValue::String(self.service_name.clone().into()), + ); + attributes.set( + "rpc.method", + AttributeValue::String(self.operation_name.clone().into()), + ); + + attributes + } +} + +impl Intercept for MetricsInterceptor { + fn name(&self) -> &'static str { + "MetricsInterceptor" + } + + fn read_before_execution( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let mut layer = Layer::new("MetricsInterceptor"); + layer.store_put(MeasurementsContainer { + call_start: self.time_source.now(), + attempts: 0, + attempt_start: SystemTime::UNIX_EPOCH, + }); + cfg.push_layer(layer); + Ok(()) + } + + fn read_after_execution( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let measurements = cfg + .load::() + .expect("set in `read_before_execution`"); + + let call_duration = measurements.call_start.elapsed(); + if let Ok(elapsed) = call_duration { + self.instruments.operation_duration.record( + elapsed.as_secs_f64(), + Some(&self.attributes()), + None, + ); + } + + Ok(()) + } + + fn read_before_attempt( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let measurements = cfg + .get_mut::() + .expect("set in `read_before_execution`"); + + measurements.attempts = measurements.attempts + 1; + measurements.attempt_start = self.time_source.now(); + + Ok(()) + } + + fn read_after_attempt( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let measurements = cfg + .load::() + .expect("set in `read_before_execution`"); + + let attempt_duration = measurements.attempt_start.elapsed(); + let mut attributes = self.attributes(); + attributes.set("attempt", AttributeValue::I64(measurements.attempts.into())); + + if let Ok(elapsed) = attempt_duration { + self.instruments.attempt_duration.record( + elapsed.as_secs_f64(), + Some(&attributes), + None, + ); + } + Ok(()) + } +} + +/// Runtime plugin that adds a [MetricsInterceptor] +#[derive(Debug, Default)] +pub struct MetricsRuntimePlugin { + service: Cow<'static, str>, + operation: Cow<'static, str>, + time_source: SharedTimeSource, +} + +impl MetricsRuntimePlugin { + /// Creates a runtime plugin which installs a [MetricsInterceptor] + pub fn new( + service: impl Into>, + operation: impl Into>, + time_source: SharedTimeSource, + ) -> Self { + Self { + service: service.into(), + operation: operation.into(), + time_source, + } + } +} + +impl RuntimePlugin for MetricsRuntimePlugin { + fn runtime_components( + &self, + _current_components: &RuntimeComponentsBuilder, + ) -> Cow<'_, RuntimeComponentsBuilder> { + let interceptor = MetricsInterceptor::new( + self.service.clone(), + self.operation.clone(), + self.time_source.clone(), + ); + if let Ok(interceptor) = interceptor { + Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor)) + } else { + Cow::Owned(RuntimeComponentsBuilder::new("Metrics")) + } + } +} diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index 756f0b7423..a6a5b7bfd6 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -377,10 +377,13 @@ impl OperationBuilder { pub fn build(self) -> Operation { let service_name = self.service_name.expect("service_name required"); let operation_name = self.operation_name.expect("operation_name required"); - + let time_source = self.runtime_components.time_source().unwrap_or_default(); let mut runtime_plugins = RuntimePlugins::new() .with_client_plugins(default_plugins( - DefaultPluginParams::new().with_retry_partition_name(service_name.clone()), + DefaultPluginParams::new() + .with_retry_partition_name(service_name.clone()) + .with_operation_name(operation_name.clone()) + .with_time_source(time_source), )) .with_client_plugin( StaticRuntimePlugin::new() From b1d4bb708d424cf0b5573edef2316dfd4061b266 Mon Sep 17 00:00:00 2001 From: Landon James Date: Wed, 19 Feb 2025 11:10:26 -0800 Subject: [PATCH 2/9] Updates for CI failures Remove use of SystemTime.elapsed(), fix private type in pub docs, and style updates --- .../aws-smithy-runtime/src/client/metrics.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs index 3819479977..2f2266c68f 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -126,7 +126,8 @@ impl Intercept for MetricsInterceptor { .load::() .expect("set in `read_before_execution`"); - let call_duration = measurements.call_start.elapsed(); + let call_end = self.time_source.now(); + let call_duration = call_end.duration_since(measurements.call_start); if let Ok(elapsed) = call_duration { self.instruments.operation_duration.record( elapsed.as_secs_f64(), @@ -148,7 +149,7 @@ impl Intercept for MetricsInterceptor { .get_mut::() .expect("set in `read_before_execution`"); - measurements.attempts = measurements.attempts + 1; + measurements.attempts += 1; measurements.attempt_start = self.time_source.now(); Ok(()) @@ -164,11 +165,13 @@ impl Intercept for MetricsInterceptor { .load::() .expect("set in `read_before_execution`"); - let attempt_duration = measurements.attempt_start.elapsed(); - let mut attributes = self.attributes(); - attributes.set("attempt", AttributeValue::I64(measurements.attempts.into())); + let attempt_end = self.time_source.now(); + let attempt_duration = attempt_end.duration_since(measurements.attempt_start); if let Ok(elapsed) = attempt_duration { + let mut attributes = self.attributes(); + attributes.set("attempt", AttributeValue::I64(measurements.attempts.into())); + self.instruments.attempt_duration.record( elapsed.as_secs_f64(), Some(&attributes), @@ -179,7 +182,7 @@ impl Intercept for MetricsInterceptor { } } -/// Runtime plugin that adds a [MetricsInterceptor] +/// Runtime plugin that adds an interceptor for collecting metrics #[derive(Debug, Default)] pub struct MetricsRuntimePlugin { service: Cow<'static, str>, @@ -188,7 +191,7 @@ pub struct MetricsRuntimePlugin { } impl MetricsRuntimePlugin { - /// Creates a runtime plugin which installs a [MetricsInterceptor] + /// Creates a runtime plugin which installs an interceptor for collecting metrics pub fn new( service: impl Into>, operation: impl Into>, From 17845fd2a2173975ff944c798afd5509ee950b5e Mon Sep 17 00:00:00 2001 From: Landon James Date: Thu, 27 Feb 2025 10:57:43 -0800 Subject: [PATCH 3/9] Mostly working Metrics interceptor Fails for credentials clients with manually constructed operations --- .../aws-smithy-runtime/src/client/defaults.rs | 21 +--- .../aws-smithy-runtime/src/client/metrics.rs | 102 ++++++++---------- .../src/client/orchestrator/operation.rs | 1 - 3 files changed, 49 insertions(+), 75 deletions(-) diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index 0dbccf8961..10181d533a 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -213,12 +213,8 @@ fn enforce_content_length_runtime_plugin() -> Option { Some(EnforceContentLengthRuntimePlugin::new().into_shared()) } -fn metrics_runtime_plugin( - service: impl Into>, - operation: impl Into>, - time_source: SharedTimeSource, -) -> Option { - Some(MetricsRuntimePlugin::new(service, operation, time_source).into_shared()) +fn metrics_runtime_plugin(time_source: SharedTimeSource) -> Option { + Some(MetricsRuntimePlugin::new(time_source).into_shared()) } fn validate_stalled_stream_protection_config( @@ -258,7 +254,6 @@ pub struct DefaultPluginParams { // The retry_partition_name is also the service_name retry_partition_name: Option>, behavior_version: Option, - operation_name: Option>, time_source: Option, } @@ -280,12 +275,6 @@ impl DefaultPluginParams { self } - /// Sets the operation name. - pub fn with_operation_name(mut self, operation_name: impl Into>) -> Self { - self.operation_name = Some(operation_name.into()); - self - } - /// Sets the time source. pub fn with_time_source(mut self, time_source: SharedTimeSource) -> Self { self.time_source = Some(time_source); @@ -313,11 +302,7 @@ pub fn default_plugins( default_timeout_config_plugin(), enforce_content_length_runtime_plugin(), default_stalled_stream_protection_config_plugin_v2(behavior_version), - metrics_runtime_plugin( - service_name, - params.operation_name.unwrap_or("unknown_operation".into()), - params.time_source.unwrap_or_default(), - ), + metrics_runtime_plugin(params.time_source.unwrap_or_default()), ] .into_iter() .flatten() diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs index 2f2266c68f..4b65b06f71 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -9,7 +9,7 @@ use aws_smithy_observability::{ ObservabilityError, }; use aws_smithy_runtime_api::client::{ - interceptors::Intercept, runtime_components::RuntimeComponentsBuilder, + interceptors::Intercept, orchestrator::Metadata, runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin, }; use aws_smithy_types::config_bag::{Layer, Storable, StoreReplace}; @@ -58,8 +58,6 @@ impl MetricsInterceptorInstruments { #[derive(Debug)] pub(crate) struct MetricsInterceptor { instruments: MetricsInterceptorInstruments, - service_name: Cow<'static, str>, - operation_name: Cow<'static, str>, // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in // the read_before_execution hook and that is when we need to start the timer for // the operation. @@ -67,32 +65,28 @@ pub(crate) struct MetricsInterceptor { } impl MetricsInterceptor { - pub(crate) fn new( - service_name: impl Into>, - operation_name: impl Into>, - time_source: SharedTimeSource, - ) -> Result { + pub(crate) fn new(time_source: SharedTimeSource) -> Result { Ok(MetricsInterceptor { instruments: MetricsInterceptorInstruments::new()?, - service_name: service_name.into(), - operation_name: operation_name.into(), time_source, }) } - /// Get [Attributes] containing the service and method name for this operation - fn attributes(&self) -> Attributes { - let mut attributes = Attributes::new(); - attributes.set( - "rpc.service", - AttributeValue::String(self.service_name.clone().into()), - ); - attributes.set( - "rpc.method", - AttributeValue::String(self.operation_name.clone().into()), - ); - - attributes + pub(crate) fn get_attrs_from_cfg( + &self, + cfg: &aws_smithy_types::config_bag::ConfigBag, + ) -> Option { + let operation_metadata = cfg.load::(); + + if let Some(md) = operation_metadata { + let mut attributes = Attributes::new(); + attributes.set("rpc.service", AttributeValue::String(md.service().into())); + attributes.set("rpc.method", AttributeValue::String(md.name().into())); + + Some(attributes) + } else { + None + } } } @@ -106,6 +100,7 @@ impl Intercept for MetricsInterceptor { _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + println!("METRICS INTERCEPTOR RUNNING: read_before_execution"); let mut layer = Layer::new("MetricsInterceptor"); layer.store_put(MeasurementsContainer { call_start: self.time_source.now(), @@ -122,18 +117,25 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + println!("METRICS INTERCEPTOR RUNNING: read_after_execution"); let measurements = cfg .load::() .expect("set in `read_before_execution`"); - let call_end = self.time_source.now(); - let call_duration = call_end.duration_since(measurements.call_start); - if let Ok(elapsed) = call_duration { - self.instruments.operation_duration.record( - elapsed.as_secs_f64(), - Some(&self.attributes()), - None, - ); + let attributes = self.get_attrs_from_cfg(cfg); + + if let Some(attrs) = attributes { + println!("ATTRS: {:#?}", attrs); + + let call_end = self.time_source.now(); + let call_duration = call_end.duration_since(measurements.call_start); + if let Ok(elapsed) = call_duration { + self.instruments.operation_duration.record( + elapsed.as_secs_f64(), + Some(&attrs), + None, + ); + } } Ok(()) @@ -145,6 +147,8 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + println!("METRICS INTERCEPTOR RUNNING: read_before_attempt"); + let measurements = cfg .get_mut::() .expect("set in `read_before_execution`"); @@ -161,22 +165,22 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + println!("METRICS INTERCEPTOR RUNNING: read_after_attempt"); + let measurements = cfg .load::() .expect("set in `read_before_execution`"); let attempt_end = self.time_source.now(); let attempt_duration = attempt_end.duration_since(measurements.attempt_start); + let attributes = self.get_attrs_from_cfg(cfg); - if let Ok(elapsed) = attempt_duration { - let mut attributes = self.attributes(); - attributes.set("attempt", AttributeValue::I64(measurements.attempts.into())); + if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) { + attrs.set("attempt", AttributeValue::I64(measurements.attempts.into())); - self.instruments.attempt_duration.record( - elapsed.as_secs_f64(), - Some(&attributes), - None, - ); + self.instruments + .attempt_duration + .record(elapsed.as_secs_f64(), Some(&attrs), None); } Ok(()) } @@ -185,23 +189,13 @@ impl Intercept for MetricsInterceptor { /// Runtime plugin that adds an interceptor for collecting metrics #[derive(Debug, Default)] pub struct MetricsRuntimePlugin { - service: Cow<'static, str>, - operation: Cow<'static, str>, time_source: SharedTimeSource, } impl MetricsRuntimePlugin { /// Creates a runtime plugin which installs an interceptor for collecting metrics - pub fn new( - service: impl Into>, - operation: impl Into>, - time_source: SharedTimeSource, - ) -> Self { - Self { - service: service.into(), - operation: operation.into(), - time_source, - } + pub fn new(time_source: SharedTimeSource) -> Self { + Self { time_source } } } @@ -210,11 +204,7 @@ impl RuntimePlugin for MetricsRuntimePlugin { &self, _current_components: &RuntimeComponentsBuilder, ) -> Cow<'_, RuntimeComponentsBuilder> { - let interceptor = MetricsInterceptor::new( - self.service.clone(), - self.operation.clone(), - self.time_source.clone(), - ); + let interceptor = MetricsInterceptor::new(self.time_source.clone()); if let Ok(interceptor) = interceptor { Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor)) } else { diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index a6a5b7bfd6..8f07385996 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -382,7 +382,6 @@ impl OperationBuilder { .with_client_plugins(default_plugins( DefaultPluginParams::new() .with_retry_partition_name(service_name.clone()) - .with_operation_name(operation_name.clone()) .with_time_source(time_source), )) .with_client_plugin( From 6686595845bef68d0027d084ecc9c4c94cd32016 Mon Sep 17 00:00:00 2001 From: Landon James Date: Thu, 27 Feb 2025 14:09:23 -0800 Subject: [PATCH 4/9] Add metadata for manually constructured operations --- rust-runtime/Cargo.lock | 4 ++-- rust-runtime/aws-smithy-observability/Cargo.toml | 2 +- rust-runtime/aws-smithy-runtime/Cargo.toml | 2 +- rust-runtime/aws-smithy-runtime/src/client/metrics.rs | 9 +-------- .../src/client/orchestrator/operation.rs | 6 ++++-- 5 files changed, 9 insertions(+), 14 deletions(-) diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 1c9cd9dd9d..d0ac834d0d 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -536,7 +536,7 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.1.0" +version = "0.1.1" dependencies = [ "aws-smithy-runtime-api", "once_cell", @@ -585,7 +585,7 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.9" +version = "1.7.10" dependencies = [ "approx", "aws-smithy-async", diff --git a/rust-runtime/aws-smithy-observability/Cargo.toml b/rust-runtime/aws-smithy-observability/Cargo.toml index 76bb6d8fe4..4ee320234e 100644 --- a/rust-runtime/aws-smithy-observability/Cargo.toml +++ b/rust-runtime/aws-smithy-observability/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-observability" -version = "0.1.0" +version = "0.1.1" authors = [ "AWS Rust SDK Team ", ] diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index 60f75eaa1c..f07019ae52 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-runtime" -version = "1.7.9" +version = "1.7.10" authors = ["AWS Rust SDK Team ", "Zelda Hessler "] description = "The new smithy runtime crate" edition = "2021" diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs index 4b65b06f71..e73f6b6742 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -100,8 +100,8 @@ impl Intercept for MetricsInterceptor { _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - println!("METRICS INTERCEPTOR RUNNING: read_before_execution"); let mut layer = Layer::new("MetricsInterceptor"); + layer.store_put(MeasurementsContainer { call_start: self.time_source.now(), attempts: 0, @@ -117,7 +117,6 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - println!("METRICS INTERCEPTOR RUNNING: read_after_execution"); let measurements = cfg .load::() .expect("set in `read_before_execution`"); @@ -125,8 +124,6 @@ impl Intercept for MetricsInterceptor { let attributes = self.get_attrs_from_cfg(cfg); if let Some(attrs) = attributes { - println!("ATTRS: {:#?}", attrs); - let call_end = self.time_source.now(); let call_duration = call_end.duration_since(measurements.call_start); if let Ok(elapsed) = call_duration { @@ -147,8 +144,6 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - println!("METRICS INTERCEPTOR RUNNING: read_before_attempt"); - let measurements = cfg .get_mut::() .expect("set in `read_before_execution`"); @@ -165,8 +160,6 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - println!("METRICS INTERCEPTOR RUNNING: read_after_attempt"); - let measurements = cfg .load::() .expect("set in `read_before_execution`"); diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index 8f07385996..25177f18c0 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -21,8 +21,8 @@ use aws_smithy_runtime_api::client::http::HttpClient; use aws_smithy_runtime_api::client::identity::SharedIdentityResolver; use aws_smithy_runtime_api::client::interceptors::context::{Error, Input, Output}; use aws_smithy_runtime_api::client::interceptors::Intercept; -use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError}; +use aws_smithy_runtime_api::client::orchestrator::{HttpResponse, Metadata}; use aws_smithy_runtime_api::client::result::SdkError; use aws_smithy_runtime_api::client::retries::classifiers::ClassifyRetry; use aws_smithy_runtime_api::client::retries::SharedRetryStrategy; @@ -378,6 +378,8 @@ impl OperationBuilder { let service_name = self.service_name.expect("service_name required"); let operation_name = self.operation_name.expect("operation_name required"); let time_source = self.runtime_components.time_source().unwrap_or_default(); + let mut config = self.config; + config.store_put(Metadata::new(operation_name.clone(), service_name.clone())); let mut runtime_plugins = RuntimePlugins::new() .with_client_plugins(default_plugins( DefaultPluginParams::new() @@ -386,7 +388,7 @@ impl OperationBuilder { )) .with_client_plugin( StaticRuntimePlugin::new() - .with_config(self.config.freeze()) + .with_config(config.freeze()) .with_runtime_components(self.runtime_components), ); for runtime_plugin in self.runtime_plugins { From d7a3c3aabdd71b3c31b5ed6dd2da0a09c50b022a Mon Sep 17 00:00:00 2001 From: Landon James Date: Thu, 27 Feb 2025 22:51:36 -0800 Subject: [PATCH 5/9] Address PR comments Add dynamic metrics scope via codegen --- .../client/FluentClientGenerator.kt | 4 ++ rust-runtime/aws-smithy-runtime/src/client.rs | 2 +- .../aws-smithy-runtime/src/client/defaults.rs | 19 ++++- .../aws-smithy-runtime/src/client/metrics.rs | 71 ++++++++++++------- .../src/client/orchestrator/operation.rs | 14 +++- 5 files changed, 81 insertions(+), 29 deletions(-) diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index 4013386183..9da1cfc1b8 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -269,12 +269,16 @@ private fun baseClientRuntimePluginsFn( let default_retry_partition = ${codegenContext.serviceShape.sdkId().dq()}; #{before_plugin_setup} + let scope = "aws.sdk.rust.services.${codegenContext.serviceShape.sdkId()}"; + let mut plugins = #{RuntimePlugins}::new() // defaults .with_client_plugins(#{default_plugins}( #{DefaultPluginParams}::new() .with_retry_partition_name(default_retry_partition) .with_behavior_version(config.behavior_version.expect(${behaviorVersionError.dq()})) + .with_time_source(config.runtime_components.time_source().unwrap_or_default()) + .with_scope(scope) )) // user config .with_client_plugin( diff --git a/rust-runtime/aws-smithy-runtime/src/client.rs b/rust-runtime/aws-smithy-runtime/src/client.rs index f50c8a1df6..045022df75 100644 --- a/rust-runtime/aws-smithy-runtime/src/client.rs +++ b/rust-runtime/aws-smithy-runtime/src/client.rs @@ -55,4 +55,4 @@ pub mod sdk_feature; pub mod waiters; /// Interceptor for collecting client metrics. -pub mod metrics; +mod metrics; diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index 10181d533a..d9e36b4708 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -213,8 +213,11 @@ fn enforce_content_length_runtime_plugin() -> Option { Some(EnforceContentLengthRuntimePlugin::new().into_shared()) } -fn metrics_runtime_plugin(time_source: SharedTimeSource) -> Option { - Some(MetricsRuntimePlugin::new(time_source).into_shared()) +fn metrics_runtime_plugin( + scope: &'static str, + time_source: SharedTimeSource, +) -> Option { + Some(MetricsRuntimePlugin::new(scope, time_source).into_shared()) } fn validate_stalled_stream_protection_config( @@ -255,6 +258,7 @@ pub struct DefaultPluginParams { retry_partition_name: Option>, behavior_version: Option, time_source: Option, + scope: Option<&'static str>, } impl DefaultPluginParams { @@ -280,6 +284,12 @@ impl DefaultPluginParams { self.time_source = Some(time_source); self } + + /// Sets the metrics scope. + pub fn with_scope(mut self, scope: &'static str) -> Self { + self.scope = Some(scope); + self + } } /// All default plugins. @@ -302,7 +312,10 @@ pub fn default_plugins( default_timeout_config_plugin(), enforce_content_length_runtime_plugin(), default_stalled_stream_protection_config_plugin_v2(behavior_version), - metrics_runtime_plugin(params.time_source.unwrap_or_default()), + metrics_runtime_plugin( + params.scope.unwrap_or("aws.sdk.rust.services.unknown"), + params.time_source.unwrap_or_default(), + ), ] .into_iter() .flatten() diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs index e73f6b6742..8123e03474 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -12,12 +12,12 @@ use aws_smithy_runtime_api::client::{ interceptors::Intercept, orchestrator::Metadata, runtime_components::RuntimeComponentsBuilder, runtime_plugin::RuntimePlugin, }; -use aws_smithy_types::config_bag::{Layer, Storable, StoreReplace}; +use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace}; use std::{borrow::Cow, sync::Arc, time::SystemTime}; /// Struct to hold metric data in the ConfigBag #[derive(Debug, Clone)] -struct MeasurementsContainer { +pub(crate) struct MeasurementsContainer { call_start: SystemTime, attempts: u32, attempt_start: SystemTime, @@ -35,10 +35,10 @@ pub(crate) struct MetricsInterceptorInstruments { } impl MetricsInterceptorInstruments { - pub(crate) fn new() -> Result { + pub(crate) fn new(scope: &'static str) -> Result { let meter = get_telemetry_provider()? .meter_provider() - .get_meter("MetricsInterceptor", None); + .get_meter(scope, None); Ok(Self{ operation_duration: meter @@ -55,9 +55,12 @@ impl MetricsInterceptorInstruments { } } +impl Storable for MetricsInterceptorInstruments { + type Storer = StoreReplace; +} + #[derive(Debug)] pub(crate) struct MetricsInterceptor { - instruments: MetricsInterceptorInstruments, // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in // the read_before_execution hook and that is when we need to start the timer for // the operation. @@ -66,10 +69,7 @@ pub(crate) struct MetricsInterceptor { impl MetricsInterceptor { pub(crate) fn new(time_source: SharedTimeSource) -> Result { - Ok(MetricsInterceptor { - instruments: MetricsInterceptorInstruments::new()?, - time_source, - }) + Ok(MetricsInterceptor { time_source }) } pub(crate) fn get_attrs_from_cfg( @@ -88,6 +88,21 @@ impl MetricsInterceptor { None } } + + pub(crate) fn get_measurements_and_instruments<'a>( + &self, + cfg: &'a aws_smithy_types::config_bag::ConfigBag, + ) -> (&'a MeasurementsContainer, &'a MetricsInterceptorInstruments) { + let measurements = cfg + .load::() + .expect("set in `read_before_execution`"); + + let instruments = cfg + .load::() + .expect("set in RuntimePlugin"); + + (measurements, instruments) + } } impl Intercept for MetricsInterceptor { @@ -117,9 +132,7 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - let measurements = cfg - .load::() - .expect("set in `read_before_execution`"); + let (measurements, instruments) = self.get_measurements_and_instruments(cfg); let attributes = self.get_attrs_from_cfg(cfg); @@ -127,11 +140,9 @@ impl Intercept for MetricsInterceptor { let call_end = self.time_source.now(); let call_duration = call_end.duration_since(measurements.call_start); if let Ok(elapsed) = call_duration { - self.instruments.operation_duration.record( - elapsed.as_secs_f64(), - Some(&attrs), - None, - ); + instruments + .operation_duration + .record(elapsed.as_secs_f64(), Some(&attrs), None); } } @@ -160,9 +171,7 @@ impl Intercept for MetricsInterceptor { _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, cfg: &mut aws_smithy_types::config_bag::ConfigBag, ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { - let measurements = cfg - .load::() - .expect("set in `read_before_execution`"); + let (measurements, instruments) = self.get_measurements_and_instruments(cfg); let attempt_end = self.time_source.now(); let attempt_duration = attempt_end.duration_since(measurements.attempt_start); @@ -171,7 +180,7 @@ impl Intercept for MetricsInterceptor { if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) { attrs.set("attempt", AttributeValue::I64(measurements.attempts.into())); - self.instruments + instruments .attempt_duration .record(elapsed.as_secs_f64(), Some(&attrs), None); } @@ -181,14 +190,15 @@ impl Intercept for MetricsInterceptor { /// Runtime plugin that adds an interceptor for collecting metrics #[derive(Debug, Default)] -pub struct MetricsRuntimePlugin { +pub(crate) struct MetricsRuntimePlugin { + scope: &'static str, time_source: SharedTimeSource, } impl MetricsRuntimePlugin { /// Creates a runtime plugin which installs an interceptor for collecting metrics - pub fn new(time_source: SharedTimeSource) -> Self { - Self { time_source } + pub(crate) fn new(scope: &'static str, time_source: SharedTimeSource) -> Self { + Self { scope, time_source } } } @@ -204,4 +214,17 @@ impl RuntimePlugin for MetricsRuntimePlugin { Cow::Owned(RuntimeComponentsBuilder::new("Metrics")) } } + + fn config(&self) -> Option { + let mut cfg = Layer::new("MetricsInstruments"); + + let instruments = MetricsInterceptorInstruments::new(self.scope); + + if let Ok(instruments) = instruments { + cfg.store_put(instruments); + Some(cfg.freeze()) + } else { + None + } + } } diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index 25177f18c0..98c2bd0b1c 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -165,6 +165,7 @@ pub struct OperationBuilder { config: Layer, runtime_components: RuntimeComponentsBuilder, runtime_plugins: Vec, + scope: Option<&'static str>, _phantom: PhantomData<(I, O, E)>, } @@ -183,6 +184,7 @@ impl OperationBuilder<(), (), ()> { config: Layer::new("operation"), runtime_components: RuntimeComponentsBuilder::new("operation"), runtime_plugins: Vec::new(), + scope: None, _phantom: Default::default(), } } @@ -201,6 +203,12 @@ impl OperationBuilder { self } + /// Configures the metrics scope for the builder. + pub fn scope(mut self, scope: &'static str) -> Self { + self.scope = Some(scope); + self + } + /// Configures the http client for the builder. pub fn http_client(mut self, connector: impl HttpClient + 'static) -> Self { self.runtime_components.set_http_client(Some(connector)); @@ -320,6 +328,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -346,6 +355,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -369,6 +379,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -384,7 +395,8 @@ impl OperationBuilder { .with_client_plugins(default_plugins( DefaultPluginParams::new() .with_retry_partition_name(service_name.clone()) - .with_time_source(time_source), + .with_time_source(time_source) + .with_scope(self.scope.unwrap_or("aws.sdk.rust.services.unknown")), )) .with_client_plugin( StaticRuntimePlugin::new() From a8797d7a0945f55c5ebb455d5695df02874c897b Mon Sep 17 00:00:00 2001 From: Landon James Date: Fri, 28 Feb 2025 20:54:44 -0800 Subject: [PATCH 6/9] Add operation metadata for IMDS/http credentials --- aws/rust-runtime/aws-config/Cargo.lock | 13 +++++++++++-- .../aws-config/src/http_credential_provider.rs | 5 +++-- aws/rust-runtime/aws-config/src/imds/client.rs | 3 ++- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/aws/rust-runtime/aws-config/Cargo.lock b/aws/rust-runtime/aws-config/Cargo.lock index 865c24fad1..d7051b43b2 100644 --- a/aws/rust-runtime/aws-config/Cargo.lock +++ b/aws/rust-runtime/aws-config/Cargo.lock @@ -174,7 +174,7 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.8" +version = "1.2.9" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -227,6 +227,14 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.1" +dependencies = [ + "aws-smithy-runtime-api", + "once_cell", +] + [[package]] name = "aws-smithy-protocol-test" version = "0.63.0" @@ -254,10 +262,11 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.10" dependencies = [ "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-protocol-test", "aws-smithy-runtime-api", "aws-smithy-types", diff --git a/aws/rust-runtime/aws-config/src/http_credential_provider.rs b/aws/rust-runtime/aws-config/src/http_credential_provider.rs index 41911e16f0..3951463939 100644 --- a/aws/rust-runtime/aws-config/src/http_credential_provider.rs +++ b/aws/rust-runtime/aws-config/src/http_credential_provider.rs @@ -19,7 +19,7 @@ use aws_smithy_runtime::client::retries::classifiers::{ use aws_smithy_runtime_api::client::http::HttpConnectorSettings; use aws_smithy_runtime_api::client::interceptors::context::{Error, InterceptorContext}; use aws_smithy_runtime_api::client::orchestrator::{ - HttpResponse, OrchestratorError, SensitiveOutput, + HttpResponse, Metadata, OrchestratorError, SensitiveOutput, }; use aws_smithy_runtime_api::client::result::SdkError; use aws_smithy_runtime_api::client::retries::classifiers::ClassifyRetry; @@ -88,6 +88,7 @@ impl Builder { path: impl Into, ) -> HttpCredentialProvider { let provider_config = self.provider_config.unwrap_or_default(); + let path = path.into(); let mut builder = Operation::builder() .service_name("HttpCredentialProvider") @@ -104,6 +105,7 @@ impl Builder { .runtime_plugin(StaticRuntimePlugin::new().with_config({ let mut layer = Layer::new("SensitiveOutput"); layer.store_put(SensitiveOutput); + layer.store_put(Metadata::new(path.clone(), provider_name)); layer.freeze() })); if let Some(http_client) = provider_config.http_client() { @@ -126,7 +128,6 @@ impl Builder { } else { builder = builder.no_retry(); } - let path = path.into(); let operation = builder .serializer(move |input: HttpProviderAuth| { let mut http_req = http::Request::builder() diff --git a/aws/rust-runtime/aws-config/src/imds/client.rs b/aws/rust-runtime/aws-config/src/imds/client.rs index 3f23cfdc99..3dfed3fec4 100644 --- a/aws/rust-runtime/aws-config/src/imds/client.rs +++ b/aws/rust-runtime/aws-config/src/imds/client.rs @@ -21,7 +21,7 @@ use aws_smithy_runtime_api::client::endpoint::{ }; use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; use aws_smithy_runtime_api::client::orchestrator::{ - HttpRequest, OrchestratorError, SensitiveOutput, + HttpRequest, Metadata, OrchestratorError, SensitiveOutput, }; use aws_smithy_runtime_api::client::result::ConnectorError; use aws_smithy_runtime_api::client::result::SdkError; @@ -250,6 +250,7 @@ impl ImdsCommonRuntimePlugin { layer.store_put(retry_config); layer.store_put(timeout_config); layer.store_put(user_agent()); + layer.store_put(Metadata::new("get_credentials", "imds")); Self { config: layer.freeze(), From 567cab6984161f4c5761d9e85ea5492703a773db Mon Sep 17 00:00:00 2001 From: Landon James Date: Wed, 5 Mar 2025 11:03:33 -0800 Subject: [PATCH 7/9] Add integration tests for metrics --- aws/sdk/integration-tests/Cargo.toml | 1 + aws/sdk/integration-tests/metrics/Cargo.toml | 25 +++ .../metrics/tests/metrics.rs | 207 ++++++++++++++++++ rust-runtime/Cargo.lock | 2 +- rust-runtime/aws-smithy-runtime/Cargo.toml | 2 +- .../aws-smithy-runtime/src/client/defaults.rs | 9 +- 6 files changed, 240 insertions(+), 6 deletions(-) create mode 100644 aws/sdk/integration-tests/metrics/Cargo.toml create mode 100644 aws/sdk/integration-tests/metrics/tests/metrics.rs diff --git a/aws/sdk/integration-tests/Cargo.toml b/aws/sdk/integration-tests/Cargo.toml index 6b6c3b4d0d..040d3d9432 100644 --- a/aws/sdk/integration-tests/Cargo.toml +++ b/aws/sdk/integration-tests/Cargo.toml @@ -11,6 +11,7 @@ members = [ "iam", "kms", "lambda", + "metrics", "no-default-features", "polly", "qldbsession", diff --git a/aws/sdk/integration-tests/metrics/Cargo.toml b/aws/sdk/integration-tests/metrics/Cargo.toml new file mode 100644 index 0000000000..24841e3a73 --- /dev/null +++ b/aws/sdk/integration-tests/metrics/Cargo.toml @@ -0,0 +1,25 @@ +# This Cargo.toml is unused in generated code. It exists solely to enable these tests to compile in-situ +[package] +name = "metrics-tests" +version = "0.1.0" +authors = ["AWS Rust SDK Team "] +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/smithy-lang/smithy-rs" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dev-dependencies] +aws-config = { path = "../../build/aws-sdk/sdk/aws-config", features = ["test-util", "behavior-version-latest"] } +aws-sdk-dynamodb = { path = "../../build/aws-sdk/sdk/dynamodb", features = ["test-util", "behavior-version-latest"] } +aws-sdk-s3 = { path = "../../build/aws-sdk/sdk/s3", features = ["test-util", "behavior-version-latest"] } +aws-smithy-observability = { path = "../../build/aws-sdk/sdk/aws-smithy-observability" } +aws-smithy-observability-otel = { path = "../../build/aws-sdk/sdk/aws-smithy-observability-otel" } +aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["client", "test-util"]} +aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" } +http = "0.2.0" +opentelemetry = { version = "0.26.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.26.0", features = ["metrics", "testing"] } +serial_test = "3.1.1" +tokio = { version = "1.23.1", features = ["full", "test-util"] } diff --git a/aws/sdk/integration-tests/metrics/tests/metrics.rs b/aws/sdk/integration-tests/metrics/tests/metrics.rs new file mode 100644 index 0000000000..8f738fb2e6 --- /dev/null +++ b/aws/sdk/integration-tests/metrics/tests/metrics.rs @@ -0,0 +1,207 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_config::retry::RetryConfig; +use aws_config::SdkConfig; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; +use aws_smithy_observability::global::set_telemetry_provider; +use aws_smithy_observability::TelemetryProvider; +use aws_smithy_observability_otel::meter::OtelMeterProvider; +use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; +use aws_smithy_types::body::SdkBody; +use opentelemetry::KeyValue; +use opentelemetry_sdk::metrics::data::{Histogram, ResourceMetrics}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; +use serial_test::serial; +use std::borrow::Cow; +use std::sync::Arc; + +// Note, all of these tests are written with a multi threaded runtime since OTel requires that to work +// and they are all run serially since they touch global state +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn service_clients_get_unique_scope_names() { + let (meter_provider, exporter) = init_metrics(); + let config = make_config(false); + make_s3_call(&config).await; + make_ddb_call(&config).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + let scope_names: &Vec> = &finished_metrics[0] + .scope_metrics + .iter() + .map(|scope_metric| scope_metric.scope.clone().name) + .collect(); + + // Metrics aren't necessarily aggregated in the order they were first emitted + assert!(scope_names.contains(&Cow::from("aws.sdk.rust.services.s3"))); + assert!(scope_names.contains(&Cow::from("aws.sdk.rust.services.dynamodb"))); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn correct_metrics_collected() { + let (meter_provider, exporter) = init_metrics(); + make_s3_call(&make_config(false)).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + let extracted_metric_names: &Vec> = &finished_metrics[0].scope_metrics[0] + .metrics + .iter() + .map(|metric| metric.name.clone()) + .collect(); + + // Correct metric names emitted + assert!(extracted_metric_names.contains(&Cow::from("smithy.client.call.duration"))); + assert!(extracted_metric_names.contains(&Cow::from("smithy.client.call.attempt.duration"))); + + let call_duration = + extract_metric_data::>(&finished_metrics, "smithy.client.call.duration") + .data_points[0] + .sum; + + let attempt_duration = extract_metric_data::>( + &finished_metrics, + "smithy.client.call.attempt.duration", + ) + .data_points[0] + .sum; + + // Both metrics have a non-zero value + assert!(call_duration > 0.0); + assert!(attempt_duration > 0.0); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn metrics_have_expected_attributes() { + let (meter_provider, exporter) = init_metrics(); + make_s3_call(&make_config(true)).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + // Both metrics contain the method and service attributes + let call_duration_attributes = + extract_metric_attributes(&finished_metrics, "smithy.client.call.duration"); + assert!(call_duration_attributes[0].contains(&KeyValue::new("rpc.method", "GetObject"))); + assert!(call_duration_attributes[0].contains(&KeyValue::new("rpc.service", "s3"))); + + let attempt_duration_attributes = + extract_metric_attributes(&finished_metrics, "smithy.client.call.attempt.duration"); + assert!(attempt_duration_attributes[0].contains(&KeyValue::new("rpc.method", "GetObject"))); + assert!(attempt_duration_attributes[0].contains(&KeyValue::new("rpc.service", "s3"))); + + // The attempt metric contains an attempt counter attribute that correctly increments + assert!(attempt_duration_attributes + .iter() + .find(|attrs| attrs.contains(&KeyValue::new("attempt", 1))) + .is_some()); + assert!(attempt_duration_attributes + .iter() + .find(|attrs| attrs.contains(&KeyValue::new("attempt", 2))) + .is_some()); +} + +// Util functions +fn init_metrics() -> (Arc, InMemoryMetricsExporter) { + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + let otel_mp = SdkMeterProvider::builder().with_reader(reader).build(); + + let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp)); + let sdk_ref = sdk_mp.clone(); + let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build(); + + let _ = set_telemetry_provider(sdk_tp); + + (sdk_ref, exporter) +} + +fn new_replay_client(num_requests: usize, with_retry: bool) -> StaticReplayClient { + let mut events = Vec::with_capacity(num_requests); + let mut start = 0; + + if with_retry { + events.push(ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(500) + .body(SdkBody::empty()) + .unwrap(), + )); + start += 1; + } + + for _ in start..num_requests { + events.push(ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder().body(SdkBody::empty()).unwrap(), + )) + } + StaticReplayClient::new(events) +} + +fn extract_metric_data<'a, T: 'static>( + metrics: &'a Vec, + metric_name: &str, +) -> &'a T { + &metrics[0].scope_metrics[0] + .metrics + .iter() + .find(|metric| metric.name == metric_name) + .unwrap() + .data + .as_any() + .downcast_ref::() + .unwrap() +} + +fn extract_metric_attributes<'a>( + metrics: &'a Vec, + metric_name: &str, +) -> Vec> { + extract_metric_data::>(metrics, metric_name) + .data_points + .iter() + .map(|dp| dp.attributes.clone()) + .collect() +} + +async fn make_s3_call(config: &SdkConfig) { + let s3_client = aws_sdk_s3::Client::new(config); + let _ = s3_client + .get_object() + .bucket("some-test-bucket") + .key("test.txt") + .send() + .await; +} + +async fn make_ddb_call(config: &SdkConfig) { + let ddb_client = aws_sdk_dynamodb::Client::new(&config); + let _ = ddb_client + .get_item() + .table_name("test-table") + .key("foo", AttributeValue::Bool(true)) + .send() + .await; +} + +fn make_config(with_retry: bool) -> SdkConfig { + SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests())) + .region(Region::new("us-east-1")) + .http_client(new_replay_client(2, with_retry)) + .retry_config(RetryConfig::standard()) + .build() +} diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 2acbb5e62a..205bf20810 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -585,7 +585,7 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.10" +version = "1.7.9" dependencies = [ "approx", "aws-smithy-async", diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index f07019ae52..60f75eaa1c 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-runtime" -version = "1.7.10" +version = "1.7.9" authors = ["AWS Rust SDK Team ", "Zelda Hessler "] description = "The new smithy runtime crate" edition = "2021" diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index d9e36b4708..c6788dddd4 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -299,14 +299,15 @@ pub fn default_plugins( let behavior_version = params .behavior_version .unwrap_or_else(BehaviorVersion::latest); - let service_name = params - .retry_partition_name - .expect("retry_partition_name is required"); [ default_http_client_plugin(), default_identity_cache_plugin(), - default_retry_config_plugin(service_name.clone()), + default_retry_config_plugin( + params + .retry_partition_name + .expect("retry_partition_name is required"), + ), default_sleep_impl_plugin(), default_time_source_plugin(), default_timeout_config_plugin(), From 745b7ab25b8d227a2a0d3a3fd3cbcc2730949fd1 Mon Sep 17 00:00:00 2001 From: Landon James Date: Wed, 5 Mar 2025 11:51:58 -0800 Subject: [PATCH 8/9] Bump aws-config version --- aws/rust-runtime/aws-config/Cargo.lock | 6 +++--- aws/rust-runtime/aws-config/Cargo.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aws/rust-runtime/aws-config/Cargo.lock b/aws/rust-runtime/aws-config/Cargo.lock index d7051b43b2..68ffff727e 100644 --- a/aws/rust-runtime/aws-config/Cargo.lock +++ b/aws/rust-runtime/aws-config/Cargo.lock @@ -45,7 +45,7 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.18" +version = "1.5.19" dependencies = [ "aws-credential-types", "aws-runtime", @@ -204,7 +204,7 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.12" +version = "0.61.1" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -262,7 +262,7 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.10" +version = "1.7.9" dependencies = [ "aws-smithy-async", "aws-smithy-http", diff --git a/aws/rust-runtime/aws-config/Cargo.toml b/aws/rust-runtime/aws-config/Cargo.toml index 2fda830ae0..c99583082f 100644 --- a/aws/rust-runtime/aws-config/Cargo.toml +++ b/aws/rust-runtime/aws-config/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-config" -version = "1.5.18" +version = "1.5.19" authors = [ "AWS Rust SDK Team ", "Russell Cohen ", From 3f75ec208ae7add1865e96cbf5b6b312e0d583ab Mon Sep 17 00:00:00 2001 From: Landon James Date: Thu, 6 Mar 2025 14:19:59 -0800 Subject: [PATCH 9/9] PR comments --- rust-runtime/aws-smithy-runtime/src/client/defaults.rs | 6 +++--- rust-runtime/aws-smithy-runtime/src/client/metrics.rs | 3 +-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index c6788dddd4..dce6c038f6 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider; use crate::client::retries::strategy::StandardRetryStrategy; use crate::client::retries::RetryPartition; use aws_smithy_async::rt::sleep::default_async_sleep; -use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource}; +use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource, TimeSource}; use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion; use aws_smithy_runtime_api::client::http::SharedHttpClient; @@ -280,8 +280,8 @@ impl DefaultPluginParams { } /// Sets the time source. - pub fn with_time_source(mut self, time_source: SharedTimeSource) -> Self { - self.time_source = Some(time_source); + pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self { + self.time_source = Some(SharedTimeSource::new(time_source)); self } diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs index 8123e03474..bc9fe47575 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -216,11 +216,10 @@ impl RuntimePlugin for MetricsRuntimePlugin { } fn config(&self) -> Option { - let mut cfg = Layer::new("MetricsInstruments"); - let instruments = MetricsInterceptorInstruments::new(self.scope); if let Ok(instruments) = instruments { + let mut cfg = Layer::new("MetricsInstruments"); cfg.store_put(instruments); Some(cfg.freeze()) } else {