Skip to content

Commit

Permalink
Handle token cancellation in transformer
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing committed Sep 30, 2024
1 parent dae7a84 commit 8cad234
Showing 1 changed file with 41 additions and 14 deletions.
55 changes: 41 additions & 14 deletions rust/monovertex/src/transformer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use crate::sourcetransform_pb::{

use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio::time::Duration;
use tokio_stream::wrappers::ReceiverStream;
use tokio_util::sync::CancellationToken;
use tonic::transport::Channel;
use tonic::{Request, Streaming};
use tracing::warn;

const DROP: &str = "U+005C__DROP__";

Expand Down Expand Up @@ -81,21 +81,36 @@ impl SourceTransformer {
);
}

// Cancellation token is used to cancel either sending task (if an error occurs while receiving) or receiving messages (if an error occurs on sending task)
let token = CancellationToken::new();
let sender: JoinHandle<Result<()>> = tokio::spawn({

// Send transform requests to the source transformer server
let sender_task: JoinHandle<Result<()>> = tokio::spawn({
let read_tx = self.read_tx.clone();
let token = token.clone();
async move {
for msg in messages {
read_tx.send(msg.into()).await.map_err(|e| {
token.cancel();
Error::TransformerError(e.to_string())
})?;
let result = tokio::select! {
result = read_tx.send(msg.into()) => result,
_ = token.cancelled() => {
warn!("Cancellation token was cancelled while sending source transform requests");
return Ok(());
},
};

match result {
Ok(()) => continue,
Err(e) => {
token.cancel();
return Err(Error::TransformerError(e.to_string()));
}
};
}
Ok(())
}
});

// Receive transformer results
let mut messages = Vec::new();
while !tracker.is_empty() {
let resp = tokio::select! {
Expand All @@ -104,16 +119,33 @@ impl SourceTransformer {
},
resp = self.resp_stream.message() => {resp}
};
let Some(resp) = resp? else {
break;

let resp = match resp {
Ok(Some(val)) => val,
Ok(None) => {
// Logging at warning level since we don't expect this to happen
warn!("Source transformer server closed its sending end of the stream. No more messages to receive");
token.cancel();
break;
}
Err(e) => {
token.cancel();
return Err(Error::TransformerError(format!(
"gRPC error while receiving messages from source transformer server: {e:?}"
)));
}
};

let Some((msg_id, msg_info)) = tracker.remove_entry(&resp.id) else {
token.cancel();
return Err(Error::TransformerError(format!(
"Received message with unknown ID {}",
resp.id
)));
};

for (i, result) in resp.results.into_iter().enumerate() {
// TODO: Expose metrics
if result.tags.iter().any(|x| x == DROP) {
continue;
}
Expand All @@ -129,12 +161,7 @@ impl SourceTransformer {
}
}

let sender_task = tokio::time::timeout(Duration::from_millis(2), sender);
let Ok(sender_task_result) = sender_task.await else {
return Err(Error::TransformerError("Possible bug: Sender task has not finished even after receiving all messages from transformer".to_string()));
};

sender_task_result.unwrap().map_err(|e| {
sender_task.await.unwrap().map_err(|e| {
Error::TransformerError(format!(
"Sending messages to gRPC transformer failed: {e:?}",
))
Expand Down

0 comments on commit 8cad234

Please sign in to comment.