Skip to content

Commit

Permalink
review comments, panic handling for side input
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
  • Loading branch information
yhl25 committed Jul 23, 2024
1 parent ced9f52 commit fb73fca
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 69 deletions.
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ pub enum Error {

#[error("Source Transformer Error: {0}")]
SourceTransformerError(ErrorKind),

#[error("SideInput Error: {0}")]
SideInputError(ErrorKind),
}
6 changes: 5 additions & 1 deletion src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,11 @@ impl<T> Server<T> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token);
let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(map_svc)
Expand Down
7 changes: 5 additions & 2 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,8 +832,11 @@ impl<C> Server<C> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown =
shared::shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx), cln_token);
let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(reduce_svc)
Expand Down
17 changes: 3 additions & 14 deletions src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@ use tokio::net::UnixListener;
use tokio::signal;
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnixListenerStream;
use tokio_util::sync::CancellationToken;
use tracing::info;

// #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))]
#[tracing::instrument(fields(path = ? path.as_ref()))]
fn write_info_file(path: impl AsRef<Path>) -> io::Result<()> {
let parent = path.as_ref().parent().unwrap();
std::fs::create_dir_all(parent)?;
fs::create_dir_all(parent)?;

// TODO: make port-number and CPU meta-data configurable, e.g., ("CPU_LIMIT", "1")
let metadata: HashMap<String, String> = HashMap::new();
Expand Down Expand Up @@ -57,18 +56,13 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime<Utc>) -> Option<Timestamp> {

/// shuts downs the gRPC server. This happens in 2 cases
/// 1. there has been an internal error (one of the tasks failed) and we need to shutdown
/// 2. user is explictly asking us to shutdown
/// 2. user is explicitly asking us to shutdown
/// Once the request for shutdown has be invoked, server will broadcast shutdown to all tasks
/// through the cancellation-token.
pub(crate) async fn shutdown_signal(
mut shutdown_on_err: mpsc::Receiver<()>,
shutdown_from_user: Option<oneshot::Receiver<()>>,
cancel_token: CancellationToken,
) {
// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cancel_token.drop_guard();

let ctrl_c = async {
signal::ctrl_c()
.await
Expand Down Expand Up @@ -180,12 +174,7 @@ mod tests {

// Spawn a new task to call shutdown_signal
let shutdown_signal_task = tokio::spawn(async move {
shutdown_signal(
internal_shutdown_rx,
Some(user_shutdown_rx),
CancellationToken::new(),
)
.await;
shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx)).await;
});

// Send a shutdown signal
Expand Down
70 changes: 45 additions & 25 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use crate::error::Error::SideInputError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::shutdown_signal;
use std::fs;
use std::path::PathBuf;

use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status};

use crate::shared;
use crate::shared::shutdown_signal;

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sideinput.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sideinput-server-info";
Expand All @@ -17,8 +18,9 @@ mod proto {
}

struct SideInputService<T> {
handler: T,
_shutdown_tx: mpsc::Sender<()>,
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
cancellation_token: CancellationToken,
}

/// The `SideInputer` trait defines a method for retrieving side input data.
Expand Down Expand Up @@ -94,19 +96,35 @@ where
&self,
_: Request<()>,
) -> Result<Response<proto::SideInputResponse>, Status> {
let msg = self.handler.retrieve_sideinput().await;
let si = match msg {
Some(value) => proto::SideInputResponse {
value,
no_broadcast: false,
let handler = Arc::clone(&self.handler);
let shutdown_tx = self.shutdown_tx.clone();
let handle = tokio::spawn(async move { handler.retrieve_sideinput().await });

tokio::select! {
msg = handle => {
match msg {
Ok(Some(value)) => {
Ok(Response::new(proto::SideInputResponse {
value,
no_broadcast: false,
}))
}
Ok(None) => {
Ok(Response::new(proto::SideInputResponse {
value: Vec::new(),
no_broadcast: true,
}))
}
Err(e) => {
shutdown_tx.send(()).await.expect("Failed to send shutdown signal");
Err(Status::internal(SideInputError(UserDefinedError(e.to_string())).to_string()))
}
}
}
_ = self.cancellation_token.cancelled() => {
Err(Status::internal(SideInputError(InternalError("Server is shutting down".to_string())).to_string()))
},
None => proto::SideInputResponse {
value: Vec::new(),
no_broadcast: true,
},
};

Ok(Response::new(si))
}
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Expand Down Expand Up @@ -179,20 +197,22 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let cln_token = CancellationToken::new();

let sideinput_svc = SideInputService {
handler,
_shutdown_tx: internal_shutdown_tx,
handler: Arc::new(handler),
shutdown_tx: internal_shutdown_tx,
cancellation_token: cln_token.clone(),
};
let sideinput_svc = proto::side_input_server::SideInputServer::new(sideinput_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shutdown_signal(
internal_shutdown_rx,
Some(shutdown_rx),
CancellationToken::new(),
);
let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(sideinput_svc)
Expand Down
44 changes: 25 additions & 19 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,28 +188,28 @@ where
// TODO: what should be the idle buffer size?
let (tx, rx) = mpsc::channel::<SinkRequest>(1);

let writer_cln_token = cancellation_token.clone();

let reader_shutdown_tx = shutdown_tx.clone();
// spawn a task to read messages from the stream and send them to the user's sink handle
tokio::spawn(async move {
let reader_handle = tokio::spawn(async move {
loop {
tokio::select! {
next_message = stream.message() => {
match next_message {
Ok(Some(message)) => {
// If sending fails, it means the receiver is dropped, and we should stop the task.
if tx.send(message.into()).await.is_err() {
break;
}
},
// If there's an error or the stream ends, break the loop to stop the task.
Ok(None) | Err(_) => break,
match stream.message().await {
Ok(Some(message)) => {
// If sending fails, it means the receiver is dropped, and we should stop the task.
if let Err(e) = tx.send(message.into()).await {
tracing::error!("Failed to send message: {}", e);
break;
}
},
// Listen for cancellation. If triggered, break the loop to stop reading new messages.
_ = writer_cln_token.cancelled() => {
}
// If there's an error or the stream ends, break the loop to stop the task.
Ok(None) => break,
Err(e) => {
tracing::error!("Error reading message from stream: {}", e);
reader_shutdown_tx
.send(())
.await
.expect("Sending shutdown signal to gRPC server");
break;
},
}
}
}
});
Expand Down Expand Up @@ -237,6 +237,8 @@ where
},

_ = cancellation_token.cancelled() => {
// abort the reader task to stop reading messages from the stream
reader_handle.abort();
Err(Status::cancelled(SinkError(InternalError("Server is shutting down".to_string())).to_string()))
}
}
Expand Down Expand Up @@ -336,7 +338,11 @@ impl<T> Server<T> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token);
let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(svc)
Expand Down
16 changes: 9 additions & 7 deletions src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use crate::shared::{self, prost_timestamp_from_utc};
use chrono::{DateTime, Utc};
use tokio::sync::mpsc::{self, Sender};
use tokio::sync::oneshot;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status};

use crate::shared::{self, prost_timestamp_from_utc};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/source.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info";
Expand All @@ -25,6 +24,7 @@ pub mod proto {
struct SourceService<T> {
handler: Arc<T>,
_shutdown_tx: Sender<()>,
_cancellation_token: CancellationToken,
}

#[async_trait]
Expand Down Expand Up @@ -267,21 +267,23 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let cln_token = CancellationToken::new();

let source_service = SourceService {
handler: Arc::new(handler),
_shutdown_tx: internal_shutdown_tx,
_cancellation_token: cln_token.clone(),
};

let source_svc = proto::source_server::SourceServer::new(source_service)
.max_decoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shared::shutdown_signal(
internal_shutdown_rx,
Some(shutdown_rx),
CancellationToken::new(),
);
let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(source_svc)
Expand Down
6 changes: 5 additions & 1 deletion src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ impl<T> Server<T> {
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token);
let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() when the function exits
// because of abort request, ctrl-c, or SIGTERM signal
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(sourcetrf_svc)
Expand Down

0 comments on commit fb73fca

Please sign in to comment.