Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add interceptor for collecting client metrics #4021

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
11 changes: 10 additions & 1 deletion aws/rust-runtime/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion rust-runtime/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust-runtime/aws-smithy-observability/src/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub enum AttributeValue {

/// Structured telemetry metadata.
#[non_exhaustive]
#[derive(Clone, Default)]
#[derive(Clone, Debug, Default)]
pub struct Attributes {
attrs: HashMap<String, AttributeValue>,
}
Expand Down
3 changes: 2 additions & 1 deletion rust-runtime/aws-smithy-runtime/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "aws-smithy-runtime"
version = "1.7.8"
version = "1.7.9"
authors = ["AWS Rust SDK Team <[email protected]>", "Zelda Hessler <[email protected]>"]
description = "The new smithy runtime crate"
edition = "2021"
Expand All @@ -23,6 +23,7 @@ wire-mock = ["test-util", "connector-hyper-0-14-x", "hyper-0-14?/server"]
[dependencies]
aws-smithy-async = { path = "../aws-smithy-async" }
aws-smithy-http = { path = "../aws-smithy-http" }
aws-smithy-observability = { path = "../aws-smithy-observability" }
aws-smithy-protocol-test = { path = "../aws-smithy-protocol-test", optional = true }
aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" }
aws-smithy-types = { path = "../aws-smithy-types", features = ["http-body-0-4-x"] }
Expand Down
3 changes: 3 additions & 0 deletions rust-runtime/aws-smithy-runtime/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,6 @@ pub mod sdk_feature;

/// Smithy support-code for code generated waiters.
pub mod waiters;

/// Interceptor for collecting client metrics.
pub mod metrics;
41 changes: 35 additions & 6 deletions rust-runtime/aws-smithy-runtime/src/client/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::client::retries::strategy::standard::TokenBucketProvider;
use crate::client::retries::strategy::StandardRetryStrategy;
use crate::client::retries::RetryPartition;
use aws_smithy_async::rt::sleep::default_async_sleep;
use aws_smithy_async::time::SystemTimeSource;
use aws_smithy_async::time::{SharedTimeSource, SystemTimeSource};
use aws_smithy_runtime_api::box_error::BoxError;
use aws_smithy_runtime_api::client::behavior_version::BehaviorVersion;
use aws_smithy_runtime_api::client::http::SharedHttpClient;
Expand All @@ -33,6 +33,8 @@ use aws_smithy_types::timeout::TimeoutConfig;
use std::borrow::Cow;
use std::time::Duration;

use super::metrics::MetricsRuntimePlugin;

fn default_plugin<CompFn>(name: &'static str, components_fn: CompFn) -> StaticRuntimePlugin
where
CompFn: FnOnce(RuntimeComponentsBuilder) -> RuntimeComponentsBuilder,
Expand Down Expand Up @@ -211,6 +213,14 @@ fn enforce_content_length_runtime_plugin() -> Option<SharedRuntimePlugin> {
Some(EnforceContentLengthRuntimePlugin::new().into_shared())
}

fn metrics_runtime_plugin(
service: impl Into<Cow<'static, str>>,
operation: impl Into<Cow<'static, str>>,
time_source: SharedTimeSource,
) -> Option<SharedRuntimePlugin> {
Some(MetricsRuntimePlugin::new(service, operation, time_source).into_shared())
}

fn validate_stalled_stream_protection_config(
components: &RuntimeComponentsBuilder,
cfg: &ConfigBag,
Expand Down Expand Up @@ -245,8 +255,11 @@ fn validate_stalled_stream_protection_config(
#[non_exhaustive]
#[derive(Debug, Default)]
pub struct DefaultPluginParams {
// The retry_partition_name is also the service_name
retry_partition_name: Option<Cow<'static, str>>,
behavior_version: Option<BehaviorVersion>,
operation_name: Option<Cow<'static, str>>,
time_source: Option<SharedTimeSource>,
}

impl DefaultPluginParams {
Expand All @@ -266,6 +279,18 @@ impl DefaultPluginParams {
self.behavior_version = Some(version);
self
}

/// Sets the operation name.
pub fn with_operation_name(mut self, operation_name: impl Into<Cow<'static, str>>) -> Self {
self.operation_name = Some(operation_name.into());
self
}

/// Sets the time source.
pub fn with_time_source(mut self, time_source: SharedTimeSource) -> Self {
self.time_source = Some(time_source);
self
}
}

/// All default plugins.
Expand All @@ -275,20 +300,24 @@ pub fn default_plugins(
let behavior_version = params
.behavior_version
.unwrap_or_else(BehaviorVersion::latest);
let service_name = params
.retry_partition_name
.expect("retry_partition_name is required");

[
default_http_client_plugin(),
default_identity_cache_plugin(),
default_retry_config_plugin(
params
.retry_partition_name
.expect("retry_partition_name is required"),
),
default_retry_config_plugin(service_name.clone()),
default_sleep_impl_plugin(),
default_time_source_plugin(),
default_timeout_config_plugin(),
enforce_content_length_runtime_plugin(),
default_stalled_stream_protection_config_plugin_v2(behavior_version),
metrics_runtime_plugin(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't going to quite work the way we want.

The OperationBuilder isn't used in generated code and we apply default plugins when we create the client (see also here).

At that point we won't have the operation name.

We also want to be very careful to create the instruments and have them live on the client (i.e. we don't want them created per/operation, the attributes (dimensions) recorded with the metric will differentiate operations/services/etc). Default runtime plugins sort of gets us there (though you have to understand it's used differently by codegen than it is say by the runtime (e.g. IMDS cred provider in aws-config).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added this bit to codegen in d7a3c3a, but leaving this chunk here for the manually created credentials clients.

service_name,
params.operation_name.unwrap_or("unknown_operation".into()),
params.time_source.unwrap_or_default(),
),
]
.into_iter()
.flatten()
Expand Down
224 changes: 224 additions & 0 deletions rust-runtime/aws-smithy-runtime/src/client/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

use aws_smithy_async::time::SharedTimeSource;
use aws_smithy_observability::{
global::get_telemetry_provider, instruments::Histogram, AttributeValue, Attributes,
ObservabilityError,
};
use aws_smithy_runtime_api::client::{
interceptors::Intercept, runtime_components::RuntimeComponentsBuilder,
runtime_plugin::RuntimePlugin,
};
use aws_smithy_types::config_bag::{Layer, Storable, StoreReplace};
use std::{borrow::Cow, sync::Arc, time::SystemTime};

/// Struct to hold metric data in the ConfigBag
#[derive(Debug, Clone)]
struct MeasurementsContainer {
call_start: SystemTime,
attempts: u32,
attempt_start: SystemTime,
}

impl Storable for MeasurementsContainer {
type Storer = StoreReplace<Self>;
}

/// Instruments for recording a single operation
#[derive(Debug, Clone)]
pub(crate) struct MetricsInterceptorInstruments {
pub(crate) operation_duration: Arc<dyn Histogram>,
pub(crate) attempt_duration: Arc<dyn Histogram>,
}

impl MetricsInterceptorInstruments {
pub(crate) fn new() -> Result<Self, ObservabilityError> {
let meter = get_telemetry_provider()?
.meter_provider()
.get_meter("MetricsInterceptor", None);

Ok(Self{
operation_duration: meter
.create_histogram("smithy.client.call.duration")
.set_units("s")
.set_description("Overall call duration (including retries and time to send or receive request and response body)")
.build(),
attempt_duration: meter
.create_histogram("smithy.client.call.attempt.duration")
.set_units("s")
.set_description("The time it takes to connect to the service, send the request, and get back HTTP status code and headers (including time queued waiting to be sent)")
.build(),
})
}
}

#[derive(Debug)]
pub(crate) struct MetricsInterceptor {
instruments: MetricsInterceptorInstruments,
service_name: Cow<'static, str>,
operation_name: Cow<'static, str>,
// Holding a TimeSource here isn't ideal, but RuntimeComponents aren't available in
// the read_before_execution hook and that is when we need to start the timer for
// the operation.
time_source: SharedTimeSource,
}

impl MetricsInterceptor {
pub(crate) fn new(
service_name: impl Into<Cow<'static, str>>,
operation_name: impl Into<Cow<'static, str>>,
time_source: SharedTimeSource,
) -> Result<Self, ObservabilityError> {
Ok(MetricsInterceptor {
instruments: MetricsInterceptorInstruments::new()?,
service_name: service_name.into(),
operation_name: operation_name.into(),
time_source,
})
}

/// Get [Attributes] containing the service and method name for this operation
fn attributes(&self) -> Attributes {
let mut attributes = Attributes::new();
attributes.set(
"rpc.service",
AttributeValue::String(self.service_name.clone().into()),
);
attributes.set(
"rpc.method",
AttributeValue::String(self.operation_name.clone().into()),
);

attributes
}
}

impl Intercept for MetricsInterceptor {
fn name(&self) -> &'static str {
"MetricsInterceptor"
}

fn read_before_execution(
&self,
_context: &aws_smithy_runtime_api::client::interceptors::context::BeforeSerializationInterceptorContextRef<'_>,
cfg: &mut aws_smithy_types::config_bag::ConfigBag,
) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
let mut layer = Layer::new("MetricsInterceptor");
layer.store_put(MeasurementsContainer {
call_start: self.time_source.now(),
attempts: 0,
attempt_start: SystemTime::UNIX_EPOCH,
});
cfg.push_layer(layer);
Ok(())
}

fn read_after_execution(
&self,
_context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
_runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
cfg: &mut aws_smithy_types::config_bag::ConfigBag,
) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
let measurements = cfg
.load::<MeasurementsContainer>()
.expect("set in `read_before_execution`");

let call_end = self.time_source.now();
let call_duration = call_end.duration_since(measurements.call_start);
if let Ok(elapsed) = call_duration {
self.instruments.operation_duration.record(
elapsed.as_secs_f64(),
Some(&self.attributes()),
None,
);
}

Ok(())
}

fn read_before_attempt(
&self,
_context: &aws_smithy_runtime_api::client::interceptors::context::BeforeTransmitInterceptorContextRef<'_>,
_runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
cfg: &mut aws_smithy_types::config_bag::ConfigBag,
) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
let measurements = cfg
.get_mut::<MeasurementsContainer>()
.expect("set in `read_before_execution`");

measurements.attempts += 1;
measurements.attempt_start = self.time_source.now();

Ok(())
}

fn read_after_attempt(
&self,
_context: &aws_smithy_runtime_api::client::interceptors::context::FinalizerInterceptorContextRef<'_>,
_runtime_components: &aws_smithy_runtime_api::client::runtime_components::RuntimeComponents,
cfg: &mut aws_smithy_types::config_bag::ConfigBag,
) -> Result<(), aws_smithy_runtime_api::box_error::BoxError> {
let measurements = cfg
.load::<MeasurementsContainer>()
.expect("set in `read_before_execution`");

let attempt_end = self.time_source.now();
let attempt_duration = attempt_end.duration_since(measurements.attempt_start);

if let Ok(elapsed) = attempt_duration {
let mut attributes = self.attributes();
attributes.set("attempt", AttributeValue::I64(measurements.attempts.into()));

self.instruments.attempt_duration.record(
elapsed.as_secs_f64(),
Some(&attributes),
None,
);
}
Ok(())
}
}

/// Runtime plugin that adds an interceptor for collecting metrics
#[derive(Debug, Default)]
pub struct MetricsRuntimePlugin {
service: Cow<'static, str>,
operation: Cow<'static, str>,
time_source: SharedTimeSource,
}

impl MetricsRuntimePlugin {
/// Creates a runtime plugin which installs an interceptor for collecting metrics
pub fn new(
service: impl Into<Cow<'static, str>>,
operation: impl Into<Cow<'static, str>>,
time_source: SharedTimeSource,
) -> Self {
Self {
service: service.into(),
operation: operation.into(),
time_source,
}
}
}

impl RuntimePlugin for MetricsRuntimePlugin {
fn runtime_components(
&self,
_current_components: &RuntimeComponentsBuilder,
) -> Cow<'_, RuntimeComponentsBuilder> {
let interceptor = MetricsInterceptor::new(
self.service.clone(),
self.operation.clone(),
self.time_source.clone(),
);
if let Ok(interceptor) = interceptor {
Cow::Owned(RuntimeComponentsBuilder::new("Metrics").with_interceptor(interceptor))
} else {
Cow::Owned(RuntimeComponentsBuilder::new("Metrics"))
}
}
}
Loading
Loading