Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve sink SDK experience #29

Merged
merged 8 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/sink-log/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
88 changes: 35 additions & 53 deletions examples/sink-log/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,43 @@
use numaflow::sink::start_uds_server;
use numaflow::sink::{self, Response, SinkRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let sink_handler = log_sink::Logger::new();

start_uds_server(sink_handler).await?;

Ok(())
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
sink::Server::new(Logger).start().await
}

mod log_sink {
use numaflow::sink;
use numaflow::sink::{Datum, Response};
use tonic::async_trait;

pub(crate) struct Logger {}

impl Logger {
pub(crate) fn new() -> Self {
Self {}
}
}

#[async_trait]
impl sink::Sinker for Logger {
async fn sink<T: Datum + Send + Sync + 'static>(
&self,
mut input: tokio::sync::mpsc::Receiver<T>,
) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();

while let Some(datum) = input.recv().await {
// do something better, but for now let's just log it.
// please note that `from_utf8` is working because the input in this
// example uses utf-8 data.
let response = match std::str::from_utf8(datum.value()) {
Ok(v) => {
println!("{}", v);
// record the response
Response {
id: datum.id().to_string(),
success: true,
err: "".to_string(),
}
struct Logger;

#[tonic::async_trait]
impl sink::Sinker for Logger {
async fn sink(&self, mut input: tokio::sync::mpsc::Receiver<SinkRequest>) -> Vec<Response> {
let mut responses: Vec<Response> = Vec::new();

while let Some(datum) = input.recv().await {
// do something better, but for now let's just log it.
// please note that `from_utf8` is working because the input in this
// example uses utf-8 data.
let response = match std::str::from_utf8(&datum.value) {
Ok(v) => {
println!("{}", v);
// record the response
Response {
id: datum.id,
success: true,
err: "".to_string(),
}
Err(e) => Response {
id: datum.id().to_string(),
success: true, // there is no point setting success to false as retrying is not going to help
err: format!("Invalid UTF-8 sequence: {}", e),
},
};

// return the responses
responses.push(response);
}

responses
}
Err(e) => Response {
id: datum.id,
success: true, // there is no point setting success to false as retrying is not going to help
err: format!("Invalid UTF-8 sequence: {}", e),
},
};

// return the responses
responses.push(response);
}

responses
}
}
24 changes: 4 additions & 20 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl From<Message> for map_response::Result {
}
}

/// Incoming request into the map handles of [`Mapper`].
/// Incoming request into the map handler of [`Mapper`].
pub struct MapRequest {
/// Set of keys in the (key, value) terminology of map/reduce paradigm.
pub keys: Vec<String>,
Expand Down Expand Up @@ -142,7 +142,8 @@ impl<T> Server<T> {
}
}

/// Set the unix domain socket file path used by the gRPC server to listen for incoming connections. Defaults value is `/var/run/numaflow/map.sock`
/// Set the unix domain socket file path used by the gRPC server to listen for incoming connections.
/// Default value is `/var/run/numaflow/map.sock`
pub fn with_socket_file(mut self, file: impl Into<PathBuf>) -> Self {
self.sock_addr = file.into();
self
Expand Down Expand Up @@ -208,28 +209,11 @@ impl<T> Server<T> {
T: Mapper + Send + Sync + 'static,
{
let (tx, rx) = oneshot::channel::<()>();
tokio::spawn(wait_for_signal(tx));
tokio::spawn(shared::wait_for_signal(tx));
self.start_with_shutdown(rx).await
}
}

async fn wait_for_signal(tx: oneshot::Sender<()>) {
use tokio::signal::unix::{signal, SignalKind};
let mut interrupt =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
let mut termination =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
tokio::select! {
_ = interrupt.recv() => {
tracing::info!("Received SIGINT. Stopping gRPC server")
}
_ = termination.recv() => {
tracing::info!("Received SIGTERM. Stopping gRPC server")
}
}
tx.send(()).expect("Sending shutdown signal to gRPC server");
}

#[cfg(test)]
mod tests {
use std::{error::Error, time::Duration};
Expand Down
18 changes: 18 additions & 0 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{collections::HashMap, io};

use chrono::{DateTime, TimeZone, Timelike, Utc};
use prost_types::Timestamp;
use tokio::sync::oneshot;
use tokio_stream::wrappers::UnixListenerStream;
use tracing::info;

Expand Down Expand Up @@ -55,3 +56,20 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {
nanos: t.nanosecond() as i32,
})
}

pub(crate) async fn wait_for_signal(tx: oneshot::Sender<()>) {
use tokio::signal::unix::{signal, SignalKind};
let mut interrupt =
signal(SignalKind::interrupt()).expect("Failed to register SIGINT interrupt handler");
let mut termination =
signal(SignalKind::terminate()).expect("Failed to register SIGTERM interrupt handler");
tokio::select! {
_ = interrupt.recv() => {
tracing::info!("Received SIGINT. Stopping gRPC server")
}
_ = termination.recv() => {
tracing::info!("Received SIGTERM. Stopping gRPC server")
}
}
tx.send(()).expect("Sending shutdown signal to gRPC server");
}
Loading
Loading