Skip to content

Commit

Permalink
gazette: Implement append_stream and friends
Browse files Browse the repository at this point in the history
This implements low-level appends to Gazette. It takes an `AsyncRead + AsyncSeek` and streams append reqests to Gazette. It allows you to retry by re-polling after recieving an Error. Also implemented is the helper `append_once` for when you don't have a stream of data to append, and instead just have a fixed set of data.
  • Loading branch information
jshearer committed Jan 7, 2025
1 parent 3e35bad commit aa21266
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 4 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ tokio = { version = "1", features = [
"time",
] }
tokio-util = { version = "0.7", features = ["io", "compat", "rt"] }
tokio-stream = { version = "0.1.17" }
tonic = { version = "0.12", features = ["tls", "tls-roots"] }
hyper-util = "0.1"
tower = { version = "0.5", features = ["util"] }
Expand Down
8 changes: 6 additions & 2 deletions crates/gazette/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ license.workspace = true
coroutines = { path = "../coroutines" }
ops = { path = "../ops" }
proto-gazette = { path = "../proto-gazette" }
proto-grpc = { path = "../proto-grpc", features = ["broker_client", "consumer_client"] }
proto-grpc = { path = "../proto-grpc", features = [
"broker_client",
"consumer_client",
] }

async-compression = { workspace = true }
bytes = { workspace = true }
Expand All @@ -25,6 +28,7 @@ reqwest = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tokio-stream = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
Expand All @@ -44,4 +48,4 @@ hexdump = { workspace = true }
# anyhow = { workspace = true }
# serde_json = { workspace = true }
# memchr = { workspace = true }
# pin-project-lite = { workspace = true }
# pin-project-lite = { workspace = true }
184 changes: 184 additions & 0 deletions crates/gazette/src/journal/append.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
use super::Client;
use crate::{journal::check_ok, Error};
use futures::{Stream, StreamExt};
use proto_gazette::broker;
use tokio::{
io::{AsyncBufReadExt, AsyncRead, AsyncSeek, AsyncSeekExt, BufReader},
pin,
};

// TODO: Tune this?
const CHUNK_SIZE: usize = 1 << 14;

impl Client {
pub async fn append_once(
&self,
journal: String,
source: Vec<u8>,
) -> crate::Result<broker::AppendResponse> {
let mapped_source = std::io::Cursor::new(source);

let appender = self.append_stream(journal, mapped_source);
tokio::pin!(appender);

match appender.next().await {
Some(Ok(resp)) => {
if let None = appender.next().await {
tracing::warn!(?resp, "Got append response!");
Ok(resp)
} else {
Err(Error::Append("Didn't get EOF after Ok".to_string()))
}
}
Some(err) => err,
None => Err(Error::UnexpectedEof),
}
}

/// Append the contents of an `AsyncRead + AsyncSeek` to the specified journal.
/// Returns a Stream of results which will yield either:
/// - An AppendResponse after all data is successfully appended
/// - Errors for any failures encountered.
/// If polled after an error, restarts the request from the beginning.
pub fn append_stream<R>(
&self,
journal: String,
source: R,
) -> impl Stream<Item = crate::Result<broker::AppendResponse>> + '_
where
R: AsyncRead + AsyncSeek + Send + Unpin + 'static,
{
coroutines::coroutine(move |mut co| async move {
let mut reader = BufReader::with_capacity(CHUNK_SIZE, source);
loop {
match self.append_all(&journal, &mut reader).await {
Ok(resp) => {
() = co.yield_(Ok(resp)).await;
return;
}
Err(err) => {
() = co.yield_(Err(err)).await;
// Seek to start for retry if we're polled after yielding error
// Seeking to start shouldn't error unless there's a bug
reader.seek(std::io::SeekFrom::Start(0)).await.unwrap();
}
}
}
})
}

// Handles the complete append process from start to finish
async fn append_all<R>(
&self,
journal: &str,
source: &mut R,
) -> crate::Result<broker::AppendResponse>
where
R: AsyncBufReadExt + Send + Unpin,
{
// Transforms `source` into a stream of `Result<AppendRequest, gazette::Error>`. This deals with
// the append RPC's semantics that require an initial "metadata" request, followed by a stream of
// "chunk" requests, followed by an empty request to indicate we're done. Potential errors ultimately
// originate from reading the input AsyncRead.
let request_generator = coroutines::coroutine(move |mut co| async move {
// Send initial request
() = co
.yield_(Ok(broker::AppendRequest {
journal: journal.to_string(),
..Default::default()
}))
.await;

loop {
// Process chunks until EOF
let bytes_read = match source.fill_buf().await {
// An empty buffer indicates EOF, as otherwise fill_buf() will wait until data is available
Ok(chunk) if chunk.len() == 0 => break,
Ok(chunk) => {
() = co
.yield_(Ok(broker::AppendRequest {
content: chunk.to_vec(),
..Default::default()
}))
.await;
chunk.len()
}
Err(e) => {
() = co.yield_(Err(Error::Append(e.to_string()))).await;
return;
}
};

source.consume(bytes_read);
}
// Send final empty chunk
() = co
.yield_(Ok(broker::AppendRequest {
..Default::default()
}))
.await;
});

// Since reading from `source` can error, we need this whole song and dance to
// handle those errors. We could just `.collect()` all of the requests and catch
// any errors there, but since this is supposed to handle significant volumes of data
// over an undefined period of time, that won't work. So instead we need to pass
// `JournalClient::append()` a stream of _just_ the `AppendRequest`s that come out
// of the above `request_generator`, while also promptly returning any errors if they
// crop up, and cancelling the append request.

let (req_tx, req_rx) = tokio::sync::mpsc::channel(100);

let mut client = self.into_sub(self.router.route(None, false, &self.default)?);

// Run `JournalClient::append` in a separate Tokio task, and feed it a steady diet of `AppendRequest`s
// while also giving us a convenient handle to `.abort()` if we encounter an error.
let mut append_handle = tokio::spawn(async move {
let resp = client
.append(tokio_stream::wrappers::ReceiverStream::new(req_rx))
.await
.map_err(crate::Error::Grpc)?
.into_inner();

check_ok(resp.status(), resp)
});

pin!(request_generator);

loop {
tokio::select! {
maybe_item = request_generator.next() => {
match maybe_item {
Some(Ok(req)) => {
req_tx.send(req).await.map_err(|e|Error::Append(e.to_string()))?;
},
Some(Err(e)) => {
// If `request_generator` errors, i.e we failed to read incoming data,
// cancel the `append` RPC and propagate the error
drop(req_tx);
append_handle.abort();
return Err(e);
},
None => {
// We hit EOF, drop the request channel sender which will close the
// `ReceiverStream` and signal `JournalClient::append` to finish up.
drop(req_tx);
break;
},
}
},
res = &mut append_handle => {
// Handle `JournalClient::append` finishing first. This will probably only happen
// if there's an error, as EOF breaks out and relies on the final `.await` to
// get the `AppendResponse` out.
return res.map_err(|e|Error::Append(e.to_string()))?;
},
}
}

// We hit EOF and now have to wait for `JournalClient::append` to finish
append_handle
.await
.map_err(|e| Error::Append(e.to_string()))?
}
}
1 change: 1 addition & 0 deletions crates/gazette/src/journal/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use proto_gazette::broker;
use tonic::transport::Channel;

mod append;
mod list;
mod read;

Expand Down
3 changes: 3 additions & 0 deletions crates/gazette/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ pub enum Error {
},
#[error("{0}")]
Protocol(&'static str),
#[error("{0}")]
Append(String),
#[error(transparent)]
UUID(#[from] uuid::Error),
#[error("unexpected server EOF")]
Expand Down Expand Up @@ -71,6 +73,7 @@ impl Error {
Error::Protocol(_) => false,
Error::UUID(_) => false,
Error::JWT(_) => false,
Error::Append(_) => false,
}
}
}
Expand Down

0 comments on commit aa21266

Please sign in to comment.