Skip to content

Commit

Permalink
feat: add partitions RPC for source (#24)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
Co-authored-by: Vigith Maurice <[email protected]>
  • Loading branch information
BulkBeing and vigith authored Jan 5, 2024
1 parent 82fb19e commit 141c22c
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 68 deletions.
17 changes: 9 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ name = "numaflow"
path = "src/lib.rs"

[dependencies]
tonic = "0.9"
prost = "0.11"
prost-types = "0.11.9"
tonic = "0.10.2"
prost = "0.12.3"
prost-types = "0.12.3"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1.14", features = ["net"] }
serde = { version = "1.0.103", features = ["derive"] }
chrono = "0.4.26"
serde_json = "1.0.103"
futures-util = "0.3.28"
serde = { version = "1.0.194", features = ["derive"] }
chrono = "0.4.31"
serde_json = "1.0.111"
futures-util = "0.3.30"
tracing = "0.1.40"

[build-dependencies]
tonic-build = "0.9"
tonic-build = "0.10.2"
22 changes: 19 additions & 3 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ service Source {
// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);

// PartitionsFn returns the list of partitions for the user defined source.
rpc PartitionsFn(google.protobuf.Empty) returns (PartitionsResponse);

// IsReady is the heartbeat endpoint for user defined source gRPC.
rpc IsReady(google.protobuf.Empty) returns (ReadyResponse);
}
Expand Down Expand Up @@ -111,7 +114,20 @@ message ReadyResponse {
message PendingResponse {
message Result {
// Required field holding the number of pending records at the user defined source.
uint64 count = 1;
// A negative count indicates that the pending information is not available.
int64 count = 1;
}
// Required field holding the result.
Result result = 1;
}

/*
* PartitionsResponse is the response for the partitions request.
*/
message PartitionsResponse {
message Result {
// Required field holding the list of partitions.
repeated int32 partitions = 1;
}
// Required field holding the result.
Result result = 1;
Expand All @@ -129,5 +145,5 @@ message Offset {
// Optional partition_id indicates which partition of the source the datum belongs to.
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
string partition_id = 2;
}
int32 partition_id = 2;
}
11 changes: 6 additions & 5 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,12 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: Mapper + Send + Sync + 'static,
{
shared::write_info_file();
shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/map.sock";
std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
Expand All @@ -178,7 +180,6 @@ where
tonic::transport::Server::builder()
.add_service(map_server::MapServer::new(map_svc))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
.await
.map_err(Into::into)
}
11 changes: 6 additions & 5 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,12 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: Reducer + Send + Sync + 'static,
{
shared::write_info_file();
shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/reduce.sock";
std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
Expand All @@ -318,7 +320,6 @@ where
tonic::transport::Server::builder()
.add_service(reduce_server::ReduceServer::new(reduce_svc))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
.await
.map_err(Into::into)
}
13 changes: 7 additions & 6 deletions src/shared.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::collections::HashMap;
use std::fs;
use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use tracing::info;

pub(crate) fn write_info_file() {
#[tracing::instrument]
pub(crate) fn write_info_file() -> io::Result<()> {
let path = if std::env::var_os("NUMAFLOW_POD").is_some() {
"/var/run/numaflow/server-info"
} else {
Expand All @@ -21,10 +23,9 @@ pub(crate) fn write_info_file() {
});

// Convert to a string of JSON and print it out
let content = info.to_string();
let content = format!("{}U+005C__END__", content);
println!("wrote to {} {}", path, content);
fs::write(path, content).unwrap();
let content = format!("{}U+005C__END__", info);
info!(path, content, "Writing to file");
fs::write(path, content)
}

pub(crate) fn utc_from_timestamp(t: Option<Timestamp>) -> DateTime<Utc> {
Expand Down
11 changes: 6 additions & 5 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,12 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: SideInputer + Send + Sync + 'static,
{
crate::shared::write_info_file();
crate::shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/sideinput.sock";
std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
Expand All @@ -61,7 +63,6 @@ where
tonic::transport::Server::builder()
.add_service(side_input_server::SideInputServer::new(si_svc))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
.await
.map_err(Into::into)
}
13 changes: 6 additions & 7 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,16 +225,15 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: Sinker + Send + Sync + 'static,
{
shared::write_info_file();
shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/sink.sock";
fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
use std::fs;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = UnixListener::bind(path)?;
let _uds_stream = UnixListenerStream::new(uds);
let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);

let sink_service = SinkService { handler: m };

Expand Down
69 changes: 45 additions & 24 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use tonic::{async_trait, Request, Response, Status};

use self::sourcer::{partitions_response, PartitionsResponse};

mod sourcer {
tonic::include_proto!("source.v1");
}
Expand All @@ -23,14 +25,14 @@ struct SourceService<T> {
}

#[async_trait]
/// Sourcer trait implements [`Sourcer::read`], [`Sourcer::ack`], and [`Sourcer::pending`] functions for implementing [user-defined source].
/// Trait representing a [user defined source](https://numaflow.numaproj.io/user-guide/sources/overview/).
///
/// ## Example
/// Please refer to [simple source](https://github.com/numaproj/numaflow-rs/tree/main/examples/simple-source) for an example.
///
/// ## NOTE
/// The standard convention for both [`Sourcer::read`] and [`Sourcer::ack`] is that they should be mutable,
/// since they have to update some state. Unfortunately the SDK provides only a shared reference of self and thus makes it unmutable. This is because
/// since they have to update some state. Unfortunately the SDK provides only a shared reference of self and thus makes it immutable. This is because
/// gRPC [tonic] provides only a shared reference for its traits. This means, the implementer for trait will have to use [SharedState] pattern to mutate
/// the values as recommended in [issue-427]. This might change in future as async traits evolves.
///
Expand All @@ -39,28 +41,33 @@ struct SourceService<T> {
/// [SharedState]: https://tokio.rs/tokio/tutorial/shared-state
/// [issue-427]: https://github.com/hyperium/tonic/issues/427
pub trait Sourcer {
/// read reads the messages from the source and sends them to the transmitter.
/// Reads the messages from the source and sends them to the transmitter.
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>);
/// acknowledges the messages that have been processed by the user-defined source.
/// Acknowledges the messages that have been processed by the user-defined source.
async fn ack(&self, offsets: Vec<Offset>);
/// pending returns the number of messages that are yet to be processed by the user-defined source.
/// Returns the number of messages that are yet to be processed by the user-defined source.
async fn pending(&self) -> usize;
/// Returns the partitions associated with the source. This will be used by the platform to determine
/// the partitions to which the watermark should be published. Some sources might not have the concept of partitions.
/// Kafka is an example of source where a reader can read from multiple partitions.
/// If None is returned, Numaflow replica-id will be returned as the partition.
async fn partitions(&self) -> Option<Vec<i32>>;
}

/// SourceReadRequest is the request from the gRPC client (numaflow) to the user's [`Sourcer::read`].
/// A request from the gRPC client (numaflow) to the user's [`Sourcer::read`].
pub struct SourceReadRequest {
/// count is the number of messages to be read.
/// The number of messages to be read.
pub count: usize,
/// timeout is the timeout in milliseconds.
/// Request timeout in milliseconds.
pub timeout: Duration,
}

/// Offset is the offset of the message. When the message is acked, the offset is passed to the user's [`Sourcer::ack`].
/// The offset of the message.
pub struct Offset {
/// offset is the offset in bytes.
/// Offset value in bytes.
pub offset: Vec<u8>,
/// partition_id is the partition_id of the message.
pub partition_id: String,
/// Partition ID of the message.
pub partition_id: i32,
}

#[async_trait]
Expand All @@ -76,7 +83,7 @@ where
) -> Result<Response<Self::ReadFnStream>, Status> {
let sr = request.into_inner().request.unwrap();

// tx.rx pair for sending data over to user-defined source
// tx,rx pair for sending data over to user-defined source
let (stx, mut srx) = mpsc::channel::<Message>(1);
// tx,rx pair for gRPC response
let (tx, rx) = mpsc::channel::<Result<ReadResponse, Status>>(1);
Expand Down Expand Up @@ -146,11 +153,27 @@ where

Ok(Response::new(PendingResponse {
result: Some(sourcer::pending_response::Result {
count: pending as u64,
count: pending as i64,
}),
}))
}

async fn partitions_fn(
&self,
_request: Request<()>,
) -> Result<Response<PartitionsResponse>, Status> {
let partitions = match self.handler.partitions().await {
Some(v) => v,
None => vec![std::env::var("NUMAFLOW_REPLICA")
.unwrap_or_default()
.parse::<i32>()
.unwrap_or_default()],
};
Ok(Response::new(PartitionsResponse {
result: Some(partitions_response::Result { partitions }),
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
}
Expand All @@ -173,16 +196,15 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: Sourcer + Send + Sync + 'static,
{
shared::write_info_file();
shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/source.sock";
fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
use std::fs;
use tokio::net::UnixListener;
use tokio_stream::wrappers::UnixListenerStream;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = UnixListener::bind(path)?;
let _uds_stream = UnixListenerStream::new(uds);
let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);

let source_service = SourceService {
handler: Arc::new(m),
Expand All @@ -191,7 +213,6 @@ where
Server::builder()
.add_service(SourceServer::new(source_service))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
.await
.map_err(Into::into)
}
11 changes: 6 additions & 5 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,12 @@ pub async fn start_uds_server<T>(m: T) -> Result<(), Box<dyn std::error::Error>>
where
T: SourceTransformer + Send + Sync + 'static,
{
shared::write_info_file();
shared::write_info_file().map_err(|e| format!("writing info file: {e:?}"))?;

let path = "/var/run/numaflow/sourcetransform.sock";
std::fs::create_dir_all(std::path::Path::new(path).parent().unwrap())?;
let path = std::path::Path::new(path);
let parent = path.parent().unwrap();
std::fs::create_dir_all(parent).map_err(|e| format!("creating directory {parent:?}: {e:?}"))?;

let uds = tokio::net::UnixListener::bind(path)?;
let _uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
Expand All @@ -191,7 +193,6 @@ where
source_transformer_svc,
))
.serve_with_incoming(_uds_stream)
.await?;

Ok(())
.await
.map_err(Into::into)
}

0 comments on commit 141c22c

Please sign in to comment.