From 141c22c2eea6e15bf97668360a822956bda51fe9 Mon Sep 17 00:00:00 2001 From: Sreekanth <32409051+BulkBeing@users.noreply.github.com> Date: Fri, 5 Jan 2024 09:16:01 +0530 Subject: [PATCH] feat: add partitions RPC for source (#24) Signed-off-by: Sreekanth Co-authored-by: Vigith Maurice --- Cargo.toml | 17 ++++++----- proto/source.proto | 22 ++++++++++++-- src/map.rs | 11 ++++--- src/reduce.rs | 11 ++++--- src/shared.rs | 13 ++++---- src/sideinput.rs | 11 ++++--- src/sink.rs | 13 ++++---- src/source.rs | 69 +++++++++++++++++++++++++++--------------- src/sourcetransform.rs | 11 ++++--- 9 files changed, 110 insertions(+), 68 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0bb909d..c39ad3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,15 +8,16 @@ name = "numaflow" path = "src/lib.rs" [dependencies] -tonic = "0.9" -prost = "0.11" -prost-types = "0.11.9" +tonic = "0.10.2" +prost = "0.12.3" +prost-types = "0.12.3" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } tokio-stream = { version = "0.1.14", features = ["net"] } -serde = { version = "1.0.103", features = ["derive"] } -chrono = "0.4.26" -serde_json = "1.0.103" -futures-util = "0.3.28" +serde = { version = "1.0.194", features = ["derive"] } +chrono = "0.4.31" +serde_json = "1.0.111" +futures-util = "0.3.30" +tracing = "0.1.40" [build-dependencies] -tonic-build = "0.9" +tonic-build = "0.10.2" diff --git a/proto/source.proto b/proto/source.proto index 1cf58a7..4352028 100644 --- a/proto/source.proto +++ b/proto/source.proto @@ -21,6 +21,9 @@ service Source { // PendingFn returns the number of pending records at the user defined source. rpc PendingFn(google.protobuf.Empty) returns (PendingResponse); + // PartitionsFn returns the list of partitions for the user defined source. + rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse); + // IsReady is the heartbeat endpoint for user defined source gRPC. rpc IsReady(google.protobuf.Empty) returns (ReadyResponse); } @@ -111,7 +114,20 @@ message ReadyResponse { message PendingResponse { message Result { // Required field holding the number of pending records at the user defined source. - uint64 count = 1; + // A negative count indicates that the pending information is not available. + int64 count = 1; + } + // Required field holding the result. + Result result = 1; +} + +/* + * PartitionsResponse is the response for the partitions request. + */ +message PartitionsResponse { + message Result { + // Required field holding the list of partitions. + repeated int32 partitions = 1; } // Required field holding the result. Result result = 1; @@ -129,5 +145,5 @@ message Offset { // Optional partition_id indicates which partition of the source the datum belongs to. // It is useful for sources that have multiple partitions. e.g. Kafka. // If the partition_id is not specified, it is assumed that the source has a single partition. - string partition_id = 2; -} + int32 partition_id = 2; +} \ No newline at end of file diff --git a/src/map.rs b/src/map.rs index 9bd3bf6..99cb181 100644 --- a/src/map.rs +++ b/src/map.rs @@ -165,10 +165,12 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: Mapper + Send + Sync + 'static, { - shared::write_info_file(); + shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/map.sock"; - std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; let uds = tokio::net::UnixListener::bind(path)?; let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); @@ -178,7 +180,6 @@ where tonic::transport::Server::builder() .add_service(map_server::MapServer::new(map_svc)) .serve_with_incoming(_uds_stream) - .await?; - - Ok(()) + .await + .map_err(Into::into) } diff --git a/src/reduce.rs b/src/reduce.rs index a3beae3..7aca213 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -303,10 +303,12 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: Reducer + Send + Sync + 'static, { - shared::write_info_file(); + shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/reduce.sock"; - std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; let uds = tokio::net::UnixListener::bind(path)?; let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); @@ -318,7 +320,6 @@ where tonic::transport::Server::builder() .add_service(reduce_server::ReduceServer::new(reduce_svc)) .serve_with_incoming(_uds_stream) - .await?; - - Ok(()) + .await + .map_err(Into::into) } diff --git a/src/shared.rs b/src/shared.rs index a60b6e9..4854a31 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -1,10 +1,12 @@ -use std::collections::HashMap; use std::fs; +use std::{collections::HashMap, io}; use chrono::{DateTime, TimeZone, Timelike, Utc}; use prost_types::Timestamp; +use tracing::info; -pub(crate) fn write_info_file() { +#[tracing::instrument] +pub(crate) fn write_info_file() -> io::Result<()> { let path = if std::env::var_os("NUMAFLOW_POD").is_some() { "/var/run/numaflow/server-info" } else { @@ -21,10 +23,9 @@ pub(crate) fn write_info_file() { }); // Convert to a string of JSON and print it out - let content = info.to_string(); - let content = format!("{}U+005C__END__", content); - println!("wrote to {} {}", path, content); - fs::write(path, content).unwrap(); + let content = format!("{}U+005C__END__", info); + info!(path, content, "Writing to file"); + fs::write(path, content) } pub(crate) fn utc_from_timestamp(t: Option) -> DateTime { diff --git a/src/sideinput.rs b/src/sideinput.rs index a7e7215..174adec 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -48,10 +48,12 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: SideInputer + Send + Sync + 'static, { - crate::shared::write_info_file(); + crate::shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/sideinput.sock"; - std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; let uds = tokio::net::UnixListener::bind(path)?; let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); @@ -61,7 +63,6 @@ where tonic::transport::Server::builder() .add_service(side_input_server::SideInputServer::new(si_svc)) .serve_with_incoming(_uds_stream) - .await?; - - Ok(()) + .await + .map_err(Into::into) } diff --git a/src/sink.rs b/src/sink.rs index 215db09..ac7d6a4 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -225,16 +225,15 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: Sinker + Send + Sync + 'static, { - shared::write_info_file(); + shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/sink.sock"; - fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; - use std::fs; - use tokio::net::UnixListener; - use tokio_stream::wrappers::UnixListenerStream; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; - let uds = UnixListener::bind(path)?; - let _uds_stream = UnixListenerStream::new(uds); + let uds = tokio::net::UnixListener::bind(path)?; + let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); let sink_service = SinkService { handler: m }; diff --git a/src/source.rs b/src/source.rs index 1d97ee0..bd2a117 100644 --- a/src/source.rs +++ b/src/source.rs @@ -14,6 +14,8 @@ use tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use tonic::{async_trait, Request, Response, Status}; +use self::sourcer::{partitions_response, PartitionsResponse}; + mod sourcer { tonic::include_proto!("source.v1"); } @@ -23,14 +25,14 @@ struct SourceService { } #[async_trait] -/// Sourcer trait implements [`Sourcer::read`], [`Sourcer::ack`], and [`Sourcer::pending`] functions for implementing [user-defined source]. +/// Trait representing a [user defined source](https://numaflow.numaproj.io/user-guide/sources/overview/). /// /// ## Example /// Please refer to [simple source](https://github.com/numaproj/numaflow-rs/tree/main/examples/simple-source) for an example. /// /// ## NOTE /// The standard convention for both [`Sourcer::read`] and [`Sourcer::ack`] is that they should be mutable, -/// since they have to update some state. Unfortunately the SDK provides only a shared reference of self and thus makes it unmutable. This is because +/// since they have to update some state. Unfortunately the SDK provides only a shared reference of self and thus makes it immutable. This is because /// gRPC [tonic] provides only a shared reference for its traits. This means, the implementer for trait will have to use [SharedState] pattern to mutate /// the values as recommended in [issue-427]. This might change in future as async traits evolves. /// @@ -39,28 +41,33 @@ struct SourceService { /// [SharedState]: https://tokio.rs/tokio/tutorial/shared-state /// [issue-427]: https://github.com/hyperium/tonic/issues/427 pub trait Sourcer { - /// read reads the messages from the source and sends them to the transmitter. + /// Reads the messages from the source and sends them to the transmitter. async fn read(&self, request: SourceReadRequest, transmitter: Sender); - /// acknowledges the messages that have been processed by the user-defined source. + /// Acknowledges the messages that have been processed by the user-defined source. async fn ack(&self, offsets: Vec); - /// pending returns the number of messages that are yet to be processed by the user-defined source. + /// Returns the number of messages that are yet to be processed by the user-defined source. async fn pending(&self) -> usize; + /// Returns the partitions associated with the source. This will be used by the platform to determine + /// the partitions to which the watermark should be published. Some sources might not have the concept of partitions. + /// Kafka is an example of source where a reader can read from multiple partitions. + /// If None is returned, Numaflow replica-id will be returned as the partition. + async fn partitions(&self) -> Option>; } -/// SourceReadRequest is the request from the gRPC client (numaflow) to the user's [`Sourcer::read`]. +/// A request from the gRPC client (numaflow) to the user's [`Sourcer::read`]. pub struct SourceReadRequest { - /// count is the number of messages to be read. + /// The number of messages to be read. pub count: usize, - /// timeout is the timeout in milliseconds. + /// Request timeout in milliseconds. pub timeout: Duration, } -/// Offset is the offset of the message. When the message is acked, the offset is passed to the user's [`Sourcer::ack`]. +/// The offset of the message. pub struct Offset { - /// offset is the offset in bytes. + /// Offset value in bytes. pub offset: Vec, - /// partition_id is the partition_id of the message. - pub partition_id: String, + /// Partition ID of the message. + pub partition_id: i32, } #[async_trait] @@ -76,7 +83,7 @@ where ) -> Result, Status> { let sr = request.into_inner().request.unwrap(); - // tx.rx pair for sending data over to user-defined source + // tx,rx pair for sending data over to user-defined source let (stx, mut srx) = mpsc::channel::(1); // tx,rx pair for gRPC response let (tx, rx) = mpsc::channel::>(1); @@ -146,11 +153,27 @@ where Ok(Response::new(PendingResponse { result: Some(sourcer::pending_response::Result { - count: pending as u64, + count: pending as i64, }), })) } + async fn partitions_fn( + &self, + _request: Request<()>, + ) -> Result, Status> { + let partitions = match self.handler.partitions().await { + Some(v) => v, + None => vec![std::env::var("NUMAFLOW_REPLICA") + .unwrap_or_default() + .parse::() + .unwrap_or_default()], + }; + Ok(Response::new(PartitionsResponse { + result: Some(partitions_response::Result { partitions }), + })) + } + async fn is_ready(&self, _: Request<()>) -> Result, Status> { Ok(Response::new(ReadyResponse { ready: true })) } @@ -173,16 +196,15 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: Sourcer + Send + Sync + 'static, { - shared::write_info_file(); + shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/source.sock"; - fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; - use std::fs; - use tokio::net::UnixListener; - use tokio_stream::wrappers::UnixListenerStream; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; - let uds = UnixListener::bind(path)?; - let _uds_stream = UnixListenerStream::new(uds); + let uds = tokio::net::UnixListener::bind(path)?; + let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); let source_service = SourceService { handler: Arc::new(m), @@ -191,7 +213,6 @@ where Server::builder() .add_service(SourceServer::new(source_service)) .serve_with_incoming(_uds_stream) - .await?; - - Ok(()) + .await + .map_err(Into::into) } diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 9c14bcc..fc1058d 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -176,10 +176,12 @@ pub async fn start_uds_server(m: T) -> Result<(), Box> where T: SourceTransformer + Send + Sync + 'static, { - shared::write_info_file(); + shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?; let path = "/var/run/numaflow/sourcetransform.sock"; - std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?; + let path = std::path::Path::new(path); + let parent = path.parent().unwrap(); + std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?; let uds = tokio::net::UnixListener::bind(path)?; let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); @@ -191,7 +193,6 @@ where source_transformer_svc, )) .serve_with_incoming(_uds_stream) - .await?; - - Ok(()) + .await + .map_err(Into::into) }