Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Sourcer traits for serving source #2301

Merged
merged 18 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ verbose_file_reads = "warn"
# This profile optimizes for runtime performance and small binary size at the expense of longer build times.
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
# [profile.release]
# lto = "fat"
[profile.release]
lto = "fat"

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub(crate) mod source {
Generator(GeneratorConfig),
UserDefined(UserDefinedConfig),
Pulsar(PulsarSourceConfig),
// Serving source starts an Axum HTTP server in the background.
// The settings will be used as application state which gets cloned in each handler on each request.
Serving(Arc<serving::Settings>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why Arc, is it for easy cloning? how come this is different from the rest? please document the reason?

}

Expand Down
1 change: 0 additions & 1 deletion rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ mod tracker;
mod mapper;

pub async fn run() -> Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();

Expand Down
9 changes: 8 additions & 1 deletion rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tonic::transport::Channel;
use crate::config::components::sink::{SinkConfig, SinkType};
use crate::config::components::source::{SourceConfig, SourceType};
use crate::config::components::transformer::TransformerConfig;
use crate::config::get_vertex_replica;
use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig};
use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET};
use crate::error::Error;
Expand Down Expand Up @@ -337,7 +338,13 @@ pub async fn create_source(
))
}
SourceType::Serving(config) => {
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?;
let serving = ServingSource::new(
Arc::clone(config),
batch_size,
read_timeout,
*get_vertex_replica(),
)
.await?;
Ok((
Source::new(
batch_size,
Expand Down
20 changes: 16 additions & 4 deletions rust/numaflow-core/src/source/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

pub(crate) use serving::ServingSource;

use crate::config::get_vertex_replica;
use crate::message::{MessageID, StringOffset};
use crate::Error;
use crate::Result;
Expand All @@ -12,9 +13,10 @@ impl TryFrom<serving::Message> for Message {
type Error = Error;

fn try_from(message: serving::Message) -> Result<Self> {
let offset = Offset::String(StringOffset::new(message.id.clone(), 0));
let offset = Offset::String(StringOffset::new(message.id.clone(), *get_vertex_replica()));

Ok(Message {
// we do not support keys from HTTP client
keys: Arc::from(vec![]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
keys: Arc::from(vec![]),
// we do not support keys from HTTP client
keys: Arc::from(vec![]),

tags: None,
value: message.value,
Expand Down Expand Up @@ -50,11 +52,14 @@ impl super::SourceReader for ServingSource {
}

fn partitions(&self) -> Vec<u16> {
vec![]
vec![*get_vertex_replica()]
}
}

impl super::SourceAcker for ServingSource {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
impl super::SourceAcker for ServingSource {
// TODO: implement for async and sync, currently it is only for async
impl super::SourceAcker for ServingSource {

/// HTTP response is sent only once we have confirmation that the message has been written to the ISB.
// TODO: Current implementation only works for `/v1/process/async` endpoint.
// For `/v1/process/{sync,sync_serve}` endpoints: https://github.com/numaproj/numaflow/issues/2308
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> {
/// HTTP response is sent only once we have confirmation that the message has been written to the ISB.
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> {

let mut serving_offsets = vec![];
for offset in offsets {
Expand Down Expand Up @@ -87,6 +92,8 @@ mod tests {
use bytes::Bytes;
use serving::{ServingSource, Settings};

use super::get_vertex_replica;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[test]
Expand Down Expand Up @@ -139,8 +146,13 @@ mod tests {
..Default::default()
};
let settings = Arc::new(settings);
let mut serving_source =
ServingSource::new(Arc::clone(&settings), 10, Duration::from_millis(1)).await?;
let mut serving_source = ServingSource::new(
Arc::clone(&settings),
10,
Duration::from_millis(1),
*get_vertex_replica(),
)
.await?;

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(2))
Expand Down
6 changes: 5 additions & 1 deletion rust/serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ async fn sync_publish_serve<T: Send + Sync + Clone + Store>(
},
};

proxy_state.message.send(message).await.unwrap(); // FIXME:
proxy_state
.message
.send(message)
.await
.expect("Failed to send request payload to Serving channel");

if let Err(e) = rx.await {
// Deregister the ID in the callback proxy state if writing to Jetstream fails
Expand Down
3 changes: 3 additions & 0 deletions rust/serving/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub enum Error {
#[error("Failed to receive message from channel. Actor task is terminated: {0:?}")]
ActorTaskTerminated(oneshot::error::RecvError),

#[error("Serving source error - {0}")]
Source(String),

#[error("Other Error - {0}")]
// catch-all variant for now
Other(String),
Expand Down
4 changes: 3 additions & 1 deletion rust/serving/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ mod metrics;
mod pipeline;

pub mod source;
pub use source::{Message, ServingSource};
use crate::source::MessageWrapper;
pub use source::{Message, ServingSource};

#[derive(Clone)]
pub(crate) struct AppState<T> {
Expand All @@ -39,7 +39,9 @@ pub(crate) async fn serve<T>(
where
T: Clone + Send + Sync + Store + 'static,
{
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let (cert, key) = generate_certs()?;

let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())
Expand Down
5 changes: 2 additions & 3 deletions rust/serving/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ mod tests {

#[tokio::test]
async fn test_start_metrics_server() -> Result<()> {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let (cert, key) = generate_certs()?;

let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())
Expand Down
62 changes: 42 additions & 20 deletions rust/serving/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,36 @@ enum ActorMessage {
Read {
batch_size: usize,
timeout_at: Instant,
reply_to: oneshot::Sender<Vec<Message>>,
reply_to: oneshot::Sender<Result<Vec<Message>>>,
},
Ack {
offsets: Vec<String>,
reply_to: oneshot::Sender<()>,
reply_to: oneshot::Sender<Result<()>>,
},
}

/// Background actor that starts Axum server for accepting HTTP requests.
struct ServingSourceActor {
/// The HTTP handlers will put the message received from the payload to this channel
messages: mpsc::Receiver<MessageWrapper>,
/// Channel for the actor handle to communicate with this actor
handler_rx: mpsc::Receiver<ActorMessage>,
/// Mapping from request's ID header (usually `X-Numaflow-Id` header) to a channel.
/// This sending a message on this channel notifies the HTTP handler function that the message
/// has been successfully processed.
tracker: HashMap<String, oneshot::Sender<()>>,
vertex_replica_id: u16,
}

impl ServingSourceActor {
async fn start(
settings: Arc<Settings>,
handler_rx: mpsc::Receiver<ActorMessage>,
request_channel_buffer_size: usize,
vertex_replica_id: u16,
) -> Result<()> {
let (messages_tx, messages_rx) = mpsc::channel(10000);
// Channel to which HTTP handlers will send request payload
let (messages_tx, messages_rx) = mpsc::channel(request_channel_buffer_size);
// Create a redis store to store the callbacks and the custom responses
let redis_store = RedisConnection::new(settings.redis.clone()).await?;
// Create the message graph from the pipeline spec and the redis store
Expand All @@ -67,6 +77,7 @@ impl ServingSourceActor {
messages: messages_rx,
handler_rx,
tracker: HashMap::new(),
vertex_replica_id,
};
serving_actor.run().await;
});
Expand Down Expand Up @@ -98,24 +109,32 @@ impl ServingSourceActor {
let _ = reply_to.send(messages);
}
ActorMessage::Ack { offsets, reply_to } => {
self.ack(offsets).await;
let _ = reply_to.send(());
let status = self.ack(offsets).await;
let _ = reply_to.send(status);
}
}
}

async fn read(&mut self, count: usize, timeout_at: Instant) -> Vec<Message> {
async fn read(&mut self, count: usize, timeout_at: Instant) -> Result<Vec<Message>> {
let mut messages = vec![];
loop {
// Stop if the read timeout has reached or if we have collected the requested number of messages
if messages.len() >= count || Instant::now() >= timeout_at {
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use ReceiverStream instead of mpsc channel to store the messages? That way you can chunk the stream using chunks_timeout()?

Copy link
Contributor Author

@BulkBeing BulkBeing Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't use ReceiverStream here since chunks_timeout consumes self. This results in:

cannot move out of `self.messages` which is behind a mutable reference

ReceiverStream.chunks_timeout() would have worked if it was function that accepts the ReceiverStream and uses it in a loop{} to consume it. In this case, the Receiver end is a struct member.

}
let message = match self.messages.try_recv() {
Ok(msg) => msg,
Err(mpsc::error::TryRecvError::Empty) => break,
Err(e) => {
tracing::error!(?e, "Receiving messages from the serving channel"); // FIXME:
return messages;
Err(mpsc::error::TryRecvError::Disconnected) => {
// If we have collected at-least one message, we return those messages.
// The error will happen on all the subsequent read attempts too.
if messages.is_empty() {
return Err(Error::Other(
"Sending half of the Serving channel has disconnected".into(),
));
}
tracing::error!("Sending half of the Serving channel has disconnected");
return Ok(messages);
}
};
let MessageWrapper {
Expand All @@ -126,22 +145,24 @@ impl ServingSourceActor {
self.tracker.insert(message.id.clone(), confirm_save);
messages.push(message);
}
messages
Ok(messages)
}

async fn ack(&mut self, offsets: Vec<String>) {
async fn ack(&mut self, offsets: Vec<String>) -> Result<()> {
let offset_suffix = format!("-{}", self.vertex_replica_id);
for offset in offsets {
let offset = offset
.strip_suffix("-0")
.expect("offset does not end with '-0'"); // FIXME: we hardcode 0 as the partition index when constructing offset
let offset = offset.strip_suffix(&offset_suffix).ok_or_else(|| {
Error::Source(format!("offset does not end with '{}'", &offset_suffix))
})?;
let confirm_save_tx = self
.tracker
.remove(offset)
.expect("offset was not found in the tracker");
.ok_or_else(|| Error::Source("offset was not found in the tracker".into()))?;
confirm_save_tx
.send(())
.expect("Sending on confirm_save channel");
.map_err(|e| Error::Source(format!("Sending on confirm_save channel: {e:?}")))?;
}
Ok(())
}
}

Expand All @@ -158,9 +179,10 @@ impl ServingSource {
settings: Arc<Settings>,
batch_size: usize,
timeout: Duration,
vertex_replica_id: u16,
) -> Result<Self> {
let (actor_tx, actor_rx) = mpsc::channel(1000);
ServingSourceActor::start(settings, actor_rx).await?;
let (actor_tx, actor_rx) = mpsc::channel(2 * batch_size);
ServingSourceActor::start(settings, actor_rx, 2 * batch_size, vertex_replica_id).await?;
Ok(Self {
batch_size,
timeout,
Expand All @@ -177,7 +199,7 @@ impl ServingSource {
timeout_at: Instant::now() + self.timeout,
};
let _ = self.actor_tx.send(actor_msg).await;
let messages = rx.await.map_err(Error::ActorTaskTerminated)?;
let messages = rx.await.map_err(Error::ActorTaskTerminated)??;
tracing::debug!(
count = messages.len(),
requested_count = self.batch_size,
Expand All @@ -194,7 +216,7 @@ impl ServingSource {
reply_to: tx,
};
let _ = self.actor_tx.send(actor_msg).await;
rx.await.map_err(Error::ActorTaskTerminated)?;
rx.await.map_err(Error::ActorTaskTerminated)??;
Ok(())
}
}
Expand Down
Loading