diff --git a/Cargo.lock b/Cargo.lock index 120b6c3f..f860ab3d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,7 @@ dependencies = [ "saluki-io", "saluki-metadata", "serde", + "serde_json", "stringtheory", "tikv-jemalloc-ctl", "tikv-jemallocator", diff --git a/bin/agent-data-plane/Cargo.toml b/bin/agent-data-plane/Cargo.toml index 8c635127..87febcfb 100644 --- a/bin/agent-data-plane/Cargo.toml +++ b/bin/agent-data-plane/Cargo.toml @@ -25,14 +25,24 @@ saluki-health = { workspace = true } saluki-io = { workspace = true } saluki-metadata = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } stringtheory = { workspace = true } -tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "signal"] } +tokio = { workspace = true, features = [ + "macros", + "rt", + "rt-multi-thread", + "signal", +] } tokio-rustls = { workspace = true } tracing = { workspace = true } [target.'cfg(target_os = "linux")'.dependencies] tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] } -tikv-jemallocator = { workspace = true, features = ["background_threads", "unprefixed_malloc_on_supported_platforms", "stats"] } +tikv-jemallocator = { workspace = true, features = [ + "background_threads", + "unprefixed_malloc_on_supported_platforms", + "stats", +] } [build-dependencies] chrono = { workspace = true } diff --git a/bin/agent-data-plane/src/components/remapper/mod.rs b/bin/agent-data-plane/src/components/remapper/mod.rs index ce48cf24..cf9f3118 100644 --- a/bin/agent-data-plane/src/components/remapper/mod.rs +++ b/bin/agent-data-plane/src/components/remapper/mod.rs @@ -68,10 +68,10 @@ impl MemoryBounds for AgentTelemetryRemapperConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::() + .with_single_value::("component struct") // We also allocate the backing storage for the string interner up front, which is used by our context // resolver. - .with_fixed_amount(self.context_string_interner_bytes.as_u64() as usize); + .with_fixed_amount("string interner", self.context_string_interner_bytes.as_u64() as usize); } } diff --git a/bin/agent-data-plane/src/main.rs b/bin/agent-data-plane/src/main.rs index 07a753fc..67c565b3 100644 --- a/bin/agent-data-plane/src/main.rs +++ b/bin/agent-data-plane/src/main.rs @@ -10,7 +10,7 @@ use std::{ time::{Duration, Instant}, }; -use memory_accounting::ComponentRegistry; +use memory_accounting::{ComponentBounds, ComponentRegistry}; use saluki_app::{api::APIBuilder, logging::LoggingAPIHandler, prelude::*}; use saluki_components::{ destinations::{ @@ -128,7 +128,17 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result // Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any. let bounds_configuration = MemoryBoundsConfiguration::try_from_config(&configuration)?; - let memory_limiter = initialize_memory_bounds(bounds_configuration, component_registry)?; + let memory_limiter = initialize_memory_bounds(bounds_configuration, &component_registry)?; + + if let Ok(val) = std::env::var("DD_ADP_WRITE_SIZING_GUIDE") { + if val != "false" { + if let Err(error) = write_sizing_guide(component_registry.as_bounds()) { + warn!("Failed to write sizing guide: {}", error); + } else { + return Ok(()); + } + } + } // Bounds validation succeeded, so now we'll build and spawn the topology. let built_topology = blueprint.build().await?; @@ -262,6 +272,28 @@ async fn create_topology( Ok(blueprint) } +fn write_sizing_guide(bounds: ComponentBounds) -> Result<(), GenericError> { + use std::{ + fs::File, + io::{BufWriter, Write}, + }; + + let template = include_str!("sizing_guide_template.html"); + let mut output = BufWriter::new(File::create("sizing_guide.html")?); + for line in template.lines() { + if line.trim() == "" { + serde_json::to_writer_pretty(&mut output, &bounds.to_exprs())?; + } else { + output.write_all(line.as_bytes())?; + } + output.write_all(b"\n")?; + } + info!("Wrote sizing guide to sizing_guide.html"); + output.flush()?; + + Ok(()) +} + async fn spawn_unprivileged_api( configuration: &GenericConfiguration, api_builder: APIBuilder, ) -> Result<(), GenericError> { diff --git a/bin/agent-data-plane/src/sizing_guide_template.html b/bin/agent-data-plane/src/sizing_guide_template.html new file mode 100644 index 00000000..82c21d8c --- /dev/null +++ b/bin/agent-data-plane/src/sizing_guide_template.html @@ -0,0 +1,295 @@ + + + + + + ADP Memory Bounds Calculator + + + +

ADP Memory Bounds Calculator

+

Workload inputs

+
+ + +
+

Config inputs

+
+

Results

+

+ Firm memory bound: + +

+
+ + + + + + diff --git a/lib/memory-accounting/src/api.rs b/lib/memory-accounting/src/api.rs index eea73256..22c6829f 100644 --- a/lib/memory-accounting/src/api.rs +++ b/lib/memory-accounting/src/api.rs @@ -69,8 +69,9 @@ impl MemoryRegistryState { component_usage.insert( component_name.to_string(), ComponentUsage { - minimum_required_bytes: bounds.self_minimum_required_bytes, - firm_limit_bytes: bounds.self_firm_limit_bytes, + // TODO: figure out what's actually going on here, this might not be a valid change + minimum_required_bytes: bounds.total_minimum_required_bytes(), + firm_limit_bytes: bounds.total_firm_limit_bytes(), actual_live_bytes: stats_snapshot.live_bytes(), }, ); diff --git a/lib/memory-accounting/src/lib.rs b/lib/memory-accounting/src/lib.rs index f3eb903f..12c36f5c 100644 --- a/lib/memory-accounting/src/lib.rs +++ b/lib/memory-accounting/src/lib.rs @@ -74,6 +74,9 @@ pub mod allocator; //mod builder; mod api; mod registry; + +use serde::Serialize; + pub use self::registry::{ComponentRegistry, MemoryBoundsBuilder}; mod grant; @@ -116,11 +119,90 @@ where } } +/// Represents a memory usage expression for a component. +#[derive(Clone, Debug, Serialize)] +#[serde(tag = "type")] +pub enum UsageExpr { + /// A config value + Config { + /// The name + name: String, + /// The value + value: usize, + }, + + /// A struct size + StructSize { + /// The value + name: String, + /// The value + value: usize, + }, + + /// A constant value + Constant { + /// The name + name: String, + /// The value + value: usize, + }, + + /// A product of subexpressions + Product { + /// Values to multiply + values: Vec, + }, + + /// A sum of subexpressions + Sum { + /// Values to add + values: Vec, + }, +} + +impl UsageExpr { + /// Creates a new usage expression that is a config value. + pub fn config(s: impl Into, value: usize) -> Self { + Self::Config { name: s.into(), value } + } + + /// Creates a new usage expression that is a constant value. + pub fn constant(s: impl Into, value: usize) -> Self { + Self::Constant { name: s.into(), value } + } + + /// Creates a new usage expression that is a struct size. + pub fn struct_size(s: impl Into) -> Self { + Self::StructSize { + name: s.into(), + value: std::mem::size_of::(), + } + } + + /// Creates a new usage expression that is the product of two subexpressions. + pub fn product(_s: impl Into, lhs: UsageExpr, rhs: UsageExpr) -> Self { + Self::Product { values: vec![lhs, rhs] } + } + + /// Creates a new usage expression that is the sum of two subexpressions. + pub fn sum(_s: impl Into, lhs: UsageExpr, rhs: UsageExpr) -> Self { + Self::Sum { values: vec![lhs, rhs] } + } + + fn evaluate(&self) -> usize { + match self { + Self::Config { value, .. } | Self::StructSize { value, .. } | Self::Constant { value, .. } => *value, + Self::Product { values } => values.iter().map(UsageExpr::evaluate).product(), + Self::Sum { values } => values.iter().map(UsageExpr::evaluate).sum(), + } + } +} + /// Memory bounds for a component. #[derive(Clone, Debug, Default)] pub struct ComponentBounds { - self_minimum_required_bytes: usize, - self_firm_limit_bytes: usize, + self_minimum_required_bytes: Vec, + self_firm_limit_bytes: Vec, subcomponents: HashMap, } @@ -128,6 +210,9 @@ impl ComponentBounds { /// Gets the total minimum required bytes for this component and all subcomponents. pub fn total_minimum_required_bytes(&self) -> usize { self.self_minimum_required_bytes + .iter() + .map(UsageExpr::evaluate) + .sum::() + self .subcomponents .values() @@ -140,7 +225,14 @@ impl ComponentBounds { /// The firm limit includes the minimum required bytes. pub fn total_firm_limit_bytes(&self) -> usize { self.self_minimum_required_bytes - + self.self_firm_limit_bytes + .iter() + .map(UsageExpr::evaluate) + .sum::() + + self + .self_firm_limit_bytes + .iter() + .map(UsageExpr::evaluate) + .sum::() + self .subcomponents .values() @@ -154,4 +246,34 @@ impl ComponentBounds { pub fn subcomponents(&self) -> impl IntoIterator { self.subcomponents.iter() } + + /// Returns a tree of all bound expressions for this component and its subcomponents as JSON. + pub fn to_exprs(&self) -> Vec { + let path = vec!["root".to_string()]; + let mut stack = vec![(path, self)]; + let mut output = Vec::new(); + + while let Some((path, cb)) = stack.pop() { + for expr in &cb.self_minimum_required_bytes { + output.push(serde_json::json!({ + "name": format!("{}.min", path.join(".")), + "expr": expr, + })); + } + for expr in &cb.self_firm_limit_bytes { + output.push(serde_json::json!({ + "name": format!("{}.firm", path.join(".")), + "expr": expr, + })); + } + + for (name, subcomponent) in cb.subcomponents() { + let mut path = path.clone(); + path.push(name.clone()); + stack.push((path, subcomponent)); + } + } + + output + } } diff --git a/lib/memory-accounting/src/registry.rs b/lib/memory-accounting/src/registry.rs index f82b7f63..8075b42d 100644 --- a/lib/memory-accounting/src/registry.rs +++ b/lib/memory-accounting/src/registry.rs @@ -4,6 +4,7 @@ use std::{ sync::{Arc, Mutex, MutexGuard}, }; +use crate::UsageExpr; use crate::{ allocator::{AllocationGroupRegistry, AllocationGroupToken}, api::MemoryAPIHandler, @@ -103,8 +104,8 @@ impl ComponentMetadata { pub fn as_bounds(&self) -> ComponentBounds { let mut bounds = ComponentBounds::default(); - bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes; - bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes; + bounds.self_firm_limit_bytes = self.bounds.self_firm_limit_bytes.clone(); + bounds.self_minimum_required_bytes = self.bounds.self_minimum_required_bytes.clone(); for (name, subcomponent) in self.subcomponents.iter() { let subcomponent = subcomponent.lock().unwrap(); @@ -119,7 +120,7 @@ impl ComponentMetadata { /// A registry for components for tracking memory bounds and runtime memory usage. /// /// This registry provides a unified interface for declaring the memory bounds of a "component", as well as registering -/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`. +/// that component for runtime memory usage tracking when using the tracking allocator implementation in `memory-accounting`. /// /// ## Components /// @@ -199,7 +200,7 @@ impl ComponentRegistry { /// /// - when a component has invalid bounds (e.g. minimum required bytes higher than firm limit) /// - when the combined total of the firm limit for all components exceeds the effective limit - pub fn verify_bounds(&mut self, initial_grant: MemoryGrant) -> Result { + pub fn verify_bounds(&self, initial_grant: MemoryGrant) -> Result { let bounds = self.inner.lock().unwrap().as_bounds(); BoundsVerifier::new(initial_grant, bounds).verify() } @@ -212,6 +213,11 @@ impl ComponentRegistry { pub fn api_handler(&self) -> MemoryAPIHandler { MemoryAPIHandler::from_state(Arc::clone(&self.inner)) } + + /// Gets the total minimum required bytes for this component and all subcomponents. + pub fn as_bounds(&self) -> ComponentBounds { + self.inner.lock().unwrap().as_bounds() + } } impl Default for ComponentRegistry { @@ -235,18 +241,18 @@ pub(crate) mod private { // Simple trait-based builder state approach so we can use a single builder view to modify either the minimum required // or firm limit amounts. pub trait BoundsMutator: private::Sealed { - fn add_usage(bounds: &mut ComponentBounds, amount: usize); + fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr); } impl BoundsMutator for Minimum { - fn add_usage(bounds: &mut ComponentBounds, amount: usize) { - bounds.self_minimum_required_bytes = bounds.self_minimum_required_bytes.saturating_add(amount); + fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) { + bounds.self_minimum_required_bytes.push(expr) } } impl BoundsMutator for Firm { - fn add_usage(bounds: &mut ComponentBounds, amount: usize) { - bounds.self_firm_limit_bytes = bounds.self_firm_limit_bytes.saturating_add(amount); + fn add_usage(bounds: &mut ComponentBounds, expr: UsageExpr) { + bounds.self_firm_limit_bytes.push(expr) } } @@ -315,27 +321,6 @@ impl<'a> MemoryBoundsBuilder<'a> { self } - /// Merges a set of existing `ComponentBounds` into the current builder. - pub fn merge_existing(&mut self, existing: &ComponentBounds) -> &mut Self { - let mut bounds_builder = self.inner.bounds_builder(); - bounds_builder - .minimum() - .with_fixed_amount(existing.self_minimum_required_bytes); - bounds_builder.firm().with_fixed_amount(existing.self_firm_limit_bytes); - - for (name, existing_subcomponent) in &existing.subcomponents { - let subcomponent = self.inner.get_or_create(name); - let mut builder = MemoryBoundsBuilder { - inner: subcomponent, - _lt: PhantomData, - }; - - builder.merge_existing(existing_subcomponent); - } - - self - } - #[cfg(test)] pub(crate) fn as_bounds(&self) -> ComponentBounds { self.inner.inner.lock().unwrap().as_bounds() @@ -363,16 +348,16 @@ impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> { /// This is useful for tracking the expected memory usage of a single instance of a type if that type is heap /// allocated. For example, components that are spawned by a topology generally end up being boxed, which means a /// heap allocation exists that is the size of the component type. - pub fn with_single_value(&mut self) -> &mut Self { - S::add_usage(&mut self.inner.bounds, std::mem::size_of::()); + pub fn with_single_value(&mut self, name: impl Into) -> &mut Self { + S::add_usage(&mut self.inner.bounds, UsageExpr::struct_size::(name)); self } /// Accounts for a fixed amount of memory usage. /// /// This is a catch-all for directly accounting for a specific number of bytes. - pub fn with_fixed_amount(&mut self, chunk_size: usize) -> &mut Self { - S::add_usage(&mut self.inner.bounds, chunk_size); + pub fn with_fixed_amount(&mut self, name: impl Into, chunk_size: usize) -> &mut Self { + S::add_usage(&mut self.inner.bounds, UsageExpr::constant(name, chunk_size)); self } @@ -380,8 +365,15 @@ impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> { /// /// This can be used to track the expected memory usage of generalized containers like `Vec`, where items are /// homogenous and allocated contiguously. - pub fn with_array(&mut self, len: usize) -> &mut Self { - S::add_usage(&mut self.inner.bounds, len * std::mem::size_of::()); + pub fn with_array(&mut self, name: impl Into, len: usize) -> &mut Self { + S::add_usage( + &mut self.inner.bounds, + UsageExpr::product( + "array", + UsageExpr::struct_size::(name), + UsageExpr::constant("len", len), + ), + ); self } @@ -389,11 +381,24 @@ impl<'a, S: BoundsMutator> BoundsBuilder<'a, S> { /// /// This can be used to track the expected memory usage of generalized maps like `HashMap`, where keys and /// values are - pub fn with_map(&mut self, len: usize) -> &mut Self { + pub fn with_map(&mut self, name: impl Into, len: usize) -> &mut Self { S::add_usage( &mut self.inner.bounds, - len * (std::mem::size_of::() + std::mem::size_of::()), + UsageExpr::product( + "map", + UsageExpr::sum( + name, + UsageExpr::struct_size::("key"), + UsageExpr::struct_size::("value"), + ), + UsageExpr::constant("len", len), + ), ); self } + + pub fn with_expr(&mut self, expr: UsageExpr) -> &mut Self { + S::add_usage(&mut self.inner.bounds, expr); + self + } } diff --git a/lib/memory-accounting/src/test_util.rs b/lib/memory-accounting/src/test_util.rs index 619a5a14..5b01b2d6 100644 --- a/lib/memory-accounting/src/test_util.rs +++ b/lib/memory-accounting/src/test_util.rs @@ -17,8 +17,10 @@ impl BoundedComponent { impl MemoryBounds for BoundedComponent { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - builder.minimum().with_fixed_amount(self.minimum_required.unwrap_or(0)); - builder.firm().with_fixed_amount(self.firm_limit); + builder + .minimum() + .with_fixed_amount("min amount", self.minimum_required.unwrap_or(0)); + builder.firm().with_fixed_amount("firm limit", self.firm_limit); } } diff --git a/lib/saluki-app/src/memory.rs b/lib/saluki-app/src/memory.rs index c382be9f..69944433 100644 --- a/lib/saluki-app/src/memory.rs +++ b/lib/saluki-app/src/memory.rs @@ -119,7 +119,7 @@ impl MemoryBoundsConfiguration { /// /// If the bounds could not be validated, an error is returned. pub fn initialize_memory_bounds( - configuration: MemoryBoundsConfiguration, mut component_registry: ComponentRegistry, + configuration: MemoryBoundsConfiguration, component_registry: &ComponentRegistry, ) -> Result { let initial_grant = match configuration.memory_limit { Some(limit) => MemoryGrant::with_slop_factor(limit.as_u64() as usize, configuration.memory_slop_factor)?, diff --git a/lib/saluki-components/src/destinations/blackhole/mod.rs b/lib/saluki-components/src/destinations/blackhole/mod.rs index f57e64aa..24b685b1 100644 --- a/lib/saluki-components/src/destinations/blackhole/mod.rs +++ b/lib/saluki-components/src/destinations/blackhole/mod.rs @@ -29,7 +29,7 @@ impl DestinationBuilder for BlackholeConfiguration { impl MemoryBounds for BlackholeConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // Capture the size of the heap allocation when the component is built. - builder.minimum().with_single_value::(); + builder.minimum().with_single_value::("blackhole"); } } diff --git a/lib/saluki-components/src/destinations/datadog/events_service_checks/mod.rs b/lib/saluki-components/src/destinations/datadog/events_service_checks/mod.rs index 4f84e75f..e337861c 100644 --- a/lib/saluki-components/src/destinations/datadog/events_service_checks/mod.rs +++ b/lib/saluki-components/src/destinations/datadog/events_service_checks/mod.rs @@ -71,9 +71,9 @@ impl MemoryBounds for DatadogEventsServiceChecksConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::() + .with_single_value::("component struct") // Capture the size of the requests channel. - .with_array::<(usize, Request)>(32); + .with_array::<(usize, Request)>("requests channel", 32); } } diff --git a/lib/saluki-components/src/destinations/datadog/metrics/mod.rs b/lib/saluki-components/src/destinations/datadog/metrics/mod.rs index fb6f216f..e3918044 100644 --- a/lib/saluki-components/src/destinations/datadog/metrics/mod.rs +++ b/lib/saluki-components/src/destinations/datadog/metrics/mod.rs @@ -8,7 +8,7 @@ use saluki_core::{ pooling::{FixedSizeObjectPool, ObjectPool}, }; use saluki_error::GenericError; -use saluki_event::DataType; +use saluki_event::{metric::Metric, DataType}; use saluki_io::buf::{BytesBuffer, FixedSizeVec, FrozenChunkedBytesBuffer}; use saluki_metrics::MetricsBuilder; use serde::Deserialize; @@ -24,6 +24,10 @@ use self::request_builder::{MetricsEndpoint, RequestBuilder}; const RB_BUFFER_POOL_COUNT: usize = 128; const RB_BUFFER_POOL_BUF_SIZE: usize = 32_768; +const fn default_max_metrics_per_payload() -> usize { + 10_000 +} + /// Datadog Metrics destination. /// /// Forwards metrics to the Datadog platform. It can handle both series and sketch metrics, and only utilizes the latest @@ -45,6 +49,17 @@ pub struct DatadogMetricsConfiguration { #[serde(skip)] config_refresher: Option, + + /// Maximum number of input metrics to encode into a single request payload. + /// + /// This applies both to the series and sketches endpoints. + /// + /// Defaults to 10,000. + #[serde( + rename = "serializer_max_metrics_per_payload", + default = "default_max_metrics_per_payload" + )] + max_metrics_per_payload: usize, } impl DatadogMetricsConfiguration { @@ -78,8 +93,14 @@ impl DestinationBuilder for DatadogMetricsConfiguration { // Create our request builders. let rb_buffer_pool = create_request_builder_buffer_pool(); - let series_request_builder = RequestBuilder::new(MetricsEndpoint::Series, rb_buffer_pool.clone()).await?; - let sketches_request_builder = RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool).await?; + let series_request_builder = RequestBuilder::new( + MetricsEndpoint::Series, + rb_buffer_pool.clone(), + self.max_metrics_per_payload, + ) + .await?; + let sketches_request_builder = + RequestBuilder::new(MetricsEndpoint::Sketches, rb_buffer_pool, self.max_metrics_per_payload).await?; Ok(Box::new(DatadogMetrics { series_request_builder, @@ -101,16 +122,20 @@ impl MemoryBounds for DatadogMetricsConfiguration { // Capture the size of the heap allocation when the component is built. // // TODO: This type signature is _ugly_, and it would be nice to improve it somehow. - .with_single_value::>>() + .with_single_value::>>("component struct") // Capture the size of our buffer pool. - .with_fixed_amount(rb_buffer_pool_size) - // Capture the size of the scratch buffer which may grow up to the uncompressed limit. - .with_fixed_amount(MetricsEndpoint::Series.uncompressed_size_limit()) - .with_fixed_amount(MetricsEndpoint::Sketches.uncompressed_size_limit()) + .with_fixed_amount("buffer pool", rb_buffer_pool_size) // Capture the size of the requests channel. // // TODO: This type signature is _ugly_, and it would be nice to improve it somehow. - .with_array::<(usize, Request)>(32); + .with_array::<(usize, Request)>("requests channel", 32); + + builder + .firm() + // Capture the size of the "split re-encode" buffers in the request builders, which is where we keep owned + // versions of metrics that we encode in case we need to actually re-encode them during a split operation. + .with_array::("series metrics split re-encode buffer", self.max_metrics_per_payload) + .with_array::("sketch metrics split re-encode buffer", self.max_metrics_per_payload); } } diff --git a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs index da15ac98..b59f3816 100644 --- a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs @@ -13,7 +13,7 @@ use saluki_io::{ }; use snafu::{ResultExt, Snafu}; use tokio::io::AsyncWriteExt as _; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; pub(super) const SCRATCH_BUF_CAPACITY: usize = 8192; @@ -117,16 +117,20 @@ where compressor: Compressor>, compression_estimator: CompressionEstimator, uncompressed_len: usize, - metrics_written: usize, - scratch_buf_lens: Vec, + compressed_len_limit: usize, + uncompressed_len_limit: usize, + max_metrics_per_payload: usize, + encoded_metrics: Vec, } impl RequestBuilder where O: ObjectPool + 'static, { - /// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI. - pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result { + /// Creates a new `RequestBuilder` for the given endpoint. + pub async fn new( + endpoint: MetricsEndpoint, buffer_pool: O, max_metrics_per_payload: usize, + ) -> Result { let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool); let compressor = create_compressor(&chunked_buffer_pool).await; Ok(Self { @@ -137,17 +141,28 @@ where compressor, compression_estimator: CompressionEstimator::default(), uncompressed_len: 0, - metrics_written: 0, - scratch_buf_lens: Vec::new(), + compressed_len_limit: endpoint.compressed_size_limit(), + uncompressed_len_limit: endpoint.uncompressed_size_limit(), + max_metrics_per_payload, + encoded_metrics: Vec::new(), }) } + /// Configures custom (un)compressed length limits for the request builder. + /// + /// Used specifically for testing purposes. + #[cfg(test)] + fn set_custom_len_limits(&mut self, uncompressed_len_limit: usize, compressed_len_limit: usize) { + self.uncompressed_len_limit = uncompressed_len_limit; + self.compressed_len_limit = compressed_len_limit; + } + /// Attempts to encode a metric and write it to the current request payload. /// /// If the metric can't be encoded due to size constraints, `Ok(Some(metric))` will be returned, and the caller must /// call `flush` before attempting to encode the same metric again. Otherwise, `Ok(None)` is returned. /// - /// ## Errors + /// # Errors /// /// If the given metric is not valid for the endpoint this request builder is configured for, or if there is an /// error during compression of the encoded metric, an error will be returned. @@ -161,28 +176,27 @@ where }); } + // Make sure we haven't hit the maximum number of metrics per payload. + if self.encoded_metrics.len() >= self.max_metrics_per_payload { + return Ok(Some(metric)); + } + // Encode the metric and then see if it will fit into the current request payload. // // If not, we return the original metric, signaling to the caller that they need to flush the current request // payload before encoding additional metrics. let encoded_metric = encode_single_metric(&metric); - let previous_len = self.scratch_buf.len(); encoded_metric.write(&mut self.scratch_buf)?; - let encoded_len = self.scratch_buf.len() - previous_len; - self.scratch_buf_lens.push(encoded_len); // If the metric can't fit into the current request payload based on the uncompressed size limit, or isn't // likely to fit into the current request payload based on the estimated compressed size limit, then return it // to the caller: this indicates that a flush must happen before trying to encode the same metric again. - // - // TODO: Use of the estimated compressed size limit is a bit of a stopgap to avoid having to do full incremental - // request building. We can still improve it, but the only sure-fire way to not exceed the (un)compressed - // payload size limits is to be able to re-do the encoding/compression process in smaller chunks. + let encoded_len = self.scratch_buf.len(); let new_uncompressed_len = self.uncompressed_len + encoded_len; - if new_uncompressed_len > self.endpoint.uncompressed_size_limit() + if new_uncompressed_len > self.uncompressed_len_limit || self .compression_estimator - .would_write_exceed_threshold(encoded_len, self.endpoint.compressed_size_limit()) + .would_write_exceed_threshold(encoded_len, self.compressed_len_limit) { trace!( encoded_len, @@ -195,13 +209,10 @@ where } // Write the scratch buffer to the compressor. - self.compressor - .write_all(&self.scratch_buf[previous_len..]) - .await - .context(Io)?; + self.compressor.write_all(&self.scratch_buf[..]).await.context(Io)?; self.compression_estimator.track_write(&self.compressor, encoded_len); self.uncompressed_len += encoded_len; - self.metrics_written += 1; + self.encoded_metrics.push(metric); trace!( encoded_len, @@ -220,7 +231,7 @@ where /// /// This attempts to split the request payload into two smaller payloads if the original request payload is too large. /// - /// ## Errors + /// # Errors /// /// If an error occurs while finalizing the compressor or creating the request, an error will be returned. pub async fn flush(&mut self) -> Vec), RequestBuilderError>> { @@ -230,9 +241,6 @@ where // Clear our internal state and finalize the compressor. We do it in this order so that if finalization fails, // somehow, the request builder is in a default state and encoding can be attempted again. - let metrics_written = self.metrics_written; - self.metrics_written = 0; - let uncompressed_len = self.uncompressed_len; self.uncompressed_len = 0; @@ -240,18 +248,39 @@ where let new_compressor = create_compressor(&self.buffer_pool).await; let mut compressor = std::mem::replace(&mut self.compressor, new_compressor); + if let Err(e) = compressor.flush().await.context(Io) { + let metrics_dropped = self.clear_encoded_metrics(); + + // TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here. + error!( + metrics_dropped, + "Failed to finalize compressor while building request. Metrics have been dropped." + ); + + return vec![Err(e)]; + } + if let Err(e) = compressor.shutdown().await.context(Io) { - self.clear_scratch_buffer(); + let metrics_dropped = self.clear_encoded_metrics(); + + // TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here. + error!( + metrics_dropped, + "Failed to finalize compressor while building request. Metrics have been dropped." + ); + return vec![Err(e)]; } let buffer = compressor.into_inner().freeze(); let compressed_len = buffer.len(); - let compressed_limit = self.endpoint.compressed_size_limit(); + let compressed_limit = self.compressed_len_limit; if compressed_len > compressed_limit { // Single metric is unable to be split. - if self.scratch_buf_lens.len() == 1 { + if self.encoded_metrics.len() == 1 { + let _ = self.clear_encoded_metrics(); + return vec![Err(RequestBuilderError::PayloadTooLarge { compressed_size_bytes: compressed_len, compressed_limit_bytes: compressed_limit, @@ -261,53 +290,105 @@ where return self.split_request().await; } - debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, "Flushing request."); + let metrics_written = self.clear_encoded_metrics(); + debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, metrics_written, "Flushing request."); - self.clear_scratch_buffer(); vec![self.create_request(buffer).map(|req| (metrics_written, req))] } - fn clear_scratch_buffer(&mut self) { - self.scratch_buf.clear(); - self.scratch_buf_lens.clear(); + fn clear_encoded_metrics(&mut self) -> usize { + let len = self.encoded_metrics.len(); + self.encoded_metrics.clear(); + len } async fn split_request(&mut self) -> Vec), RequestBuilderError>> { + // Nothing to do if we have no encoded metrics. let mut requests = Vec::new(); - - if self.scratch_buf_lens.is_empty() { + if self.encoded_metrics.is_empty() { return requests; } - let lens_pivot = self.scratch_buf_lens.len() / 2; - let first_half_metrics_len = self.scratch_buf_lens.len() - lens_pivot; - - let scratch_buf_pivot = self.scratch_buf_lens.iter().take(first_half_metrics_len).sum(); - assert!(scratch_buf_pivot < self.scratch_buf.len()); - - let first_half_scratch_buf = &self.scratch_buf[0..scratch_buf_pivot]; - let second_half_scratch_buf = &self.scratch_buf[scratch_buf_pivot..]; + // We're going to attempt to split all of the previously-encoded metrics between two _new_ compressed payloads, + // with the goal that each payload will be under the compressed size limit. + // + // We achieve this by temporarily consuming the "encoded metrics" buffer, feeding the first half of it back to + // ourselves by re-encoding and then flushing, and then doing the same thing with the second half. If either + // half fails to properly encode, we give up entirely. + // + // We specifically manage the control flow so that we always restore the original "encoded metrics" buffer to + // the builder (albeit cleared) before returning, so that we don't waste its allocation as it's been sized up + // over time. + // + // We can do this by swapping it out with a new `Vec` since empty vectors don't allocate at all. + let mut encoded_metrics = std::mem::take(&mut self.encoded_metrics); + let encoded_metrics_pivot = encoded_metrics.len() / 2; - let mut compressor_half_one = create_compressor(&self.buffer_pool).await; + let first_half_encoded_metrics = &encoded_metrics[0..encoded_metrics_pivot]; + let second_half_encoded_metrics = &encoded_metrics[encoded_metrics_pivot..]; - if let Err(e) = compressor_half_one.write_all(first_half_scratch_buf).await.context(Io) { - requests.push(Err(e)); + // TODO: We're duplicating functionality here between `encode`/`flush`, but this makes it a lot easier to skip + // over the normal behavior that would do all the storing of encoded metrics, trying to split the payload, etc, + // since we want to avoid that and avoid any recursion in general. + // + // We should consider if there's a better way to split out some of this into common methods or something. + if let Some(request) = self.try_split_request(first_half_encoded_metrics).await { + requests.push(request); } - match self.finalize(compressor_half_one).await { - Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))), - Err(e) => requests.push(Err(e)), + + if let Some(request) = self.try_split_request(second_half_encoded_metrics).await { + requests.push(request); } - let mut compressor_half_two = create_compressor(&self.buffer_pool).await; - if let Err(e) = compressor_half_two.write_all(second_half_scratch_buf).await.context(Io) { - requests.push(Err(e)); + // Restore our original "encoded metrics" buffer before finishing up, but also clear it. + encoded_metrics.clear(); + self.encoded_metrics = encoded_metrics; + + requests + } + + async fn try_split_request( + &mut self, metrics: &[Metric], + ) -> Option), RequestBuilderError>> { + let mut uncompressed_len = 0; + let mut compressor = create_compressor(&self.buffer_pool).await; + + for metric in metrics { + // Encode each metric and write it to our compressor. + // + // We skip any of the typical payload size checks here, because we already know we at least fit these + // metrics into the previous attempted payload, so there's no reason to redo all of that here. + let encoded_metric = encode_single_metric(metric); + + if let Err(e) = encoded_metric.write(&mut self.scratch_buf) { + return Some(Err(e)); + } + + if let Err(e) = compressor.write_all(&self.scratch_buf[..]).await.context(Io) { + return Some(Err(e)); + } + + uncompressed_len += self.scratch_buf.len(); } - match self.finalize(compressor_half_two).await { - Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))), - Err(e) => requests.push(Err(e)), + + // Make sure we haven't exceeded our uncompressed size limit. + // + // Again, this should never happen since we've already gone through this the first time but we're just being + // extra sure here since the interface allows for it to happen. :shrug: + if uncompressed_len > self.uncompressed_len_limit { + let metrics_dropped = metrics.len(); + + // TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here. + error!(uncompressed_len, metrics_dropped, "Uncompressed size limit exceeded while splitting request. This should never occur. Metrics have been dropped."); + + return None; } - self.clear_scratch_buffer(); - requests + + Some( + self.finalize(compressor) + .await + .and_then(|buffer| self.create_request(buffer).map(|request| (metrics.len(), request))), + ) } async fn finalize( @@ -316,7 +397,7 @@ where compressor.shutdown().await.context(Io)?; let buffer = compressor.into_inner().freeze(); let compressed_len = buffer.len(); - let compressed_limit = self.endpoint.compressed_size_limit(); + let compressed_limit = self.compressed_len_limit; if compressed_len > compressed_limit { return Err(RequestBuilderError::PayloadTooLarge { compressed_size_bytes: compressed_len, @@ -365,6 +446,7 @@ impl EncodedMetric { } fn write(&self, buf: &mut Vec) -> Result<(), RequestBuilderError> { + buf.clear(); let mut output_stream = CodedOutputStream::vec(buf); // Write the field tag. @@ -561,9 +643,15 @@ fn origin_metadata_to_proto_metadata(product: u32, subproduct: u32, product_deta #[cfg(test)] mod tests { + use saluki_core::pooling::FixedSizeObjectPool; use saluki_event::metric::Metric; + use saluki_io::buf::{BytesBuffer, FixedSizeVec}; - use super::encode_sketch_metric; + use super::{encode_sketch_metric, MetricsEndpoint, RequestBuilder}; + + fn create_request_builder_buffer_pool() -> FixedSizeObjectPool { + FixedSizeObjectPool::with_builder("test_pool", 8, || FixedSizeVec::with_capacity(64)) + } #[test] fn histogram_vs_sketch_identical_payload() { @@ -581,4 +669,74 @@ mod tests { assert_eq!(histogram_payload, distribution_payload); } + + #[tokio::test] + async fn split_oversized_request() { + // Generate some metrics that will exceed the compressed size limit. + let counter1 = Metric::counter(("abcdefg", &["345", "678"][..]), 1.0); + let counter2 = Metric::counter(("hijklmn", &["9!@", "#$%"][..]), 1.0); + let counter3 = Metric::counter(("opqrstu", &["^&*", "()A"][..]), 1.0); + let counter4 = Metric::counter(("vwxyz12", &["BCD", "EFG"][..]), 1.0); + + // Create a regular ol' request builder with normal (un)compressed size limits. + let buffer_pool = create_request_builder_buffer_pool(); + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX) + .await + .expect("should not fail to create request builder"); + + // Encode the metrics, which should all fit into the request payload. + let metrics = vec![counter1, counter2, counter3, counter4]; + for metric in metrics { + match request_builder.encode(metric).await { + Ok(None) => {} + Ok(Some(_)) => panic!("initial encode should never fail to fit encoded metric payload"), + Err(e) => panic!("initial encode should never fail: {}", e), + } + } + + // Now we attempt to flush, but first, we'll adjust our limits to force the builder to split the request. + // + // We've chosen 96 because it's just under where the compressor should land when compressing all four metrics. + // This value may need to change in the future if we change to a different compression algorithm. + request_builder.set_custom_len_limits(MetricsEndpoint::Series.compressed_size_limit(), 96); + let requests = request_builder.flush().await; + assert_eq!(requests.len(), 2); + } + + #[tokio::test] + async fn obeys_max_metrics_per_payload() { + // Generate some simple metrics. + let counter1 = Metric::counter(("abcdefg", &["345", "678"][..]), 1.0); + let counter2 = Metric::counter(("hijklmn", &["9!@", "#$%"][..]), 1.0); + let counter3 = Metric::counter(("opqrstu", &["^&*", "()A"][..]), 1.0); + + // Create a regular ol' request builder with normal (un)compressed size limits, and no limit on the number of + // metrics per payload. + // + // We should be able to encode the three metrics without issue. + let buffer_pool = create_request_builder_buffer_pool(); + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, usize::MAX) + .await + .expect("should not fail to create request builder"); + + assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter3.clone()).await.unwrap()); + + // Now create a request builder with normal (un)compressed size limits, but a limit of 2 metrics per payload. + // + // We should only be able to encode two of the three metrics before we're signaled to flush. + let buffer_pool = create_request_builder_buffer_pool(); + let mut request_builder = RequestBuilder::new(MetricsEndpoint::Series, buffer_pool, 2) + .await + .expect("should not fail to create request builder"); + + assert_eq!(None, request_builder.encode(counter1.clone()).await.unwrap()); + assert_eq!(None, request_builder.encode(counter2.clone()).await.unwrap()); + assert_eq!(Some(counter3.clone()), request_builder.encode(counter3).await.unwrap()); + + // Since we know we could fit the same three metrics in the first request builder when there was no limit on the + // number of metrics per payload, we know we're not being instructed to flush here due to hitting (un)compressed + // size limits. + } } diff --git a/lib/saluki-components/src/destinations/datadog/status_flare/mod.rs b/lib/saluki-components/src/destinations/datadog/status_flare/mod.rs index ce76c101..f56cd566 100644 --- a/lib/saluki-components/src/destinations/datadog/status_flare/mod.rs +++ b/lib/saluki-components/src/destinations/datadog/status_flare/mod.rs @@ -82,7 +82,7 @@ impl MemoryBounds for DatadogStatusFlareConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::(); + .with_single_value::("component struct"); } } diff --git a/lib/saluki-components/src/destinations/prometheus/mod.rs b/lib/saluki-components/src/destinations/prometheus/mod.rs index 30e5ceec..723e7840 100644 --- a/lib/saluki-components/src/destinations/prometheus/mod.rs +++ b/lib/saluki-components/src/destinations/prometheus/mod.rs @@ -91,20 +91,21 @@ impl MemoryBounds for PrometheusConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::() + .with_single_value::("component struct") // This isn't _really_ bounded since the string buffer could definitely grow larger if the metric name was // larger, but the default buffer size is far beyond any typical metric name that it should almost never // grow beyond this initially allocated size. - .with_fixed_amount(NAME_NORMALIZATION_BUFFER_SIZE); + .with_fixed_amount("name normalization buffer size", NAME_NORMALIZATION_BUFFER_SIZE); + builder .firm() // Even though our context map is really the Prometheus context to a map of context/value pairs, we're just // simplifying things here because the ratio of true "contexts" to Prometheus contexts should be very high, // high enough to make this a reasonable approximation. - .with_map::(CONTEXT_LIMIT) - .with_fixed_amount(PAYLOAD_SIZE_LIMIT_BYTES) - .with_fixed_amount(PAYLOAD_BUFFER_SIZE_LIMIT_BYTES) - .with_fixed_amount(TAGS_BUFFER_SIZE_LIMIT_BYTES); + .with_map::("state map", CONTEXT_LIMIT) + .with_fixed_amount("payload size", PAYLOAD_SIZE_LIMIT_BYTES) + .with_fixed_amount("payload buffer", PAYLOAD_BUFFER_SIZE_LIMIT_BYTES) + .with_fixed_amount("tags buffer", TAGS_BUFFER_SIZE_LIMIT_BYTES); } } diff --git a/lib/saluki-components/src/sources/dogstatsd/mod.rs b/lib/saluki-components/src/sources/dogstatsd/mod.rs index 12a881f7..fdcc847d 100644 --- a/lib/saluki-components/src/sources/dogstatsd/mod.rs +++ b/lib/saluki-components/src/sources/dogstatsd/mod.rs @@ -4,7 +4,7 @@ use std::{num::NonZeroUsize, time::Duration}; use async_trait::async_trait; use bytes::{Buf, BufMut}; use bytesize::ByteSize; -use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr}; use metrics::{Counter, Gauge, Histogram}; use saluki_config::GenericConfiguration; use saluki_context::{ContextResolver, ContextResolverBuilder}; @@ -352,12 +352,19 @@ impl MemoryBounds for DogStatsDConfiguration { builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::() + .with_single_value::("source struct") // We allocate our I/O buffers entirely up front. - .with_fixed_amount(self.buffer_count * get_adjusted_buffer_size(self.buffer_size)) + .with_expr(UsageExpr::product( + "buffers", + UsageExpr::config("dogstatsd_buffer_count", self.buffer_count), + UsageExpr::config("dogstatsd_buffer_size", get_adjusted_buffer_size(self.buffer_size)), + )) // We also allocate the backing storage for the string interner up front, which is used by our context // resolver. - .with_fixed_amount(self.context_string_interner_bytes.as_u64() as usize); + .with_expr(UsageExpr::config( + "dogstatsd_string_interner_size", + self.context_string_interner_bytes.as_u64() as usize, + )); } } diff --git a/lib/saluki-components/src/sources/internal_metrics/mod.rs b/lib/saluki-components/src/sources/internal_metrics/mod.rs index 53f2ecd5..e4f2a8ba 100644 --- a/lib/saluki-components/src/sources/internal_metrics/mod.rs +++ b/lib/saluki-components/src/sources/internal_metrics/mod.rs @@ -33,7 +33,9 @@ impl SourceBuilder for InternalMetricsConfiguration { impl MemoryBounds for InternalMetricsConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // Capture the size of the heap allocation when the component is built. - builder.minimum().with_single_value::(); + builder + .minimum() + .with_single_value::("component struct"); } } diff --git a/lib/saluki-components/src/transforms/aggregate/mod.rs b/lib/saluki-components/src/transforms/aggregate/mod.rs index 3c8c1e14..615e4877 100644 --- a/lib/saluki-components/src/transforms/aggregate/mod.rs +++ b/lib/saluki-components/src/transforms/aggregate/mod.rs @@ -1,16 +1,19 @@ -use std::{num::NonZeroU64, time::Duration}; +use std::{ + num::NonZeroU64, + time::{Duration, Instant}, +}; use async_trait::async_trait; use hashbrown::{hash_map::Entry, HashMap}; -use memory_accounting::{MemoryBounds, MemoryBoundsBuilder}; +use memory_accounting::{MemoryBounds, MemoryBoundsBuilder, UsageExpr}; use saluki_config::GenericConfiguration; use saluki_context::Context; use saluki_core::{ components::{transforms::*, ComponentContext}, observability::ComponentMetricsExt as _, - pooling::ObjectPool, + pooling::{ElasticObjectPool, ObjectPool}, topology::{ - interconnect::{BufferedForwarder, FixedSizeEventBuffer}, + interconnect::{BufferedForwarder, FixedSizeEventBuffer, FixedSizeEventBufferInner, Forwarder}, OutputDefinition, }, }; @@ -20,7 +23,10 @@ use saluki_event::{metric::*, DataType, Event}; use saluki_metrics::MetricsBuilder; use serde::Deserialize; use smallvec::SmallVec; -use tokio::{select, time::interval_at}; +use tokio::{ + select, + time::{interval, interval_at}, +}; use tracing::{debug, error, trace}; mod telemetry; @@ -29,11 +35,14 @@ use self::telemetry::Telemetry; mod config; use self::config::HistogramConfiguration; +const PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL: Duration = Duration::from_secs(2); +const PASSTHROUGH_EVENT_BUFFERS_MAX: usize = 16; + const fn default_window_duration() -> Duration { Duration::from_secs(10) } -const fn default_flush_interval() -> Duration { +const fn default_primary_flush_interval() -> Duration { Duration::from_secs(15) } @@ -45,10 +54,18 @@ const fn default_counter_expiry_seconds() -> Option { Some(300) } -const fn default_forward_timestamped_metrics() -> bool { +const fn default_passthrough_timestamped_metrics() -> bool { true } +const fn default_passthrough_idle_flush_timeout() -> Duration { + Duration::from_secs(1) +} + +const fn default_passthrough_event_buffer_len() -> usize { + 2048 +} + /// Aggregate transform. /// /// Aggregates metrics into fixed-size windows, flushing them at a regular interval. @@ -86,8 +103,8 @@ pub struct AggregateConfiguration { /// systems, etc) and the frequency of updates (how often updates to a metric are emitted). /// /// Defaults to 15 seconds. - #[serde(rename = "aggregate_flush_interval", default = "default_flush_interval")] - flush_interval: Duration, + #[serde(rename = "aggregate_flush_interval", default = "default_primary_flush_interval")] + primary_flush_interval: Duration, /// Maximum number of contexts to aggregate per window. /// @@ -131,7 +148,7 @@ pub struct AggregateConfiguration { #[serde(alias = "dogstatsd_expiry_seconds", default = "default_counter_expiry_seconds")] counter_expiry_seconds: Option, - /// Whether or not to immediately forward metrics with pre-defined timestamps. + /// Whether or not to immediately forward (passthrough) metrics with pre-defined timestamps. /// /// When enabled, this causes the aggregator to immediately forward metrics that already have a timestamp present. /// Only metrics without a timestamp will be aggregated. This can be useful when metrics are already pre-aggregated @@ -141,9 +158,35 @@ pub struct AggregateConfiguration { /// Defaults to `true`. #[serde( rename = "dogstatsd_no_aggregation_pipeline", - default = "default_forward_timestamped_metrics" + default = "default_passthrough_timestamped_metrics" + )] + passthrough_timestamped_metrics: bool, + + /// How often to flush buffered passthrough metrics. + /// + /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in + /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum + /// amount of time that passthrough metrics will be buffered before being forwarded. + /// + /// Defaults to 1 seconds. + #[serde( + rename = "aggregate_passthrough_idle_flush_timeout", + default = "default_passthrough_idle_flush_timeout" )] - forward_timestamped_metrics: bool, + passthrough_idle_flush_timeout: Duration, + + /// Length of event buffers used exclusive for passthrough metrics. + /// + /// While passthrough metrics are not re-aggregated by the transform, they will still be temporarily buffered in + /// order to optimize the efficiency of processing them in the next component. This setting controls the maximum + /// number of passthrough metrics that can be buffered in a single batch before being forwarded. + /// + /// Defaults to 2048. + #[serde( + rename = "dogstatsd_no_aggregation_pipeline_batch_size", + default = "default_passthrough_event_buffer_len" + )] + passthrough_event_buffer_len: usize, /// Histogram aggregation configuration. /// @@ -163,11 +206,13 @@ impl AggregateConfiguration { pub fn with_defaults() -> Self { Self { window_duration: default_window_duration(), - flush_interval: default_flush_interval(), + primary_flush_interval: default_primary_flush_interval(), context_limit: default_context_limit(), flush_open_windows: false, counter_expiry_seconds: default_counter_expiry_seconds(), - forward_timestamped_metrics: default_forward_timestamped_metrics(), + passthrough_timestamped_metrics: default_passthrough_timestamped_metrics(), + passthrough_idle_flush_timeout: default_passthrough_idle_flush_timeout(), + passthrough_event_buffer_len: default_passthrough_event_buffer_len(), hist_config: HistogramConfiguration::default(), } } @@ -175,15 +220,26 @@ impl AggregateConfiguration { #[async_trait] impl TransformBuilder for AggregateConfiguration { - async fn build(&self, _context: ComponentContext) -> Result, GenericError> { + async fn build(&self, context: ComponentContext) -> Result, GenericError> { + let metrics_builder = MetricsBuilder::from_component_context(context); + let telemetry = Telemetry::new(&metrics_builder); + + let state = AggregationState::new( + self.window_duration, + self.context_limit, + self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs), + self.hist_config.clone(), + telemetry.clone(), + ); + Ok(Box::new(Aggregate { - window_duration: self.window_duration, - flush_interval: self.flush_interval, - context_limit: self.context_limit, + state, + telemetry, + primary_flush_interval: self.primary_flush_interval, flush_open_windows: self.flush_open_windows, - counter_expiry_seconds: self.counter_expiry_seconds, - forward_timestamped_metrics: self.forward_timestamped_metrics, - hist_config: self.hist_config.clone(), + passthrough_timestamped_metrics: self.passthrough_timestamped_metrics, + passthrough_idle_flush_timeout: self.passthrough_idle_flush_timeout, + passthrough_event_buffer_len: self.passthrough_event_buffer_len, })) } @@ -208,25 +264,43 @@ impl MemoryBounds for AggregateConfiguration { // // However, there could be many more values in a single metric, and we don't account for that. + let passthrough_event_buffer_min_elements = self.passthrough_event_buffer_len; + let passthrough_event_buffer_max_elements = + self.passthrough_event_buffer_len * (PASSTHROUGH_EVENT_BUFFERS_MAX - 1); + builder .minimum() // Capture the size of the heap allocation when the component is built. - .with_single_value::(); + .with_single_value::("component struct") + .with_array::( + "passthrough event buffer pool (minimum)", + passthrough_event_buffer_min_elements, + ); builder .firm() // Account for the aggregation state map, where we map contexts to the merged metric. - .with_map::(self.context_limit); + .with_expr(UsageExpr::product( + "aggregation state map", + UsageExpr::sum( + "context map entry", + UsageExpr::struct_size::("context"), + UsageExpr::struct_size::("aggregated metric"), + ), + UsageExpr::config("aggregate_context_limit", self.context_limit), + )) + // Upper bound of our passthrough event buffer object pool. + .with_array::("passthrough event buffer pool", passthrough_event_buffer_max_elements); } } pub struct Aggregate { - window_duration: Duration, - flush_interval: Duration, - context_limit: usize, + state: AggregationState, + telemetry: Telemetry, + primary_flush_interval: Duration, flush_open_windows: bool, - counter_expiry_seconds: Option, - forward_timestamped_metrics: bool, - hist_config: HistogramConfiguration, + passthrough_timestamped_metrics: bool, + passthrough_idle_flush_timeout: Duration, + passthrough_event_buffer_len: usize, } #[async_trait] @@ -234,44 +308,44 @@ impl Transform for Aggregate { async fn run(mut self: Box, mut context: TransformContext) -> Result<(), GenericError> { let mut health = context.take_health_handle(); - let metrics_builder = MetricsBuilder::from_component_context(context.component_context()); - let telemetry = Telemetry::new(&metrics_builder); - - let mut state = AggregationState::new( - self.window_duration, - self.context_limit, - self.counter_expiry_seconds.filter(|s| *s != 0).map(Duration::from_secs), - self.hist_config, - telemetry.clone(), + let mut primary_flush = interval_at( + tokio::time::Instant::now() + self.primary_flush_interval, + self.primary_flush_interval, ); + let mut final_primary_flush = false; - let mut flush = interval_at(tokio::time::Instant::now() + self.flush_interval, self.flush_interval); - - let metrics_builder = MetricsBuilder::from_component_context(context.component_context()); - let telemetry = Telemetry::new(&metrics_builder); + let passthrough_flush = interval(PASSTHROUGH_IDLE_FLUSH_CHECK_INTERVAL); + let mut passthrough_batcher = PassthroughBatcher::new( + self.passthrough_event_buffer_len, + self.passthrough_idle_flush_timeout, + self.telemetry.clone(), + ) + .await; health.mark_ready(); debug!("Aggregation transform started."); - let mut final_flush = false; + tokio::pin!(passthrough_flush); loop { select! { _ = health.live() => continue, - _ = flush.tick() => { + _ = primary_flush.tick() => { // We've reached the end of the current window. Flush our aggregation state and forward the metrics // onwards. Regardless of whether any metrics were aggregated, we always update the aggregation // state to track the start time of the current aggregation window. - if !state.is_empty() { + if !self.state.is_empty() { debug!("Flushing aggregated metrics..."); - let should_flush_open_windows = final_flush && self.flush_open_windows; + let should_flush_open_windows = final_primary_flush && self.flush_open_windows; let mut forwarder = context.forwarder().buffered().expect("default output should always exist"); - if let Err(e) = state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await { + if let Err(e) = self.state.flush(get_unix_timestamp(), should_flush_open_windows, &mut forwarder).await { error!(error = %e, "Failed to flush aggregation state."); } + self.telemetry.increment_flushes(); + match forwarder.flush().await { Ok(aggregated_events) => debug!(aggregated_events, "Forwarded events."), Err(e) => error!(error = %e, "Failed to flush aggregated events."), @@ -279,56 +353,60 @@ impl Transform for Aggregate { } // If this is the final flush, we break out of the loop. - if final_flush { + if final_primary_flush { debug!("All aggregation complete."); break } }, - maybe_events = context.event_stream().next(), if !final_flush => match maybe_events { + _ = passthrough_flush.tick() => passthrough_batcher.try_flush(context.forwarder()).await, + maybe_events = context.event_stream().next(), if !final_primary_flush => match maybe_events { Some(events) => { trace!(events_len = events.len(), "Received events."); - let mut forwarder = context.forwarder().buffered().expect("default output should always exist"); let current_time = get_unix_timestamp(); + let mut processed_passthrough_metrics = false; for event in events { if let Some(metric) = event.try_into_metric() { - let metric = if self.forward_timestamped_metrics { - // If we're configured to forward timestamped metrics immediately, then we need to - // try to handle any timestamped values in this metric. If we get back `Some(...)`, - // it's either the original metric because no values had timestamps _or_ it's a - // modified version of the metric after all timestamped values were split out and - // directly forwarded. - match handle_forward_timestamped_metric(metric, &mut forwarder, &telemetry).await { - Ok(None) => continue, - Ok(Some(metric)) => metric, - Err(e) => { - error!(error = %e, "Failed to handle timestamped metric."); - continue; - } + let metric = if self.passthrough_timestamped_metrics { + // Try splitting out any timestamped values, and if we have any, we'll buffer them + // separately and process the remaining nontimestamped metric (if any) by + // aggregating it like normal. + let (maybe_timestamped_metric, maybe_nontimestamped_metric) = try_split_timestamped_values(metric); + + // If we have a timestamped metric, then batch it up out-of-band. + if let Some(timestamped_metric) = maybe_timestamped_metric { + passthrough_batcher.push_metric(timestamped_metric, context.forwarder()).await; + processed_passthrough_metrics = true; + } + + // If we have an nontimestamped metric, we'll process it like normal. + // + // Otherwise, continue to the next event. + match maybe_nontimestamped_metric { + Some(metric) => metric, + None => continue, } } else { metric }; - if !state.insert(current_time, metric) { + if !self.state.insert(current_time, metric) { trace!("Dropping metric due to context limit."); - telemetry.increment_events_dropped(); + self.telemetry.increment_events_dropped(); } } } - match forwarder.flush().await { - Ok(unaggregated_events) => debug!(unaggregated_events, "Forwarded events."), - Err(e) => error!(error = %e, "Failed to flush unaggregated events."), + if processed_passthrough_metrics { + passthrough_batcher.update_last_processed_at(); } }, None => { // We've reached the end of our input stream, so mark ourselves for a final flush and reset the // interval so it ticks immediately on the next loop iteration. - final_flush = true; - - flush.reset_immediately(); + final_primary_flush = true; + primary_flush.reset_immediately(); debug!("Aggregation transform stopping..."); } @@ -336,38 +414,119 @@ impl Transform for Aggregate { } } + // Do a final flush of any timestamped metrics that we've buffered up. + passthrough_batcher.try_flush(context.forwarder()).await; + debug!("Aggregation transform stopped."); Ok(()) } } -async fn handle_forward_timestamped_metric( - mut metric: Metric, forwarder: &mut BufferedForwarder<'_, O>, telemetry: &Telemetry, -) -> Result, GenericError> -where - O: ObjectPool, -{ +fn try_split_timestamped_values(mut metric: Metric) -> (Option, Option) { if metric.values().all_timestamped() { - // All the values are timestamped, so take and forward the metric as-is. - forwarder.push(Event::Metric(metric)).await?; - - telemetry.increment_passthrough_metrics(); - - Ok(None) + (Some(metric), None) } else if metric.values().any_timestamped() { - // Only _some_ of the values are timestamped, so split out those timestamped ones, forward them, and then hand - // back the now-modified original metric. + // Only _some_ of the values are timestamped, so we'll split the timestamped values into a new metric. let new_metric_values = metric.values_mut().split_timestamped(); let new_metric = Metric::from_parts(metric.context().clone(), new_metric_values, metric.metadata().clone()); - forwarder.push(Event::Metric(new_metric)).await?; - telemetry.increment_passthrough_metrics(); - - Ok(Some(metric)) + (Some(new_metric), Some(metric)) } else { // No timestamped values, so we need to aggregate this metric. - Ok(Some(metric)) + (None, Some(metric)) + } +} + +struct PassthroughBatcher { + buffer_pool: ElasticObjectPool, + active_buffer: FixedSizeEventBuffer, + active_buffer_start: Instant, + last_processed_at: Instant, + idle_flush_timeout: Duration, + telemetry: Telemetry, +} + +impl PassthroughBatcher { + async fn new(event_buffer_len: usize, idle_flush_timeout: Duration, telemetry: Telemetry) -> Self { + let (buffer_pool, pool_shrinker) = ElasticObjectPool::::with_builder( + "agg_passthrough_event_buffers", + 1, + PASSTHROUGH_EVENT_BUFFERS_MAX, + move || FixedSizeEventBufferInner::with_capacity(event_buffer_len), + ); + tokio::spawn(pool_shrinker); + + let active_buffer = buffer_pool.acquire().await; + + Self { + buffer_pool, + active_buffer, + active_buffer_start: Instant::now(), + last_processed_at: Instant::now(), + idle_flush_timeout, + telemetry, + } + } + + async fn push_metric(&mut self, metric: Metric, forwarder: &Forwarder) { + // Try pushing the metric into our active buffer. + // + // If our active buffer is full, then we'll flush the buffer, grab a new one, and push the metric into it. + if let Some(event) = self.active_buffer.try_push(Event::Metric(metric)) { + debug!("Passthrough event buffer was full. Flushing..."); + self.forward_events(forwarder).await; + + if self.active_buffer.try_push(event).is_some() { + error!("Event buffer is full even after forwarding events. Dropping event."); + self.telemetry.increment_events_dropped(); + return; + } + } + + // If this is the first metric in the buffer, we've started a new batch, so track when it started. + if self.active_buffer.len() == 1 { + self.active_buffer_start = Instant::now(); + } + + self.telemetry.increment_passthrough_metrics(); + } + + fn update_last_processed_at(&mut self) { + // We expose this as a standalone method, rather than just doing it automatically in `push_metric`, because + // otherwise we might be calling this 10-20K times per second, instead of simply doing it after the end of each + // input event buffer in the transform's main loop, which should be much less frequent. + self.last_processed_at = Instant::now(); + } + + async fn try_flush(&mut self, forwarder: &Forwarder) { + // If our active buffer isn't empty, and we've exceeded our idle flush timeout, then flush the buffer. + if !self.active_buffer.is_empty() && self.last_processed_at.elapsed() >= self.idle_flush_timeout { + debug!("Passthrough processing exceeded idle flush timeout. Flushing..."); + + self.forward_events(forwarder).await; + } + } + + async fn forward_events(&mut self, forwarder: &Forwarder) { + if !self.active_buffer.is_empty() { + let unaggregated_events = self.active_buffer.len(); + + // Track how long this batch was alive for. + let batch_duration = self.active_buffer_start.elapsed(); + self.telemetry.record_passthrough_batch_duration(batch_duration); + + self.telemetry.increment_passthrough_flushes(); + + // Swap our active buffer with a new, empty one, and then forward the old one. + let new_active_buffer = self.buffer_pool.acquire().await; + let old_active_buffer = std::mem::replace(&mut self.active_buffer, new_active_buffer); + + match forwarder.forward_buffer(old_active_buffer).await { + Ok(()) => debug!(unaggregated_events, "Forwarded events."), + Err(e) => error!(error = %e, "Failed to flush unaggregated events."), + } + } } } diff --git a/lib/saluki-components/src/transforms/aggregate/telemetry.rs b/lib/saluki-components/src/transforms/aggregate/telemetry.rs index c71ea823..c022ea32 100644 --- a/lib/saluki-components/src/transforms/aggregate/telemetry.rs +++ b/lib/saluki-components/src/transforms/aggregate/telemetry.rs @@ -1,4 +1,6 @@ -use metrics::{Counter, Gauge}; +use std::time::Duration; + +use metrics::{Counter, Gauge, Histogram}; use saluki_event::metric::MetricValues; use saluki_metrics::MetricsBuilder; @@ -52,8 +54,11 @@ impl MetricTypedGauge { pub struct Telemetry { active_contexts: Gauge, active_contexts_by_type: MetricTypedGauge, - passthrough_metrics: Counter, events_dropped: Counter, + flushes: Counter, + passthrough_metrics: Counter, + passthrough_flushes: Counter, + passthrough_batch_duration: Histogram, } impl Telemetry { @@ -61,9 +66,12 @@ impl Telemetry { Self { active_contexts: builder.register_debug_gauge("aggregate_active_contexts"), active_contexts_by_type: MetricTypedGauge::new(builder, "aggregate_active_contexts_by_type"), - passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"), events_dropped: builder .register_debug_counter_with_tags("component_events_dropped_total", ["intentional:true"]), + flushes: builder.register_debug_counter("aggregate_flushes_total"), + passthrough_metrics: builder.register_debug_counter("aggregate_passthrough_metrics_total"), + passthrough_flushes: builder.register_debug_counter("aggregate_passthrough_flushes_total"), + passthrough_batch_duration: builder.register_debug_histogram("aggregate_passthrough_batch_duration_secs"), } } @@ -72,8 +80,11 @@ impl Telemetry { Self { active_contexts: Gauge::noop(), active_contexts_by_type: MetricTypedGauge::noop(), - passthrough_metrics: Counter::noop(), events_dropped: Counter::noop(), + flushes: Counter::noop(), + passthrough_metrics: Counter::noop(), + passthrough_flushes: Counter::noop(), + passthrough_batch_duration: Histogram::noop(), } } @@ -87,11 +98,23 @@ impl Telemetry { self.active_contexts_by_type.for_values(values).decrement(1); } + pub fn increment_events_dropped(&self) { + self.events_dropped.increment(1); + } + + pub fn increment_flushes(&self) { + self.flushes.increment(1); + } + pub fn increment_passthrough_metrics(&self) { self.passthrough_metrics.increment(1); } - pub fn increment_events_dropped(&self) { - self.events_dropped.increment(1); + pub fn increment_passthrough_flushes(&self) { + self.passthrough_flushes.increment(1); + } + + pub fn record_passthrough_batch_duration(&self, duration: Duration) { + self.passthrough_batch_duration.record(duration.as_secs_f64()); } } diff --git a/lib/saluki-components/src/transforms/chained/mod.rs b/lib/saluki-components/src/transforms/chained/mod.rs index 6f2e00ce..bf93ad8f 100644 --- a/lib/saluki-components/src/transforms/chained/mod.rs +++ b/lib/saluki-components/src/transforms/chained/mod.rs @@ -39,7 +39,7 @@ impl ChainedConfiguration { impl MemoryBounds for ChainedConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // Capture the size of the heap allocation when the component is built. - builder.minimum().with_single_value::(); + builder.minimum().with_single_value::("component struct"); for (subtransform_id, subtransform_builder) in self.subtransform_builders.iter() { let mut subtransform_bounds_builder = builder.subcomponent(subtransform_id); diff --git a/lib/saluki-components/src/transforms/dogstatsd_prefix_filter/mod.rs b/lib/saluki-components/src/transforms/dogstatsd_prefix_filter/mod.rs index cf8a722b..def20959 100644 --- a/lib/saluki-components/src/transforms/dogstatsd_prefix_filter/mod.rs +++ b/lib/saluki-components/src/transforms/dogstatsd_prefix_filter/mod.rs @@ -96,7 +96,9 @@ impl TransformBuilder for DogstatsDPrefixFilterConfiguration { impl MemoryBounds for DogstatsDPrefixFilterConfiguration { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // Capture the size of the heap allocation when the component is built. - builder.minimum().with_single_value::(); + builder + .minimum() + .with_single_value::("component struct"); } } diff --git a/lib/saluki-components/src/transforms/host_enrichment/mod.rs b/lib/saluki-components/src/transforms/host_enrichment/mod.rs index e00ac7ea..5308c6bf 100644 --- a/lib/saluki-components/src/transforms/host_enrichment/mod.rs +++ b/lib/saluki-components/src/transforms/host_enrichment/mod.rs @@ -45,7 +45,9 @@ impl MemoryBounds for HostEnrichmentConfiguration { // Not a relevant problem _right now_, but a _potential_ problem in the future. :shrug: // Capture the size of the heap allocation when the component is built. - builder.minimum().with_single_value::(); + builder + .minimum() + .with_single_value::("component struct"); } } diff --git a/lib/saluki-core/src/topology/blueprint.rs b/lib/saluki-core/src/topology/blueprint.rs index a5cc7fcb..cb701912 100644 --- a/lib/saluki-core/src/topology/blueprint.rs +++ b/lib/saluki-core/src/topology/blueprint.rs @@ -67,10 +67,10 @@ impl TopologyBlueprint { let mut event_buffer_bounds_builder = bounds_builder.subcomponent("buffer_pools/event_buffer"); event_buffer_bounds_builder .minimum() - .with_fixed_amount(GLOBAL_EVENT_BUFFER_POOL_SIZE_MIN); + .with_fixed_amount("global event buffer pool", GLOBAL_EVENT_BUFFER_POOL_SIZE_MIN); event_buffer_bounds_builder .firm() - .with_fixed_amount(GLOBAL_EVENT_BUFFER_POOL_SIZE_FIRM); + .with_fixed_amount("global event buffer pool", GLOBAL_EVENT_BUFFER_POOL_SIZE_FIRM); Self { graph: Graph::default(), @@ -87,7 +87,7 @@ impl TopologyBlueprint { .get_or_create("interconnects") .bounds_builder() .minimum() - .with_array::(128); + .with_array::("fixed size event buffers", 128); } /// Adds a source component to the blueprint. diff --git a/lib/saluki-env/src/host/providers/fixed.rs b/lib/saluki-env/src/host/providers/fixed.rs index e2483c04..991feb0d 100644 --- a/lib/saluki-env/src/host/providers/fixed.rs +++ b/lib/saluki-env/src/host/providers/fixed.rs @@ -37,6 +37,6 @@ impl HostProvider for FixedHostProvider { impl MemoryBounds for FixedHostProvider { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - builder.minimum().with_single_value::(); + builder.minimum().with_single_value::("component struct"); } } diff --git a/lib/saluki-env/src/host/providers/remote_agent.rs b/lib/saluki-env/src/host/providers/remote_agent.rs index 4ea2fd43..a3a4090b 100644 --- a/lib/saluki-env/src/host/providers/remote_agent.rs +++ b/lib/saluki-env/src/host/providers/remote_agent.rs @@ -54,6 +54,6 @@ impl HostProvider for RemoteAgentHostProvider { impl MemoryBounds for RemoteAgentHostProvider { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { - builder.minimum().with_single_value::(); + builder.minimum().with_single_value::("component struct"); } } diff --git a/lib/saluki-env/src/workload/aggregator.rs b/lib/saluki-env/src/workload/aggregator.rs index c3dc02d0..0e0723c6 100644 --- a/lib/saluki-env/src/workload/aggregator.rs +++ b/lib/saluki-env/src/workload/aggregator.rs @@ -105,7 +105,7 @@ impl MemoryBounds for MetadataAggregator { builder .firm() // Operations channel. - .with_array::(OPERATIONS_CHANNEL_SIZE); + .with_array::("metadata ops channel", OPERATIONS_CHANNEL_SIZE); for store in &self.stores { builder.with_subcomponent(store.name(), store); diff --git a/lib/saluki-env/src/workload/collectors/cgroups.rs b/lib/saluki-env/src/workload/collectors/cgroups.rs index 835c2049..70fe1bf4 100644 --- a/lib/saluki-env/src/workload/collectors/cgroups.rs +++ b/lib/saluki-env/src/workload/collectors/cgroups.rs @@ -105,9 +105,9 @@ impl MemoryBounds for CgroupsMetadataCollector { builder .minimum() // Pre-allocated operation batch buffer. This is only the minimum, as it could grow larger. - .with_array::(64); + .with_array::("metadata operations", 64); // TODO: Kind of a throwaway calculation because nothing about the reader can really be bounded at the moment. - builder.firm().with_fixed_amount(std::mem::size_of::()); + builder.firm().with_single_value::("component struct"); } } @@ -117,21 +117,18 @@ fn traverse_cgroups( let start = std::time::Instant::now(); let child_cgroups = reader.get_child_cgroups(); + let child_cgroups_len = child_cgroups.len(); for child_cgroup in child_cgroups { - let cgroup_name = child_cgroup.name; - let container_id = child_cgroup.container_id; - debug!(%container_id, %cgroup_name, "Found container control group."); - // Create an ancestry link between the container inode and the container ID. - let entity_id = EntityId::ContainerInode(child_cgroup.ino); - let ancestor_entity_id = EntityId::Container(container_id); + let entity_id = EntityId::ContainerInode(child_cgroup.inode()); + let ancestor_entity_id = EntityId::Container(child_cgroup.into_container_id()); let operation = MetadataOperation::link_ancestor(entity_id, ancestor_entity_id); operations.push(operation); } let elapsed = start.elapsed(); - tracing::info!(elapsed = ?elapsed, "Traversed cgroups."); + debug!(elapsed = ?elapsed, child_cgroups_len, "Traversed cgroups."); Ok((reader, operations)) } diff --git a/lib/saluki-env/src/workload/collectors/containerd.rs b/lib/saluki-env/src/workload/collectors/containerd.rs index 331a86a4..07143158 100644 --- a/lib/saluki-env/src/workload/collectors/containerd.rs +++ b/lib/saluki-env/src/workload/collectors/containerd.rs @@ -94,7 +94,9 @@ impl MemoryBounds for ContainerdMetadataCollector { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // TODO: Kind of a throwaway calculation because nothing about the gRPC client can really be bounded at the // moment, and we also don't have any way to know the number of namespaces we'll be monitoring a priori. - builder.firm().with_fixed_amount(std::mem::size_of::()); + builder + .firm() + .with_fixed_amount("self struct", std::mem::size_of::()); } } diff --git a/lib/saluki-env/src/workload/collectors/remote_agent/tagger.rs b/lib/saluki-env/src/workload/collectors/remote_agent/tagger.rs index 187d020d..aecaf56a 100644 --- a/lib/saluki-env/src/workload/collectors/remote_agent/tagger.rs +++ b/lib/saluki-env/src/workload/collectors/remote_agent/tagger.rs @@ -168,7 +168,9 @@ impl MemoryBounds for RemoteAgentTaggerMetadataCollector { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // TODO: Kind of a throwaway calculation because nothing about the gRPC client can really be bounded at the // moment. - builder.firm().with_fixed_amount(std::mem::size_of::()); + builder + .firm() + .with_fixed_amount("self struct", std::mem::size_of::()); } } diff --git a/lib/saluki-env/src/workload/collectors/remote_agent/workloadmeta.rs b/lib/saluki-env/src/workload/collectors/remote_agent/workloadmeta.rs index 91f9d1c3..448e8e67 100644 --- a/lib/saluki-env/src/workload/collectors/remote_agent/workloadmeta.rs +++ b/lib/saluki-env/src/workload/collectors/remote_agent/workloadmeta.rs @@ -96,7 +96,9 @@ impl MemoryBounds for RemoteAgentWorkloadMetadataCollector { fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) { // TODO: Kind of a throwaway calculation because nothing about the gRPC client can really be bounded at the // moment. - builder.firm().with_fixed_amount(std::mem::size_of::()); + builder + .firm() + .with_fixed_amount("self struct", std::mem::size_of::()); } } diff --git a/lib/saluki-env/src/workload/helpers/cgroups.rs b/lib/saluki-env/src/workload/helpers/cgroups.rs index 50428131..575e26ec 100644 --- a/lib/saluki-env/src/workload/helpers/cgroups.rs +++ b/lib/saluki-env/src/workload/helpers/cgroups.rs @@ -13,7 +13,7 @@ use regex::Regex; use saluki_config::GenericConfiguration; use saluki_error::{generic_error, ErrorContext as _, GenericError}; use stringtheory::{interning::GenericMapInterner, MetaString}; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; use crate::features::{Feature, FeatureDetector}; @@ -44,7 +44,7 @@ impl CgroupsConfiguration { config: &GenericConfiguration, feature_detector: FeatureDetector, ) -> Result { let procfs_root = match config.try_get_typed::("container_proc_root")? { - Some(procfs_root) => procfs_root, + Some(path) => path, None => { if feature_detector.is_feature_available(Feature::HostMappedProcfs) { PathBuf::from(DEFAULT_HOST_MAPPED_PROCFS_ROOT) @@ -55,7 +55,7 @@ impl CgroupsConfiguration { }; let cgroupfs_root = match config.try_get_typed::("container_cgroup_root")? { - Some(procfs_root) => procfs_root, + Some(path) => path, None => { if feature_detector.is_feature_available(Feature::HostMappedProcfs) { PathBuf::from(DEFAULT_HOST_MAPPED_CGROUPFS_ROOT) @@ -87,6 +87,10 @@ impl CgroupsConfiguration { } /// Reader for querying control groups being used for containerization. +/// +/// This reader is capable of querying both cgroups v1 and v2 hierarchies, and can be used to find cgroups -- either +/// within the entire hierarchy, or for a specific process ID -- that are mapped specifically to containers. A simple +/// naming heuristic is used to both identify and extract container IDs from cgroup names. #[derive(Clone)] pub struct CgroupsReader { procfs_path: PathBuf, @@ -95,6 +99,16 @@ pub struct CgroupsReader { } impl CgroupsReader { + /// Creates a new `CgroupsReader` from the given configuration and interner. + /// + /// If either a valid cgroups v1 or v2 hierarchy is found, `Ok(Some)` is returned with the reader. Otherwise, + /// `Ok(None)` is returned. + /// + /// The provided interner will be used exclusively for handling container IDs. + /// + /// # Errors + /// + /// If there is an I/O error while attempting to query the current cgroups hierarchy, an error will be returned. pub fn try_from_config( config: &CgroupsConfiguration, interner: GenericMapInterner, ) -> Result, GenericError> { @@ -112,21 +126,19 @@ impl CgroupsReader { let metadata = match cgroup_path.metadata() { Ok(metadata) => metadata, Err(e) => { - debug!(error = %e, "Failed to get metadata for possible cgroup controller path '{}'.", cgroup_path.display()); + trace!(error = %e, cgroup_controller_path = %cgroup_path.display(), "Failed to get metadata for possible cgroup controller path."); return None; } }; - debug!( + trace!( controller_inode = metadata.ino(), - "Found valid cgroups controller for container '{}' at '{}'.", - container_id, - cgroup_path.display() + %container_id, + cgroup_controller_path = %cgroup_path.display(), + "Found valid cgroups controller for container.", ); return Some(Cgroup { - name: name.to_string(), - path: cgroup_path.to_path_buf(), ino: metadata.ino(), container_id, }); @@ -136,10 +148,26 @@ impl CgroupsReader { None } + /// Gets a cgroup for the given process ID. + /// + /// This method will attempt to find the cgroup for the given process ID by looking at the `/proc//cgroup` + /// file. If the process ID does not exist or is not attached to a cgroup, `None` will be returned. pub fn get_cgroup_by_pid(&self, pid: u32) -> Option { // See if the given process ID exists in the proc filesystem _and_ if there's a cgroup path for it. let proc_pid_cgroup_path = self.procfs_path.join(pid.to_string()).join("cgroup"); - let lines = read_lines(&proc_pid_cgroup_path).ok()?; + let lines = match read_lines(&proc_pid_cgroup_path) { + Ok(lines) => lines, + Err(e) => match e.kind() { + io::ErrorKind::NotFound => { + debug!(pid, cgroup_lookup_path = %proc_pid_cgroup_path.display(), "Process does not exist or is not attached to a cgroup."); + return None; + } + _ => { + debug!(error = %e, pid, cgroup_lookup_path = %proc_pid_cgroup_path.display(), "Failed to read cgroup file for process."); + return None; + } + }, + }; let base_controller_name = self.hierarchy_reader.base_controller(); @@ -157,9 +185,12 @@ impl CgroupsReader { } } + debug!(pid, cgroup_lookup_path = %proc_pid_cgroup_path.display(), base_controller_name, "Could not find matching base cgroup controller for process."); + None } + /// Gets all child cgroups in the current cgroups hierarchy pub fn get_child_cgroups(&self) -> Vec { let mut cgroups = Vec::new(); let mut visit = |path: &Path| { @@ -198,7 +229,7 @@ enum HierarchyReader { } impl HierarchyReader { - pub fn try_from_config(config: &CgroupsConfiguration) -> Result, GenericError> { + fn try_from_config(config: &CgroupsConfiguration) -> Result, GenericError> { // Open the mount file from procfs to scan through and find any cgroups subsystems. let mounts_path = config.procfs_path().join("mounts"); let mount_entries = read_lines(&mounts_path) @@ -215,7 +246,19 @@ impl HierarchyReader { let maybe_cgroup_path = fields.nth(1); let maybe_fs_type = fields.nth(0); - if let (Some(cgroup_path), Some(fs_type)) = (maybe_cgroup_path, maybe_fs_type) { + if let (Some(raw_cgroup_path), Some(fs_type)) = (maybe_cgroup_path, maybe_fs_type) { + let cgroup_path = Path::new(raw_cgroup_path); + + // Make sure this path is rooted within our configured cgroupfs path. + // + // When we're inside a container that has a host-mapped cgroupfs path, the `mounts` file might end up + // having duplicate entries (like one set as `/sys/fs/cgroup` and another set as `/host/sys/fs/cgroup`, + // etc)... and we want to use the one that matches our configured cgroupfs path as that's the one that + // will actually have the cgroups we care about. + if !cgroup_path.starts_with(config.cgroupfs_path()) { + continue; + } + match fs_type { // For cgroups v1, we have to go through all mounts we see to build a full list of enabled controlled. "cgroup" => process_cgroupv1_mount_entry(cgroup_path, &mut controllers), @@ -276,14 +319,25 @@ impl HierarchyReader { } } +/// A container cgroup. pub struct Cgroup { - pub name: String, - pub path: PathBuf, - pub ino: u64, - pub container_id: MetaString, + ino: u64, + container_id: MetaString, +} + +impl Cgroup { + /// Returns the inode of the cgroup controller. + pub fn inode(&self) -> u64 { + self.ino + } + + /// Consumes `self` and returns the container ID. + pub fn into_container_id(self) -> MetaString { + self.container_id + } } -pub struct CgroupControllerEntry<'a> { +struct CgroupControllerEntry<'a> { id: usize, name: Option<&'a str>, path: &'a Path, @@ -309,9 +363,9 @@ impl<'a> CgroupControllerEntry<'a> { } } -fn process_cgroupv1_mount_entry(cgroup_path: &str, controllers: &mut HashMap) { +fn process_cgroupv1_mount_entry(cgroup_path: &Path, controllers: &mut HashMap) { // Split the cgroup path, since there can be multiple controllers mounted at the same path. - let path_controllers = Path::new(cgroup_path) + let path_controllers = cgroup_path .file_name() .and_then(|s| s.to_str().map(|s| s.split(','))) .into_iter() @@ -320,7 +374,7 @@ fn process_cgroupv1_mount_entry(cgroup_path: &str, controllers: &mut HashMap Result, GenericError> { - let root = PathBuf::from(cgroup_path); - +fn process_cgroupv2_mount_entry(cgroup_path: &Path) -> Result, GenericError> { // Read and get the list of active/enabled controllers. - let controllers_path = root.join(CGROUPS_V2_CONTROLLERS_FILE); + let controllers_path = cgroup_path.join(CGROUPS_V2_CONTROLLERS_FILE); let controllers = read_lines(&controllers_path) .with_error_context(|| { format!( @@ -346,7 +398,7 @@ fn process_cgroupv2_mount_entry(cgroup_path: &str) -> Result>(); Ok(Some(HierarchyReader::V2 { - root: PathBuf::from(cgroup_path), + root: cgroup_path.to_path_buf(), controllers, })) } @@ -416,3 +468,51 @@ fn extract_container_id(cgroup_name: &str, interner: &GenericMapInterner) -> Opt } }) } + +#[cfg(test)] +mod tests { + use std::{num::NonZeroUsize, path::Path}; + + use stringtheory::{interning::GenericMapInterner, MetaString}; + + use super::{extract_container_id, CgroupControllerEntry}; + + #[test] + fn parse_controller_entry_cgroups_v1() { + let controller_id = 12; + let controller_name = "memory"; + let controller_path_raw = "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod095a9475_4c4f_4726_912c_65743701ef3f.slice/cri-containerd-06d914d2013e51a777feead523895935e33d8ad725b3251ac74c491b3d55d8fe.scope"; + let controller_path = Path::new(controller_path_raw); + let raw = format!("{}:{}:{}", controller_id, controller_name, controller_path_raw); + + let entry = CgroupControllerEntry::try_from_str(&raw).unwrap(); + assert_eq!(entry.id, controller_id); + assert_eq!(entry.name, Some(controller_name)); + assert_eq!(entry.path, controller_path); + } + + #[test] + fn parse_controller_entry_cgroups_v2() { + let controller_id = 0; + let controller_path_raw = + "/system.slice/docker-0b96e72f48e169638a735c0a05adcfc9d6aba2bf6697b627f1635b4f00ea011d.scope"; + let controller_path = Path::new(controller_path_raw); + let raw = format!("{}::{}", controller_id, controller_path_raw); + + let entry = CgroupControllerEntry::try_from_str(&raw).unwrap(); + assert_eq!(entry.id, controller_id); + assert_eq!(entry.name, None); + assert_eq!(entry.path, controller_path); + } + + #[test] + fn extract_container_id_cri_containerd() { + let expected_container_id = + MetaString::from("06d914d2013e51a777feead523895935e33d8ad725b3251ac74c491b3d55d8fe"); + let raw = format!("cri-containerd-{}.scope", expected_container_id); + let interner = GenericMapInterner::new(NonZeroUsize::new(1024).unwrap()); + + let actual_container_id = extract_container_id(&raw, &interner); + assert_eq!(Some(expected_container_id), actual_container_id); + } +} diff --git a/lib/saluki-env/src/workload/on_demand_pid.rs b/lib/saluki-env/src/workload/on_demand_pid.rs index a0180a12..513e8472 100644 --- a/lib/saluki-env/src/workload/on_demand_pid.rs +++ b/lib/saluki-env/src/workload/on_demand_pid.rs @@ -61,7 +61,7 @@ impl OnDemandPIDResolver { // If we don't have a mapping, query the host OS for it. match self.inner.cgroups_reader.get_cgroup_by_pid(process_id) { Some(cgroup) => { - let container_eid = EntityId::Container(cgroup.container_id); + let container_eid = EntityId::Container(cgroup.into_container_id()); debug!("Resolved PID {} to container ID {}.", process_id, container_eid); diff --git a/lib/saluki-env/src/workload/providers/remote_agent/mod.rs b/lib/saluki-env/src/workload/providers/remote_agent/mod.rs index e067a214..8e03bcc9 100644 --- a/lib/saluki-env/src/workload/providers/remote_agent/mod.rs +++ b/lib/saluki-env/src/workload/providers/remote_agent/mod.rs @@ -81,7 +81,7 @@ impl RemoteAgentWorkloadProvider { provider_bounds .subcomponent("string_interner") .firm() - .with_fixed_amount(string_interner_size_bytes.get()); + .with_fixed_amount("string interner", string_interner_size_bytes.get()); // Construct our aggregator, and add any collectors based on the detected features we've been given. let aggregator_health = health_registry diff --git a/lib/saluki-env/src/workload/stores/external_data.rs b/lib/saluki-env/src/workload/stores/external_data.rs index 9c88dab9..86dac4d3 100644 --- a/lib/saluki-env/src/workload/stores/external_data.rs +++ b/lib/saluki-env/src/workload/stores/external_data.rs @@ -143,10 +143,10 @@ impl MemoryBounds for ExternalDataStore { builder .firm() // Active entities. - .with_array::(self.entity_limit()) + .with_array::("entity ids", self.entity_limit()) // Forward and reverse mappings. - .with_map::(self.entity_limit()) - .with_map::(self.entity_limit()); + .with_map::("ext data entity map", self.entity_limit()) + .with_map::("entity ext data map", self.entity_limit()); } } diff --git a/lib/saluki-env/src/workload/stores/tag.rs b/lib/saluki-env/src/workload/stores/tag.rs index ef514594..f50307d9 100644 --- a/lib/saluki-env/src/workload/stores/tag.rs +++ b/lib/saluki-env/src/workload/stores/tag.rs @@ -419,19 +419,19 @@ impl MemoryBounds for TagStore { builder .firm() // Active entities. - .with_array::(self.entity_limit()) + .with_array::("entity ids", self.entity_limit()) // Entity hierarchy mappings. // // See TODO note about why this is an estimate. - .with_map::(self.entity_limit()) + .with_map::("entity id map", self.entity_limit()) // Low cardinality entity tags. - .with_map::(self.entity_limit()) + .with_map::("entity tagset map", self.entity_limit()) // High cardinality entity tags. - .with_map::(self.entity_limit()) + .with_map::("entity tagset map", self.entity_limit()) // Unified low cardinality entity tags. - .with_map::(self.entity_limit()) + .with_map::("entity tagset map", self.entity_limit()) // Unified high cardinality entity tags. - .with_map::(self.entity_limit()); + .with_map::("entity tagset map", self.entity_limit()); } } diff --git a/lib/saluki-event/src/metric/mod.rs b/lib/saluki-event/src/metric/mod.rs index 2e2926ab..27cc4ff4 100644 --- a/lib/saluki-event/src/metric/mod.rs +++ b/lib/saluki-event/src/metric/mod.rs @@ -30,7 +30,7 @@ pub use self::value::{HistogramPoints, HistogramSummary, MetricValues, ScalarPoi /// /// The metadata contains ancillary data related to the metric, such as the timestamp, sample rate, and origination /// information like hostname and sender. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct Metric { context: Context, values: MetricValues,