diff --git a/aws/rust-runtime/Cargo.lock b/aws/rust-runtime/Cargo.lock index 6902fcea4d..40597f0a7f 100644 --- a/aws/rust-runtime/Cargo.lock +++ b/aws/rust-runtime/Cargo.lock @@ -273,6 +273,14 @@ dependencies = [ "tracing", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.0" +dependencies = [ + "aws-smithy-runtime-api", + "once_cell", +] + [[package]] name = "aws-smithy-protocol-test" version = "0.63.0" @@ -292,10 +300,11 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" dependencies = [ "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-runtime-api", "aws-smithy-types", "bytes", diff --git a/aws/rust-runtime/aws-config/Cargo.lock b/aws/rust-runtime/aws-config/Cargo.lock index 865c24fad1..68ffff727e 100644 --- a/aws/rust-runtime/aws-config/Cargo.lock +++ b/aws/rust-runtime/aws-config/Cargo.lock @@ -45,7 +45,7 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.18" +version = "1.5.19" dependencies = [ "aws-credential-types", "aws-runtime", @@ -174,7 +174,7 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.8" +version = "1.2.9" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -204,7 +204,7 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.60.12" +version = "0.61.1" dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", @@ -227,6 +227,14 @@ dependencies = [ "aws-smithy-types", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.1" +dependencies = [ + "aws-smithy-runtime-api", + "once_cell", +] + [[package]] name = "aws-smithy-protocol-test" version = "0.63.0" @@ -254,10 +262,11 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" dependencies = [ "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-protocol-test", "aws-smithy-runtime-api", "aws-smithy-types", diff --git a/aws/rust-runtime/aws-config/Cargo.toml b/aws/rust-runtime/aws-config/Cargo.toml index 2fda830ae0..c99583082f 100644 --- a/aws/rust-runtime/aws-config/Cargo.toml +++ b/aws/rust-runtime/aws-config/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-config" -version = "1.5.18" +version = "1.5.19" authors = [ "AWS Rust SDK Team ", "Russell Cohen ", diff --git a/aws/rust-runtime/aws-config/src/http_credential_provider.rs b/aws/rust-runtime/aws-config/src/http_credential_provider.rs index 41911e16f0..3951463939 100644 --- a/aws/rust-runtime/aws-config/src/http_credential_provider.rs +++ b/aws/rust-runtime/aws-config/src/http_credential_provider.rs @@ -19,7 +19,7 @@ use aws_smithy_runtime::client::retries::classifiers::{ use aws_smithy_runtime_api::client::http::HttpConnectorSettings; use aws_smithy_runtime_api::client::interceptors::context::{Error, InterceptorContext}; use aws_smithy_runtime_api::client::orchestrator::{ - HttpResponse, OrchestratorError, SensitiveOutput, + HttpResponse, Metadata, OrchestratorError, SensitiveOutput, }; use aws_smithy_runtime_api::client::result::SdkError; use aws_smithy_runtime_api::client::retries::classifiers::ClassifyRetry; @@ -88,6 +88,7 @@ impl Builder { path: impl Into, ) -> HttpCredentialProvider { let provider_config = self.provider_config.unwrap_or_default(); + let path = path.into(); let mut builder = Operation::builder() .service_name("HttpCredentialProvider") @@ -104,6 +105,7 @@ impl Builder { .runtime_plugin(StaticRuntimePlugin::new().with_config({ let mut layer = Layer::new("SensitiveOutput"); layer.store_put(SensitiveOutput); + layer.store_put(Metadata::new(path.clone(), provider_name)); layer.freeze() })); if let Some(http_client) = provider_config.http_client() { @@ -126,7 +128,6 @@ impl Builder { } else { builder = builder.no_retry(); } - let path = path.into(); let operation = builder .serializer(move |input: HttpProviderAuth| { let mut http_req = http::Request::builder() diff --git a/aws/rust-runtime/aws-config/src/imds/client.rs b/aws/rust-runtime/aws-config/src/imds/client.rs index 3f23cfdc99..3dfed3fec4 100644 --- a/aws/rust-runtime/aws-config/src/imds/client.rs +++ b/aws/rust-runtime/aws-config/src/imds/client.rs @@ -21,7 +21,7 @@ use aws_smithy_runtime_api::client::endpoint::{ }; use aws_smithy_runtime_api::client::interceptors::context::InterceptorContext; use aws_smithy_runtime_api::client::orchestrator::{ - HttpRequest, OrchestratorError, SensitiveOutput, + HttpRequest, Metadata, OrchestratorError, SensitiveOutput, }; use aws_smithy_runtime_api::client::result::ConnectorError; use aws_smithy_runtime_api::client::result::SdkError; @@ -250,6 +250,7 @@ impl ImdsCommonRuntimePlugin { layer.store_put(retry_config); layer.store_put(timeout_config); layer.store_put(user_agent()); + layer.store_put(Metadata::new("get_credentials", "imds")); Self { config: layer.freeze(), diff --git a/aws/sdk/integration-tests/Cargo.toml b/aws/sdk/integration-tests/Cargo.toml index 6b6c3b4d0d..040d3d9432 100644 --- a/aws/sdk/integration-tests/Cargo.toml +++ b/aws/sdk/integration-tests/Cargo.toml @@ -11,6 +11,7 @@ members = [ "iam", "kms", "lambda", + "metrics", "no-default-features", "polly", "qldbsession", diff --git a/aws/sdk/integration-tests/metrics/Cargo.toml b/aws/sdk/integration-tests/metrics/Cargo.toml new file mode 100644 index 0000000000..24841e3a73 --- /dev/null +++ b/aws/sdk/integration-tests/metrics/Cargo.toml @@ -0,0 +1,25 @@ +# This Cargo.toml is unused in generated code. It exists solely to enable these tests to compile in-situ +[package] +name = "metrics-tests" +version = "0.1.0" +authors = ["AWS Rust SDK Team "] +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/smithy-lang/smithy-rs" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dev-dependencies] +aws-config = { path = "../../build/aws-sdk/sdk/aws-config", features = ["test-util", "behavior-version-latest"] } +aws-sdk-dynamodb = { path = "../../build/aws-sdk/sdk/dynamodb", features = ["test-util", "behavior-version-latest"] } +aws-sdk-s3 = { path = "../../build/aws-sdk/sdk/s3", features = ["test-util", "behavior-version-latest"] } +aws-smithy-observability = { path = "../../build/aws-sdk/sdk/aws-smithy-observability" } +aws-smithy-observability-otel = { path = "../../build/aws-sdk/sdk/aws-smithy-observability-otel" } +aws-smithy-runtime = { path = "../../build/aws-sdk/sdk/aws-smithy-runtime", features = ["client", "test-util"]} +aws-smithy-types = { path = "../../build/aws-sdk/sdk/aws-smithy-types" } +http = "0.2.0" +opentelemetry = { version = "0.26.0", features = ["metrics"] } +opentelemetry_sdk = { version = "0.26.0", features = ["metrics", "testing"] } +serial_test = "3.1.1" +tokio = { version = "1.23.1", features = ["full", "test-util"] } diff --git a/aws/sdk/integration-tests/metrics/tests/metrics.rs b/aws/sdk/integration-tests/metrics/tests/metrics.rs new file mode 100644 index 0000000000..8f738fb2e6 --- /dev/null +++ b/aws/sdk/integration-tests/metrics/tests/metrics.rs @@ -0,0 +1,207 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_config::retry::RetryConfig; +use aws_config::SdkConfig; +use aws_sdk_dynamodb::types::AttributeValue; +use aws_sdk_s3::config::{Credentials, Region, SharedCredentialsProvider}; +use aws_smithy_observability::global::set_telemetry_provider; +use aws_smithy_observability::TelemetryProvider; +use aws_smithy_observability_otel::meter::OtelMeterProvider; +use aws_smithy_runtime::client::http::test_util::{ReplayEvent, StaticReplayClient}; +use aws_smithy_types::body::SdkBody; +use opentelemetry::KeyValue; +use opentelemetry_sdk::metrics::data::{Histogram, ResourceMetrics}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; +use serial_test::serial; +use std::borrow::Cow; +use std::sync::Arc; + +// Note, all of these tests are written with a multi threaded runtime since OTel requires that to work +// and they are all run serially since they touch global state +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn service_clients_get_unique_scope_names() { + let (meter_provider, exporter) = init_metrics(); + let config = make_config(false); + make_s3_call(&config).await; + make_ddb_call(&config).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + let scope_names: &Vec> = &finished_metrics[0] + .scope_metrics + .iter() + .map(|scope_metric| scope_metric.scope.clone().name) + .collect(); + + // Metrics aren't necessarily aggregated in the order they were first emitted + assert!(scope_names.contains(&Cow::from("aws.sdk.rust.services.s3"))); + assert!(scope_names.contains(&Cow::from("aws.sdk.rust.services.dynamodb"))); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn correct_metrics_collected() { + let (meter_provider, exporter) = init_metrics(); + make_s3_call(&make_config(false)).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + let extracted_metric_names: &Vec> = &finished_metrics[0].scope_metrics[0] + .metrics + .iter() + .map(|metric| metric.name.clone()) + .collect(); + + // Correct metric names emitted + assert!(extracted_metric_names.contains(&Cow::from("smithy.client.call.duration"))); + assert!(extracted_metric_names.contains(&Cow::from("smithy.client.call.attempt.duration"))); + + let call_duration = + extract_metric_data::>(&finished_metrics, "smithy.client.call.duration") + .data_points[0] + .sum; + + let attempt_duration = extract_metric_data::>( + &finished_metrics, + "smithy.client.call.attempt.duration", + ) + .data_points[0] + .sum; + + // Both metrics have a non-zero value + assert!(call_duration > 0.0); + assert!(attempt_duration > 0.0); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +#[serial] +async fn metrics_have_expected_attributes() { + let (meter_provider, exporter) = init_metrics(); + make_s3_call(&make_config(true)).await; + + meter_provider.flush().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + // Both metrics contain the method and service attributes + let call_duration_attributes = + extract_metric_attributes(&finished_metrics, "smithy.client.call.duration"); + assert!(call_duration_attributes[0].contains(&KeyValue::new("rpc.method", "GetObject"))); + assert!(call_duration_attributes[0].contains(&KeyValue::new("rpc.service", "s3"))); + + let attempt_duration_attributes = + extract_metric_attributes(&finished_metrics, "smithy.client.call.attempt.duration"); + assert!(attempt_duration_attributes[0].contains(&KeyValue::new("rpc.method", "GetObject"))); + assert!(attempt_duration_attributes[0].contains(&KeyValue::new("rpc.service", "s3"))); + + // The attempt metric contains an attempt counter attribute that correctly increments + assert!(attempt_duration_attributes + .iter() + .find(|attrs| attrs.contains(&KeyValue::new("attempt", 1))) + .is_some()); + assert!(attempt_duration_attributes + .iter() + .find(|attrs| attrs.contains(&KeyValue::new("attempt", 2))) + .is_some()); +} + +// Util functions +fn init_metrics() -> (Arc, InMemoryMetricsExporter) { + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + let otel_mp = SdkMeterProvider::builder().with_reader(reader).build(); + + let sdk_mp = Arc::new(OtelMeterProvider::new(otel_mp)); + let sdk_ref = sdk_mp.clone(); + let sdk_tp = TelemetryProvider::builder().meter_provider(sdk_mp).build(); + + let _ = set_telemetry_provider(sdk_tp); + + (sdk_ref, exporter) +} + +fn new_replay_client(num_requests: usize, with_retry: bool) -> StaticReplayClient { + let mut events = Vec::with_capacity(num_requests); + let mut start = 0; + + if with_retry { + events.push(ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder() + .status(500) + .body(SdkBody::empty()) + .unwrap(), + )); + start += 1; + } + + for _ in start..num_requests { + events.push(ReplayEvent::new( + http::Request::builder().body(SdkBody::empty()).unwrap(), + http::Response::builder().body(SdkBody::empty()).unwrap(), + )) + } + StaticReplayClient::new(events) +} + +fn extract_metric_data<'a, T: 'static>( + metrics: &'a Vec, + metric_name: &str, +) -> &'a T { + &metrics[0].scope_metrics[0] + .metrics + .iter() + .find(|metric| metric.name == metric_name) + .unwrap() + .data + .as_any() + .downcast_ref::() + .unwrap() +} + +fn extract_metric_attributes<'a>( + metrics: &'a Vec, + metric_name: &str, +) -> Vec> { + extract_metric_data::>(metrics, metric_name) + .data_points + .iter() + .map(|dp| dp.attributes.clone()) + .collect() +} + +async fn make_s3_call(config: &SdkConfig) { + let s3_client = aws_sdk_s3::Client::new(config); + let _ = s3_client + .get_object() + .bucket("some-test-bucket") + .key("test.txt") + .send() + .await; +} + +async fn make_ddb_call(config: &SdkConfig) { + let ddb_client = aws_sdk_dynamodb::Client::new(&config); + let _ = ddb_client + .get_item() + .table_name("test-table") + .key("foo", AttributeValue::Bool(true)) + .send() + .await; +} + +fn make_config(with_retry: bool) -> SdkConfig { + SdkConfig::builder() + .credentials_provider(SharedCredentialsProvider::new(Credentials::for_tests())) + .region(Region::new("us-east-1")) + .http_client(new_replay_client(2, with_retry)) + .retry_config(RetryConfig::standard()) + .build() +} diff --git a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt index 4013386183..9da1cfc1b8 100644 --- a/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt +++ b/codegen-client/src/main/kotlin/software/amazon/smithy/rust/codegen/client/smithy/generators/client/FluentClientGenerator.kt @@ -269,12 +269,16 @@ private fun baseClientRuntimePluginsFn( let default_retry_partition = ${codegenContext.serviceShape.sdkId().dq()}; #{before_plugin_setup} + let scope = "aws.sdk.rust.services.${codegenContext.serviceShape.sdkId()}"; + let mut plugins = #{RuntimePlugins}::new() // defaults .with_client_plugins(#{default_plugins}( #{DefaultPluginParams}::new() .with_retry_partition_name(default_retry_partition) .with_behavior_version(config.behavior_version.expect(${behaviorVersionError.dq()})) + .with_time_source(config.runtime_components.time_source().unwrap_or_default()) + .with_scope(scope) )) // user config .with_client_plugin( diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index 7eba80108f..205bf20810 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -536,7 +536,7 @@ dependencies = [ [[package]] name = "aws-smithy-observability" -version = "0.1.0" +version = "0.1.1" dependencies = [ "aws-smithy-runtime-api", "once_cell", @@ -585,11 +585,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" dependencies = [ "approx", "aws-smithy-async", "aws-smithy-http", + "aws-smithy-observability", "aws-smithy-protocol-test", "aws-smithy-runtime-api", "aws-smithy-types", diff --git a/rust-runtime/aws-smithy-observability/Cargo.toml b/rust-runtime/aws-smithy-observability/Cargo.toml index 76bb6d8fe4..4ee320234e 100644 --- a/rust-runtime/aws-smithy-observability/Cargo.toml +++ b/rust-runtime/aws-smithy-observability/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-observability" -version = "0.1.0" +version = "0.1.1" authors = [ "AWS Rust SDK Team ", ] diff --git a/rust-runtime/aws-smithy-observability/src/attributes.rs b/rust-runtime/aws-smithy-observability/src/attributes.rs index 8b4f5ac661..1e68b3a2c0 100644 --- a/rust-runtime/aws-smithy-observability/src/attributes.rs +++ b/rust-runtime/aws-smithy-observability/src/attributes.rs @@ -25,7 +25,7 @@ pub enum AttributeValue { /// Structured telemetry metadata. #[non_exhaustive] -#[derive(Clone, Default)] +#[derive(Clone, Debug, Default)] pub struct Attributes { attrs: HashMap, } diff --git a/rust-runtime/aws-smithy-runtime/Cargo.toml b/rust-runtime/aws-smithy-runtime/Cargo.toml index 5ec7722d7e..60f75eaa1c 100644 --- a/rust-runtime/aws-smithy-runtime/Cargo.toml +++ b/rust-runtime/aws-smithy-runtime/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aws-smithy-runtime" -version = "1.7.8" +version = "1.7.9" authors = ["AWS Rust SDK Team ", "Zelda Hessler "] description = "The new smithy runtime crate" edition = "2021" @@ -23,6 +23,7 @@ wire-mock = ["test-util", "connector-hyper-0-14-x", "hyper-0-14?/server"] [dependencies] aws-smithy-async = { path = "../aws-smithy-async" } aws-smithy-http = { path = "../aws-smithy-http" } +aws-smithy-observability = { path = "../aws-smithy-observability" } aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true } aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" } aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x"] } diff --git a/rust-runtime/aws-smithy-runtime/src/client.rs b/rust-runtime/aws-smithy-runtime/src/client.rs index 33ccc1f09f..045022df75 100644 --- a/rust-runtime/aws-smithy-runtime/src/client.rs +++ b/rust-runtime/aws-smithy-runtime/src/client.rs @@ -53,3 +53,6 @@ pub mod sdk_feature; /// Smithy support-code for code generated waiters. pub mod waiters; + +/// Interceptor for collecting client metrics. +mod metrics; diff --git a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs index f6699907d7..dce6c038f6 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/defaults.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/defaults.rs @@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider; use crate::client::retries::strategy::StandardRetryStrategy; use crate::client::retries::RetryPartition; use aws_smithy_async::rt::sleep::default_async_sleep; -use aws_smithy_async::time::SystemTimeSource; +use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource, TimeSource}; use aws_smithy_runtime_api::box_error::BoxError; use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion; use aws_smithy_runtime_api::client::http::SharedHttpClient; @@ -33,6 +33,8 @@ use aws_smithy_types::timeout::TimeoutConfig; use std::borrow::Cow; use std::time::Duration; +use super::metrics::MetricsRuntimePlugin; + fn default_plugin(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin where CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder, @@ -211,6 +213,13 @@ fn enforce_content_length_runtime_plugin() -> Option { Some(EnforceContentLengthRuntimePlugin::new().into_shared()) } +fn metrics_runtime_plugin( + scope: &'static str, + time_source: SharedTimeSource, +) -> Option { + Some(MetricsRuntimePlugin::new(scope, time_source).into_shared()) +} + fn validate_stalled_stream_protection_config( components: &RuntimeComponentsBuilder, cfg: &ConfigBag, @@ -245,8 +254,11 @@ fn validate_stalled_stream_protection_config( #[non_exhaustive] #[derive(Debug, Default)] pub struct DefaultPluginParams { + // The retry_partition_name is also the service_name retry_partition_name: Option>, behavior_version: Option, + time_source: Option, + scope: Option<&'static str>, } impl DefaultPluginParams { @@ -266,6 +278,18 @@ impl DefaultPluginParams { self.behavior_version = Some(version); self } + + /// Sets the time source. + pub fn with_time_source(mut self, time_source: impl TimeSource + 'static) -> Self { + self.time_source = Some(SharedTimeSource::new(time_source)); + self + } + + /// Sets the metrics scope. + pub fn with_scope(mut self, scope: &'static str) -> Self { + self.scope = Some(scope); + self + } } /// All default plugins. @@ -289,6 +313,10 @@ pub fn default_plugins( default_timeout_config_plugin(), enforce_content_length_runtime_plugin(), default_stalled_stream_protection_config_plugin_v2(behavior_version), + metrics_runtime_plugin( + params.scope.unwrap_or("aws.sdk.rust.services.unknown"), + params.time_source.unwrap_or_default(), + ), ] .into_iter() .flatten() diff --git a/rust-runtime/aws-smithy-runtime/src/client/metrics.rs b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs new file mode 100644 index 0000000000..bc9fe47575 --- /dev/null +++ b/rust-runtime/aws-smithy-runtime/src/client/metrics.rs @@ -0,0 +1,229 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_async::time::SharedTimeSource; +use aws_smithy_observability::{ + global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes, + ObservabilityError, +}; +use aws_smithy_runtime_api::client::{ + interceptors::Intercept, orchestrator::Metadata, runtime_components::RuntimeComponentsBuilder, + runtime_plugin::RuntimePlugin, +}; +use aws_smithy_types::config_bag::{FrozenLayer, Layer, Storable, StoreReplace}; +use std::{borrow::Cow, sync::Arc, time::SystemTime}; + +/// Struct to hold metric data in the ConfigBag +#[derive(Debug, Clone)] +pub(crate) struct MeasurementsContainer { + call_start: SystemTime, + attempts: u32, + attempt_start: SystemTime, +} + +impl Storable for MeasurementsContainer { + type Storer = StoreReplace; +} + +/// Instruments for recording a single operation +#[derive(Debug, Clone)] +pub(crate) struct MetricsInterceptorInstruments { + pub(crate) operation_duration: Arc, + pub(crate) attempt_duration: Arc, +} + +impl MetricsInterceptorInstruments { + pub(crate) fn new(scope: &'static str) -> Result { + let meter = get_telemetry_provider()? + .meter_provider() + .get_meter(scope, None); + + Ok(Self{ + operation_duration: meter + .create_histogram("smithy.client.call.duration") + .set_units("s") + .set_description("Overall call duration (including retries and time to send or receive request and response body)") + .build(), + attempt_duration: meter + .create_histogram("smithy.client.call.attempt.duration") + .set_units("s") + .set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)") + .build(), + }) + } +} + +impl Storable for MetricsInterceptorInstruments { + type Storer = StoreReplace; +} + +#[derive(Debug)] +pub(crate) struct MetricsInterceptor { + // Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in + // the read_before_execution hook and that is when we need to start the timer for + // the operation. + time_source: SharedTimeSource, +} + +impl MetricsInterceptor { + pub(crate) fn new(time_source: SharedTimeSource) -> Result { + Ok(MetricsInterceptor { time_source }) + } + + pub(crate) fn get_attrs_from_cfg( + &self, + cfg: &aws_smithy_types::config_bag::ConfigBag, + ) -> Option { + let operation_metadata = cfg.load::(); + + if let Some(md) = operation_metadata { + let mut attributes = Attributes::new(); + attributes.set("rpc.service", AttributeValue::String(md.service().into())); + attributes.set("rpc.method", AttributeValue::String(md.name().into())); + + Some(attributes) + } else { + None + } + } + + pub(crate) fn get_measurements_and_instruments<'a>( + &self, + cfg: &'a aws_smithy_types::config_bag::ConfigBag, + ) -> (&'a MeasurementsContainer, &'a MetricsInterceptorInstruments) { + let measurements = cfg + .load::() + .expect("set in `read_before_execution`"); + + let instruments = cfg + .load::() + .expect("set in RuntimePlugin"); + + (measurements, instruments) + } +} + +impl Intercept for MetricsInterceptor { + fn name(&self) -> &'static str { + "MetricsInterceptor" + } + + fn read_before_execution( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let mut layer = Layer::new("MetricsInterceptor"); + + layer.store_put(MeasurementsContainer { + call_start: self.time_source.now(), + attempts: 0, + attempt_start: SystemTime::UNIX_EPOCH, + }); + cfg.push_layer(layer); + Ok(()) + } + + fn read_after_execution( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let (measurements, instruments) = self.get_measurements_and_instruments(cfg); + + let attributes = self.get_attrs_from_cfg(cfg); + + if let Some(attrs) = attributes { + let call_end = self.time_source.now(); + let call_duration = call_end.duration_since(measurements.call_start); + if let Ok(elapsed) = call_duration { + instruments + .operation_duration + .record(elapsed.as_secs_f64(), Some(&attrs), None); + } + } + + Ok(()) + } + + fn read_before_attempt( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let measurements = cfg + .get_mut::() + .expect("set in `read_before_execution`"); + + measurements.attempts += 1; + measurements.attempt_start = self.time_source.now(); + + Ok(()) + } + + fn read_after_attempt( + &self, + _context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>, + _runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents, + cfg: &mut aws_smithy_types::config_bag::ConfigBag, + ) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> { + let (measurements, instruments) = self.get_measurements_and_instruments(cfg); + + let attempt_end = self.time_source.now(); + let attempt_duration = attempt_end.duration_since(measurements.attempt_start); + let attributes = self.get_attrs_from_cfg(cfg); + + if let (Ok(elapsed), Some(mut attrs)) = (attempt_duration, attributes) { + attrs.set("attempt", AttributeValue::I64(measurements.attempts.into())); + + instruments + .attempt_duration + .record(elapsed.as_secs_f64(), Some(&attrs), None); + } + Ok(()) + } +} + +/// Runtime plugin that adds an interceptor for collecting metrics +#[derive(Debug, Default)] +pub(crate) struct MetricsRuntimePlugin { + scope: &'static str, + time_source: SharedTimeSource, +} + +impl MetricsRuntimePlugin { + /// Creates a runtime plugin which installs an interceptor for collecting metrics + pub(crate) fn new(scope: &'static str, time_source: SharedTimeSource) -> Self { + Self { scope, time_source } + } +} + +impl RuntimePlugin for MetricsRuntimePlugin { + fn runtime_components( + &self, + _current_components: &RuntimeComponentsBuilder, + ) -> Cow<'_, RuntimeComponentsBuilder> { + let interceptor = MetricsInterceptor::new(self.time_source.clone()); + if let Ok(interceptor) = interceptor { + Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor)) + } else { + Cow::Owned(RuntimeComponentsBuilder::new("Metrics")) + } + } + + fn config(&self) -> Option { + let instruments = MetricsInterceptorInstruments::new(self.scope); + + if let Ok(instruments) = instruments { + let mut cfg = Layer::new("MetricsInstruments"); + cfg.store_put(instruments); + Some(cfg.freeze()) + } else { + None + } + } +} diff --git a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs index 756f0b7423..98c2bd0b1c 100644 --- a/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs +++ b/rust-runtime/aws-smithy-runtime/src/client/orchestrator/operation.rs @@ -21,8 +21,8 @@ use aws_smithy_runtime_api::client::http::HttpClient; use aws_smithy_runtime_api::client::identity::SharedIdentityResolver; use aws_smithy_runtime_api::client::interceptors::context::{Error, Input, Output}; use aws_smithy_runtime_api::client::interceptors::Intercept; -use aws_smithy_runtime_api::client::orchestrator::HttpResponse; use aws_smithy_runtime_api::client::orchestrator::{HttpRequest, OrchestratorError}; +use aws_smithy_runtime_api::client::orchestrator::{HttpResponse, Metadata}; use aws_smithy_runtime_api::client::result::SdkError; use aws_smithy_runtime_api::client::retries::classifiers::ClassifyRetry; use aws_smithy_runtime_api::client::retries::SharedRetryStrategy; @@ -165,6 +165,7 @@ pub struct OperationBuilder { config: Layer, runtime_components: RuntimeComponentsBuilder, runtime_plugins: Vec, + scope: Option<&'static str>, _phantom: PhantomData<(I, O, E)>, } @@ -183,6 +184,7 @@ impl OperationBuilder<(), (), ()> { config: Layer::new("operation"), runtime_components: RuntimeComponentsBuilder::new("operation"), runtime_plugins: Vec::new(), + scope: None, _phantom: Default::default(), } } @@ -201,6 +203,12 @@ impl OperationBuilder { self } + /// Configures the metrics scope for the builder. + pub fn scope(mut self, scope: &'static str) -> Self { + self.scope = Some(scope); + self + } + /// Configures the http client for the builder. pub fn http_client(mut self, connector: impl HttpClient + 'static) -> Self { self.runtime_components.set_http_client(Some(connector)); @@ -320,6 +328,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -346,6 +355,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -369,6 +379,7 @@ impl OperationBuilder { config: self.config, runtime_components: self.runtime_components, runtime_plugins: self.runtime_plugins, + scope: self.scope, _phantom: Default::default(), } } @@ -377,14 +388,19 @@ impl OperationBuilder { pub fn build(self) -> Operation { let service_name = self.service_name.expect("service_name required"); let operation_name = self.operation_name.expect("operation_name required"); - + let time_source = self.runtime_components.time_source().unwrap_or_default(); + let mut config = self.config; + config.store_put(Metadata::new(operation_name.clone(), service_name.clone())); let mut runtime_plugins = RuntimePlugins::new() .with_client_plugins(default_plugins( - DefaultPluginParams::new().with_retry_partition_name(service_name.clone()), + DefaultPluginParams::new() + .with_retry_partition_name(service_name.clone()) + .with_time_source(time_source) + .with_scope(self.scope.unwrap_or("aws.sdk.rust.services.unknown")), )) .with_client_plugin( StaticRuntimePlugin::new() - .with_config(self.config.freeze()) + .with_config(config.freeze()) .with_runtime_components(self.runtime_components), ); for runtime_plugin in self.runtime_plugins {