Skip to content

Commit

Permalink
feat(opentelemetry): add memory metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdan.vidrean committed Nov 5, 2024
1 parent b49a2ff commit cf5fe79
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 50 deletions.
250 changes: 217 additions & 33 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 12 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ authors = ["Augusto César Dias <[email protected]>"]
edition = "2021"

[features]
default = ["reqwest"]
default = ["reqwest", "opentelemetry"]
reqwest = ["dep:reqwest"]
s3 = [
"dep:aws-config",
"dep:aws-sdk-s3",
"dep:aws-sdk-sts",
"dep:aws-credential-types",
]
opentelemetry = [
"dep:opentelemetry",
"dep:opentelemetry_sdk",
"dep:opentelemetry-otlp",
"dep:sysinfo",
]

[dependencies]
tokio = { version = "1.35.1", features = ["rt", "rt-multi-thread", "macros"] }
Expand All @@ -40,9 +46,11 @@ aws-credential-types = { version = "1.1.4", optional = true }
async-trait = "0.1.75"
thiserror = "1.0.53"
rayon = "1.8.1"
opentelemetry = { version = "0.26.0", features = ["default"] }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.26.0", features = ["grpc-tonic"] }
opentelemetry = { version = "0.26.0", features = ["default"], optional = true }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"], optional = true }
opentelemetry-otlp = { version = "0.26.0", features = ["grpc-tonic"], optional = true }
sysinfo = { version = "0.32.0", optional = true }
tokio-cron-scheduler = "0.13.0"

[dev-dependencies.reqwest]
version = "0.11.24"
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ RUN apk add --update --no-cache --repository https://dl-cdn.alpinelinux.org/alpi
libpng-dev=1.6.44-r0 \
librsvg-dev=2.56.3-r0 \
libwebp-dev=1.3.2-r0 \
openssl-dev=3.1.7-r0 \
openssl-dev=3.1.7-r1 \
orc-dev=0.4.39-r0 \
pkgconf=1.9.5-r0 \
tiff-dev=4.5.1-r0
Expand Down Expand Up @@ -64,7 +64,7 @@ RUN apk add --update --no-cache \
libpng=1.6.44-r0 \
librsvg=2.56.3-r0 \
libwebp=1.3.2-r0 \
openssl=3.1.7-r0 \
openssl=3.1.7-r1 \
orc=0.4.39-r0 \
tiff=4.5.1-r0

Expand Down
3 changes: 2 additions & 1 deletion config/default.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@
"s3_secret": "rootpassword",
"s3_endpoint": "http://localhost:2969/",
"s3_bucket": "dali-private",
"max_file_size" : 41943040
"max_file_size" : 41943040,
"otel_collector_endpoint": "http://localhost:4317"
}
8 changes: 5 additions & 3 deletions dev-env-resources/otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ exporters:
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:9090"
endpoint: 0.0.0.0:9090
logging:
verbosity: detailed
service:
extensions: [health_check]
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [otlp]
exporters: [otlp, logging]
metrics:
receivers: [otlp]
exporters: [prometheus]
exporters: [prometheus, logging]
7 changes: 7 additions & 0 deletions dev-env-resources/prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
global:
scrape_interval: 5s

scrape_configs:
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:9090']
1 change: 1 addition & 0 deletions src/commons/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub struct Configuration {
pub s3_endpoint: Option<String>,
pub s3_bucket: Option<String>,
pub max_file_size: Option<u32>,
pub otel_collector_endpoint: Option<String>,
}

impl fmt::Display for Configuration {
Expand Down
78 changes: 74 additions & 4 deletions src/commons/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,36 @@
// (c) Copyright 2019-2024 OLX

#![cfg(feature = "opentelemetry")]
use std::time::Duration;

use log::warn;
use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::{Config, RandomIdGenerator, Sampler};
use opentelemetry_sdk::Resource;
use sysinfo::{MemoryRefreshKind, System};
use tokio::{task, time};

use super::config::Configuration;

pub async fn init_opentelemetry(config: &Configuration) {
let otel_collector_endpoint = config.otel_collector_endpoint.clone();
if otel_collector_endpoint.is_none() {
warn!("the hostname for the otel collector is missing from the configuration parameters");
return;
}
let endpoint = otel_collector_endpoint.unwrap();
init_tracer(endpoint.clone());
init_memory_gauges(endpoint.clone()).await;
}

pub fn init_tracer() {
fn init_tracer(otel_collector_endpoint: String) {
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_endpoint("http://localhost:4317")
.with_endpoint(otel_collector_endpoint)
.with_timeout(Duration::from_secs(3)),
)
.with_trace_config(
Expand All @@ -28,7 +44,61 @@ pub fn init_tracer() {
.install_batch(opentelemetry_sdk::runtime::Tokio);

if tracer_provider.is_err() {
panic!("open telemetry couldn't be initiated. received error");
panic!(
"failed to initiate the tracer provider for opentelemetry. received error {}",
tracer_provider.err().unwrap()
);
}
global::set_tracer_provider(tracer_provider.unwrap());
}

async fn init_memory_gauges(otel_collector_endpoint: String) {
// let mut interval = tokio::time::interval(Duration::from_secs(10)); // Adjust the interval as needed
let meter_provider = opentelemetry_otlp::new_pipeline()
.metrics(opentelemetry_sdk::runtime::Tokio)
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_endpoint(otel_collector_endpoint)
.with_timeout(Duration::from_secs(3)),
)
.build();

if meter_provider.is_err() {
panic!(
"failed to initiate the meter provider for opentelemetry. received error {}",
meter_provider.err().unwrap()
);
}
global::set_meter_provider(meter_provider.unwrap());

let _memory_task = task::spawn(async {
let meter: opentelemetry::metrics::Meter =
global::meter_provider().meter("Dali - Memory Meter");
let used_memory_gauge = meter
.u64_gauge("dali_used_memory")
.with_description("Used memory in megabytes (MB)")
.init();
let free_memory_gauge = meter
.u64_gauge("dali_free_memory")
.with_description("Free memory in megabytes (MB)")
.init();
let mut system = System::new_all();
let mut interval = time::interval(Duration::from_secs(30));

loop {
interval.tick().await;

system.refresh_memory_specifics(MemoryRefreshKind::new().with_ram());
free_memory_gauge.record(
system.free_memory() / 1000000,
&[KeyValue::new("type", "free")],
);
used_memory_gauge.record(
system.used_memory() / 1000000,
&[KeyValue::new("type", "used")],
);
}
});
}
5 changes: 4 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ mod routes;

#[tokio::main]
async fn main() {
commons::open_telemetry::init_tracer();
let config = Configuration::new().expect("Failed to load application configuration.");
#[cfg(feature = "opentelemetry")]
{
commons::open_telemetry::init_opentelemetry(&config).await;
}
println!(r#"{{"configuration": {}}}"#, config);

set_up_logging(&config);
Expand Down
38 changes: 36 additions & 2 deletions src/routes/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use axum::{
use core::str;
use futures::future::join_all;
use log::{error, warn};
#[cfg(feature = "opentelemetry")]
use opentelemetry::{
global::{self, BoxedSpan, ObjectSafeSpan},
trace::{Status, Tracer, TracerProvider},
Expand Down Expand Up @@ -134,8 +135,11 @@ pub async fn process_image(
}): State<AppState>,
ProcessImageRequestExtractor(params): ProcessImageRequestExtractor<ProcessImageRequest>,
) -> Result<Response<Body>, ImageProcessingError> {
#[cfg(feature = "opentelemetry")]
let tracer = global::tracer_provider().tracer("dali");
#[cfg(feature = "opentelemetry")]
let mut span = tracer.start("ImageProcessing");

let now = SystemTime::now();
let main_img = image_provider
.get_file(&params.image_address, &config)
Expand Down Expand Up @@ -184,33 +188,63 @@ pub async fn process_image(
let processed_image = recv.await.map_err(|e| {
let error_message = format!("failed to join the thread which process the image. error: {}", e);
error!("{}", error_message);
#[cfg(feature = "opentelemetry")]
span.set_status(Status::error(error_message));
ImageProcessingError::ProcessingWorkerJoinError
})?
.map_err(|e| {
let error_message = format!("the image processing has failed for the resource with the error: {}. libvips raw error is: {}",
e, vips_app.error_buffer().unwrap_or("").replace("\n", ". "));
error!("{}", error_message);
#[cfg(feature = "opentelemetry")]
span.set_status(Status::error(error_message));
ImageProcessingError::LibvipsProcessingFailed(e)
})?;

log_size_metrics(&format, total_input_size, processed_image.len(), &mut span);
#[cfg(not(feature = "opentelemetry"))]
log_size_metrics(&format, total_input_size, processed_image.len());
#[cfg(feature = "opentelemetry")]
log_size_metrics_with_otel(&format, total_input_size, processed_image.len(), &mut span);

let mut response_builder = Response::builder().status(StatusCode::OK);
for (key, value) in main_img.response_headers.into_iter() {
if !HEADERS_DETERMINED_BY_DALI.contains(&key.to_lowercase().as_str()) {
response_builder = response_builder.header(key, value);
}
}

#[cfg(feature = "opentelemetry")]
span.set_status(Status::Ok);
Ok(response_builder
.header("Content-Type", format!("image/{}", format))
.body(Body::from(processed_image))
.unwrap())
}

fn log_size_metrics(
#[cfg(not(feature = "opentelemetry"))]
fn log_size_metrics(format: &ImageFormat, input_size: usize, response_length: usize) {
match format {
ImageFormat::Jpeg => {
INPUT_SIZE.jpeg.observe(input_size as f64);
OUTPUT_SIZE.jpeg.observe(response_length as f64);
}
ImageFormat::Heic => {
INPUT_SIZE.heic.observe(input_size as f64);
OUTPUT_SIZE.heic.observe(response_length as f64);
}
ImageFormat::Webp => {
INPUT_SIZE.webp.observe(input_size as f64);
OUTPUT_SIZE.webp.observe(response_length as f64);
}
ImageFormat::Png => {
INPUT_SIZE.png.observe(input_size as f64);
OUTPUT_SIZE.png.observe(response_length as f64);
}
}
}

#[cfg(feature = "opentelemetry")]
fn log_size_metrics_with_otel(
format: &ImageFormat,
input_size: usize,
response_length: usize,
Expand Down

0 comments on commit cf5fe79

Please sign in to comment.