Skip to content

Commit

Permalink
feat: use future for async storage
Browse files Browse the repository at this point in the history
  • Loading branch information
LeuisKen committed Dec 29, 2024
1 parent 26c4e9c commit 71718b0
Show file tree
Hide file tree
Showing 14 changed files with 214 additions and 26 deletions.
8 changes: 8 additions & 0 deletions bridge/core/api/executing_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,12 @@ void ExecutingContextWebFMethods::ClearInterval(ExecutingContext* context,
WindowOrWorkerGlobalScope::clearInterval(context, interval_id, shared_exception_state->exception_state);
}

void ExecutingContextWebFMethods::SetRunRustFutureTasks(ExecutingContext* context,
WebFNativeFunctionContext* callback_context,
SharedExceptionState* shared_exception_state) {
auto callback_impl = WebFNativeFunction::Create(callback_context, shared_exception_state);

context->SetRunRustFutureTasks(callback_impl);
}

} // namespace webf
8 changes: 8 additions & 0 deletions bridge/core/executing_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,14 @@ void ExecutingContext::EnqueueMicrotask(MicrotaskCallback callback, void* data)
JS_FreeValue(ctx(), proxy_data);
}

void ExecutingContext::SetRunRustFutureTasks(const std::shared_ptr<WebFNativeFunction>& run_future_task) {
run_rust_future_tasks_ = run_future_task;
}

void ExecutingContext::RunRustFutureTasks() {
run_rust_future_tasks_->Invoke(this, 0, nullptr);
}

void ExecutingContext::DrainPendingPromiseJobs() {
// should executing pending promise jobs.
JSContext* pctx;
Expand Down
5 changes: 5 additions & 0 deletions bridge/core/executing_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class ExecutingContext {
void ReportError(JSValueConst error, char** rust_errmsg, uint32_t* rust_errmsg_length);
void DrainMicrotasks();
void EnqueueMicrotask(MicrotaskCallback callback, void* data = nullptr);
void SetRunRustFutureTasks(const std::shared_ptr<WebFNativeFunction>& run_rust_future_tasks);
void RunRustFutureTasks();
void DefineGlobalProperty(const char* prop, JSValueConst value);
ExecutionContextData* contextData();
uint8_t* DumpByteCode(const char* code, uint32_t codeLength, const char* sourceURL, uint64_t* bytecodeLength);
Expand Down Expand Up @@ -225,6 +227,9 @@ class ExecutingContext {

// Rust methods ptr should keep alive when ExecutingContext is disposing.
const std::unique_ptr<ExecutingContextWebFMethods> public_method_ptr_ = nullptr;

// Rust future task queue run trigger
std::shared_ptr<WebFNativeFunction> run_rust_future_tasks_;
};

class ObjectProperty {
Expand Down
1 change: 1 addition & 0 deletions bridge/core/frame/dom_timer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void DOMTimer::Fire() {
}
} else if (auto* callback = DynamicTo<WebFNativeFunction>(callback_.get())) {
callback->Invoke(context_, 0, nullptr);
context_->RunRustFutureTasks();
}
}

Expand Down
1 change: 1 addition & 0 deletions bridge/core/frame/module_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ NativeValue* handleInvokeModuleTransientCallback(void* ptr,
}
context->dartIsolateContext()->profiler()->FinishTrackSteps();
context->dartIsolateContext()->profiler()->FinishTrackAsyncEvaluation();
context->RunRustFutureTasks();
return return_value;
}
}
Expand Down
1 change: 1 addition & 0 deletions bridge/core/native/native_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ static void ExecuteNativeLibrary(PluginLibraryEntryPoint entry_point,
native_library_load_context->context, native_library_load_context->context->publicMethodPtr(),
native_library_load_context->context->status()};
void* result = entry_point(entry_data);
native_library_load_context->context->RunRustFutureTasks();
}

delete native_library_load_context;
Expand Down
1 change: 1 addition & 0 deletions bridge/core/page.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ NativeValue* WebFPage::invokeModuleEvent(SharedNativeString* native_module_name,
NativeValue tmp = callback->Invoke(context_, 2, params);
auto* return_value = static_cast<NativeValue*>(dart_malloc(sizeof(NativeValue)));
memcpy(return_value, &tmp, sizeof(NativeValue));
context_->RunRustFutureTasks();
return return_value;
}
}
Expand Down
7 changes: 7 additions & 0 deletions bridge/include/plugin_api/executing_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ using PublicContextSetInterval = int32_t (*)(ExecutingContext*,
SharedExceptionState*);
using PublicContextClearTimeout = void (*)(ExecutingContext*, int32_t, SharedExceptionState*);
using PublicContextClearInterval = void (*)(ExecutingContext*, int32_t, SharedExceptionState*);
using PublicContextSetRunRustFutureTasks = void (*)(ExecutingContext*,
WebFNativeFunctionContext*,
SharedExceptionState*);

// Memory aligned and readable from WebF side.
// Only C type member can be included in this class, any C++ type and classes can is not allowed to use here.
Expand Down Expand Up @@ -77,6 +80,9 @@ struct ExecutingContextWebFMethods {
static void ClearInterval(ExecutingContext* context,
int32_t interval_id,
SharedExceptionState* shared_exception_state);
static void SetRunRustFutureTasks(ExecutingContext* context,
WebFNativeFunctionContext* callback_context,
SharedExceptionState* shared_exception_state);

double version{1.0};
PublicContextGetDocument context_get_document{document};
Expand All @@ -92,6 +98,7 @@ struct ExecutingContextWebFMethods {
PublicContextSetInterval context_set_interval{SetInterval};
PublicContextClearTimeout context_clear_timeout{ClearTimeout};
PublicContextClearInterval context_clear_interval{ClearInterval};
PublicContextSetRunRustFutureTasks context_set_run_rust_future_tasks{SetRunRustFutureTasks};
};

} // namespace webf
Expand Down
2 changes: 1 addition & 1 deletion bridge/rusty_webf_sys/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ license = "Apache-2.0"

[dependencies]
libc = "0.2.0"

futures = "0.3"

[dependencies.windows]
version = "0.58.0"
Expand Down
49 changes: 49 additions & 0 deletions bridge/rusty_webf_sys/src/executing_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ pub struct ExecutingContextRustMethods {
pub set_interval: extern "C" fn(*const OpaquePtr, *const WebFNativeFunctionContext, c_int, *const OpaquePtr) -> c_int,
pub clear_timeout: extern "C" fn(*const OpaquePtr, c_int, *const OpaquePtr),
pub clear_interval: extern "C" fn(*const OpaquePtr, c_int, *const OpaquePtr),
pub set_run_rust_future_tasks: extern "C" fn(*const OpaquePtr, *const WebFNativeFunctionContext, *const OpaquePtr) -> c_void,
}

pub type TimeoutCallback = Box<dyn Fn()>;
pub type IntervalCallback = Box<dyn Fn()>;
pub type RunRustFutureTasksCallback = Box<dyn Fn()>;

/// An environment contains all the necessary running states of a web page.
///
Expand Down Expand Up @@ -259,6 +261,43 @@ impl ExecutingContext {
}
}

pub fn set_run_rust_future_tasks(&self, callback: RunRustFutureTasksCallback, exception_state: &ExceptionState) -> Result<(), String> {
let general_callback: WebFNativeFunction = Box::new(move |argc, argv| {
if argc != 0 {
println!("Invalid argument count for run rust future tasks callback");
return NativeValue::new_null();
}
callback();
NativeValue::new_null()
});

let callback_data = Box::new(WebFNativeFunctionContextData {
func: general_callback,
});
let callback_context_data_ptr = Box::into_raw(callback_data);
let callback_context = Box::new(WebFNativeFunctionContext {
callback: invoke_webf_native_function,
free_ptr: release_webf_native_function,
ptr: callback_context_data_ptr,
});
let callback_context_ptr = Box::into_raw(callback_context);

unsafe {
((*self.method_pointer).set_run_rust_future_tasks)(self.ptr, callback_context_ptr, exception_state.ptr);
}

if exception_state.has_exception() {
unsafe {
let _ = Box::from_raw(callback_context_ptr);
let _ = Box::from_raw(callback_context_data_ptr);
}
return Err(exception_state.stringify(self));
}

Ok(())

}

}

impl Drop for ExecutingContext {
Expand All @@ -271,3 +310,13 @@ impl Drop for ExecutingContext {
}
}
}

impl Clone for ExecutingContext {
fn clone(&self) -> Self {
ExecutingContext {
ptr: self.ptr,
method_pointer: self.method_pointer,
status: self.status,
}
}
}
20 changes: 13 additions & 7 deletions bridge/rusty_webf_sys/src/frame/async_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,52 +23,58 @@ impl AsyncStorage {
unsafe { &*self.context }
}

pub fn get_item(&self, key: &str, callback: GetItemCallback, exception_state: &ExceptionState) {
pub fn get_item(&self, key: &str, exception_state: &ExceptionState) -> WebFNativeFuture<String> {
let key_string = NativeValue::new_string(key);
let future_for_return = WebFNativeFuture::<String>::new();
let future_in_callback = future_for_return.clone();
let general_callback: WebFNativeFunction = Box::new(move |argc, argv| {
if argc == 1 {
let error_string = unsafe { (*argv).clone() };
let error_string = error_string.to_string();
callback(Err(error_string));
future_in_callback.set_result(Err(error_string));
return NativeValue::new_null();
}
if argc == 2 {
let item_string = unsafe { (*argv.wrapping_add(1)).clone() };
if item_string.is_null() {
callback(Ok(None));
future_in_callback.set_result(Ok(None));
return NativeValue::new_null();
}
let item_string = item_string.to_string();
callback(Ok(Some(item_string)));
future_in_callback.set_result(Ok(Some(item_string)));
return NativeValue::new_null();
}
println!("Invalid argument count for async storage callback");
NativeValue::new_null()
});
self.context().webf_invoke_module_with_params_and_callback("AsyncStorage", "getItem", &key_string, general_callback, exception_state).unwrap();
future_for_return
}

pub fn set_item(&self, key: &str, value: &str, callback: SetItemCallback, exception_state: &ExceptionState) {
pub fn set_item(&self, key: &str, value: &str, exception_state: &ExceptionState) -> WebFNativeFuture<String> {
let key_string = NativeValue::new_string(key);
let value_string = NativeValue::new_string(value);
let future_for_return = WebFNativeFuture::<String>::new();
let future_in_callback = future_for_return.clone();
let params_vec = vec![key_string, value_string];
let params = NativeValue::new_list(params_vec);
let general_callback: WebFNativeFunction = Box::new(move |argc, argv| {
if argc == 1 {
let error_string = unsafe { (*argv).clone() };
let error_string = error_string.to_string();
callback(Err(error_string));
future_in_callback.set_result(Err(error_string));
return NativeValue::new_null();
}
if argc == 2 {
let result = unsafe { (*argv.wrapping_add(1)).clone() };
let result = result.to_string();
callback(Ok(Some(result)));
future_in_callback.set_result(Ok(Some(result)));
return NativeValue::new_null();
}
println!("Invalid argument count for async storage callback");
NativeValue::new_null()
});
self.context().webf_invoke_module_with_params_and_callback("AsyncStorage", "setItem", &params, general_callback, exception_state).unwrap();
future_for_return
}
}
2 changes: 2 additions & 0 deletions bridge/rusty_webf_sys/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub mod native_value;
pub mod script_value_ref;
pub mod webf_event_listener;
pub mod webf_function;
pub mod webf_future;

pub use dom::*;
pub use events::*;
Expand All @@ -29,6 +30,7 @@ pub use native_value::*;
pub use script_value_ref::*;
pub use webf_event_listener::*;
pub use webf_function::*;
pub use webf_future::*;

#[repr(C)]
pub struct OpaquePtr;
Expand Down
92 changes: 92 additions & 0 deletions bridge/rusty_webf_sys/src/webf_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::cell::RefCell;
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};

use futures::task;

type Task = Pin<Box<dyn Future<Output = ()>>>;

pub struct FutureRuntime {
tasks: VecDeque<Task>,
}

impl FutureRuntime {
pub fn new() -> FutureRuntime {
FutureRuntime {
tasks: VecDeque::new(),
}
}

/// Spawn a future onto the mini-tokio instance.
pub fn spawn<F>(&mut self, future: F)
where
F: Future<Output = ()> + 'static,
{
self.tasks.push_back(Box::pin(future));
}

pub fn run(&mut self) {
let waker = task::noop_waker();
let mut cx = Context::from_waker(&waker);
let mut unfinished_tasks = VecDeque::new();

while let Some(mut task) = self.tasks.pop_front() {
if task.as_mut().poll(&mut cx).is_pending() {
unfinished_tasks.push_back(task);
}
}

self.tasks.append(&mut unfinished_tasks);
}
}

pub struct WebFNativeFuture<T> {
inner: Rc<RefCell<Inner<T>>>,
}

struct Inner<T> {
result: Option<Result<Option<T>, String>>,
}

impl<T> WebFNativeFuture<T> {
pub fn new() -> WebFNativeFuture<T> {
WebFNativeFuture {
inner: Rc::new(RefCell::new(Inner {
result: None,
})),
}
}

pub fn set_result(&self, result: Result<Option<T>, String>) {
let mut inner = self.inner.borrow_mut();
inner.result = Some(result);
}
}

impl<T> Future for WebFNativeFuture<T>
where
T: 'static,
{
type Output = Result<Option<T>, String>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let mut inner = self.inner.borrow_mut();

if let Some(result) = inner.result.take() {
Poll::Ready(result)
} else {
Poll::Pending
}
}
}

impl Clone for WebFNativeFuture<String> {
fn clone(&self) -> Self {
WebFNativeFuture {
inner: self.inner.clone(),
}
}
}
Loading

0 comments on commit 71718b0

Please sign in to comment.