diff --git a/README.md b/README.md index 704d6147ca..fe4f7cfe05 100644 --- a/README.md +++ b/README.md @@ -184,7 +184,7 @@ for specific dates and for Zoom meeting links. "OTel Rust SIG" is the name of meeting for this group. Meeting notes are available as a public [Google -doc](https://docs.google.com/document/d/1tGKuCsSnyT2McDncVJrMgg74_z8V06riWZa0Sr79I_4/edit). +doc](https://docs.google.com/document/d/12upOzNk8c3SFTjsL6IRohCWMgzLKoknSCOOdMakbWo4/edit). If you have trouble accessing the doc, please get in touch on [Slack](https://cloud-native.slack.com/archives/C03GDP0H023). diff --git a/opentelemetry-otlp/CHANGELOG.md b/opentelemetry-otlp/CHANGELOG.md index ce4b760618..b7156e3ab6 100644 --- a/opentelemetry-otlp/CHANGELOG.md +++ b/opentelemetry-otlp/CHANGELOG.md @@ -6,7 +6,10 @@ - Feature flag "populate-logs-event-name" is removed as no longer relevant. LogRecord's `event_name()` is now automatically populated on the newly added "event_name" field in LogRecord proto definition. - +- Remove "grpc-tonic" feature from default, and instead add "http-proto" and + "reqwest-blocking-client" features as default, to align with the + specification. + [2516](https://github.com/open-telemetry/opentelemetry-rust/pull/2516) ## 0.27.0 diff --git a/opentelemetry-otlp/Cargo.toml b/opentelemetry-otlp/Cargo.toml index cebe075082..0c7ee8fe94 100644 --- a/opentelemetry-otlp/Cargo.toml +++ b/opentelemetry-otlp/Cargo.toml @@ -62,7 +62,7 @@ internal-logs = ["tracing", "opentelemetry/internal-logs"] # add ons serialize = ["serde", "serde_json"] -default = ["grpc-tonic", "trace", "metrics", "logs", "internal-logs"] +default = ["http-proto", "reqwest-blocking-client", "trace", "metrics", "logs", "internal-logs"] # grpc using tonic grpc-tonic = ["tonic", "prost", "http", "tokio", "opentelemetry-proto/gen-tonic"] diff --git a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml index 4974c8a223..242ea0c1f8 100644 --- a/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp-http/Cargo.toml @@ -6,7 +6,6 @@ license = "Apache-2.0" publish = false [features] -default = ["reqwest-blocking"] reqwest-blocking = ["opentelemetry-otlp/reqwest-blocking-client"] hyper = ["opentelemetry-otlp/hyper-client"] @@ -14,7 +13,7 @@ hyper = ["opentelemetry-otlp/hyper-client"] once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio", "experimental_metrics_periodicreader_with_async_runtime"]} -opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs", "internal-logs"] , default-features = false} +opentelemetry-otlp = { path = "../..", features = ["http-proto", "http-json", "logs", "internal-logs"]} opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} tokio = { workspace = true, features = ["full"] } diff --git a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml index ad050bc338..735a9470d7 100644 --- a/opentelemetry-otlp/examples/basic-otlp/Cargo.toml +++ b/opentelemetry-otlp/examples/basic-otlp/Cargo.toml @@ -9,7 +9,7 @@ publish = false once_cell = { workspace = true } opentelemetry = { path = "../../../opentelemetry" } opentelemetry_sdk = { path = "../../../opentelemetry-sdk", features = ["rt-tokio"] } -opentelemetry-otlp = { path = "../../../opentelemetry-otlp" } +opentelemetry-otlp = { path = "../../../opentelemetry-otlp", features = ["grpc-tonic"] } tokio = { version = "1.0", features = ["full"] } opentelemetry-appender-tracing = { path = "../../../opentelemetry-appender-tracing", default-features = false} tracing = { workspace = true, features = ["std"]} diff --git a/opentelemetry-proto/Cargo.toml b/opentelemetry-proto/Cargo.toml index 5ab08257f3..ff7e428405 100644 --- a/opentelemetry-proto/Cargo.toml +++ b/opentelemetry-proto/Cargo.toml @@ -30,7 +30,7 @@ path = "tests/json_serde.rs" [features] default = ["full"] -full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde"] +full = ["gen-tonic", "trace", "logs", "metrics", "zpages", "with-serde", "internal-logs"] # crates used to generate rs files gen-tonic = ["gen-tonic-messages", "tonic/transport"] @@ -44,6 +44,7 @@ zpages = ["trace"] testing = ["opentelemetry/testing"] # add ons +internal-logs = ["tracing"] with-schemars = ["schemars"] with-serde = ["serde", "hex"] @@ -55,6 +56,7 @@ opentelemetry_sdk = { version = "0.27", default-features = false, path = "../ope schemars = { version = "0.8", optional = true } serde = { workspace = true, optional = true, features = ["serde_derive"] } hex = { version = "0.4.3", optional = true } +tracing = {workspace = true, optional = true} # optional for opentelemetry internal logging [dev-dependencies] opentelemetry = { features = ["testing"], path = "../opentelemetry" } diff --git a/opentelemetry-sdk/benches/metrics_counter.rs b/opentelemetry-sdk/benches/metrics_counter.rs index b4517d379b..3817fad144 100644 --- a/opentelemetry-sdk/benches/metrics_counter.rs +++ b/opentelemetry-sdk/benches/metrics_counter.rs @@ -1,16 +1,17 @@ /* The benchmark results: criterion = "0.5.1" - rustc 1.82.0 (f6e511eec 2024-10-15) - OS: Ubuntu 22.04.3 LTS (5.15.167.4-microsoft-standard-WSL2) - Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + rustc 1.83.0 (90b35a623 2024-11-26) + OS: Ubuntu 22.04.4 LTS (5.15.167.4-microsoft-standard-WSL2) + Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz 2.79 GHz RAM: 64.0 GB - | Test | Average time| - |--------------------------------|-------------| - | Counter_Add_Sorted | 172 ns | - | Counter_Add_Unsorted | 183 ns | - | Counter_Overflow | 562 ns | - | ThreadLocal_Random_Generator_5 | 37 ns | + | Test | Average time| + |-------------------------------------------------------|-------------| + | Counter_Add_Sorted | 160 ns | + | Counter_Add_Unsorted | 164 ns | + | Counter_Add_Sorted_With_Non_Static_Values | 238 ns | + | Counter_Overflow | 562 ns | + | ThreadLocal_Random_Generator_5 | 37 ns | */ use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; @@ -51,6 +52,15 @@ fn create_counter(name: &'static str) -> Counter { fn criterion_benchmark(c: &mut Criterion) { counter_add_sorted(c); counter_add_unsorted(c); + + let attribute_values: [String; 10] = (1..=10) + .map(|i| format!("value{}", i)) + .collect::>() + .try_into() + .expect("Expected a Vec of length 10"); + + counter_add_sorted_with_non_static_values(c, attribute_values); + counter_overflow(c); random_generator(c); } @@ -127,6 +137,54 @@ fn counter_add_unsorted(c: &mut Criterion) { }); } +fn counter_add_sorted_with_non_static_values(c: &mut Criterion, attribute_values: [String; 10]) { + let counter = create_counter("Counter_Add_Sorted_With_Non_Static_Values"); + c.bench_function("Counter_Add_Sorted_With_Non_Static_Values", |b| { + b.iter_batched( + || { + // 4*4*10*10 = 1600 time series. + CURRENT_RNG.with(|rng| { + let mut rng = rng.borrow_mut(); + [ + rng.gen_range(0..4), + rng.gen_range(0..4), + rng.gen_range(0..10), + rng.gen_range(0..10), + ] + }) + }, + |rands| { + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + let index_fourth_attribute = rands[3]; + counter.add( + 1, + &[ + KeyValue::new( + "attribute1", + attribute_values[index_first_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute2", + attribute_values[index_second_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute3", + attribute_values[index_third_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute4", + attribute_values[index_fourth_attribute].as_str().to_owned(), + ), + ], + ); + }, + BatchSize::SmallInput, + ); + }); +} + fn counter_overflow(c: &mut Criterion) { let counter = create_counter("Counter_Overflow"); // Cause overflow. diff --git a/opentelemetry-sdk/benches/metrics_histogram.rs b/opentelemetry-sdk/benches/metrics_histogram.rs index c6d5751dd6..8a0b3d8125 100644 --- a/opentelemetry-sdk/benches/metrics_histogram.rs +++ b/opentelemetry-sdk/benches/metrics_histogram.rs @@ -1,17 +1,18 @@ /* The benchmark results: criterion = "0.5.1" - rustc 1.82.0 (f6e511eec 2024-10-15) + rustc 1.83.0 (90b35a623 2024-11-26) OS: Ubuntu 22.04.4 LTS (5.15.167.4-microsoft-standard-WSL2) - Hardware: AMD EPYC 7763 64-Core Processor - 2.44 GHz, 16vCPUs, + Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz 2.79 GHz RAM: 64.0 GB - | Test | Average time| - |--------------------------------|-------------| - | Histogram_Record | 225.04 ns | + | Test | Average time| + |-------------------------------------------------------|-------------| + | Histogram_Record | 186.24 ns | + | Histogram_Record_With_Non_Static_Values | 264.70 ns | */ -use criterion::{criterion_group, criterion_main, Criterion}; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion}; use opentelemetry::{ metrics::{Histogram, MeterProvider as _}, KeyValue, @@ -48,36 +49,97 @@ fn create_histogram(name: &'static str) -> Histogram { fn criterion_benchmark(c: &mut Criterion) { histogram_record(c); + + let attribute_values: [String; 10] = (1..=10) + .map(|i| format!("value{}", i)) + .collect::>() + .try_into() + .expect("Expected a Vec of length 10"); + + histogram_record_with_non_static_values(c, attribute_values); } fn histogram_record(c: &mut Criterion) { let histogram = create_histogram("Histogram_Record"); c.bench_function("Histogram_Record", |b| { - b.iter(|| { - // 4*4*10*10 = 1600 time series. - let rands = CURRENT_RNG.with(|rng| { - let mut rng = rng.borrow_mut(); - [ - rng.gen_range(0..4), - rng.gen_range(0..4), - rng.gen_range(0..10), - rng.gen_range(0..10), - ] - }); - let index_first_attribute = rands[0]; - let index_second_attribute = rands[1]; - let index_third_attribute = rands[2]; - let index_fourth_attribute = rands[3]; - histogram.record( - 1, - &[ - KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]), - KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]), - KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]), - KeyValue::new("attribute4", ATTRIBUTE_VALUES[index_fourth_attribute]), - ], - ); - }); + b.iter_batched( + || { + // 4*4*10*10 = 1600 time series. + CURRENT_RNG.with(|rng| { + let mut rng = rng.borrow_mut(); + [ + rng.gen_range(0..4), + rng.gen_range(0..4), + rng.gen_range(0..10), + rng.gen_range(0..10), + ] + }) + }, + |rands| { + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + let index_fourth_attribute = rands[3]; + histogram.record( + 1, + &[ + KeyValue::new("attribute1", ATTRIBUTE_VALUES[index_first_attribute]), + KeyValue::new("attribute2", ATTRIBUTE_VALUES[index_second_attribute]), + KeyValue::new("attribute3", ATTRIBUTE_VALUES[index_third_attribute]), + KeyValue::new("attribute4", ATTRIBUTE_VALUES[index_fourth_attribute]), + ], + ); + }, + BatchSize::SmallInput, + ); + }); +} + +fn histogram_record_with_non_static_values(c: &mut Criterion, attribute_values: [String; 10]) { + let histogram = create_histogram("Histogram_Record_With_Non_Static_Values"); + c.bench_function("Histogram_Record_With_Non_Static_Values", |b| { + b.iter_batched( + || { + // 4*4*10*10 = 1600 time series. + CURRENT_RNG.with(|rng| { + let mut rng = rng.borrow_mut(); + [ + rng.gen_range(0..4), + rng.gen_range(0..4), + rng.gen_range(0..10), + rng.gen_range(0..10), + ] + }) + }, + |rands| { + let index_first_attribute = rands[0]; + let index_second_attribute = rands[1]; + let index_third_attribute = rands[2]; + let index_fourth_attribute = rands[3]; + histogram.record( + 1, + &[ + KeyValue::new( + "attribute1", + attribute_values[index_first_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute2", + attribute_values[index_second_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute3", + attribute_values[index_third_attribute].as_str().to_owned(), + ), + KeyValue::new( + "attribute4", + attribute_values[index_fourth_attribute].as_str().to_owned(), + ), + ], + ); + }, + BatchSize::SmallInput, + ); }); } diff --git a/opentelemetry-sdk/src/logs/log_processor.rs b/opentelemetry-sdk/src/logs/log_processor.rs index b6284b87c6..c70654ef56 100644 --- a/opentelemetry-sdk/src/logs/log_processor.rs +++ b/opentelemetry-sdk/src/logs/log_processor.rs @@ -199,8 +199,8 @@ impl LogProcessor for SimpleLogProcessor { #[allow(clippy::large_enum_variant)] #[derive(Debug)] enum BatchMessage { - /// Export logs, called when the log is emitted. - ExportLog(Box<(LogRecord, InstrumentationScope)>), + /// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`. + ExportLog(Arc), /// ForceFlush flushes the current buffer to the exporter. ForceFlush(mpsc::SyncSender), /// Shut down the worker thread, push all logs in buffer to the exporter. @@ -209,6 +209,8 @@ enum BatchMessage { SetResource(Arc), } +type LogsData = Box<(LogRecord, InstrumentationScope)>; + /// The `BatchLogProcessor` collects finished logs in a buffer and exports them /// in batches to the configured `LogExporter`. This processor is ideal for /// high-throughput environments, as it minimizes the overhead of exporting logs @@ -246,11 +248,15 @@ enum BatchMessage { /// .build(); /// pub struct BatchLogProcessor { - message_sender: SyncSender, + logs_sender: SyncSender, // Data channel to store log records and instrumentation scopes + message_sender: SyncSender, // Control channel to store control messages for the worker thread handle: Mutex>>, forceflush_timeout: Duration, shutdown_timeout: Duration, is_shutdown: AtomicBool, + export_log_message_sent: Arc, + current_batch_size: Arc, + max_export_batch_size: usize, // Track dropped logs - we'll log this at shutdown dropped_logs_count: AtomicUsize, @@ -279,11 +285,8 @@ impl LogProcessor for BatchLogProcessor { } let result = self - .message_sender - .try_send(BatchMessage::ExportLog(Box::new(( - record.clone(), - instrumentation.clone(), - )))); + .logs_sender + .try_send(Box::new((record.clone(), instrumentation.clone()))); if result.is_err() { // Increment dropped logs count. The first time we have to drop a log, @@ -292,6 +295,37 @@ impl LogProcessor for BatchLogProcessor { otel_warn!(name: "BatchLogProcessor.LogDroppingStarted", message = "BatchLogProcessor dropped a LogRecord due to queue full/internal errors. No further log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped."); } + return; + } + + // At this point, sending the log record to the data channel was successful. + // Increment the current batch size and check if it has reached the max export batch size. + if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size + { + // Check if the a control message for exporting logs is already sent to the worker thread. + // If not, send a control message to export logs. + // `export_log_message_sent` is set to false ONLY when the worker thread has processed the control message. + + if !self.export_log_message_sent.load(Ordering::Relaxed) { + // This is a cost-efficient check as atomic load operations do not require exclusive access to cache line. + // Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false. + // Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures. + // We could have used compare_exchange as well here, but it's more verbose than swap. + if !self.export_log_message_sent.swap(true, Ordering::Relaxed) { + match self.message_sender.try_send(BatchMessage::ExportLog( + self.export_log_message_sent.clone(), + )) { + Ok(_) => { + // Control message sent successfully. + } + Err(_err) => { + // TODO: Log error + // If the control message could not be sent, reset the `export_log_message_sent` flag. + self.export_log_message_sent.store(false, Ordering::Relaxed); + } + } + } + } } } @@ -318,19 +352,9 @@ impl LogProcessor for BatchLogProcessor { } fn shutdown(&self) -> LogResult<()> { - // test and set is_shutdown flag if it is not set - if self - .is_shutdown - .swap(true, std::sync::atomic::Ordering::Relaxed) - { - otel_warn!( - name: "BatchLogProcessor.Shutdown.ProcessorShutdown", - message = "BatchLogProcessor has been shutdown. No further logs will be emitted." - ); - return LogResult::Err(LogError::AlreadyShutdown( - "BatchLogProcessor is already shutdown".into(), - )); - } + // Set is_shutdown to true + self.is_shutdown + .store(true, std::sync::atomic::Ordering::Relaxed); let dropped_logs = self.dropped_logs_count.load(Ordering::Relaxed); let max_queue_size = self.max_queue_size; @@ -388,8 +412,12 @@ impl BatchLogProcessor { where E: LogExporter + Send + Sync + 'static, { - let (message_sender, message_receiver) = mpsc::sync_channel(config.max_queue_size); + let (logs_sender, logs_receiver) = mpsc::sync_channel::(config.max_queue_size); + let (message_sender, message_receiver) = mpsc::sync_channel::(64); // Is this a reasonable bound? let max_queue_size = config.max_queue_size; + let max_export_batch_size = config.max_export_batch_size; + let current_batch_size = Arc::new(AtomicUsize::new(0)); + let current_batch_size_for_thread = current_batch_size.clone(); let handle = thread::Builder::new() .name("OpenTelemetry.Logs.BatchProcessor".to_string()) @@ -402,6 +430,50 @@ impl BatchLogProcessor { ); let mut last_export_time = Instant::now(); let mut logs = Vec::with_capacity(config.max_export_batch_size); + let current_batch_size = current_batch_size_for_thread; + + // This method gets upto `max_export_batch_size` amount of logs from the channel and exports them. + // It returns the result of the export operation. + // It expects the logs vec to be empty when it's called. + #[inline] + fn get_logs_and_export( + logs_receiver: &mpsc::Receiver, + exporter: &E, + logs: &mut Vec, + last_export_time: &mut Instant, + current_batch_size: &AtomicUsize, + config: &BatchConfig, + ) -> ExportResult + where + E: LogExporter + Send + Sync + 'static, + { + let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs. + let mut result = LogResult::Ok(()); + let mut total_exported_logs: usize = 0; + + while target > 0 && total_exported_logs < target { + // Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec + while let Ok(log) = logs_receiver.try_recv() { + logs.push(log); + if logs.len() == config.max_export_batch_size { + break; + } + } + + let count_of_logs = logs.len(); // Count of logs that will be exported + total_exported_logs += count_of_logs; + + result = export_with_timeout_sync( + config.max_export_timeout, + exporter, + logs, + last_export_time, + ); // This method clears the logs vec after exporting + + current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed); + } + result + } loop { let remaining_time = config @@ -410,37 +482,44 @@ impl BatchLogProcessor { .unwrap_or(config.scheduled_delay); match message_receiver.recv_timeout(remaining_time) { - Ok(BatchMessage::ExportLog(log)) => { - logs.push(log); - if logs.len() == config.max_export_batch_size { - otel_debug!( - name: "BatchLogProcessor.ExportingDueToBatchSize", - ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, - &mut logs, - &mut last_export_time, - ); - } + Ok(BatchMessage::ExportLog(export_log_message_sent)) => { + // Reset the export log message sent flag now it has has been processed. + export_log_message_sent.store(false, Ordering::Relaxed); + + otel_debug!( + name: "BatchLogProcessor.ExportingDueToBatchSize", + ); + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, + &mut logs, + &mut last_export_time, + ¤t_batch_size, + &config, + ); } Ok(BatchMessage::ForceFlush(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); } Ok(BatchMessage::Shutdown(sender)) => { otel_debug!(name: "BatchLogProcessor.ExportingDueToShutdown"); - let result = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + let result = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); let _ = sender.send(result); @@ -460,11 +539,14 @@ impl BatchLogProcessor { otel_debug!( name: "BatchLogProcessor.ExportingDueToTimer", ); - let _ = export_with_timeout_sync( - config.max_export_timeout, - &mut exporter, + + let _ = get_logs_and_export( + &logs_receiver, + &exporter, &mut logs, &mut last_export_time, + ¤t_batch_size, + &config, ); } Err(RecvTimeoutError::Disconnected) => { @@ -486,6 +568,7 @@ impl BatchLogProcessor { // Return batch processor with link to worker BatchLogProcessor { + logs_sender, message_sender, handle: Mutex::new(Some(handle)), forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable @@ -493,6 +576,9 @@ impl BatchLogProcessor { is_shutdown: AtomicBool::new(false), dropped_logs_count: AtomicUsize::new(0), max_queue_size, + export_log_message_sent: Arc::new(AtomicBool::new(false)), + current_batch_size, + max_export_batch_size, } } @@ -511,7 +597,7 @@ impl BatchLogProcessor { #[allow(clippy::vec_box)] fn export_with_timeout_sync( _: Duration, // TODO, enforcing timeout in exporter. - exporter: &mut E, + exporter: &E, batch: &mut Vec>, last_export_time: &mut Instant, ) -> ExportResult diff --git a/opentelemetry-sdk/src/metrics/periodic_reader.rs b/opentelemetry-sdk/src/metrics/periodic_reader.rs index 2cee6c4d0d..ad9d4ccb54 100644 --- a/opentelemetry-sdk/src/metrics/periodic_reader.rs +++ b/opentelemetry-sdk/src/metrics/periodic_reader.rs @@ -415,14 +415,21 @@ impl PeriodicReaderInner { .send(Message::Shutdown(response_tx)) .map_err(|e| MetricError::Other(e.to_string()))?; - if let Ok(response) = response_rx.recv() { - if response { - Ok(()) - } else { + // TODO: Make this timeout configurable. + match response_rx.recv_timeout(Duration::from_secs(5)) { + Ok(response) => { + if response { + Ok(()) + } else { + Err(MetricError::Other("Failed to shutdown".into())) + } + } + Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other( + "Failed to shutdown due to Timeout".into(), + )), + Err(mpsc::RecvTimeoutError::Disconnected) => { Err(MetricError::Other("Failed to shutdown".into())) } - } else { - Err(MetricError::Other("Failed to shutdown".into())) } } }