Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 28, 2025
1 parent a396c00 commit eb2dc43
Showing 1 changed file with 102 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use saluki_io::{
};
use snafu::{ResultExt, Snafu};
use tokio::io::AsyncWriteExt as _;
use tracing::{debug, trace};
use tracing::{debug, error, trace};

pub(super) const SCRATCH_BUF_CAPACITY: usize = 8192;

Expand Down Expand Up @@ -117,15 +117,14 @@ where
compressor: Compressor<ChunkedBytesBuffer<O>>,
compression_estimator: CompressionEstimator,
uncompressed_len: usize,
metrics_written: usize,
scratch_buf_lens: Vec<usize>,
encoded_metrics: Vec<Metric>,
}

impl<O> RequestBuilder<O>
where
O: ObjectPool<Item = BytesBuffer> + 'static,
{
/// Creates a new `RequestBuilder` for the given endpoint, using the specified API key and base URI.
/// Creates a new `RequestBuilder` for the given endpoint.
pub async fn new(endpoint: MetricsEndpoint, buffer_pool: O) -> Result<Self, RequestBuilderError> {
let chunked_buffer_pool = ChunkedBytesBufferObjectPool::new(buffer_pool);
let compressor = create_compressor(&chunked_buffer_pool).await;
Expand All @@ -137,8 +136,7 @@ where
compressor,
compression_estimator: CompressionEstimator::default(),
uncompressed_len: 0,
metrics_written: 0,
scratch_buf_lens: Vec::new(),
encoded_metrics: Vec::new(),
})
}

Expand All @@ -147,7 +145,7 @@ where
/// If the metric can't be encoded due to size constraints, `Ok(Some(metric))` will be returned, and the caller must
/// call `flush` before attempting to encode the same metric again. Otherwise, `Ok(None)` is returned.
///
/// ## Errors
/// # Errors
///
/// If the given metric is not valid for the endpoint this request builder is configured for, or if there is an
/// error during compression of the encoded metric, an error will be returned.
Expand All @@ -166,18 +164,12 @@ where
// If not, we return the original metric, signaling to the caller that they need to flush the current request
// payload before encoding additional metrics.
let encoded_metric = encode_single_metric(&metric);
let previous_len = self.scratch_buf.len();
encoded_metric.write(&mut self.scratch_buf)?;
let encoded_len = self.scratch_buf.len() - previous_len;
self.scratch_buf_lens.push(encoded_len);

// If the metric can't fit into the current request payload based on the uncompressed size limit, or isn't
// likely to fit into the current request payload based on the estimated compressed size limit, then return it
// to the caller: this indicates that a flush must happen before trying to encode the same metric again.
//
// TODO: Use of the estimated compressed size limit is a bit of a stopgap to avoid having to do full incremental
// request building. We can still improve it, but the only sure-fire way to not exceed the (un)compressed
// payload size limits is to be able to re-do the encoding/compression process in smaller chunks.
let encoded_len = self.scratch_buf.len();
let new_uncompressed_len = self.uncompressed_len + encoded_len;
if new_uncompressed_len > self.endpoint.uncompressed_size_limit()
|| self
Expand All @@ -195,13 +187,10 @@ where
}

// Write the scratch buffer to the compressor.
self.compressor
.write_all(&self.scratch_buf[previous_len..])
.await
.context(Io)?;
self.compressor.write_all(&self.scratch_buf[..]).await.context(Io)?;
self.compression_estimator.track_write(&self.compressor, encoded_len);
self.uncompressed_len += encoded_len;
self.metrics_written += 1;
self.encoded_metrics.push(metric);

trace!(
encoded_len,
Expand All @@ -220,7 +209,7 @@ where
///
/// This attempts to split the request payload into two smaller payloads if the original request payload is too large.
///
/// ## Errors
/// # Errors
///
/// If an error occurs while finalizing the compressor or creating the request, an error will be returned.
pub async fn flush(&mut self) -> Vec<Result<(usize, Request<FrozenChunkedBytesBuffer>), RequestBuilderError>> {
Expand All @@ -230,9 +219,6 @@ where

// Clear our internal state and finalize the compressor. We do it in this order so that if finalization fails,
// somehow, the request builder is in a default state and encoding can be attempted again.
let metrics_written = self.metrics_written;
self.metrics_written = 0;

let uncompressed_len = self.uncompressed_len;
self.uncompressed_len = 0;

Expand All @@ -241,7 +227,14 @@ where
let new_compressor = create_compressor(&self.buffer_pool).await;
let mut compressor = std::mem::replace(&mut self.compressor, new_compressor);
if let Err(e) = compressor.shutdown().await.context(Io) {
self.clear_scratch_buffer();
let metrics_dropped = self.clear_encoded_metrics();

// TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here.
error!(
metrics_dropped,
"Failed to finalize compressor while building request. Metrics have been dropped."
);

return vec![Err(e)];
}

Expand All @@ -251,7 +244,9 @@ where
let compressed_limit = self.endpoint.compressed_size_limit();
if compressed_len > compressed_limit {
// Single metric is unable to be split.
if self.scratch_buf_lens.len() == 1 {
if self.encoded_metrics.len() == 1 {
let _ = self.clear_encoded_metrics();

return vec![Err(RequestBuilderError::PayloadTooLarge {
compressed_size_bytes: compressed_len,
compressed_limit_bytes: compressed_limit,
Expand All @@ -261,53 +256,105 @@ where
return self.split_request().await;
}

debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, "Flushing request.");
let metrics_written = self.clear_encoded_metrics();
debug!(endpoint = ?self.endpoint, uncompressed_len, compressed_len, metrics_written, "Flushing request.");

self.clear_scratch_buffer();
vec![self.create_request(buffer).map(|req| (metrics_written, req))]
}

fn clear_scratch_buffer(&mut self) {
self.scratch_buf.clear();
self.scratch_buf_lens.clear();
fn clear_encoded_metrics(&mut self) -> usize {
let len = self.encoded_metrics.len();
self.encoded_metrics.clear();
len
}

async fn split_request(&mut self) -> Vec<Result<(usize, Request<FrozenChunkedBytesBuffer>), RequestBuilderError>> {
// Nothing to do if we have no encoded metrics.
let mut requests = Vec::new();

if self.scratch_buf_lens.is_empty() {
if self.encoded_metrics.is_empty() {
return requests;
}

let lens_pivot = self.scratch_buf_lens.len() / 2;
let first_half_metrics_len = self.scratch_buf_lens.len() - lens_pivot;

let scratch_buf_pivot = self.scratch_buf_lens.iter().take(first_half_metrics_len).sum();
assert!(scratch_buf_pivot < self.scratch_buf.len());

let first_half_scratch_buf = &self.scratch_buf[0..scratch_buf_pivot];
let second_half_scratch_buf = &self.scratch_buf[scratch_buf_pivot..];
// We're going to attempt to split all of the previously-encoded metrics between two _new_ compressed payloads,
// with the goal that each payload will be under the compressed size limit.
//
// We achieve this by temporarily consuming the "encoded metrics" buffer, feeding the first half of it back to
// ourselves by re-encoding and then flushing, and then doing the same thing with the second half. If either
// half fails to properly encode, we give up entirely.
//
// We specifically manage the control flow so that we always restore the original "encoded metrics" buffer to
// the builder (albeit cleared) before returning, so that we don't waste its allocation as it's been sized up
// over time.
//
// We can do this by swapping it out with a new `Vec<Metric>` since empty vectors don't allocate at all.
let mut encoded_metrics = std::mem::replace(&mut self.encoded_metrics, Vec::new());
let encoded_metrics_pivot = encoded_metrics.len() / 2;

let mut compressor_half_one = create_compressor(&self.buffer_pool).await;
let first_half_encoded_metrics = &encoded_metrics[0..encoded_metrics_pivot];
let second_half_encoded_metrics = &encoded_metrics[encoded_metrics_pivot..];

if let Err(e) = compressor_half_one.write_all(first_half_scratch_buf).await.context(Io) {
requests.push(Err(e));
// TODO: We're duplicating functionality here between `encode`/`flush`, but this makes it a lot easier to skip
// over the normal behavior that would do all the storing of encoded metrics, trying to split the payload, etc,
// since we want to avoid that and avoid any recursion in general.
//
// We should consider if there's a better way to split out some of this into common methods or something.
if let Some(request) = self.try_split_request(first_half_encoded_metrics).await {
requests.push(request);
}
match self.finalize(compressor_half_one).await {
Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))),
Err(e) => requests.push(Err(e)),

if let Some(request) = self.try_split_request(second_half_encoded_metrics).await {
requests.push(request);
}

let mut compressor_half_two = create_compressor(&self.buffer_pool).await;
if let Err(e) = compressor_half_two.write_all(second_half_scratch_buf).await.context(Io) {
requests.push(Err(e));
// Restore our original "encoded metrics" buffer before finishing up, but also clear it.
encoded_metrics.clear();
self.encoded_metrics = encoded_metrics;

requests
}

async fn try_split_request(
&mut self, metrics: &[Metric],
) -> Option<Result<(usize, Request<FrozenChunkedBytesBuffer>), RequestBuilderError>> {
let mut uncompressed_len = 0;
let mut compressor = create_compressor(&self.buffer_pool).await;

for metric in metrics {
// Encode each metric and write it to our compressor.
//
// We skip any of the typical payload size checks here, because we already know we at least fit these
// metrics into the previous attempted payload, so there's no reason to redo all of that here.
let encoded_metric = encode_single_metric(&metric);

if let Err(e) = encoded_metric.write(&mut self.scratch_buf) {
return Some(Err(e));
}

if let Err(e) = compressor.write_all(&self.scratch_buf[..]).await.context(Io) {
return Some(Err(e));
}

uncompressed_len += self.scratch_buf.len();
}
match self.finalize(compressor_half_two).await {
Ok(buffer) => requests.push(self.create_request(buffer).map(|req| (1, req))),
Err(e) => requests.push(Err(e)),

// Make sure we haven't exceeded our uncompressed size limit.
//
// Again, this should never happen since we've already gone through this the first time but we're just being
// extra sure here since the interface allows for it to happen. :shrug:
if uncompressed_len > self.endpoint.uncompressed_size_limit() {
let metrics_dropped = metrics.len();

// TODO: Propagate the number of metrics dropped in the returned error itself rather than logging here.
error!(uncompressed_len, metrics_dropped, "Uncompressed size limit exceeded while splitting request. This should never occur. Metrics have been dropped.");

return None;
}
self.clear_scratch_buffer();
requests

Some(
self.finalize(compressor)
.await
.and_then(|buffer| self.create_request(buffer).map(|request| (metrics.len(), request))),
)
}

async fn finalize(
Expand Down Expand Up @@ -365,6 +412,7 @@ impl EncodedMetric {
}

fn write(&self, buf: &mut Vec<u8>) -> Result<(), RequestBuilderError> {
buf.clear();
let mut output_stream = CodedOutputStream::vec(buf);

// Write the field tag.
Expand Down

0 comments on commit eb2dc43

Please sign in to comment.