Skip to content

Commit

Permalink
feat(opentelemetry): integrate with open telemetry
Browse files Browse the repository at this point in the history
  • Loading branch information
bogdan.vidrean committed Nov 1, 2024
1 parent 631e3ba commit b49a2ff
Show file tree
Hide file tree
Showing 9 changed files with 403 additions and 40 deletions.
316 changes: 289 additions & 27 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ aws-credential-types = { version = "1.1.4", optional = true }
async-trait = "0.1.75"
thiserror = "1.0.53"
rayon = "1.8.1"
opentelemetry = { version = "0.26.0", features = ["default"] }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
opentelemetry-otlp = { version = "0.26.0", features = ["grpc-tonic"] }

[dev-dependencies.reqwest]
version = "0.11.24"
22 changes: 20 additions & 2 deletions dev-env-resources/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,23 @@ services:
/usr/bin/mc cp /test-image.jpeg myminio/dali-private;
exit 0;
"
otel-collector:
image: otel/opentelemetry-collector-contrib:0.110.0
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./otel-collector-config.yaml:/etc/otel-collector-config.yaml
ports:
- "4317:4317" # OTLP gRPC receiver
- "13133:13133" # healtcheck
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "6831:6831/udp" # UDP port for Jaeger agent
- "16686:16686" # Web UI
- "14268:14268" # HTTP port for spans
prometheus:
image: prom/prometheus:latest
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
26 changes: 26 additions & 0 deletions dev-env-resources/otel-collector-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
processors:
extensions:
health_check:
endpoint: "0.0.0.0:13133"
exporters:
otlp:
endpoint: jaeger:4317
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:9090"
service:
extensions: [health_check]
pipelines:
traces:
receivers: [otlp]
processors: []
exporters: [otlp]
metrics:
receivers: [otlp]
exporters: [prometheus]
Empty file.
1 change: 1 addition & 0 deletions src/commons/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

pub mod config;
pub mod errors;
pub mod open_telemetry;

use errors::InvalidSizeError;
use libvips::ops::Angle;
Expand Down
34 changes: 34 additions & 0 deletions src/commons/open_telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// (c) Copyright 2019-2024 OLX

use std::time::Duration;

use opentelemetry::{global, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::trace::{Config, RandomIdGenerator, Sampler};
use opentelemetry_sdk::Resource;

pub fn init_tracer() {
let tracer_provider = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_protocol(opentelemetry_otlp::Protocol::Grpc)
.with_endpoint("http://localhost:4317")
.with_timeout(Duration::from_secs(3)),
)
.with_trace_config(
Config::default()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_max_events_per_span(64)
.with_max_attributes_per_span(16)
.with_resource(Resource::new(vec![KeyValue::new("service.name", "dali")])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio);

if tracer_provider.is_err() {
panic!("open telemetry couldn't be initiated. received error");
}
global::set_tracer_provider(tracer_provider.unwrap());
}
3 changes: 2 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// (c) Copyright 2019-2024 OLX
use std::env;
use std::sync::Arc;
use std::time::SystemTime;
Expand All @@ -13,14 +14,14 @@ use libvips::VipsApp;
use commons::config::Configuration;
use routes::metric::HTTP_DURATION;

// (c) Copyright 2019-2024 OLX
mod commons;
mod image_processor;
mod image_provider;
mod routes;

#[tokio::main]
async fn main() {
commons::open_telemetry::init_tracer();
let config = Configuration::new().expect("Failed to load application configuration.");
println!(r#"{{"configuration": {}}}"#, config);

Expand Down
38 changes: 28 additions & 10 deletions src/routes/image.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ use axum::{
use core::str;
use futures::future::join_all;
use log::{error, warn};
use opentelemetry::{
global::{self, BoxedSpan, ObjectSafeSpan},
trace::{Status, Tracer, TracerProvider},
KeyValue,
};
use serde::de::DeserializeOwned;
use serde_json::json;
use std::time::SystemTime;
Expand Down Expand Up @@ -129,6 +134,8 @@ pub async fn process_image(
}): State<AppState>,
ProcessImageRequestExtractor(params): ProcessImageRequestExtractor<ProcessImageRequest>,
) -> Result<Response<Body>, ImageProcessingError> {
let tracer = global::tracer_provider().tracer("dali");
let mut span = tracer.start("ImageProcessing");
let now = SystemTime::now();
let main_img = image_provider
.get_file(&params.image_address, &config)
Expand Down Expand Up @@ -175,48 +182,59 @@ pub async fn process_image(
let _ = send.send(image);
});
let processed_image = recv.await.map_err(|e| {
error!(
"failed to join the thread which process the image. error: {}",
e
);
let error_message = format!("failed to join the thread which process the image. error: {}", e);
error!("{}", error_message);
span.set_status(Status::error(error_message));
ImageProcessingError::ProcessingWorkerJoinError
})?
.map_err(|e| {
error!(
"the image processing has failed for the resource with the error: {}. libvips raw error is: {}",
e, vips_app.error_buffer().unwrap_or("").replace("\n", ". ")
);
let error_message = format!("the image processing has failed for the resource with the error: {}. libvips raw error is: {}",
e, vips_app.error_buffer().unwrap_or("").replace("\n", ". "));
error!("{}", error_message);
span.set_status(Status::error(error_message));
ImageProcessingError::LibvipsProcessingFailed(e)
})?;

log_size_metrics(&format, total_input_size, processed_image.len());
log_size_metrics(&format, total_input_size, processed_image.len(), &mut span);
let mut response_builder = Response::builder().status(StatusCode::OK);
for (key, value) in main_img.response_headers.into_iter() {
if !HEADERS_DETERMINED_BY_DALI.contains(&key.to_lowercase().as_str()) {
response_builder = response_builder.header(key, value);
}
}

span.set_status(Status::Ok);
Ok(response_builder
.header("Content-Type", format!("image/{}", format))
.body(Body::from(processed_image))
.unwrap())
}

fn log_size_metrics(format: &ImageFormat, input_size: usize, response_length: usize) {
fn log_size_metrics(
format: &ImageFormat,
input_size: usize,
response_length: usize,
span: &mut BoxedSpan,
) {
span.set_attribute(KeyValue::new("content-length", response_length as f64));
match format {
ImageFormat::Jpeg => {
span.set_attribute(KeyValue::new("content-type", "jpeg"));
INPUT_SIZE.jpeg.observe(input_size as f64);
OUTPUT_SIZE.jpeg.observe(response_length as f64);
}
ImageFormat::Heic => {
span.set_attribute(KeyValue::new("content-type", "heic"));
INPUT_SIZE.heic.observe(input_size as f64);
OUTPUT_SIZE.heic.observe(response_length as f64);
}
ImageFormat::Webp => {
span.set_attribute(KeyValue::new("content-type", "webp"));
INPUT_SIZE.webp.observe(input_size as f64);
OUTPUT_SIZE.webp.observe(response_length as f64);
}
ImageFormat::Png => {
span.set_attribute(KeyValue::new("content-type", "png"));
INPUT_SIZE.png.observe(input_size as f64);
OUTPUT_SIZE.png.observe(response_length as f64);
}
Expand Down

0 comments on commit b49a2ff

Please sign in to comment.