Skip to content

Commit

Permalink
Introduce data transformers.
Browse files Browse the repository at this point in the history
Resolves #775.
  • Loading branch information
SergioBenitez committed Dec 28, 2023
1 parent a285625 commit c8461a2
Show file tree
Hide file tree
Showing 6 changed files with 578 additions and 147 deletions.
132 changes: 81 additions & 51 deletions core/lib/src/data/data.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::tokio::io::AsyncReadExt;
use crate::data::data_stream::DataStream;
use crate::data::{ByteUnit, StreamReader};
use std::io;
use std::pin::Pin;

/// The number of bytes to read into the "peek" buffer.
pub const PEEK_BYTES: usize = 512;
use crate::data::ByteUnit;
use crate::data::data_stream::{DataStream, RawReader, RawStream};
use crate::data::peekable::Peekable;
use crate::data::transform::{Transform, TransformBuf, Inspect, InPlaceMap};

/// Type representing the body data of a request.
///
Expand Down Expand Up @@ -38,31 +39,27 @@ pub const PEEK_BYTES: usize = 512;
/// body data. This enables partially or fully reading from a `Data` object
/// without consuming the `Data` object.
pub struct Data<'r> {
buffer: Vec<u8>,
is_complete: bool,
stream: StreamReader<'r>,
stream: Peekable<512, RawReader<'r>>,
transforms: Vec<Pin<Box<dyn Transform + Send + Sync + 'r>>>,
}

// TODO: Before `async`, we had a read timeout of 5s. Such a short read timeout
// is likely no longer necessary, but an idle timeout should be implemented.
impl<'r> Data<'r> {
/// Create a `Data` from a recognized `stream`.
pub(crate) fn from<S: Into<StreamReader<'r>>>(stream: S) -> Data<'r> {
// TODO.async: This used to also set the read timeout to 5 seconds.
// Such a short read timeout is likely no longer necessary, but some
// kind of idle timeout should be implemented.
#[inline]
pub(crate) fn new(stream: Peekable<512, RawReader<'r>>) -> Self {
Self { stream, transforms: Vec::new() }
}

let stream = stream.into();
let buffer = Vec::with_capacity(PEEK_BYTES / 8);
Data { buffer, stream, is_complete: false }
#[inline]
pub(crate) fn from<S: Into<RawStream<'r>>>(stream: S) -> Data<'r> {
Data::new(Peekable::new(RawReader::new(stream.into())))
}

/// This creates a `data` object from a local data source `data`.
#[inline]
pub(crate) fn local(data: Vec<u8>) -> Data<'r> {
Data {
buffer: data,
stream: StreamReader::empty(),
is_complete: true,
}
Data::new(Peekable::with_buffer(data, true, RawReader::new(RawStream::Empty)))
}

/// Returns the raw data stream, limited to `limit` bytes.
Expand All @@ -82,18 +79,31 @@ impl<'r> Data<'r> {
/// let stream = data.open(2.mebibytes());
/// }
/// ```
#[inline(always)]
pub fn open(self, limit: ByteUnit) -> DataStream<'r> {
DataStream::new(self.buffer, self.stream, limit.into())
DataStream::new(self.transforms, self.stream, limit.into())
}

/// Retrieve at most `num` bytes from the `peek` buffer without consuming
/// `self`.
///
/// The peek buffer contains at most 512 bytes of the body of the request.
/// The actual size of the returned buffer is the `min` of the request's
/// body, `num` and `512`. The [`peek_complete`](#method.peek_complete)
/// method can be used to determine if this buffer contains _all_ of the
/// data in the body of the request.
/// Fills the peek buffer with body data until it contains at least `num`
/// bytes (capped to 512), or the complete body data, whichever is less, and
/// returns it. If the buffer already contains either at least `num` bytes
/// or all of the body data, no I/O is performed and the buffer is simply
/// returned. If `num` is greater than `512`, it is artificially capped to
/// `512`.
///
/// No guarantees are made about the actual size of the returned buffer
/// except that it will not exceed the length of the body data. It may be:
///
/// * Less than `num` if `num > 512` or the complete body data is `< 512`
/// or an error occurred while reading the body.
/// * Equal to `num` if `num` is `<= 512` and exactly `num` bytes of the
/// body data were successfully read.
/// * Greater than `num` if `> num` bytes of the body data have
/// successfully been read, either by this request, a previous request,
/// or opportunistically.
///
/// [`Data::peek_complete()`] can be used to determine if this buffer
/// contains the complete body data.
///
/// # Examples
///
Expand Down Expand Up @@ -147,30 +157,13 @@ impl<'r> Data<'r> {
/// }
/// }
/// ```
#[inline(always)]
pub async fn peek(&mut self, num: usize) -> &[u8] {
let num = std::cmp::min(PEEK_BYTES, num);
let mut len = self.buffer.len();
if len >= num {
return &self.buffer[..num];
}

while len < num {
match self.stream.read_buf(&mut self.buffer).await {
Ok(0) => { self.is_complete = true; break },
Ok(n) => len += n,
Err(e) => {
error_!("Failed to read into peek buffer: {:?}.", e);
break;
}
}
}

&self.buffer[..std::cmp::min(len, num)]
self.stream.peek(num).await
}

/// Returns true if the `peek` buffer contains all of the data in the body
/// of the request. Returns `false` if it does not or if it is not known if
/// it does.
/// of the request. Returns `false` if it does not or it is not known.
///
/// # Example
///
Expand All @@ -185,6 +178,43 @@ impl<'r> Data<'r> {
/// ```
#[inline(always)]
pub fn peek_complete(&self) -> bool {
self.is_complete
self.stream.complete
}

/// Chains the [`Transform`] `transform` to `self`.
///
/// Note that transforms do nothing until the data is
/// [`open()`ed](Data::open()) and read.
#[inline(always)]
pub fn chain_transform<T>(&mut self, transform: T) -> &mut Self
where T: Transform + Send + Sync + 'static
{
self.transforms.push(Box::pin(transform));
self
}

/// Chain a [`Transform`] that can inspect the data as it streams.
pub fn chain_inspect<F>(&mut self, f: F) -> &mut Self
where F: FnMut(&[u8]) + Send + Sync + 'static
{
self.chain_transform(Inspect(Box::new(f)))
}

/// Chain a [`Transform`] that can in-place map the data as it streams.
/// Unlike [`Data::chain_try_inplace_map()`], this version assumes the
/// mapper is infallible.
pub fn chain_inplace_map<F>(&mut self, mut f: F) -> &mut Self
where F: FnMut(&mut TransformBuf<'_, '_>) + Send + Sync + 'static
{
self.chain_transform(InPlaceMap(Box::new(move |buf| Ok(f(buf)))))
}

/// Chain a [`Transform`] that can in-place map the data as it streams.
/// Unlike [`Data::chain_inplace_map()`], this version allows the mapper to
/// be infallible.
pub fn chain_try_inplace_map<F>(&mut self, f: F) -> &mut Self
where F: FnMut(&mut TransformBuf<'_, '_>) -> io::Result<()> + Send + Sync + 'static
{
self.chain_transform(InPlaceMap(Box::new(f)))
}
}
Loading

0 comments on commit c8461a2

Please sign in to comment.