From 5f81383d825d13557984295d804e9498a207181b Mon Sep 17 00:00:00 2001 From: Matheus Zaniolo Date: Fri, 3 Jan 2025 11:27:41 -0300 Subject: [PATCH] feat: Add OpenTelemetry setup function --- Cargo.toml | 35 +++ README.md | 66 ++++++ src/lib.rs | 3 + src/telemetry/config.rs | 105 +++++++++ src/telemetry/mod.rs | 345 ++++++++++++++++++++++++++++ src/telemetry/reqwest_middleware.rs | 42 ++++ 6 files changed, 596 insertions(+) create mode 100644 src/telemetry/config.rs create mode 100644 src/telemetry/mod.rs create mode 100644 src/telemetry/reqwest_middleware.rs diff --git a/Cargo.toml b/Cargo.toml index 1a795bd..988bc50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,12 +14,47 @@ time = { version = "0.3.36", optional = true } tracing = "0.1.40" url = { version = "2.5.2", features = ["serde"] } +# telemetry deps +async-trait = { version = "^0.1.51", optional = true } +http = { version = "1.2.0", optional = true } +once_cell = { version = "1.20.2", optional = true } +opentelemetry = { version = "0.27.1", optional = true } +opentelemetry-appender-tracing = { version = "0.27.0", optional = true } +opentelemetry-http = { version = "0.27.0", optional = true } +opentelemetry-otlp = { version = "0.27.0", optional = true } +opentelemetry-semantic-conventions = { version = "0.27.0", optional = true } +opentelemetry_sdk = { version = "0.27.1", features = [ + "rt-tokio", +], optional = true } +reqwest-middleware = { version = "0.4.0", optional = true } +tracing-opentelemetry = { version = "0.28.0", optional = true } +tracing-subscriber = { version = "0.3.19", features = [ + "env-filter", +], optional = true } + + [dev-dependencies] serde_json = "1.0.128" +tokio = { version = "1.43.0", features = ["full"] } [features] reqwest = ["dep:reqwest"] time = ["dep:time"] +telemetry = [ + "async-trait", + "http", + "once_cell", + "opentelemetry", + "opentelemetry-appender-tracing", + "opentelemetry-http", + "opentelemetry-otlp", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "reqwest", + "reqwest-middleware", + "tracing-opentelemetry", + "tracing-subscriber", +] [lints.rust] dead_code = "warn" diff --git a/README.md b/README.md index a2936e5..30af3d8 100644 --- a/README.md +++ b/README.md @@ -7,6 +7,72 @@ Random rust utility functions and types +## Telemetry + +For using this module the feature flag `telemetry` need to be added. +This module contains a set of helpers to work with OpenTelemetry logs, traces and metrics. + +### Setup + +For setup all that's needed it to run the function `famedly_rust_utils::famedly_rust_utils::telemetry::init_otel`. The function returns a guard that takes care of properly shutting down the providers. + +If no configuration is present the exporting of logs traces and metrics is disable and the stdout logging is enable. + +The functions on the crate exporting opentelemetry traces should be annotated with `tracing::instrument` to generate a new span for that function. Documentation on this macro can be found on the [here](https://docs.rs/tracing/latest/tracing/attr.instrument.html) + +The opentelemetry information is exported using gRPC to and opentelemetry collector. By default the expected endpoint is `http://localhots:4317` +The default level of logging and traces is `info` and the default filter directive is `opentelemetry=off,tonic=off,h2=off,reqwest=info,axum=info,hyper=info,hyper-tls=info,tokio=info,tower=info,josekit=info,openssl=info` + +```rust +#[tokio::main] +async fn main() { + let _guard = init_otel(config).unwrap(); + +} +``` + + +### Propagate the context + +A context can be propagated to allow linking the traces from two different services. This is done by injecting the context information on the request and retrieving it on the other service. + +#### reqwest + +For injecting the current context using the reqwest client we can warp a client on a [reqwest-middleware](https://crates.io/crates/reqwest-middleware) and use the `OtelMiddleware` middleware present on the crate. + +```rust +use famedly_rust_utils::telemetry::OtelMiddleware; + +let reqwest_client = reqwest::Client::builder().build().unwrap(); +let client = reqwest_middleware::ClientBuilder::new(reqwest_client) + // Insert the tracing middleware + .with(OtelMiddleware::default()) + .build(); +client.get("http://localhost").send().await; +``` + +### axum + +For retrieving a context using axum we can use the `OtelAxumLayer` from [axum_tracing_opentelemetry](https://crates.io/crates/axum-tracing-opentelemetry) + +> [!WARNING] +> This only seems to be working using the feature flag `tracing_level_info`. See the [issue](https://github.com/davidB/tracing-opentelemetry-instrumentation-sdk/issues/148) + +This layer should run as soon as possible + +```rust +use axum_tracing_opentelemetry::middleware::OtelAxumLayer; + +Router::new().layer(OtelAxumLayer::default()) + +``` + +### Metrics + +For adding metrics all that it's needed it to make a trace with specific prefix. The documentation on how it works is [here](https://docs.rs/tracing-opentelemetry/latest/tracing_opentelemetry/struct.MetricsLayer.html#usage) + +For adding metrics to axum servers creates like [tower-otel-http-metrics](https://github.com/francoposa/tower-otel-http-metrics) + ## Lints ```sh diff --git a/src/lib.rs b/src/lib.rs index dd5f5ac..363c01f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,9 @@ mod level_filter; #[cfg(feature = "reqwest")] /// Helpers for [reqwest] pub mod reqwest; +#[cfg(feature = "telemetry")] +/// Function to setup the telemetry tools +pub mod telemetry; pub use base_url::{BaseUrl, BaseUrlParseError}; pub use level_filter::LevelFilter; diff --git a/src/telemetry/config.rs b/src/telemetry/config.rs new file mode 100644 index 0000000..0b2c0e8 --- /dev/null +++ b/src/telemetry/config.rs @@ -0,0 +1,105 @@ +//! OpenTelemetry configuration +//! +//! Module containing the configuration struct for the OpenTelemetry +use std::str::FromStr as _; + +use serde::Deserialize; +use url::Url; + +use crate::LevelFilter; + +const DEFAULT_ENDPOINT: &str = "http://localhost:4317"; + +/// OpenTelemetry configuration +#[derive(Debug, Deserialize, Clone, Default)] +pub struct OtelConfig { + /// Enables logs on stdout + pub stdout: Option, + /// Configurations for exporting traces, metrics and logs + pub exporter: Option, +} + +/// Configuration for exporting OpenTelemetry data +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ExporterConfig { + /// gRPC endpoint for exporting using OTELP + pub endpoint: Option, + /// Application service name + pub service_name: String, + /// Application version + pub version: String, + + /// Logs exporting config + pub logs: Option, + /// Traces exporting config + pub traces: Option, + /// Metrics exporting config + pub metrics: Option, +} + +/// Stdout logs configuration +#[derive(Debug, Deserialize, Clone)] +pub struct StdoutLogsConfig { + /// Enables the stdout logs + pub enabled: bool, + /// Level for the crate + pub level: Option, + /// Level for the dependencies + pub general_level: Option, +} + +/// Provider configuration for OpenTelemetry export +#[derive(Debug, Deserialize, Clone, Default)] +pub struct ProviderConfig { + /// Enables provider + pub enabled: bool, + /// Level for the crate + pub level: Option, + /// Level for the dependencies + pub general_level: Option, +} + +impl ProviderConfig { + #[allow(clippy::expect_used)] + pub(crate) fn get_filter(&self, crate_name: &'static str) -> String { + format!( + "{},{}={}", + self.general_level.unwrap_or_default(), + crate_name, + self.level.unwrap_or_default() + ) + } +} + +impl StdoutLogsConfig { + #[allow(clippy::expect_used)] + pub(crate) fn get_filter(&self, crate_name: &'static str) -> String { + format!( + "{},{}={}", + self.general_level.unwrap_or_default(), + crate_name, + self.level.unwrap_or_default() + ) + } +} + +impl Default for StdoutLogsConfig { + fn default() -> Self { + Self { enabled: true, level: None, general_level: None } + } +} + +impl ExporterConfig { + #[allow(clippy::expect_used)] + pub(crate) fn get_endpoint(&self) -> Url { + self.endpoint + .clone() + .unwrap_or(Url::from_str(DEFAULT_ENDPOINT).expect("Error parsing default endpoint")) + } +} + +impl Default for LevelFilter { + fn default() -> Self { + LevelFilter(tracing::level_filters::LevelFilter::INFO) + } +} diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs new file mode 100644 index 0000000..fd3ded9 --- /dev/null +++ b/src/telemetry/mod.rs @@ -0,0 +1,345 @@ +//! OpenTelemetry initialization +//! +//! Lib containing the definitions and initializations of the OpenTelemetry +//! tools +use std::str::FromStr as _; + +use config::{OtelConfig, StdoutLogsConfig}; +use opentelemetry::{ + trace::{TraceError, TracerProvider as _}, + KeyValue, +}; +use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; +use opentelemetry_otlp::{LogExporter, SpanExporter, WithExportConfig as _}; +use opentelemetry_sdk::{ + logs::{LogError, LoggerProvider}, + metrics::{MeterProviderBuilder, MetricError, PeriodicReader, SdkMeterProvider}, + propagation::TraceContextPropagator, + runtime, + trace::{RandomIdGenerator, TracerProvider}, + Resource, +}; +use opentelemetry_semantic_conventions::{ + resource::{SERVICE_NAME, SERVICE_VERSION}, + SCHEMA_URL, +}; +use tracing_opentelemetry::{MetricsLayer, OpenTelemetryLayer}; +use tracing_subscriber::{ + layer::SubscriberExt as _, util::SubscriberInitExt as _, EnvFilter, Layer, +}; +use url::Url; + +pub mod config; +mod reqwest_middleware; +pub use reqwest_middleware::OtelMiddleware; + +fn resource(service_name: String, version: String) -> Resource { + Resource::from_schema_url( + [KeyValue::new(SERVICE_NAME, service_name), KeyValue::new(SERVICE_VERSION, version)], + SCHEMA_URL, + ) +} + +fn init_traces( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = SpanExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + let tracer_provider = TracerProvider::builder() + .with_id_generator(RandomIdGenerator::default()) + .with_resource(resource(service_name, version)) + .with_batch_exporter(exporter, runtime::Tokio) + .build(); + + opentelemetry::global::set_tracer_provider(tracer_provider.clone()); + Ok(tracer_provider) +} + +fn init_metrics( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = opentelemetry_otlp::MetricExporter::builder() + .with_tonic() + .with_endpoint(endpoint) + .with_temporality(opentelemetry_sdk::metrics::Temporality::default()) + .build()?; + + let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); + + let meter_provider = MeterProviderBuilder::default() + .with_resource(resource(service_name, version)) + .with_reader(reader) + .build(); + + Ok(meter_provider) +} + +fn init_logs( + endpoint: Url, + service_name: String, + version: String, +) -> Result { + let exporter = LogExporter::builder().with_tonic().with_endpoint(endpoint).build()?; + + Ok(LoggerProvider::builder() + .with_resource(resource(service_name, version)) + .with_batch_exporter(exporter, runtime::Tokio) + .build()) +} + +/// Initializes the OpenTelemetry +/// +/// example +/// ```rust +/// use famedly_rust_utils::telemetry; +/// +/// #[tokio::main] +/// async fn main() { +/// let _guard = telemetry::init_otel( +/// &telemetry::config::OtelConfig::default(), +/// env!("CARGO_CRATE_NAME"), +/// ); +/// +/// // ... +/// } +/// ``` +#[must_use = "The return is a guard for the providers and it need to be kept to properly shutdown them"] +pub fn init_otel( + config: &OtelConfig, + main_crate: &'static str, +) -> Result { + opentelemetry::global::set_text_map_propagator(TraceContextPropagator::default()); + + let stdout_layer = config + .stdout + .as_ref() + .or(Some(&StdoutLogsConfig::default())) + .and_then(|stdout| stdout.enabled.then_some(stdout)) + .map(|logger_config| { + let filter_fmt = EnvFilter::from_str(&logger_config.get_filter(main_crate))?; + Ok::<_, OtelInitError>( + tracing_subscriber::fmt::layer().with_thread_names(true).with_filter(filter_fmt), + ) + }) + .transpose()?; + + let (logger_provider, logs_layer) = config + .exporter + .as_ref() + .and_then(|exporter| { + exporter.logs.as_ref().and_then(|c| c.enabled.then_some(c)).map(|logger_config| { + let filter_otel = EnvFilter::from_str(&logger_config.get_filter(main_crate))?; + let logger_provider = init_logs( + exporter.get_endpoint(), + exporter.service_name.clone(), + exporter.version.clone(), + )?; + + // Create a new OpenTelemetryTracingBridge using the above LoggerProvider. + let logs_layer = OpenTelemetryTracingBridge::new(&logger_provider); + let logs_layer = logs_layer.with_filter(filter_otel); + + Ok::<_, OtelInitError>((Some(logger_provider), Some(logs_layer))) + }) + }) + .transpose()? + .unwrap_or((None, None)); + + let (tracer_provider, tracer_layer) = config + .exporter + .as_ref() + .and_then(|exporter| { + exporter.traces.as_ref().and_then(|c| c.enabled.then_some(c)).map(|tracer_config| { + let trace_filter = EnvFilter::from_str(&tracer_config.get_filter(main_crate))?; + let tracer_provider = init_traces( + exporter.get_endpoint(), + exporter.service_name.clone(), + exporter.version.clone(), + )?; + let tracer = tracer_provider.tracer(exporter.service_name.clone()); + let tracer_layer = OpenTelemetryLayer::new(tracer).with_filter(trace_filter); + Ok::<_, OtelInitError>((Some(tracer_provider), Some(tracer_layer))) + }) + }) + .transpose()? + .unwrap_or((None, None)); + + let (meter_provider, meter_layer) = config + .exporter + .as_ref() + .and_then(|exporter| { + exporter.metrics.as_ref().and_then(|c| c.enabled.then_some(c)).map(|meter_config| { + let metrics_filter = EnvFilter::from_str(&meter_config.get_filter(main_crate))?; + let meter_provider = init_metrics( + exporter.get_endpoint(), + exporter.service_name.clone(), + exporter.version.clone(), + )?; + let meter_layer = + MetricsLayer::new(meter_provider.clone()).with_filter(metrics_filter); + + Ok::<_, OtelInitError>((Some(meter_provider), Some(meter_layer))) + }) + }) + .transpose()? + .unwrap_or((None, None)); + + // Initialize the tracing subscriber with the stdout layer and + // layers for exporting over OpenTelemetry the logs, traces and metrics. + tracing_subscriber::registry() + .with(logs_layer) + .with(stdout_layer) + .with(meter_layer) + .with(tracer_layer) + .init(); + + Ok(ProvidersGuard { logger_provider, tracer_provider, meter_provider }) +} + +/// Guarding object to make sure the providers are properly shutdown +#[derive(Debug)] +#[allow(dead_code)] +pub struct ProvidersGuard { + logger_provider: Option, + tracer_provider: Option, + meter_provider: Option, +} + +// Necessary to call TracerProvider::shutdown() on exit +// due to a bug with flushing on global shutdown: +// https://github.com/open-telemetry/opentelemetry-rust/issues/1961 +impl Drop for ProvidersGuard { + fn drop(&mut self) { + // This causes a hang in testing. + // Some relevant information: + // https://github.com/open-telemetry/opentelemetry-rust/issues/536 + #[cfg(not(test))] + { + self.logger_provider.as_ref().inspect(|logger_provider| { + if let Err(err) = logger_provider.shutdown() { + tracing::error!("Could not shutdown LoggerProvider: {err}"); + } + }); + self.tracer_provider.as_ref().inspect(|tracer_provider| { + if let Err(err) = tracer_provider.shutdown() { + tracing::error!("Could not shutdown TracerProvider: {err}"); + } + }); + self.meter_provider.as_ref().inspect(|meter_provider| { + if let Err(err) = meter_provider.shutdown() { + tracing::error!("Could not shutdown MeterProvider: {err}"); + } + }); + } + } +} + +/// OpenTelemetry setup errors +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum OtelInitError { + #[error("Logger initialization error: {0}")] + LoggerInitError(#[from] LogError), + #[error("Tracer initialization error: {0}")] + TracerInitError(#[from] TraceError), + #[error("Meter initialization error: {0}")] + MeterInitError(#[from] MetricError), + #[error("Parsing EnvFilter directives error: {0}")] + EnvFilterError(#[from] tracing_subscriber::filter::ParseError), +} + +#[cfg(test)] +mod tests { + + use super::{ + config::{ExporterConfig, OtelConfig, ProviderConfig}, + init_otel, + }; + use crate::telemetry::config::StdoutLogsConfig; + + #[tokio::test] + async fn test_tracer_provider_enabled() { + let config = OtelConfig { + stdout: None, + exporter: Some(ExporterConfig { + traces: Some(ProviderConfig { enabled: true, ..Default::default() }), + ..Default::default() + }), + }; + let guard = init_otel(&config, env!("CARGO_PKG_NAME")).expect("Error initializing Otel"); + assert!(guard.tracer_provider.is_some()); + } + #[tokio::test] + async fn test_tracer_provider_disabled() { + let config_enabled_false = OtelConfig { + stdout: None, + exporter: Some(ExporterConfig { + traces: Some(ProviderConfig { enabled: false, ..Default::default() }), + ..Default::default() + }), + }; + let guard = init_otel(&config_enabled_false, env!("CARGO_PKG_NAME")) + .expect("Error initializing Otel"); + assert!(guard.tracer_provider.is_none()); + } + + // There seems to but a problem when testing the scenario when the meter + // provider is enable. The tests hangs when calling the shutdown from the + // PeriodicReader. For now we won't test this scenarios + //https://github.com/open-telemetry/opentelemetry-rust/issues/2056 + + #[tokio::test] + async fn test_meter_provider_disabled() { + let config_enabled_false = OtelConfig { + stdout: None, + exporter: Some(ExporterConfig { + metrics: Some(ProviderConfig { enabled: false, ..Default::default() }), + ..Default::default() + }), + }; + let guard = init_otel(&config_enabled_false, env!("CARGO_PKG_NAME")) + .expect("Error initializing Otel"); + assert!(guard.meter_provider.is_none()); + } + #[tokio::test] + async fn test_logger_provider_enabled() { + let config = OtelConfig { + stdout: None, + exporter: Some(ExporterConfig { + logs: Some(ProviderConfig { enabled: true, ..Default::default() }), + ..Default::default() + }), + }; + let guard = init_otel(&config, env!("CARGO_PKG_NAME")).expect("Error initializing Otel"); + assert!(guard.logger_provider.is_some()); + } + #[tokio::test] + async fn test_logger_provider_disabled() { + let config_enabled_false = OtelConfig { + stdout: None, + exporter: Some(ExporterConfig { + logs: Some(ProviderConfig { enabled: false, ..Default::default() }), + ..Default::default() + }), + }; + let guard = init_otel(&config_enabled_false, env!("CARGO_PKG_NAME")) + .expect("Error initializing Otel"); + assert!(guard.logger_provider.is_none()); + } + + #[tokio::test] + async fn test_exporter_config_none() { + let config_none = OtelConfig { + stdout: Some(StdoutLogsConfig { enabled: true, ..Default::default() }), + exporter: Some(ExporterConfig::default()), + }; + let guard = + init_otel(&config_none, env!("CARGO_PKG_NAME")).expect("Error initializing Otel"); + assert!(guard.meter_provider.is_none()); + assert!(guard.tracer_provider.is_none()); + assert!(guard.logger_provider.is_none()); + } +} diff --git a/src/telemetry/reqwest_middleware.rs b/src/telemetry/reqwest_middleware.rs new file mode 100644 index 0000000..cd366d2 --- /dev/null +++ b/src/telemetry/reqwest_middleware.rs @@ -0,0 +1,42 @@ +use http::Extensions; +use opentelemetry_http::HeaderInjector; +use reqwest::{Request, Response}; +use reqwest_middleware::{Middleware, Next, Result}; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt as _; + +/// Middleware for [reqwest-middleware] to propagate the Otel context +/// +/// Example +/// +/// ```rust +/// use famedly_rust_utils::telemetry; +/// +/// #[tokio::main] +/// async fn main() { +/// let reqwest_client = reqwest::Client::builder().build().unwrap(); +/// let client = reqwest_middleware::ClientBuilder::new(reqwest_client) +/// // Insert the tracing middleware +/// .with(telemetry::OtelMiddleware::default()) +/// .build(); +/// client.get("http://localhost").send().await; +/// } +/// ``` +#[derive(Debug, Default)] +pub struct OtelMiddleware; + +#[async_trait::async_trait] +impl Middleware for OtelMiddleware { + async fn handle( + &self, + mut req: Request, + extensions: &mut Extensions, + next: Next<'_>, + ) -> Result { + opentelemetry::global::get_text_map_propagator(|propagator| { + let cx = Span::current().context(); + propagator.inject_context(&cx, &mut HeaderInjector(req.headers_mut())); + }); + next.run(req, extensions).await + } +}