Skip to content

Commit

Permalink
Fixes based on code review
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Jan 3, 2025
1 parent 54c2cce commit 8f0389b
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 33 deletions.
4 changes: 2 additions & 2 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ verbose_file_reads = "warn"
# This profile optimizes for runtime performance and small binary size at the expense of longer build times.
# Compared to default release profile, this profile reduced binary size from 29MB to 21MB
# and increased build time (with only one line change in code) from 12 seconds to 133 seconds (tested on Mac M2 Max).
# [profile.release]
# lto = "fat"
[profile.release]
lto = "fat"

# This profile optimizes for short build times at the expense of larger binary size and slower runtime performance.
# If you have to rebuild image often, in Dockerfile you may replace `--release` passed to cargo command with `--profile quick-release`
Expand Down
2 changes: 2 additions & 0 deletions rust/numaflow-core/src/config/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ pub(crate) mod source {
Generator(GeneratorConfig),
UserDefined(UserDefinedConfig),
Pulsar(PulsarSourceConfig),
// Serving source starts an Axum HTTP server in the background.
// The settings will be used as application state which gets cloned in each handler on each request.
Serving(Arc<serving::Settings>),
}

Expand Down
1 change: 0 additions & 1 deletion rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ mod tracker;
mod mapper;

pub async fn run() -> Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let cln_token = CancellationToken::new();
let shutdown_cln_token = cln_token.clone();

Expand Down
9 changes: 8 additions & 1 deletion rust/numaflow-core/src/shared/create_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use tonic::transport::Channel;
use crate::config::components::sink::{SinkConfig, SinkType};
use crate::config::components::source::{SourceConfig, SourceType};
use crate::config::components::transformer::TransformerConfig;
use crate::config::get_vertex_replica;
use crate::config::pipeline::map::{MapMode, MapType, MapVtxConfig};
use crate::config::pipeline::{DEFAULT_BATCH_MAP_SOCKET, DEFAULT_STREAM_MAP_SOCKET};
use crate::error::Error;
Expand Down Expand Up @@ -337,7 +338,13 @@ pub async fn create_source(
))
}
SourceType::Serving(config) => {
let serving = ServingSource::new(Arc::clone(config), batch_size, read_timeout).await?;
let serving = ServingSource::new(
Arc::clone(config),
batch_size,
read_timeout,
*get_vertex_replica(),
)
.await?;
Ok((
Source::new(
batch_size,
Expand Down
20 changes: 16 additions & 4 deletions rust/numaflow-core/src/source/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

pub(crate) use serving::ServingSource;

use crate::config::get_vertex_replica;
use crate::message::{MessageID, StringOffset};
use crate::Error;
use crate::Result;
Expand All @@ -12,9 +13,10 @@ impl TryFrom<serving::Message> for Message {
type Error = Error;

fn try_from(message: serving::Message) -> Result<Self> {
let offset = Offset::String(StringOffset::new(message.id.clone(), 0));
let offset = Offset::String(StringOffset::new(message.id.clone(), *get_vertex_replica()));

Ok(Message {
// we do not support keys from HTTP client
keys: Arc::from(vec![]),
tags: None,
value: message.value,
Expand Down Expand Up @@ -50,11 +52,14 @@ impl super::SourceReader for ServingSource {
}

fn partitions(&self) -> Vec<u16> {
vec![]
vec![*get_vertex_replica()]
}
}

impl super::SourceAcker for ServingSource {
/// HTTP response is sent only once we have confirmation that the message has been written to the ISB.
// TODO: Current implementation only works for `/v1/process/async` endpoint.
// For `/v1/process/{sync,sync_serve}` endpoints: https://github.com/numaproj/numaflow/issues/2308
async fn ack(&mut self, offsets: Vec<Offset>) -> Result<()> {
let mut serving_offsets = vec![];
for offset in offsets {
Expand Down Expand Up @@ -87,6 +92,8 @@ mod tests {
use bytes::Bytes;
use serving::{ServingSource, Settings};

use super::get_vertex_replica;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[test]
Expand Down Expand Up @@ -139,8 +146,13 @@ mod tests {
..Default::default()
};
let settings = Arc::new(settings);
let mut serving_source =
ServingSource::new(Arc::clone(&settings), 10, Duration::from_millis(1)).await?;
let mut serving_source = ServingSource::new(
Arc::clone(&settings),
10,
Duration::from_millis(1),
*get_vertex_replica(),
)
.await?;

let client = reqwest::Client::builder()
.timeout(Duration::from_secs(2))
Expand Down
6 changes: 5 additions & 1 deletion rust/serving/src/app/jetstream_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,11 @@ async fn sync_publish_serve<T: Send + Sync + Clone + Store>(
},
};

proxy_state.message.send(message).await.unwrap(); // FIXME:
proxy_state
.message
.send(message)
.await
.expect("Failed to send request payload to Serving channel");

if let Err(e) = rx.await {
// Deregister the ID in the callback proxy state if writing to Jetstream fails
Expand Down
3 changes: 3 additions & 0 deletions rust/serving/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub enum Error {
#[error("Failed to receive message from channel. Actor task is terminated: {0:?}")]
ActorTaskTerminated(oneshot::error::RecvError),

#[error("Serving source error - {0}")]
Source(String),

#[error("Other Error - {0}")]
// catch-all variant for now
Other(String),
Expand Down
4 changes: 3 additions & 1 deletion rust/serving/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ mod metrics;
mod pipeline;

pub mod source;
pub use source::{Message, ServingSource};
use crate::source::MessageWrapper;
pub use source::{Message, ServingSource};

#[derive(Clone)]
pub(crate) struct AppState<T> {
Expand All @@ -39,7 +39,9 @@ pub(crate) async fn serve<T>(
where
T: Clone + Send + Sync + Store + 'static,
{
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();

let (cert, key) = generate_certs()?;

let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())
Expand Down
5 changes: 2 additions & 3 deletions rust/serving/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,8 @@ mod tests {

#[tokio::test]
async fn test_start_metrics_server() -> Result<()> {
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.unwrap();
// Setup the CryptoProvider (controls core cryptography used by rustls) for the process
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let (cert, key) = generate_certs()?;

let tls_config = RustlsConfig::from_pem(cert.pem().into(), key.serialize_pem().into())
Expand Down
62 changes: 42 additions & 20 deletions rust/serving/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,26 +31,36 @@ enum ActorMessage {
Read {
batch_size: usize,
timeout_at: Instant,
reply_to: oneshot::Sender<Vec<Message>>,
reply_to: oneshot::Sender<Result<Vec<Message>>>,
},
Ack {
offsets: Vec<String>,
reply_to: oneshot::Sender<()>,
reply_to: oneshot::Sender<Result<()>>,
},
}

/// Background actor that starts Axum server for accepting HTTP requests.
struct ServingSourceActor {
/// The HTTP handlers will put the message received from the payload to this channel
messages: mpsc::Receiver<MessageWrapper>,
/// Channel for the actor handle to communicate with this actor
handler_rx: mpsc::Receiver<ActorMessage>,
/// Mapping from request's ID header (usually `X-Numaflow-Id` header) to a channel.
/// This sending a message on this channel notifies the HTTP handler function that the message
/// has been successfully processed.
tracker: HashMap<String, oneshot::Sender<()>>,
vertex_replica_id: u16,
}

impl ServingSourceActor {
async fn start(
settings: Arc<Settings>,
handler_rx: mpsc::Receiver<ActorMessage>,
request_channel_buffer_size: usize,
vertex_replica_id: u16,
) -> Result<()> {
let (messages_tx, messages_rx) = mpsc::channel(10000);
// Channel to which HTTP handlers will send request payload
let (messages_tx, messages_rx) = mpsc::channel(request_channel_buffer_size);
// Create a redis store to store the callbacks and the custom responses
let redis_store = RedisConnection::new(settings.redis.clone()).await?;
// Create the message graph from the pipeline spec and the redis store
Expand All @@ -67,6 +77,7 @@ impl ServingSourceActor {
messages: messages_rx,
handler_rx,
tracker: HashMap::new(),
vertex_replica_id,
};
serving_actor.run().await;
});
Expand Down Expand Up @@ -98,24 +109,32 @@ impl ServingSourceActor {
let _ = reply_to.send(messages);
}
ActorMessage::Ack { offsets, reply_to } => {
self.ack(offsets).await;
let _ = reply_to.send(());
let status = self.ack(offsets).await;
let _ = reply_to.send(status);
}
}
}

async fn read(&mut self, count: usize, timeout_at: Instant) -> Vec<Message> {
async fn read(&mut self, count: usize, timeout_at: Instant) -> Result<Vec<Message>> {
let mut messages = vec![];
loop {
// Stop if the read timeout has reached or if we have collected the requested number of messages
if messages.len() >= count || Instant::now() >= timeout_at {
break;
}
let message = match self.messages.try_recv() {
Ok(msg) => msg,
Err(mpsc::error::TryRecvError::Empty) => break,
Err(e) => {
tracing::error!(?e, "Receiving messages from the serving channel"); // FIXME:
return messages;
Err(mpsc::error::TryRecvError::Disconnected) => {
// If we have collected at-least one message, we return those messages.
// The error will happen on all the subsequent read attempts too.
if messages.is_empty() {
return Err(Error::Other(
"Sending half of the Serving channel has disconnected".into(),
));
}
tracing::error!("Sending half of the Serving channel has disconnected");
return Ok(messages);
}
};
let MessageWrapper {
Expand All @@ -126,22 +145,24 @@ impl ServingSourceActor {
self.tracker.insert(message.id.clone(), confirm_save);
messages.push(message);
}
messages
Ok(messages)
}

async fn ack(&mut self, offsets: Vec<String>) {
async fn ack(&mut self, offsets: Vec<String>) -> Result<()> {
let offset_suffix = format!("-{}", self.vertex_replica_id);
for offset in offsets {
let offset = offset
.strip_suffix("-0")
.expect("offset does not end with '-0'"); // FIXME: we hardcode 0 as the partition index when constructing offset
let offset = offset.strip_suffix(&offset_suffix).ok_or_else(|| {
Error::Source(format!("offset does not end with '{}'", &offset_suffix))
})?;
let confirm_save_tx = self
.tracker
.remove(offset)
.expect("offset was not found in the tracker");
.ok_or_else(|| Error::Source("offset was not found in the tracker".into()))?;
confirm_save_tx
.send(())
.expect("Sending on confirm_save channel");
.map_err(|e| Error::Source(format!("Sending on confirm_save channel: {e:?}")))?;
}
Ok(())
}
}

Expand All @@ -158,9 +179,10 @@ impl ServingSource {
settings: Arc<Settings>,
batch_size: usize,
timeout: Duration,
vertex_replica_id: u16,
) -> Result<Self> {
let (actor_tx, actor_rx) = mpsc::channel(1000);
ServingSourceActor::start(settings, actor_rx).await?;
let (actor_tx, actor_rx) = mpsc::channel(2 * batch_size);
ServingSourceActor::start(settings, actor_rx, 2 * batch_size, vertex_replica_id).await?;
Ok(Self {
batch_size,
timeout,
Expand All @@ -177,7 +199,7 @@ impl ServingSource {
timeout_at: Instant::now() + self.timeout,
};
let _ = self.actor_tx.send(actor_msg).await;
let messages = rx.await.map_err(Error::ActorTaskTerminated)?;
let messages = rx.await.map_err(Error::ActorTaskTerminated)??;
tracing::debug!(
count = messages.len(),
requested_count = self.batch_size,
Expand All @@ -194,7 +216,7 @@ impl ServingSource {
reply_to: tx,
};
let _ = self.actor_tx.send(actor_msg).await;
rx.await.map_err(Error::ActorTaskTerminated)?;
rx.await.map_err(Error::ActorTaskTerminated)??;
Ok(())
}
}
Expand Down

0 comments on commit 8f0389b

Please sign in to comment.