From eb2dc436bb0d10a1b0f32632f179f1dd78216dac Mon Sep 17 00:00:00 2001 From: Toby Lawrence Date: Mon, 27 Jan 2025 20:25:35 -0500 Subject: [PATCH] wip --- .../datadog/metrics/request_builder.rs | 156 ++++++++++++------ 1 file changed, 102 insertions(+), 54 deletions(-) diff --git a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs index da15ac98..e01e73f5 100644 --- a/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs +++ b/lib/saluki-components/src/destinations/datadog/metrics/request_builder.rs @@ -13,7 +13,7 @@ use saluki_io::{ }; use snafu::{ResultExt, Snafu}; use tokio::io::AsyncWriteExt as _; -use tracing::{debug, trace}; +use tracing::{debug, error, trace}; pub(super) const SCRATCH_BUF_CAPACITY: usize = 8192; @@ -117,15 +117,14 @@ where compressor: Compressor>, compression_estimator: CompressionEstimator, uncompressed_len: usize, - metrics_written: usize, - scratch_buf_lens: Vec, + encoded_metrics: Vec, } impl RequestBuilder where O: ObjectPool + 'static, { - /// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI. + /// Creates a new `RequestBuilder` for the given endpoint. pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result { let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool); let compressor = create_compressor(&chunked_buffer_pool).await; @@ -137,8 +136,7 @@ where compressor, compression_estimator: CompressionEstimator::default(), uncompressed_len: 0, - metrics_written: 0, - scratch_buf_lens: Vec::new(), + encoded_metrics: Vec::new(), }) } @@ -147,7 +145,7 @@ where /// If the metric can't be encoded due to size constraints, `Ok(Some(metric))` will be returned, and the caller must /// call `flush` before attempting to encode the same metric again. Otherwise, `Ok(None)` is returned. /// - /// ## Errors + /// # Errors /// /// If the given metric is not valid for the endpoint this request builder is configured for, or if there is an /// error during compression of the encoded metric, an error will be returned. @@ -166,18 +164,12 @@ where // If not, we return the original metric, signaling to the caller that they need to flush the current request // payload before encoding additional metrics. let encoded_metric = encode_single_metric(&metric); - let previous_len = self.scratch_buf.len(); encoded_metric.write(&mut self.scratch_buf)?; - let encoded_len = self.scratch_buf.len() - previous_len; - self.scratch_buf_lens.push(encoded_len); // If the metric can't fit into the current request payload based on the uncompressed size limit, or isn't // likely to fit into the current request payload based on the estimated compressed size limit, then return it // to the caller: this indicates that a flush must happen before trying to encode the same metric again. - // - // TODO: Use of the estimated compressed size limit is a bit of a stopgap to avoid having to do full incremental - // request building. We can still improve it, but the only sure-fire way to not exceed the (un)compressed - // payload size limits is to be able to re-do the encoding/compression process in smaller chunks. + let encoded_len = self.scratch_buf.len(); let new_uncompressed_len = self.uncompressed_len + encoded_len; if new_uncompressed_len > self.endpoint.uncompressed_size_limit() || self @@ -195,13 +187,10 @@ where } // Write the scratch buffer to the compressor. - self.compressor - .write_all(&self.scratch_buf[previous_len..]) - .await - .context(Io)?; + self.compressor.write_all(&self.scratch_buf[..]).await.context(Io)?; self.compression_estimator.track_write(&self.compressor, encoded_len); self.uncompressed_len += encoded_len; - self.metrics_written += 1; + self.encoded_metrics.push(metric); trace!( encoded_len, @@ -220,7 +209,7 @@ where /// /// This attempts to split the request payload into two smaller payloads if the original request payload is too large. /// - /// ## Errors + /// # Errors /// /// If an error occurs while finalizing the compressor or creating the request, an error will be returned. pub async fn flush(&mut self) -> Vec), RequestBuilderError>> { @@ -230,9 +219,6 @@ where // Clear our internal state and finalize the compressor. We do it in this order so that if finalization fails, // somehow, the request builder is in a default state and encoding can be attempted again. - let metrics_written = self.metrics_written; - self.metrics_written = 0; - let uncompressed_len = self.uncompressed_len; self.uncompressed_len = 0; @@ -241,7 +227,14 @@ where let new_compressor = create_compressor(&self.buffer_pool).await; let mut compressor = std::mem::replace(&mut self.compressor, new_compressor); if let Err(e) = compressor.shutdown().await.context(Io) { - self.clear_scratch_buffer(); + let metrics_dropped = self.clear_encoded_metrics(); + + // TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here. + error!( + metrics_dropped, + "Failed to finalize compressor while building request. Metrics have been dropped." + ); + return vec![Err(e)]; } @@ -251,7 +244,9 @@ where let compressed_limit = self.endpoint.compressed_size_limit(); if compressed_len > compressed_limit { // Single metric is unable to be split. - if self.scratch_buf_lens.len() == 1 { + if self.encoded_metrics.len() == 1 { + let _ = self.clear_encoded_metrics(); + return vec![Err(RequestBuilderError::PayloadTooLarge { compressed_size_bytes: compressed_len, compressed_limit_bytes: compressed_limit, @@ -261,53 +256,105 @@ where return self.split_request().await; } - debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, "Flushing request."); + let metrics_written = self.clear_encoded_metrics(); + debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, metrics_written, "Flushing request."); - self.clear_scratch_buffer(); vec![self.create_request(buffer).map(|req| (metrics_written, req))] } - fn clear_scratch_buffer(&mut self) { - self.scratch_buf.clear(); - self.scratch_buf_lens.clear(); + fn clear_encoded_metrics(&mut self) -> usize { + let len = self.encoded_metrics.len(); + self.encoded_metrics.clear(); + len } async fn split_request(&mut self) -> Vec), RequestBuilderError>> { + // Nothing to do if we have no encoded metrics. let mut requests = Vec::new(); - - if self.scratch_buf_lens.is_empty() { + if self.encoded_metrics.is_empty() { return requests; } - let lens_pivot = self.scratch_buf_lens.len() / 2; - let first_half_metrics_len = self.scratch_buf_lens.len() - lens_pivot; - - let scratch_buf_pivot = self.scratch_buf_lens.iter().take(first_half_metrics_len).sum(); - assert!(scratch_buf_pivot < self.scratch_buf.len()); - - let first_half_scratch_buf = &self.scratch_buf[0..scratch_buf_pivot]; - let second_half_scratch_buf = &self.scratch_buf[scratch_buf_pivot..]; + // We're going to attempt to split all of the previously-encoded metrics between two _new_ compressed payloads, + // with the goal that each payload will be under the compressed size limit. + // + // We achieve this by temporarily consuming the "encoded metrics" buffer, feeding the first half of it back to + // ourselves by re-encoding and then flushing, and then doing the same thing with the second half. If either + // half fails to properly encode, we give up entirely. + // + // We specifically manage the control flow so that we always restore the original "encoded metrics" buffer to + // the builder (albeit cleared) before returning, so that we don't waste its allocation as it's been sized up + // over time. + // + // We can do this by swapping it out with a new `Vec` since empty vectors don't allocate at all. + let mut encoded_metrics = std::mem::replace(&mut self.encoded_metrics, Vec::new()); + let encoded_metrics_pivot = encoded_metrics.len() / 2; - let mut compressor_half_one = create_compressor(&self.buffer_pool).await; + let first_half_encoded_metrics = &encoded_metrics[0..encoded_metrics_pivot]; + let second_half_encoded_metrics = &encoded_metrics[encoded_metrics_pivot..]; - if let Err(e) = compressor_half_one.write_all(first_half_scratch_buf).await.context(Io) { - requests.push(Err(e)); + // TODO: We're duplicating functionality here between `encode`/`flush`, but this makes it a lot easier to skip + // over the normal behavior that would do all the storing of encoded metrics, trying to split the payload, etc, + // since we want to avoid that and avoid any recursion in general. + // + // We should consider if there's a better way to split out some of this into common methods or something. + if let Some(request) = self.try_split_request(first_half_encoded_metrics).await { + requests.push(request); } - match self.finalize(compressor_half_one).await { - Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))), - Err(e) => requests.push(Err(e)), + + if let Some(request) = self.try_split_request(second_half_encoded_metrics).await { + requests.push(request); } - let mut compressor_half_two = create_compressor(&self.buffer_pool).await; - if let Err(e) = compressor_half_two.write_all(second_half_scratch_buf).await.context(Io) { - requests.push(Err(e)); + // Restore our original "encoded metrics" buffer before finishing up, but also clear it. + encoded_metrics.clear(); + self.encoded_metrics = encoded_metrics; + + requests + } + + async fn try_split_request( + &mut self, metrics: &[Metric], + ) -> Option), RequestBuilderError>> { + let mut uncompressed_len = 0; + let mut compressor = create_compressor(&self.buffer_pool).await; + + for metric in metrics { + // Encode each metric and write it to our compressor. + // + // We skip any of the typical payload size checks here, because we already know we at least fit these + // metrics into the previous attempted payload, so there's no reason to redo all of that here. + let encoded_metric = encode_single_metric(&metric); + + if let Err(e) = encoded_metric.write(&mut self.scratch_buf) { + return Some(Err(e)); + } + + if let Err(e) = compressor.write_all(&self.scratch_buf[..]).await.context(Io) { + return Some(Err(e)); + } + + uncompressed_len += self.scratch_buf.len(); } - match self.finalize(compressor_half_two).await { - Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))), - Err(e) => requests.push(Err(e)), + + // Make sure we haven't exceeded our uncompressed size limit. + // + // Again, this should never happen since we've already gone through this the first time but we're just being + // extra sure here since the interface allows for it to happen. :shrug: + if uncompressed_len > self.endpoint.uncompressed_size_limit() { + let metrics_dropped = metrics.len(); + + // TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here. + error!(uncompressed_len, metrics_dropped, "Uncompressed size limit exceeded while splitting request. This should never occur. Metrics have been dropped."); + + return None; } - self.clear_scratch_buffer(); - requests + + Some( + self.finalize(compressor) + .await + .and_then(|buffer| self.create_request(buffer).map(|request| (metrics.len(), request))), + ) } async fn finalize( @@ -365,6 +412,7 @@ impl EncodedMetric { } fn write(&self, buf: &mut Vec) -> Result<(), RequestBuilderError> { + buf.clear(); let mut output_stream = CodedOutputStream::vec(buf); // Write the field tag.