Skip to content

Commit

Permalink
fix(native): Handle error without panic (drop unwrap)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Feb 5, 2024
1 parent cb9698e commit f0a94c3
Showing 1 changed file with 60 additions and 27 deletions.
87 changes: 60 additions & 27 deletions packages/cubejs-backend-native/src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::cell::RefCell;
use std::collections::HashMap;
#[cfg(build = "debug")]
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

use crate::transport::MapCubeErrExt;
Expand All @@ -13,11 +15,19 @@ use tokio::sync::oneshot;

use crate::utils::bind_method;

type JsAsyncStringChannelCallback = Box<dyn FnOnce(Result<String, CubeError>) + Send>;
type JsAsyncChannelCallback =
Box<dyn FnOnce(&mut FunctionContext, Result<Handle<JsValue>, CubeError>) + Send>;
type JsAsyncStringChannelCallback =
Box<dyn FnOnce(Result<String, CubeError>) -> Result<(), CubeError> + Send>;
type JsAsyncChannelCallback = Box<
dyn FnOnce(&mut FunctionContext, Result<Handle<JsValue>, CubeError>) -> Result<(), CubeError>
+ Send,
>;

#[cfg(build = "debug")]
static JS_ASYNC_CHANNEL_DEBUG_ID_SEQ: AtomicU64 = AtomicU64::new(0);

pub struct JsAsyncChannel {
#[cfg(build = "debug")]
_id: u64,
callback: Option<JsAsyncChannelCallback>,
}

Expand All @@ -26,32 +36,39 @@ type BoxedChannel = JsBox<RefCell<JsAsyncChannel>>;
impl Finalize for JsAsyncChannel {}

fn js_async_channel_resolve(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;

#[cfg(build = "debug")]
trace!("JsAsyncChannel.resolved");
trace!("JsAsyncChannel.resolved {}", this.id);

let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;
let result = cx.argument::<JsValue>(0)?;

if this.borrow_mut().resolve(&mut cx, result) {
Ok(cx.undefined())
let tricky_rust_scope_hack = if let Err(err) = this.borrow_mut().resolve(&mut cx, result) {
cx.throw_error(format!("JsAsyncChannel resolving error: {}", err))
} else {
cx.throw_error("Resolve was called on AsyncChannel that was already used")
}
Ok(cx.undefined())
};

tricky_rust_scope_hack
}

fn js_async_channel_reject(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;

#[cfg(build = "debug")]
trace!("JsAsyncChannel.reject");
trace!("JsAsyncChannel.reject {}", this.id);

let this = cx.this().downcast_or_throw::<BoxedChannel, _>(&mut cx)?;
let error = cx.argument::<JsString>(0)?;

let error_str = error.value(&mut cx);
if this.borrow_mut().reject(&mut cx, error_str) {
Ok(cx.undefined())

let tricky_rust_scope_hack = if let Err(err) = this.borrow_mut().reject(&mut cx, error_str) {
cx.throw_error(format!("JsAsyncChannel rejecting error: {}", err))
} else {
cx.throw_error("Reject was called on AsyncChannel that was already used")
}
Ok(cx.undefined())
};

tricky_rust_scope_hack
}

impl JsAsyncChannel {
Expand All @@ -70,6 +87,8 @@ impl JsAsyncChannel {

pub fn new_raw(callback: JsAsyncChannelCallback) -> Self {
Self {
#[cfg(build = "debug")]
_id: JS_ASYNC_CHANNEL_DEBUG_ID_SEQ.fetch_add(1, Ordering::SeqCst),
callback: Some(callback),
}
}
Expand All @@ -91,23 +110,27 @@ impl JsAsyncChannel {
Ok(obj)
}

fn resolve(&mut self, cx: &mut FunctionContext, result: Handle<JsValue>) -> bool {
fn resolve(
&mut self,
cx: &mut FunctionContext,
result: Handle<JsValue>,
) -> Result<(), CubeError> {
if let Some(callback) = self.callback.take() {
callback(cx, Ok(result));

true
callback(cx, Ok(result))
} else {
false
Err(CubeError::internal(
"Resolve was called on AsyncChannel that was already used".to_string(),
))
}
}

fn reject(&mut self, cx: &mut FunctionContext, error: String) -> bool {
fn reject(&mut self, cx: &mut FunctionContext, error: String) -> Result<(), CubeError> {
if let Some(callback) = self.callback.take() {
callback(cx, Err(CubeError::internal(error)));

true
callback(cx, Err(CubeError::internal(error)))
} else {
false
Err(CubeError::internal(
"Reject was called on AsyncChannel that was already used".to_string(),
))
}
}
}
Expand All @@ -132,7 +155,12 @@ where
Err(err) => Err(CubeError::internal(err.to_string())),
};

tx.send(to_channel).unwrap();
tx.send(to_channel).map_err(|_| {
CubeError::internal(
"AsyncChannel: Unable to send result from JS back to Rust, channel closed"
.to_string(),
)
})
}));

channel
Expand Down Expand Up @@ -185,7 +213,12 @@ where
let async_channel = JsAsyncChannel::new_raw(Box::new(move |cx, result| {
let to_channel = result.and_then(|res| result_from_js_value(cx, res));

tx.send(to_channel).unwrap();
tx.send(to_channel).map_err(|_| {
CubeError::internal(
"AsyncChannel: Unable to send result from JS back to Rust, channel closed"
.to_string(),
)
})
}));

channel.send(move |mut cx| {
Expand Down

0 comments on commit f0a94c3

Please sign in to comment.