diff --git a/rust/monovertex/src/transformer.rs b/rust/monovertex/src/transformer.rs index 8f7cc0509..f6fa044fc 100644 --- a/rust/monovertex/src/transformer.rs +++ b/rust/monovertex/src/transformer.rs @@ -11,11 +11,11 @@ use crate::sourcetransform_pb::{ use tokio::sync::mpsc; use tokio::task::JoinHandle; -use tokio::time::Duration; use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::transport::Channel; use tonic::{Request, Streaming}; +use tracing::warn; const DROP: &str = "U+005C__DROP__"; @@ -81,21 +81,36 @@ impl SourceTransformer { ); } + // Cancellation token is used to cancel either sending task (if an error occurs while receiving) or receiving messages (if an error occurs on sending task) let token = CancellationToken::new(); - let sender: JoinHandle> = tokio::spawn({ + + // Send transform requests to the source transformer server + let sender_task: JoinHandle> = tokio::spawn({ let read_tx = self.read_tx.clone(); let token = token.clone(); async move { for msg in messages { - read_tx.send(msg.into()).await.map_err(|e| { - token.cancel(); - Error::TransformerError(e.to_string()) - })?; + let result = tokio::select! { + result = read_tx.send(msg.into()) => result, + _ = token.cancelled() => { + warn!("Cancellation token was cancelled while sending source transform requests"); + return Ok(()); + }, + }; + + match result { + Ok(()) => continue, + Err(e) => { + token.cancel(); + return Err(Error::TransformerError(e.to_string())); + } + }; } Ok(()) } }); + // Receive transformer results let mut messages = Vec::new(); while !tracker.is_empty() { let resp = tokio::select! { @@ -104,16 +119,33 @@ impl SourceTransformer { }, resp = self.resp_stream.message() => {resp} }; - let Some(resp) = resp? else { - break; + + let resp = match resp { + Ok(Some(val)) => val, + Ok(None) => { + // Logging at warning level since we don't expect this to happen + warn!("Source transformer server closed its sending end of the stream. No more messages to receive"); + token.cancel(); + break; + } + Err(e) => { + token.cancel(); + return Err(Error::TransformerError(format!( + "gRPC error while receiving messages from source transformer server: {e:?}" + ))); + } }; + let Some((msg_id, msg_info)) = tracker.remove_entry(&resp.id) else { + token.cancel(); return Err(Error::TransformerError(format!( "Received message with unknown ID {}", resp.id ))); }; + for (i, result) in resp.results.into_iter().enumerate() { + // TODO: Expose metrics if result.tags.iter().any(|x| x == DROP) { continue; } @@ -129,12 +161,7 @@ impl SourceTransformer { } } - let sender_task = tokio::time::timeout(Duration::from_millis(2), sender); - let Ok(sender_task_result) = sender_task.await else { - return Err(Error::TransformerError("Possible bug: Sender task has not finished even after receiving all messages from transformer".to_string())); - }; - - sender_task_result.unwrap().map_err(|e| { + sender_task.await.unwrap().map_err(|e| { Error::TransformerError(format!( "Sending messages to gRPC transformer failed: {e:?}", ))