diff --git a/examples/sink-log/src/main.rs b/examples/sink-log/src/main.rs index f1d6da4..302fa95 100644 --- a/examples/sink-log/src/main.rs +++ b/examples/sink-log/src/main.rs @@ -21,17 +21,9 @@ impl sink::Sinker for Logger { Ok(v) => { println!("{}", v); // record the response - Response { - id: datum.id, - success: true, - err: "".to_string(), - } + Response::ok(datum.id) } - Err(e) => Response { - id: datum.id, - success: true, // there is no point setting success to false as retrying is not going to help - err: format!("Invalid UTF-8 sequence: {}", e), - }, + Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)), }; // return the responses diff --git a/proto/sink.proto b/proto/sink.proto index 32f0619..c413ea8 100644 --- a/proto/sink.proto +++ b/proto/sink.proto @@ -22,6 +22,7 @@ message SinkRequest { google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; string id = 5; + map headers = 6; } /** @@ -38,10 +39,19 @@ message SinkResponse { message Result { // id is the ID of the message, can be used to uniquely identify the message. string id = 1; - // success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed. - bool success = 2; + // status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. + Status status = 2; // err_msg is the error message, set it if success is set to false. string err_msg = 3; } repeated Result results = 1; +} + +/* + * Status is the status of the response. + */ +enum Status { + SUCCESS = 0; + FAILURE = 1; + FALLBACK = 2; } \ No newline at end of file diff --git a/src/sink.rs b/src/sink.rs index 80a2ee0..2907371 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; +use std::env; use std::future::Future; use std::path::PathBuf; @@ -11,6 +13,11 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sink.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info"; +const DEFAULT_FB_SOCK_ADDR: &str = "/var/run/numaflow/fb-sink.sock"; +const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-info"; +const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE"; +const UD_CONTAINER_FB_SINK: &str = "fb-udsink"; + /// Numaflow Sink Proto definitions. pub mod proto { @@ -56,17 +63,9 @@ pub trait Sinker { /// Ok(v) => { /// println!("{}", v); /// // record the response - /// Response { - /// id: datum.id, - /// success: true, - /// err: "".to_string(), - /// } + /// Response::ok(datum.id) /// } - /// Err(e) => Response { - /// id: datum.id, - /// success: true, // there is no point setting success to false as retrying is not going to help - /// err: format!("Invalid UTF-8 sequence: {}", e), - /// }, + /// Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)), /// }; /// /// // return the responses @@ -90,8 +89,10 @@ pub struct SinkRequest { pub watermark: DateTime, /// Time of the element as seen at source or aligned after a reduce operation. pub event_time: DateTime, - /// ID is the unique id of the message to be send to the Sink. + /// ID is the unique id of the message to be sent to the Sink. pub id: String, + /// Headers for the message. + pub headers: HashMap, } impl From for SinkRequest { @@ -102,6 +103,7 @@ impl From for SinkRequest { watermark: shared::utc_from_timestamp(sr.watermark), event_time: shared::utc_from_timestamp(sr.event_time), id: sr.id, + headers: sr.headers, } } } @@ -110,27 +112,68 @@ impl From for SinkRequest { pub struct Response { /// id is the unique ID of the message. pub id: String, - /// success indicates whether the write to the sink was successful. If set to `false`, it will be + /// success indicates whether to write to the sink was successful. If set to `false`, it will be /// retried, hence it is better to try till it is successful. pub success: bool, + /// fallback is used to indicate that the message should be forwarded to the fallback sink. + pub fallback: bool, /// err string is used to describe the error if [`Response::success`] was `false`. - pub err: String, + pub err: Option, +} + +impl Response { + /// Creates a new `Response` instance indicating a successful operation. + pub fn ok(id: String) -> Self { + Self { + id, + success: true, + fallback: false, + err: None, + } + } + + /// Creates a new `Response` instance indicating a failed operation. + pub fn failure(id: String, err: String) -> Self { + Self { + id, + success: false, + fallback: false, + err: Some(err), + } + } + + /// Creates a new `Response` instance indicating a failed operation with a fallback + /// set to 'true'. So that the message will be forwarded to the fallback sink. + pub fn fallback(id: String) -> Self { + Self { + id, + success: false, + fallback: true, + err: None, + } + } } impl From for proto::sink_response::Result { fn from(r: Response) -> Self { Self { id: r.id, - success: r.success, - err_msg: r.err.to_string(), + status: if r.fallback { + proto::Status::Fallback as i32 + } else if r.success { + proto::Status::Success as i32 + } else { + proto::Status::Failure as i32 + }, + err_msg: r.err.unwrap_or_default(), } } } #[tonic::async_trait] impl proto::sink_server::Sink for SinkService -where - T: Sinker + Send + Sync + 'static, + where + T: Sinker + Send + Sync + 'static, { async fn sink_fn( &self, @@ -185,10 +228,17 @@ pub struct Server { impl Server { pub fn new(svc: T) -> Self { + let container_type = env::var(ENV_UD_CONTAINER_TYPE).unwrap_or_default(); + let (sock_addr, server_info_file) = if container_type == UD_CONTAINER_FB_SINK { + (DEFAULT_FB_SOCK_ADDR.into(), DEFAULT_FB_SERVER_INFO_FILE.into()) + } else { + (DEFAULT_SOCK_ADDR.into(), DEFAULT_SERVER_INFO_FILE.into()) + }; + Self { - sock_addr: DEFAULT_SOCK_ADDR.into(), + sock_addr, max_message_size: DEFAULT_MAX_MESSAGE_SIZE, - server_info_file: DEFAULT_SERVER_INFO_FILE.into(), + server_info_file, svc: Some(svc), } } @@ -232,9 +282,9 @@ impl Server { &mut self, shutdown: F, ) -> Result<(), Box> - where - T: Sinker + Send + Sync + 'static, - F: Future, + where + T: Sinker + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -250,10 +300,10 @@ impl Server { .map_err(Into::into) } - /// Starts the gRPC server. Automatically registers singal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives. + /// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives. pub async fn start(&mut self) -> Result<(), Box> - where - T: Sinker + Send + Sync + 'static, + where + T: Sinker + Send + Sync + 'static, { self.start_with_shutdown(shared::shutdown_signal()).await } @@ -262,13 +312,14 @@ impl Server { #[cfg(test)] mod tests { use std::{error::Error, time::Duration}; - use tower::service_fn; - use crate::sink; - use crate::sink::proto::sink_client::SinkClient; use tempfile::TempDir; use tokio::sync::oneshot; use tonic::transport::Uri; + use tower::service_fn; + + use crate::sink; + use crate::sink::proto::sink_client::SinkClient; #[tokio::test] async fn sink_server() -> Result<(), Box> { @@ -289,17 +340,9 @@ mod tests { Ok(v) => { println!("{}", v); // record the response - sink::Response { - id: datum.id, - success: true, - err: "".to_string(), - } + sink::Response::ok(datum.id) } - Err(e) => sink::Response { - id: datum.id, - success: true, // there is no point setting success to false as retrying is not going to help - err: format!("Invalid UTF-8 sequence: {}", e), - }, + Err(e) => sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)), }; // return the responses @@ -347,6 +390,7 @@ mod tests { watermark: Some(prost_types::Timestamp::default()), event_time: Some(prost_types::Timestamp::default()), id: "1".to_string(), + headers: Default::default(), }; let resp = client.sink_fn(tokio_stream::iter(vec![request])).await?;