From 9a9e6224d9ab8c28aa4038d4d65f53155e308b1e Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Fri, 21 Jun 2024 19:59:19 +0530 Subject: [PATCH] refactor Signed-off-by: Yashash H L --- src/map.rs | 10 +- src/reduce.rs | 318 +++++++++++++++++++++++------------------ src/shared.rs | 2 +- src/sideinput.rs | 7 +- src/sink.rs | 9 +- src/source.rs | 13 +- src/sourcetransform.rs | 9 +- 7 files changed, 209 insertions(+), 159 deletions(-) diff --git a/src/map.rs b/src/map.rs index 744e22e..5d66352 100644 --- a/src/map.rs +++ b/src/map.rs @@ -274,7 +274,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - shutdown_rx: Option>, + shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where T: Mapper + Send + Sync + 'static, @@ -297,7 +297,7 @@ impl Server { .add_service(map_svc) .serve_with_incoming_shutdown( listener, - shutdown_signal(internal_shutdown_rx, shutdown_rx), + shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)), ) .await .map_err(Into::into) @@ -308,7 +308,8 @@ impl Server { where T: Mapper + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } @@ -323,7 +324,6 @@ mod tests { use tonic::transport::Uri; use tower::service_fn; - #[tokio::test] async fn map_server() -> Result<(), Box> { struct Cat; @@ -352,7 +352,7 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/reduce.rs b/src/reduce.rs index 40dc3e8..92c8f26 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -3,10 +3,10 @@ use std::path::PathBuf; use std::sync::Arc; use chrono::{DateTime, Utc}; -use tokio::sync::mpsc::{channel, Sender}; use tokio::sync::{mpsc, oneshot}; -use tokio_stream::wrappers::ReceiverStream; +use tokio::sync::mpsc::{channel, Sender}; use tokio_stream::StreamExt; +use tokio_stream::wrappers::ReceiverStream; use tonic::{async_trait, Request, Response, Status}; use crate::error::Error; @@ -305,23 +305,56 @@ where &self, request: Request>, ) -> Result, Status> { - // Clone the creator and response_stream since we need to move them into the spawned task + // Clone the creator and shutdown_tx to be used in the spawned tasks. let creator = Arc::clone(&self.creator); - let (response_tx, response_rx) = channel::>(1); - - // Create a new TaskSet - let (error_tx, mut error_rx) = channel::(1); - let mut task_set = TaskSet::new(creator, response_tx.clone(), error_tx.clone()); - let shutdown_tx = self.shutdown_tx.clone(); - // Error handling logic: We have an error channel to which any user defined errors or internal - // errors are sent, we have a separate task that listens to this error channel and sends the error back to the client. + + // Create a channel to send the response back to the grpc client. + let (grpc_response_tx, grpc_response_rx) = + channel::>(1); + + // Internal response channel which will be used by the task set and tasks to send the response after + // executing the user defined function. It's a result type so in case of error, we can send the error + // back to the client. + // + // NOTE: we are using a separate channel instead of the grpc_response_tx because in case of errors, + // we have to do graceful shutdown. + let (response_tx, mut response_rx) = + channel::>(1); + + // Start a task executor to handle the incoming ReduceRequests from the client, returns a tx to send + // commands to the task executor. + let task_tx = TaskSet::start_task_executor(creator, response_tx.clone()); + + // Spawn a new task to listen to the response channel and send the response back to the grpc client. + // In case of error, it propagates the error back to the client in grpc status format and aborts all + // the active tasks and sends a shutdown signal to the grpc server. + let task_tx_clone = task_tx.clone(); tokio::spawn(async move { - while let Some(error) = error_rx.recv().await { - response_tx - .send(Err(error.clone().into())) - .await - .expect("send to response channel failed"); + while let Some(result) = response_rx.recv().await { + match result { + Ok(response) => { + let eof = response.eof; + grpc_response_tx + .send(Ok(response)) + .await + .expect("send to grpc response channel failed"); + if eof { + break; + } + } + Err(error) => { + grpc_response_tx + .send(Err(Status::internal(error.to_string()))) + .await + .expect("send to grpc response channel failed"); + task_tx_clone + .send(TaskCommand::Abort) + .await + .expect("task_tx send failed"); + shutdown_tx.send(()).await.expect("shutdown_tx send failed"); + } + } } }); @@ -331,50 +364,30 @@ where while let Some(reduce_request) = stream.next().await { match reduce_request { Ok(rr) => { - let keys = match rr.payload.as_ref() { - Some(payload) => payload.keys.clone(), - None => { - error_tx - .send(ReduceError(InternalError( - "Invalid ReduceRequest".to_string(), - ))) - .await - .expect("error_tx send failed"); - continue; - } - }; - - if task_set.tasks.contains_key(&keys.join(KEY_JOIN_DELIMITER)) { - task_set.write_to_task(keys, rr).await; - } else { - task_set.create_and_write(keys, rr).await; - } + task_tx + .send(TaskCommand::HandleReduceRequest(rr)) + .await + .expect("task_tx send failed"); } Err(e) => { - // check if the error is client cancelled error - if e.code() == tonic::Code::Cancelled { - // abort all the tasks - task_set.abort().await; - // send a shutdown signal to gracefully shut down the server - shutdown_tx.send(()).await.expect("shutdown_tx send failed"); - } else { - error_tx - .send(ReduceError(InternalError(format!( - "Failed to read ReduceRequest: {}", - e - )))) - .await - .expect("error_tx send failed"); - } + response_tx + .send(Err(ReduceError(InternalError(format!( + "Failed to receive request: {}", + e + ))))) + .await + .expect("error_tx send failed"); } } } - - task_set.close().await; + task_tx + .send(TaskCommand::Close) + .await + .expect("task_tx send failed"); }); // return the rx as the streaming endpoint - Ok(Response::new(ReceiverStream::new(response_rx))) + Ok(Response::new(ReceiverStream::new(grpc_response_rx))) } async fn is_ready(&self, _: Request<()>) -> Result, Status> { @@ -382,34 +395,32 @@ where } } -/// The `Task` struct represents a task in the reduce service. -/// It is responsible for invoking the user's reducer and sending the response back to the client. +// The `Task` struct represents a task in the reduce service. It is responsible for executing the +// user defined function. We will a separate task for each keyed window. The task will be created +// when the first message for a given key arrives and will be closed when the window is closed. struct Task { - tx: Sender, - error_tx: Sender, - finished_rx: oneshot::Receiver<()>, + udf_tx: Sender, + response_tx: Sender>, + done_rx: oneshot::Receiver<()>, handle: tokio::task::JoinHandle<()>, } impl Task { - /// Creates a new `Task` with the given reducer, keys, metadata, and response sender. - /// It starts the reducer in a new task and returns a `Task` struct that can be used to send `ReduceRequest`s to the reducer. + // Creates a new task with the given reducer, keys, metadata, and response channel. async fn new( reducer: R, keys: Vec, md: Metadata, - response_tx: Sender>, - error_tx: Sender, + response_tx: Sender>, ) -> Self { - let (tx, rx) = channel::(1); - let (finished_tx, finished_rx) = oneshot::channel(); + let (udf_tx, udf_rx) = channel::(1); + let (done_tx, done_rx) = oneshot::channel(); - let error_tx_clone = error_tx.clone(); - let udf_error_tx_clone = error_tx.clone(); - let handle = tokio::spawn(async move { - let messages = reducer.reduce(keys, rx, &md).await; + let udf_response_tx = response_tx.clone(); + let task_handle = tokio::spawn(async move { + let messages = reducer.reduce(keys, udf_rx, &md).await; for message in messages { - let send_result = response_tx + let send_result = udf_response_tx .send(Ok(proto::ReduceResponse { result: Some(proto::reduce_response::Result { keys: message.keys.unwrap_or_default(), @@ -426,99 +437,142 @@ impl Task { .await; if let Err(e) = send_result { - let _ = udf_error_tx_clone - .send(ReduceError(InternalError(format!( + let _ = udf_response_tx + .send(Err(ReduceError(InternalError(format!( "Failed to send response back: {}", e - )))) + ))))) .await; return; } } }); - // Spawn a separate task that listens to the join handle and writes to the error channel in case of errors - // we need a separate handle to do this because, we cannot wait until the window is closed to propagate the - // error back the client. - let task_handle = tokio::spawn(async move { - if let Err(e) = handle.await { - let _ = error_tx_clone - .send(ReduceError(UserDefinedError(format!(" {}", e)))) + // We spawn a separate task to await the handle so that in case of any unhandled errors in the user-defined + // code will immediately be propagated to the client. + let handler_tx = response_tx.clone(); + let handle = tokio::spawn(async move { + if let Err(e) = task_handle.await { + let _ = handler_tx + .send(Err(ReduceError(UserDefinedError(format!(" {}", e))))) .await; } // Send a message indicating that the task has finished - let _ = finished_tx.send(()); + let _ = done_tx.send(()); }); + // We store the task handle so that we can abort the task if needed, we only need the second task handle because + // if the second task is aborted, the first task's handle will be dropped and the task will be aborted. Self { - tx, - error_tx, - finished_rx, - handle: task_handle, + udf_tx, + response_tx, + done_rx, + handle, } } - /// Sends a `ReduceRequest` to the task. + // Sends the request to the user defined function's input channel. async fn send(&self, rr: ReduceRequest) { - if let Err(e) = self.tx.send(rr).await { - self.error_tx - .send(ReduceError(InternalError(format!( + if let Err(e) = self.udf_tx.send(rr).await { + self.response_tx + .send(Err(ReduceError(InternalError(format!( "Failed to send message to task: {}", e - )))) + ))))) .await .expect("failed to send message to error channel"); } } - /// Closes the task and waits for it to finish. + // Closes the task and waits for it to finish. async fn close(self) { // drop the sender to close the task - drop(self.tx); + drop(self.udf_tx); // Wait for the task to finish - let _ = self.finished_rx.await; + let _ = self.done_rx.await; } - /// Aborts the task. + // Aborts the task. async fn abort(self) { self.handle.abort(); } } -/// The `TaskSet` struct represents a set of tasks in the reduce service. -/// It stores a map of keys to tasks, and is responsible for creating, writing to, and closing tasks. -/// It also sends an EOF message to the response stream when all tasks are closed. +// The `TaskSet` struct represents a set of tasks that are executing the user defined function. It is responsible +// for creating new tasks, writing messages to the tasks, closing the tasks, and aborting the tasks. struct TaskSet { tasks: HashMap, - response_stream: Sender>, - error_stream: Sender, + response_tx: Sender>, creator: Arc, window: IntervalWindow, } +enum TaskCommand { + HandleReduceRequest(proto::ReduceRequest), + Close, + Abort, +} + impl TaskSet where C: ReducerCreator + Send + Sync + 'static, { - /// Creates a new `TaskSet` with the given `ReducerCreator` and response stream. - fn new( + // Starts a new task executor which listens to incoming commands and executes them. + // returns a tx to send commands to the task executor. + fn start_task_executor( creator: Arc, - response_stream: Sender>, - error_stream: Sender, - ) -> Self { - Self { - tasks: HashMap::new(), - response_stream, - error_stream, - creator, - window: IntervalWindow::default(), - } + response_tx: Sender>, + ) -> Sender { + let (task_tx, mut task_rx) = channel::(1); + + tokio::spawn(async move { + let mut task_set = TaskSet { + tasks: HashMap::new(), + response_tx, + creator, + window: IntervalWindow::default(), + }; + while let Some(command) = task_rx.recv().await { + match command { + TaskCommand::HandleReduceRequest(rr) => { + // Extract the keys from the ReduceRequest. + let keys = match rr.payload.as_ref() { + Some(payload) => payload.keys.clone(), + None => { + task_set + .handle_error(ReduceError(InternalError( + "Invalid ReduceRequest".to_string(), + ))) + .await; + continue; + } + }; + + // Check if the task already exists, if it does, write the ReduceRequest to the task, + // otherwise create a new task and write the ReduceRequest to the task. + if task_set.tasks.contains_key(&keys.join(KEY_JOIN_DELIMITER)) { + task_set.write_to_task(keys, rr).await; + } else { + task_set.create_and_write(keys, rr).await; + } + } + TaskCommand::Close => { + task_set.close().await; + } + TaskCommand::Abort => { + task_set.abort().await; + } + } + } + }); + + task_tx } - /// Creates a new task with the given keys and `ReduceRequest`. - /// It creates a new reducer, starts it in a new task, and adds the task to the task set. + // Creates a new task with the given keys and `ReduceRequest`. + // It creates a new reducer and assigns it to the task to execute the user defined function. async fn create_and_write(&mut self, keys: Vec, rr: proto::ReduceRequest) { let (reduce_request, interval_window) = match self.validate_and_extract(rr).await { Some(value) => value, @@ -534,14 +588,7 @@ where let md = Metadata::new(interval_window); // Create a new Task with the reducer, keys, and metadata - let task = Task::new( - reducer, - keys.clone(), - md, - self.response_stream.clone(), - self.error_stream.clone(), - ) - .await; + let task = Task::new(reducer, keys.clone(), md, self.response_tx.clone()).await; // track the task in the task set self.tasks.insert(keys.join(KEY_JOIN_DELIMITER), task); @@ -556,7 +603,7 @@ where } } - /// writes the ReduceRequest to the task with the given keys. + // Writes the ReduceRequest to the task with the given keys. async fn write_to_task(&mut self, keys: Vec, rr: proto::ReduceRequest) { let (reduce_request, _) = match self.validate_and_extract(rr).await { Some(value) => value, @@ -624,17 +671,15 @@ where Some((reduce_request, interval_window)) } - /// Closes all tasks in the task set and sends an EOF message to the response stream. + // Closes all tasks in the task set and sends an EOF message to the response stream. async fn close(&mut self) { for (_, task) in self.tasks.drain() { task.close().await; } // after all the tasks have been closed, send an EOF message to the response stream - - // instead of unwrap, send the error to error stream let send_eof = self - .response_stream + .response_tx .send(Ok(proto::ReduceResponse { result: None, window: Some(proto::Window { @@ -662,10 +707,10 @@ where } } - // Sends an error to the error stream. + // Sends an error to the response stream. async fn handle_error(&self, error: Error) { - self.error_stream - .send(error) + self.response_tx + .send(Err(error)) .await .expect("error_tx send failed"); } @@ -728,7 +773,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - user_shutdown_rx: Option>, + user_shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where C: ReducerCreator + Send + Sync + 'static, @@ -744,7 +789,7 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, user_shutdown_rx); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx)); tonic::transport::Server::builder() .add_service(reduce_svc) @@ -758,14 +803,15 @@ impl Server { where C: ReducerCreator + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } #[cfg(test)] mod tests { - use std::path::PathBuf; use std::{error::Error, time::Duration}; + use std::path::PathBuf; use prost_types::Timestamp; use tempfile::TempDir; @@ -773,8 +819,8 @@ mod tests { use tokio::time::sleep; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; - use tonic::transport::Uri; use tonic::Request; + use tonic::transport::Uri; use tower::service_fn; use crate::reduce; @@ -852,7 +898,7 @@ mod tests { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; @@ -882,7 +928,7 @@ mod tests { let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; @@ -994,7 +1040,7 @@ mod tests { let (_shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; @@ -1114,7 +1160,7 @@ mod tests { let (_shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/shared.rs b/src/shared.rs index 7a0b728..ce7666b 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -93,10 +93,10 @@ pub(crate) async fn shutdown_signal( #[cfg(test)] mod tests { + use super::*; use std::fs::File; use std::io::Read; use tempfile::NamedTempFile; - use super::*; #[test] fn test_utc_from_timestamp() { diff --git a/src/sideinput.rs b/src/sideinput.rs index 6038180..6d3ab13 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -168,7 +168,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - shutdown_rx: Option>, + shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where T: SideInputer + Send + Sync + 'static, @@ -176,7 +176,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, shutdown_rx); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); let sideinput_svc = SideInputService { handler, _shutdown_tx: internal_shutdown_tx, @@ -197,6 +197,7 @@ impl Server { where T: SideInputer + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } diff --git a/src/sink.rs b/src/sink.rs index a8383b8..59c8b1c 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -282,7 +282,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - shutdown_rx: Option>, + shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where T: Sinker + Send + Sync + 'static, @@ -290,7 +290,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, shutdown_rx); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); let svc = SinkService { handler, _shutdown_tx: internal_shutdown_tx, @@ -311,7 +311,8 @@ impl Server { where T: Sinker + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } @@ -376,7 +377,7 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/source.rs b/src/source.rs index 974d8f2..8b29465 100644 --- a/src/source.rs +++ b/src/source.rs @@ -105,8 +105,8 @@ where keys: resp.keys, }), })) - .await - .expect("receiver dropped"); + .await + .expect("receiver dropped"); } }); @@ -259,7 +259,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - shutdowm_rx: Option>, + shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where T: Sourcer + Send + Sync + 'static, @@ -267,7 +267,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, shutdowm_rx); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); let source_service = SourceService { handler: Arc::new(handler), _shutdown_tx: internal_shutdown_tx, @@ -289,7 +289,8 @@ impl Server { where T: Sourcer + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } @@ -391,7 +392,7 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await; diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index cd50e88..e60894f 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -308,7 +308,7 @@ impl Server { /// Starts the gRPC server. When message is received on the `shutdown` channel, graceful shutdown of the gRPC server will be initiated. pub async fn start_with_shutdown( &mut self, - shutdown_rx: Option>, + shutdown_rx: oneshot::Receiver<()>, ) -> Result<(), Box> where T: SourceTransformer + Send + Sync + 'static, @@ -316,7 +316,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, shutdown_rx); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); let sourcetrf_svc = SourceTransformerService { handler, _shutdown_tx: internal_shutdown_tx, @@ -338,7 +338,8 @@ impl Server { where T: SourceTransformer + Send + Sync + 'static, { - self.start_with_shutdown(None).await + let (_shutdown_tx, shutdown_rx) = oneshot::channel(); + self.start_with_shutdown(shutdown_rx).await } } @@ -386,7 +387,7 @@ mod tests { assert_eq!(server.socket_file(), sock_file); let (shutdown_tx, shutdown_rx) = oneshot::channel(); - let task = tokio::spawn(async move { server.start_with_shutdown(Some(shutdown_rx)).await }); + let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await }); tokio::time::sleep(Duration::from_millis(50)).await;