Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement Source trait and use it for user-defined source #2114

Merged
merged 13 commits into from
Oct 3, 2024
1 change: 0 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/numaflow-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,4 @@ mod tests {
let drop = OnFailureStrategy::Drop;
assert_eq!(drop.to_string(), "drop");
}
}
}
3 changes: 3 additions & 0 deletions rust/numaflow-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ mod source;
///
/// [Transformer]: https://numaflow.numaproj.io/user-guide/sources/transformer/overview/
mod transformer;

/// Reads from a stream.
mod reader;
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use chrono::{DateTime, Utc};
use crate::error::Error;
use crate::monovertex::sink_pb::sink_request::Request;
use crate::monovertex::sink_pb::SinkRequest;
use crate::monovertex::{source_pb, sourcetransform_pb};
use crate::monovertex::source_pb::{read_response, AckRequest};
use crate::monovertex::sourcetransform_pb::SourceTransformRequest;
use crate::monovertex::{source_pb, sourcetransform_pb};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, I saw a lot of formatting changes in the PR, how did you generate those? asking because I ran cargo fmt in my last PR and it didn't generate your changes. is it because we are using different formatting scheme?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i do "cargo fmt" right before committing.

use crate::shared::utils::{prost_timestamp_from_utc, utc_from_timestamp};

/// A message that is sent from the source to the sink.
Expand Down
8 changes: 5 additions & 3 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::error;
use crate::shared::utils;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::Source;
use crate::source::user_defined::new_source;
use crate::transformer::user_defined::SourceTransformer;
use forwarder::ForwarderBuilder;
use metrics::MetricsState;
Expand Down Expand Up @@ -145,6 +145,8 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) ->
)
.await?;

let (source_reader, lag_reader) = new_source(source_grpc_client.clone(), 500, 100).await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment about the numbers 500 and 100? use constants.

// Start the metrics server in a separate background async spawn,
// This should be running throughout the lifetime of the application, hence the handle is not
// joined.
Expand All @@ -160,11 +162,11 @@ async fn start_forwarder(cln_token: CancellationToken, sdk_config: SDKConfig) ->
utils::start_metrics_server(metrics_state).await;

// start the lag reader to publish lag metrics
let mut lag_reader = utils::create_lag_reader(source_grpc_client.clone()).await;
let mut lag_reader = utils::create_lag_reader(lag_reader).await;
lag_reader.start().await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we pass lag_reader inside create_lag_reader method, which is confusing


// build the forwarder
let source_reader = Source::new(source_grpc_client.clone()).await?;

let sink_writer = SinkWriter::new(sink_grpc_client.clone()).await?;

let mut forwarder_builder = ForwarderBuilder::new(source_reader, sink_writer, cln_token);
Expand Down
62 changes: 31 additions & 31 deletions rust/numaflow-core/src/monovertex/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@ use tokio_util::sync::CancellationToken;
use tracing::{debug, info};

use crate::config::{config, OnFailureStrategy};
use crate::error;
use crate::error::Error;
use crate::message::{Message, Offset};
use crate::monovertex::metrics;
use crate::monovertex::metrics::forward_metrics;
use crate::monovertex::sink_pb::Status::{Failure, Fallback, Success};
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::Source;
use crate::transformer::user_defined::SourceTransformer;
use crate::{error, source};

/// Forwarder is responsible for reading messages from the source, applying transformation if
/// transformer is present, writing the messages to the sink, and then acknowledging the messages
/// back to the source.
pub(crate) struct Forwarder {
source: Source,
pub(crate) struct Forwarder<T> {
source: T,
sink_writer: SinkWriter,
source_transformer: Option<SourceTransformer>,
fb_sink_writer: Option<SinkWriter>,
Expand All @@ -29,21 +28,17 @@ pub(crate) struct Forwarder {
}

/// ForwarderBuilder is used to build a Forwarder instance with optional fields.
pub(crate) struct ForwarderBuilder {
source: Source,
pub(crate) struct ForwarderBuilder<T> {
source: T,
sink_writer: SinkWriter,
cln_token: CancellationToken,
source_transformer: Option<SourceTransformer>,
fb_sink_writer: Option<SinkWriter>,
}

impl ForwarderBuilder {
impl<T> ForwarderBuilder<T> {
/// Create a new builder with mandatory fields
pub(crate) fn new(
source: Source,
sink_writer: SinkWriter,
cln_token: CancellationToken,
) -> Self {
pub(crate) fn new(source: T, sink_writer: SinkWriter, cln_token: CancellationToken) -> Self {
Self {
source,
sink_writer,
Expand All @@ -67,7 +62,7 @@ impl ForwarderBuilder {

/// Build the Forwarder instance
#[must_use]
pub(crate) fn build(self) -> Forwarder {
pub(crate) fn build(self) -> Forwarder<T> {
let common_labels = metrics::forward_metrics_labels().clone();
Forwarder {
source: self.source,
Expand All @@ -80,7 +75,10 @@ impl ForwarderBuilder {
}
}

impl Forwarder {
impl<T> Forwarder<T>
where
T: source::Source,
{
/// start starts the forward-a-chunk loop and exits only after a chunk has been forwarded and ack'ed.
/// this means that, in the happy path scenario a block is always completely processed.
/// this function will return on any error and will cause end up in a non-0 exit code.
Expand Down Expand Up @@ -121,13 +119,9 @@ impl Forwarder {
/// and then acknowledge the messages back to the source.
async fn read_and_process_messages(&mut self) -> error::Result<usize> {
let start_time = tokio::time::Instant::now();
let messages = self
.source
.read(config().batch_size, config().timeout_in_ms)
.await
.map_err(|e| {
Error::ForwarderError(format!("Failed to read messages from source {:?}", e))
})?;
let messages = self.source.read().await.map_err(|e| {
Error::ForwarderError(format!("Failed to read messages from source {:?}", e))
})?;

debug!(
"Read batch size: {} and latency - {}ms",
Expand Down Expand Up @@ -555,7 +549,7 @@ mod tests {
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient;
use crate::shared::utils::create_rpc_channel;
use crate::sink::user_defined::SinkWriter;
use crate::source::user_defined::Source;
use crate::source::user_defined::UserDefinedSource;
use crate::transformer::user_defined::SourceTransformer;

struct SimpleSource {
Expand Down Expand Up @@ -735,9 +729,11 @@ mod tests {

let cln_token = CancellationToken::new();

let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
let source = UserDefinedSource::new(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
500,
100,
)
.await
.expect("failed to connect to source server");

Expand Down Expand Up @@ -857,9 +853,11 @@ mod tests {

let cln_token = CancellationToken::new();

let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
let source = UserDefinedSource::new(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
500,
100,
Comment on lines +858 to +859
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to make it a constant, since its used in multiple places

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we fetch it from the config?

)
.await
.expect("failed to connect to source server");

Expand Down Expand Up @@ -971,9 +969,11 @@ mod tests {

let cln_token = CancellationToken::new();

let source = Source::new(SourceClient::new(
create_rpc_channel(source_sock_file.clone()).await.unwrap(),
))
let source = UserDefinedSource::new(
SourceClient::new(create_rpc_channel(source_sock_file.clone()).await.unwrap()),
500,
100,
)
.await
.expect("failed to connect to source server");

Expand Down
53 changes: 23 additions & 30 deletions rust/numaflow-core/src/monovertex/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::error::Error;
use crate::monovertex::sink_pb::sink_client::SinkClient;
use crate::monovertex::source_pb::source_client::SourceClient;
use crate::monovertex::sourcetransform_pb::source_transform_client::SourceTransformClient;
use crate::reader;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
Expand Down Expand Up @@ -362,29 +363,29 @@ struct TimestampedPending {
timestamp: std::time::Instant,
}

/// `LagReader` is responsible for periodically checking the lag of the source client
/// PendingReader is responsible for periodically checking the lag of the reader
/// and exposing the metrics. It maintains a list of pending stats and ensures that
/// only the most recent entries are kept.
pub(crate) struct LagReader {
source_client: SourceClient<Channel>,
pub(crate) struct PendingReader<T> {
lag_reader: T,
Comment on lines +369 to +370
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lag_reader inside PendingReader which is confusing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lag_reader is the trait. any recommendation?

lag_checking_interval: Duration,
refresh_interval: Duration,
buildup_handle: Option<JoinHandle<()>>,
expose_handle: Option<JoinHandle<()>>,
pending_stats: Arc<Mutex<Vec<TimestampedPending>>>,
}

/// LagReaderBuilder is used to build a `LagReader` instance.
pub(crate) struct LagReaderBuilder {
source_client: SourceClient<Channel>,
/// PendingReaderBuilder is used to build a [LagReader] instance.
pub(crate) struct PendingReaderBuilder<T> {
lag_reader: T,
lag_checking_interval: Option<Duration>,
refresh_interval: Option<Duration>,
}

impl LagReaderBuilder {
pub(crate) fn new(source_client: SourceClient<Channel>) -> Self {
impl<T: reader::LagReader> PendingReaderBuilder<T> {
pub(crate) fn new(lag_reader: T) -> Self {
Self {
source_client,
lag_reader,
lag_checking_interval: None,
refresh_interval: None,
}
Expand All @@ -400,9 +401,9 @@ impl LagReaderBuilder {
self
}

pub(crate) fn build(self) -> LagReader {
LagReader {
source_client: self.source_client,
pub(crate) fn build(self) -> PendingReader<T> {
PendingReader {
lag_reader: self.lag_reader,
lag_checking_interval: self
.lag_checking_interval
.unwrap_or_else(|| Duration::from_secs(3)),
Expand All @@ -416,20 +417,20 @@ impl LagReaderBuilder {
}
}

impl LagReader {
impl<T: reader::LagReader + Clone + Send + 'static> PendingReader<T> {
/// Starts the lag reader by spawning tasks to build up pending info and expose pending metrics.
///
/// This method spawns two asynchronous tasks:
/// - One to periodically check the lag and update the pending stats.
/// - Another to periodically expose the pending metrics.
pub async fn start(&mut self) {
let source_client = self.source_client.clone();
let pending_reader = self.lag_reader.clone();
let lag_checking_interval = self.lag_checking_interval;
let refresh_interval = self.refresh_interval;
let pending_stats = self.pending_stats.clone();

self.buildup_handle = Some(tokio::spawn(async move {
build_pending_info(source_client, lag_checking_interval, pending_stats).await;
build_pending_info(pending_reader, lag_checking_interval, pending_stats).await;
}));

let pending_stats = self.pending_stats.clone();
Expand All @@ -439,8 +440,8 @@ impl LagReader {
}
}

/// When lag-reader is dropped, we need to clean up the pending exposer and the pending builder tasks.
impl Drop for LagReader {
/// When the PendingReader is dropped, we need to clean up the pending exposer and the pending builder tasks.
impl<T> Drop for PendingReader<T> {
fn drop(&mut self) {
if let Some(handle) = self.expose_handle.take() {
handle.abort();
Expand All @@ -454,15 +455,15 @@ impl Drop for LagReader {
}

/// Periodically checks the pending messages from the source client and build the pending stats.
async fn build_pending_info(
mut source_client: SourceClient<Channel>,
async fn build_pending_info<T: reader::LagReader>(
mut lag_reader: T,
lag_checking_interval: Duration,
pending_stats: Arc<Mutex<Vec<TimestampedPending>>>,
) {
let mut ticker = time::interval(lag_checking_interval);
loop {
ticker.tick().await;
match fetch_pending(&mut source_client).await {
match fetch_pending(&mut lag_reader).await {
Ok(pending) => {
if pending != -1 {
let mut stats = pending_stats.lock().await;
Expand All @@ -484,14 +485,8 @@ async fn build_pending_info(
}
}

async fn fetch_pending(source_client: &mut SourceClient<Channel>) -> crate::error::Result<i64> {
let request = Request::new(());
let response = source_client
.pending_fn(request)
.await?
.into_inner()
.result
.map_or(-1, |r| r.count); // default to -1(unavailable)
async fn fetch_pending<T: reader::LagReader>(lag_reader: &mut T) -> crate::error::Result<i64> {
let response: i64 = lag_reader.pending().await?.map_or(-1, |p| p as i64); // default to -1(unavailable)
Ok(response)
}

Expand Down Expand Up @@ -556,8 +551,6 @@ async fn calculate_pending(
result
}

// TODO add tests

#[cfg(test)]
mod tests {
use super::*;
Expand Down
9 changes: 9 additions & 0 deletions rust/numaflow-core/src/reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/// Lag reader reports the pending information at Reader (source, ISBs), this information is used by
/// the auto-scaler.
#[trait_variant::make(LagReader: Send)]
#[allow(dead_code)]
pub(crate) trait LocalLagReader {
/// Pending elements yet to be read from the stream. The stream could be the [crate::source], or ISBs
/// It may or may not include unacknowledged messages.
async fn pending(&mut self) -> crate::error::Result<Option<usize>>;
}
Loading
Loading