diff --git a/.cargo/config.toml b/.cargo/config.toml index 6c8d827..05dde38 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,3 +1,10 @@ [alias] xtask = "run --package xtask --" x = "run --package xtask --" + +[target.x86_64-unknown-linux-gnu] +rustflags = [ + # for use with tokio-rs/console + "--cfg", + "tokio_unstable" +] diff --git a/.ghjk/lock.json b/.ghjk/lock.json index 5da7196..ad0600a 100644 --- a/.ghjk/lock.json +++ b/.ghjk/lock.json @@ -1,7 +1,6 @@ { "version": "0", - "platform": "x86_64-linux", - "moduleEntries": { + "sys_entries": { "ports": { "version": "0", "configResolutions": { @@ -287,6 +286,7 @@ }, "ghjkEnvProvInstSet___dev": { "installs": [ + "bciqlfx3mm5hi37g75snjknph6fkniixjhnvyyfxeua7f5z4h7nnqtna", "bciqlmoqot4jk2lb2b34pldr5iiwsfm3biuipzesjkkwmc2n2o6nlw4q", "bciqikjfnbntvagpghawbzlfp2es6lnqzhba3qx5de7tdrmvhuzhsjqa", "bciqfrfun7z7soj7yxzziyvmt2jnebqvneeoozk5vynmg5pa6wqynhvi", @@ -312,7 +312,7 @@ "lock-sed": { "ty": "denoFile@v1", "key": "lock-sed", - "envKey": "bciqcjjccn4eivzzybioty5p5tde6gnafea43rypzxci54tqjfglxyyy" + "envKey": "bciqbs6giotedkicnvrztzwovfr4siq2phoa63wngljmzna6aa4kyciy" }, "cache-v8": { "ty": "denoFile@v1", @@ -339,13 +339,13 @@ } ] }, - "bciqkjh3l5pvnydjxc3gp6autcaurxvw2euj7uz472qureg3p4y5nfuq": { + "bciqonpyvifzte5gf6tl7gtk3ep5q3vst2gqncuznf3kav4mzbuvnpai": { "desc": "the default default environment.", "provides": [ { "ty": "posix.envVar", "key": "RUST_LOG", - "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info" + "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info,h2=info,rustls=info,mio=info,hyper_util=info" }, { "ty": "ghjk.ports.InstallSetRef", @@ -353,12 +353,12 @@ } ] }, - "bciqcjjccn4eivzzybioty5p5tde6gnafea43rypzxci54tqjfglxyyy": { + "bciqbs6giotedkicnvrztzwovfr4siq2phoa63wngljmzna6aa4kyciy": { "provides": [ { "ty": "posix.envVar", "key": "RUST_LOG", - "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info" + "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info,h2=info,rustls=info,mio=info,hyper_util=info" }, { "ty": "ghjk.ports.InstallSetRef", @@ -366,12 +366,12 @@ } ] }, - "bciqothoegu7lnencjdk6pjtassehqlzaldan6l67lmtd2slhmvnq5la": { + "bciqm4lt7lvjdt3qzudw24qe4brezcej3ilxxhbmgfo2gdbfmh4j3a7i": { "provides": [ { "ty": "posix.envVar", "key": "RUST_LOG", - "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info" + "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info,h2=info,rustls=info,mio=info,hyper_util=info" }, { "ty": "ghjk.ports.InstallSetRef", @@ -379,17 +379,17 @@ } ] }, - "bciqdlbved4xvo27dss37motxje63ai3mwc4g5otjre5heqto2g7zbhi": { + "bciqdngvsrwedhravpw6h4lzku6f52ljmexs6sqrtowntcqpguibfgti": { "provides": [ { "ty": "posix.envVar", "key": "RUST_LOG", - "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info" + "val": "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info,h2=info,rustls=info,mio=info,hyper_util=info" }, { "ty": "posix.envVar", "key": "RUSTY_V8_MIRROR", - "val": "/home/asdf/repos/ecma/ghjk/.dev/rusty_v8" + "val": "/home/yohe/ghjk/.dev/rusty_v8" }, { "ty": "ghjk.ports.InstallSetRef", @@ -400,9 +400,9 @@ }, "defaultEnv": "dev", "envsNamed": { - "main": "bciqkjh3l5pvnydjxc3gp6autcaurxvw2euj7uz472qureg3p4y5nfuq", - "_rust": "bciqothoegu7lnencjdk6pjtassehqlzaldan6l67lmtd2slhmvnq5la", - "dev": "bciqdlbved4xvo27dss37motxje63ai3mwc4g5otjre5heqto2g7zbhi" + "main": "bciqonpyvifzte5gf6tl7gtk3ep5q3vst2gqncuznf3kav4mzbuvnpai", + "_rust": "bciqm4lt7lvjdt3qzudw24qe4brezcej3ilxxhbmgfo2gdbfmh4j3a7i", + "dev": "bciqdngvsrwedhravpw6h4lzku6f52ljmexs6sqrtowntcqpguibfgti" } } } @@ -904,6 +904,43 @@ "rust-src" ] }, + "bciqlfx3mm5hi37g75snjknph6fkniixjhnvyyfxeua7f5z4h7nnqtna": { + "port": { + "ty": "denoWorker@v1", + "name": "cargobi_cratesio", + "platforms": [ + "x86_64-linux", + "aarch64-linux", + "x86_64-darwin", + "aarch64-darwin", + "x86_64-windows", + "aarch64-windows", + "x86_64-freebsd", + "aarch64-freebsd", + "x86_64-netbsd", + "aarch64-netbsd", + "x86_64-aix", + "aarch64-aix", + "x86_64-solaris", + "aarch64-solaris", + "x86_64-illumos", + "aarch64-illumos", + "x86_64-android", + "aarch64-android" + ], + "version": "0.1.0", + "buildDeps": [ + { + "name": "cargo_binstall_ghrel" + }, + { + "name": "rust_rustup" + } + ], + "moduleSpecifier": "file:///ports/cargobi.ts" + }, + "crateName": "tokio-console" + }, "bciqlmoqot4jk2lb2b34pldr5iiwsfm3biuipzesjkkwmc2n2o6nlw4q": { "version": "v2.4.0", "port": { @@ -926,4 +963,4 @@ "bciqeie3punk3gz4kcfdk2fxx5bsj5fh3j7bb7z36qmimayhwdsvp7cq": {} } } -} +} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index a8c1ed4..997711b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -925,6 +925,58 @@ dependencies = [ "serde_json", ] +[[package]] +name = "console" +version = "0.15.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e1f83fc076bd6dd27517eacdf25fef6c4dfe5f1d7448bafaaf3a26f13b5e4eb" +dependencies = [ + "encode_unicode", + "lazy_static", + "libc", + "unicode-width", + "windows-sys 0.52.0", +] + +[[package]] +name = "console-api" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8030735ecb0d128428b64cd379809817e620a40e5001c54465b99ec5feec2857" +dependencies = [ + "futures-core", + "prost", + "prost-types", + "tonic", + "tracing-core", +] + +[[package]] +name = "console-subscriber" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6539aa9c6a4cd31f4b1c040f860a1eac9aa80e7df6b05d506a6e7179936d6a01" +dependencies = [ + "console-api", + "crossbeam-channel", + "crossbeam-utils", + "futures-task", + "hdrhistogram", + "humantime", + "hyper-util", + "prost", + "prost-types", + "serde", + "serde_json", + "thread_local", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-core", + "tracing-subscriber", +] + [[package]] name = "console_static_text" version = "0.8.1" @@ -2593,6 +2645,19 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "339544cc9e2c4dc3fc7149fd630c5f22263a4fdf18a98afd0075784968b5cf00" +[[package]] +name = "dialoguer" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658bce805d770f407bc62102fca7c2c64ceef2fbcb2b8bd19d2765ce093980de" +dependencies = [ + "console", + "shell-words", + "tempfile", + "thiserror", + "zeroize", +] + [[package]] name = "diatomic-waker" version = "0.2.3" @@ -2971,6 +3036,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encode_unicode" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -3542,10 +3613,13 @@ dependencies = [ "clap_complete", "color-eyre", "config", + "console", + "console-subscriber", "dashmap", "data-encoding", "deno_core", "denort", + "dialoguer", "directories", "educe", "futures", @@ -6907,6 +6981,12 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "45bb67a18fa91266cc7807181f62f9178a6873bfad7dc788c42e6430db40184f" +[[package]] +name = "shell-words" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" + [[package]] name = "shellexpand" version = "3.1.0" @@ -7895,6 +7975,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys 0.48.0", ] diff --git a/deno.jsonc b/deno.jsonc index 0bcb7ce..6ae9b9b 100644 --- a/deno.jsonc +++ b/deno.jsonc @@ -1,6 +1,6 @@ { "tasks": { - "test": "cargo build -p ghjk && deno test --parallel --unstable-worker-options --unstable-kv -A tests/*", + "test": "cargo build -p ghjk && deno test --unstable-worker-options --unstable-kv -A tests/*", "self": "deno run -A --unstable-kv --unstable-worker-options main.ts ", "cache": "deno cache deps/*", "check": "deno run -A ./scripts/check.ts", diff --git a/ghjk.ts b/ghjk.ts index c718be0..6c209e7 100644 --- a/ghjk.ts +++ b/ghjk.ts @@ -16,7 +16,7 @@ config({ env("main").vars({ RUST_LOG: - "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info", + "info,deno=info,denort=trace,swc_ecma_transforms_base=info,swc_common=info,h2=info,rustls=info,mio=info,hyper_util=info", }); env("_rust") @@ -35,6 +35,7 @@ const RUSTY_V8_MIRROR = `${import.meta.dirname}/.dev/rusty_v8`; env("dev") .inherit("_rust") + .install(ports.cargobi({ crateName: "tokio-console" })) .vars({ // V8_FORCE_DEBUG: "true", RUSTY_V8_MIRROR, @@ -65,7 +66,7 @@ task( "cache-v8", { desc: "Install the V8 builds to a local cache.", - inherit: "_rust", + inherit: false, fn: async ($) => { const tmpDirPath = await Deno.makeTempDir({}); const v8Versions = [ diff --git a/install.ts b/install.ts index c87226c..b494029 100755 --- a/install.ts +++ b/install.ts @@ -5,6 +5,12 @@ import "./setup_logger.ts"; import { defaultInstallArgs, install } from "./install/mod.ts"; +// import the main entry points so that they get cached into the deno +// store during install +import "./modules/std.ts"; +import "./port.ts"; +import "./ports/mod.ts"; + if (import.meta.main) { const shellsToHook = Deno.env.get("GHJK_INSTALL_HOOK_SHELLS") ?.split(",") diff --git a/modules/ports/worker.ts b/modules/ports/worker.ts index 383673c..97bf2d5 100644 --- a/modules/ports/worker.ts +++ b/modules/ports/worker.ts @@ -164,6 +164,8 @@ export class DenoWorkerPort extends PortBase { const worker = new Worker(import.meta.url, { name: `${this.manifest.name}@${this.manifest.version}`, type: "module", + // FIXME: catch worker errors or they bring down the whole + // program (no lockfile generated) // TODO: proper permissions }); // promise that resolves when worker replies diff --git a/src/denort/lib.rs b/src/denort/lib.rs index bfacbb7..22c7a94 100644 --- a/src/denort/lib.rs +++ b/src/denort/lib.rs @@ -3,6 +3,8 @@ pub use deno; pub mod promises; +pub mod unsync; +pub mod worker; #[allow(unused)] mod interlude { @@ -21,13 +23,10 @@ mod interlude { } use crate::interlude::*; -use deno::{ - deno_runtime::{ - deno_core::{futures::FutureExt, unsync::JoinHandle, ModuleSpecifier}, - deno_permissions, - tokio_util::create_and_run_current_thread_with_maybe_metrics, - }, - *, +use deno::deno_runtime::{ + deno_core::{futures::FutureExt, unsync::JoinHandle, ModuleSpecifier}, + deno_permissions, + tokio_util::create_and_run_current_thread_with_maybe_metrics, }; #[rustfmt::skip] @@ -49,536 +48,6 @@ pub fn init() { }; } -// thread tag used for basic sanity checks -pub const WORKER_THREAD_NAME: &str = "denort-worker-thread"; - -/// This starts a new thread and uses it to run all the tasks -/// that'll need to touch deno internals. Deno is single threaded. -/// -/// Returned handles will use channels internally to communicate to this worker. -pub async fn worker( - flags: deno::args::Flags, - custom_extensions_cb: Option>, -) -> Res { - let cx = WorkerContext::from_config(flags, custom_extensions_cb).await?; - - let (msg_tx, mut msg_rx) = tokio::sync::mpsc::unbounded_channel::(); - - let (term_signal_tx, mut term_signal_rx) = tokio::sync::watch::channel(false); - - let join_handle = deno_core::unsync::spawn( - async move { - trace!("starting deno worker"); - loop { - let msg = tokio::select! { - Some(msg) = msg_rx.recv() => { - msg - } - _ = term_signal_rx.changed() => break, - else => break - }; - trace!(?msg, "deno worker msg"); - match msg { - DenoWorkerMsg::PrepareModule { - response_channel, - inner, - } => { - response_channel - .send(module_worker(&cx, term_signal_rx.clone(), inner).await) - .expect_or_log("channel error"); - } - } - } - // std::mem::forget(cx); - trace!("deno worker done"); - } - .instrument(tracing::trace_span!("deno-worker")), - ); - // let term_signal_tx = Arc::new(term_signal_tx); - let join_handle = Arc::new(std::sync::Mutex::new(Some(join_handle))); - Ok(DenoWorkerHandle { - sender: msg_tx, - term_signal: term_signal_tx, - join_handle, - }) -} - -type TermSignal = tokio::sync::watch::Receiver; - -async fn module_worker( - cx: &WorkerContext, - global_term_signal: TermSignal, - msg: PrepareModuleMsg, -) -> Res { - let mut module_cx = cx - .prepare_module( - msg.main_module.clone(), - &msg.permissions, - msg.mode, - msg.stdio, - msg.custom_extensions_cb, - ) - .await?; - - let (module_tx, mut module_rx) = tokio::sync::mpsc::channel::(1); - deno_core::unsync::spawn( - async move { - trace!("starting module worker"); - while let Some(msg) = module_rx.recv().await { - trace!(?msg, "module worker msg"); - match msg { - ModuleWorkerReq::Run { response_channel } => response_channel - .send( - module_cx - .run(global_term_signal.clone()) - .await - .map_err(|err| ferr!(Box::new(err))), - ) - .expect_or_log("channel error"), - ModuleWorkerReq::DriveTillExit { - term_signal, - response_channel, - } => response_channel - .send( - module_cx - .drive_till_exit(global_term_signal.clone(), term_signal) - .await - .map_err(|err| ferr!(Box::new(err))), - ) - .expect_or_log("channel error"), - ModuleWorkerReq::Execute { response_channel } => response_channel - .send( - module_cx - .execute_main_module() - .await - .map_err(|err| ferr!(Box::new(err))), - ) - .expect_or_log("channel error"), - ModuleWorkerReq::GetLoadedModules { response_channel } => response_channel - .send(module_cx.get_loaded_modules()) - .expect_or_log("channel error"), - } - } - // std::mem::forget(module_cx); - trace!("module worker done"); - } - .instrument(tracing::trace_span!( - "deno-module-worker", - main_module = %msg.main_module - )), - ); - Ok(ModuleWorkerHandle { sender: module_tx }) -} - -#[derive(educe::Educe)] -#[educe(Debug)] -struct WorkerContext { - #[educe(Debug(ignore))] - cli_factory: deno::factory::CliFactory, - #[educe(Debug(ignore))] - worker_factory: deno::worker::CliMainWorkerFactory, - #[educe(Debug(ignore))] - graph: Arc, -} - -impl WorkerContext { - async fn from_config( - flags: deno::args::Flags, - root_custom_extensions_cb: Option>, - ) -> Res { - deno_permissions::set_prompt_callbacks( - Box::new(util::draw_thread::DrawThread::hide), - Box::new(util::draw_thread::DrawThread::show), - ); - - let flags = args::Flags { ..flags }; - let flags = Arc::new(flags); - let cli_factory = factory::CliFactory::from_flags(flags); - let cli_factory = if let Some(custom_extensions_cb) = &root_custom_extensions_cb { - cli_factory.with_custom_ext_cb(custom_extensions_cb.clone()) - } else { - cli_factory - }; - let worker_factory = cli_factory - .create_cli_main_worker_factory() - .await - .map_err(|err| ferr!(Box::new(err)))?; - - let graph = cli_factory - .main_module_graph_container() - .await - .map_err(|err| ferr!(Box::new(err)))? - .clone(); - Ok(Self { - cli_factory, - worker_factory, - graph, - }) - } - - async fn prepare_module( - &self, - main_module: ModuleSpecifier, - permissions: &deno_permissions::PermissionsOptions, - mode: deno_runtime::WorkerExecutionMode, - stdio: deno_runtime::deno_io::Stdio, - custom_extensions_cb: Option>, - ) -> Res { - let desc_parser = self - .cli_factory - .permission_desc_parser() - .map_err(|err| ferr!(Box::new(err)))? - .clone(); - let permissions = - deno_permissions::Permissions::from_options(desc_parser.as_ref(), permissions)?; - let permissions = deno_permissions::PermissionsContainer::new(desc_parser, permissions); - let mut worker = self - .worker_factory - .create_custom_worker( - mode, - main_module.clone(), - permissions, - custom_extensions_cb, - stdio, - ) - .await - .map_err(|err| ferr!(Box::new(err)))?; - let maybe_coverage_collector = worker - .maybe_setup_coverage_collector() - .await - .map_err(|err| ferr!(Box::new(err)))?; - - // TODO: hot module support, expose shared worker contet from deno/cli/worker - // let maybe_hmr_runner = worker - // .maybe_setup_hmr_runner() - // .await - // .map_err(|err| ferr!(Box::new(err)))?; - - let worker = worker.into_main_worker(); - - Ok(ModuleWorkerContext { - main_module, - worker, - graph: self.graph.clone(), - maybe_coverage_collector, - // maybe_hmr_runner, - }) - } -} - -#[derive(educe::Educe)] -#[educe(Debug)] -struct PrepareModuleMsg { - main_module: ModuleSpecifier, - permissions: deno_permissions::PermissionsOptions, - #[educe(Debug(ignore))] - mode: deno_runtime::WorkerExecutionMode, - #[educe(Debug(ignore))] - stdio: deno_runtime::deno_io::Stdio, - #[educe(Debug(ignore))] - custom_extensions_cb: Option>, -} - -#[derive(educe::Educe)] -#[educe(Debug)] -enum DenoWorkerMsg { - PrepareModule { - #[educe(Debug(ignore))] - response_channel: tokio::sync::oneshot::Sender>, - inner: PrepareModuleMsg, - }, -} - -#[derive(Clone, educe::Educe)] -#[educe(Debug)] -pub struct DenoWorkerHandle { - sender: tokio::sync::mpsc::UnboundedSender, - term_signal: tokio::sync::watch::Sender, - #[educe(Debug(ignore))] - join_handle: Arc>>>, -} - -impl DenoWorkerHandle { - pub async fn terminate(self) -> Res<()> { - self.term_signal.send(true)?; - let join_handle = { - let mut opt = self.join_handle.lock().expect_or_log("mutex error"); - opt.take() - }; - let Some(join_handle) = join_handle else { - return Ok(()); - }; - join_handle.await.wrap_err("tokio error") - } -} - -impl DenoWorkerHandle { - pub async fn prepare_module( - &self, - main_module: ModuleSpecifier, - permissions: deno_permissions::PermissionsOptions, - mode: deno_runtime::WorkerExecutionMode, - stdio: deno_runtime::deno_io::Stdio, - custom_extensions_cb: Option>, - ) -> Res { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(DenoWorkerMsg::PrepareModule { - response_channel: tx, - inner: PrepareModuleMsg { - main_module, - permissions, - mode, - stdio, - custom_extensions_cb, - }, - }) - .expect_or_log("channel error"); - rx.await.expect_or_log("channel error") - } -} - -#[derive(educe::Educe)] -#[educe(Debug)] -struct ModuleWorkerContext { - main_module: deno_core::ModuleSpecifier, - #[educe(Debug(ignore))] - worker: deno_runtime::worker::MainWorker, - #[educe(Debug(ignore))] - graph: Arc, - #[educe(Debug(ignore))] - maybe_coverage_collector: Option>, - // maybe_hmr_runner: Option>, -} - -impl ModuleWorkerContext { - fn get_loaded_modules(&self) -> Vec { - use deno::graph_container::*; - self.graph - .graph() - .walk( - [&self.main_module].into_iter(), - deno::deno_graph::WalkOptions { - kind: deno::deno_graph::GraphKind::CodeOnly, - check_js: false, - follow_dynamic: true, - prefer_fast_check_graph: false, - }, - ) - .map(|(url, _)| url.clone()) - .collect() - } - - async fn run(&mut self, global_term_signal: TermSignal) -> anyhow::Result { - debug!("main_module {}", self.main_module); - self.execute_main_module().await?; - - let (_local_signal_tx, local_signal_rx) = tokio::sync::watch::channel(false); - self.drive_till_exit(global_term_signal, local_signal_rx) - .await - } - - async fn drive_till_exit( - &mut self, - mut global_term_signal: TermSignal, - mut term_signal: TermSignal, - ) -> anyhow::Result { - self.worker.dispatch_load_event()?; - loop { - /* if let Some(hmr_runner) = self.maybe_hmr_runner.as_mut() { - let watcher_communicator = - self.shared.maybe_file_watcher_communicator.clone().unwrap(); - - let hmr_future = hmr_runner.run().boxed_local(); - let event_loop_future = self.worker.run_event_loop(false).boxed_local(); - - let result; - tokio::select! { - hmr_result = hmr_future => { - result = hmr_result; - }, - event_loop_result = event_loop_future => { - result = event_loop_result; - } - } - if let Err(e) = result { - watcher_communicator.change_restart_mode(WatcherRestartMode::Automatic); - return Err(e); - } - } else { - self.worker - .run_event_loop(self.maybe_coverage_collector.is_none()) - .await?; - } */ - - let event_loop_future = self.worker.run_event_loop(false).boxed_local(); - - tokio::select! { - _ = global_term_signal.changed() => { - trace!("global term signal lit, shutting down event loop"); - break - }, - _ = term_signal.changed() => { - trace!("worker term signal lit, shutting down event loop"); - break - }, - event_loop_result = event_loop_future => { - event_loop_result? - } - }; - self.worker - .run_event_loop(self.maybe_coverage_collector.is_none()) - .await?; - - let web_continue = self.worker.dispatch_beforeunload_event()?; - if !web_continue { - let node_continue = self.worker.dispatch_process_beforeexit_event()?; - if !node_continue { - trace!("beforeunload and beforeexit success, shutting down loop"); - break; - } - } - } - self.worker.dispatch_unload_event()?; - self.worker.dispatch_process_exit_event()?; - if let Some(coverage_collector) = self.maybe_coverage_collector.as_mut() { - self.worker - .js_runtime - .with_event_loop_future( - coverage_collector.stop_collecting().boxed_local(), - deno_core::PollEventLoopOptions::default(), - ) - .await?; - } - /* if let Some(hmr_runner) = self.maybe_hmr_runner.as_mut() { - self.worker - .js_runtime - .with_event_loop_future( - hmr_runner.stop().boxed_local(), - deno_core::PollEventLoopOptions::default(), - ) - .await?; - } */ - Ok(self.worker.exit_code()) - //.map_err(|err| ferr!(Box::new(err))) - } - - async fn execute_main_module(&mut self) -> anyhow::Result<()> { - let id = self.worker.preload_main_module(&self.main_module).await?; - self.worker.evaluate_module(id).await - } -} - -#[derive(educe::Educe)] -#[educe(Debug)] -enum ModuleWorkerReq { - Run { - #[educe(Debug(ignore))] - response_channel: tokio::sync::oneshot::Sender>, - }, - DriveTillExit { - term_signal: TermSignal, - #[educe(Debug(ignore))] - response_channel: tokio::sync::oneshot::Sender>, - }, - Execute { - #[educe(Debug(ignore))] - response_channel: tokio::sync::oneshot::Sender>, - }, - GetLoadedModules { - #[educe(Debug(ignore))] - response_channel: tokio::sync::oneshot::Sender>, - }, -} - -#[derive(Clone, Debug)] -pub struct ModuleWorkerHandle { - sender: tokio::sync::mpsc::Sender, -} - -#[derive(Clone, Debug)] -pub struct FinishedWorkerHandle { - sender: tokio::sync::mpsc::Sender, -} - -impl ModuleWorkerHandle { - /// Load and execute the main module - /// and drive the main loop until the program - /// exits. - pub async fn run(self) -> Res<(i32, FinishedWorkerHandle)> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(ModuleWorkerReq::Run { - response_channel: tx, - }) - .await - .expect_or_log("channel error"); - Ok(( - rx.await.expect_or_log("channel error")?, - FinishedWorkerHandle { - sender: self.sender, - }, - )) - } - - /// Load and execute the main module - /// but doesn't progress the main event - /// loop. - pub async fn execute(&mut self) -> Res<()> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(ModuleWorkerReq::Execute { - response_channel: tx, - }) - .await - .expect_or_log("channel error"); - rx.await.expect_or_log("channel error") - } - - /// Drive the event loop until exit and return - /// result in returned channel or the term signal - /// is lit. - /// Expects that [`execute`] was called first on the worker. - pub async fn drive_till_exit( - self, - ) -> Res<( - tokio::sync::oneshot::Receiver>, - tokio::sync::watch::Sender, - FinishedWorkerHandle, - )> { - let (term_signal_tx, term_signal_rx) = tokio::sync::watch::channel(false); - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(ModuleWorkerReq::DriveTillExit { - term_signal: term_signal_rx, - response_channel: tx, - }) - .await - .expect_or_log("channel error"); - Ok(( - rx, - term_signal_tx, - FinishedWorkerHandle { - sender: self.sender, - }, - )) - } -} - -impl FinishedWorkerHandle { - pub async fn get_loaded_modules(&mut self) -> Vec { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.sender - .send(ModuleWorkerReq::GetLoadedModules { - response_channel: tx, - }) - .await - .expect_or_log("channel error"); - // FIXME: can use sync oneshot here? - rx.await.expect_or_log("channel error") - } -} - /// Ensure that the subcommand runs in a task, rather than being directly executed. Since some of these /// futures are very large, this prevents the stack from getting blown out from passing them by value up /// the callchain (especially in debug mode when Rust doesn't have a chance to elide copies!). @@ -590,8 +59,8 @@ fn spawn_subcommand + 'static>(f: F) -> JoinHandle<()> { pub fn run_sync( main_mod: ModuleSpecifier, config_file: Option, - permissions: args::PermissionFlags, - custom_extensions: Arc, + permissions: deno::args::PermissionFlags, + custom_extensions: Arc, ) { new_thread_builder() .spawn(|| { @@ -613,14 +82,14 @@ pub fn run_sync( pub async fn run( main_module: ModuleSpecifier, config_file: Option, - permissions: args::PermissionFlags, - custom_extensions: Arc, + permissions: deno::args::PermissionFlags, + custom_extensions: Arc, ) -> anyhow::Result<()> { // NOTE: avoid using the Run subcommand // as it breaks our custom_extensions patch for some reason - let flags = args::Flags { + let flags = deno::args::Flags { permissions, - unstable_config: args::UnstableConfig { + unstable_config: deno::args::UnstableConfig { features: DEFAULT_UNSTABLE_FLAGS .iter() .copied() @@ -629,7 +98,7 @@ pub async fn run( ..Default::default() }, config_flag: if let Some(config_file) = config_file { - args::ConfigFlag::Path(config_file) + deno::args::ConfigFlag::Path(config_file) } else { Default::default() }, @@ -638,7 +107,8 @@ pub async fn run( let flags = Arc::new(flags); - let cli_factory = factory::CliFactory::from_flags(flags).with_custom_ext_cb(custom_extensions); + let cli_factory = + deno::factory::CliFactory::from_flags(flags).with_custom_ext_cb(custom_extensions); let worker_factory = cli_factory.create_cli_main_worker_factory().await?; @@ -653,12 +123,12 @@ pub async fn run( } pub fn test_sync( - files: deno_config::glob::FilePatterns, + files: deno::deno_config::glob::FilePatterns, config_file: PathBuf, - permissions: args::PermissionFlags, + permissions: deno::args::PermissionFlags, coverage_dir: Option, filter: Option, - custom_extensions: Arc, + custom_extensions: Arc, argv: Vec, ) { new_thread_builder() @@ -687,29 +157,31 @@ pub fn test_sync( } pub async fn test( - files: deno_config::glob::FilePatterns, + files: deno::deno_config::glob::FilePatterns, config_file: PathBuf, - permissions: args::PermissionFlags, + permissions: deno::args::PermissionFlags, coverage_dir: Option, filter: Option, - custom_extensions: Arc, + custom_extensions: Arc, argv: Vec, ) -> anyhow::Result<()> { use deno::tools::test::*; deno_permissions::set_prompt_callbacks( - Box::new(util::draw_thread::DrawThread::hide), - Box::new(util::draw_thread::DrawThread::show), + Box::new(deno::util::draw_thread::DrawThread::hide), + Box::new(deno::util::draw_thread::DrawThread::show), ); let pattern_to_str = |pattern| match pattern { - deno_config::glob::PathOrPattern::Path(path) => path.to_string_lossy().to_string(), - deno_config::glob::PathOrPattern::Pattern(pattern) => pattern.as_str().to_string(), - deno_config::glob::PathOrPattern::RemoteUrl(url) => url.as_str().to_owned(), - deno_config::glob::PathOrPattern::NegatedPath(path) => path.to_string_lossy().to_string(), + deno::deno_config::glob::PathOrPattern::Path(path) => path.to_string_lossy().to_string(), + deno::deno_config::glob::PathOrPattern::Pattern(pattern) => pattern.as_str().to_string(), + deno::deno_config::glob::PathOrPattern::RemoteUrl(url) => url.as_str().to_owned(), + deno::deno_config::glob::PathOrPattern::NegatedPath(path) => { + path.to_string_lossy().to_string() + } }; - let test_flags = args::TestFlags { - files: args::FileFlags { + let test_flags = deno::args::TestFlags { + files: deno::args::FileFlags { include: files .include .clone() @@ -732,9 +204,9 @@ pub async fn test( concurrent_jobs: std::thread::available_parallelism().ok(), ..Default::default() }; - let flags = args::Flags { + let flags = deno::args::Flags { permissions, - unstable_config: args::UnstableConfig { + unstable_config: deno::args::UnstableConfig { features: DEFAULT_UNSTABLE_FLAGS .iter() .copied() @@ -742,20 +214,21 @@ pub async fn test( .collect(), ..Default::default() }, - type_check_mode: args::TypeCheckMode::Local, - config_flag: args::ConfigFlag::Path(config_file.to_string_lossy().into()), + type_check_mode: deno::args::TypeCheckMode::Local, + config_flag: deno::args::ConfigFlag::Path(config_file.to_string_lossy().into()), argv, - subcommand: args::DenoSubcommand::Test(test_flags.clone()), + subcommand: deno::args::DenoSubcommand::Test(test_flags.clone()), ..Default::default() }; let flags = Arc::new(flags); - let cli_factory = factory::CliFactory::from_flags(flags).with_custom_ext_cb(custom_extensions); + let cli_factory = + deno::factory::CliFactory::from_flags(flags).with_custom_ext_cb(custom_extensions); let options = cli_factory.cli_options()?.clone(); - let test_options = args::WorkspaceTestOptions { + let test_options = deno::args::WorkspaceTestOptions { // files, ..options.resolve_workspace_test_options(&test_flags) }; diff --git a/src/denort/promises.rs b/src/denort/promises.rs index 7692575..ade0ae4 100644 --- a/src/denort/promises.rs +++ b/src/denort/promises.rs @@ -36,24 +36,25 @@ where .take() .unwrap() } - let on_fulfilled = v8::Function::builder( + let on_fulfilled = |scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, rv: v8::ReturnValue| { let data = v8::Local::::try_from(args.data()).unwrap(); let f = get_handler::(data); f(scope, rv, Ok(args.get(0))); - }, - ) - .data(external.into()) - .build(scope); - let on_rejected = v8::Function::builder( + }; + let on_fulfilled = v8::Function::builder(on_fulfilled) + .data(external.into()) + .build(scope); + + let on_rejected = |scope: &mut v8::HandleScope, args: v8::FunctionCallbackArguments, rv: v8::ReturnValue| { let data = v8::Local::::try_from(args.data()).unwrap(); let f = get_handler::(data); f(scope, rv, Err(args.get(0))); - }, - ) - .data(external.into()) - .build(scope); + }; + let on_rejected = v8::Function::builder(on_rejected) + .data(external.into()) + .build(scope); // function builders will return None if the runtime is shutting down let (Some(on_fulfilled), Some(on_rejected)) = (on_fulfilled, on_rejected) else { _ = get_handler::(external); diff --git a/src/denort/unsync.rs b/src/denort/unsync.rs new file mode 100644 index 0000000..0a3acac --- /dev/null +++ b/src/denort/unsync.rs @@ -0,0 +1,280 @@ +// Modified from https://github.com/denoland/deno_unsync/blob/503a3fcb82235a591a98b497c8d26be5772c6dc9/src/tokio/task.rs +// Copyright 2018-2024 the Deno authors. MIT license. + +use core::pin::Pin; +use core::task::Context; +use core::task::Poll; +use std::future::Future; +use std::marker::PhantomData; +use tokio::runtime::Handle; +use tokio::runtime::RuntimeFlavor; + +/// Equivalent to [`tokio::task::JoinHandle`]. +#[repr(transparent)] +pub struct JoinHandle { + handle: tokio::task::JoinHandle>, + _r: PhantomData, +} + +impl JoinHandle { + /// Equivalent to [`tokio::task::JoinHandle::abort`]. + pub fn abort(&self) { + self.handle.abort() + } + + pub fn abort_handle(&self) -> tokio::task::AbortHandle { + self.handle.abort_handle() + } +} + +impl Future for JoinHandle { + type Output = Result; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + // SAFETY: We are sure that handle is valid here + unsafe { + let me: &mut Self = Pin::into_inner_unchecked(self); + let handle = Pin::new_unchecked(&mut me.handle); + match handle.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Ok(r)) => Poll::Ready(Ok(r.into_inner())), + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } + } +} + +/// Equivalent to [`tokio::task::spawn`], but does not require the future to be [`Send`]. Must only be +/// used on a [`RuntimeFlavor::CurrentThread`] executor, though this is only checked when running with +/// debug assertions. +#[inline(always)] +pub fn spawn + 'static, R: 'static>(name: &str, f: F) -> JoinHandle { + debug_assert!(Handle::current().runtime_flavor() == RuntimeFlavor::CurrentThread); + // SAFETY: we know this is a current-thread executor + let future = unsafe { MaskFutureAsSend::new(f) }; + JoinHandle { + handle: tokio::task::Builder::new() + .name(name) + .spawn(future) + .expect("tokio error"), + _r: Default::default(), + } +} + +/// Equivalent to [`tokio::task::spawn_blocking`]. Currently a thin wrapper around the tokio API, but this +/// may change in the future. +#[inline(always)] +pub fn spawn_blocking R) + Send + 'static, R: Send + 'static>( + name: &str, + f: F, +) -> JoinHandle { + let handle = tokio::task::Builder::new() + .name(name) + .spawn_blocking(|| MaskResultAsSend { result: f() }) + .expect("tokio error"); + JoinHandle { + handle, + _r: Default::default(), + } +} + +#[repr(transparent)] +#[doc(hidden)] +pub struct MaskResultAsSend { + result: R, +} + +/// SAFETY: We ensure that Send bounds are only faked when tokio is running on a current-thread executor +unsafe impl Send for MaskResultAsSend {} + +impl MaskResultAsSend { + #[inline(always)] + pub fn into_inner(self) -> R { + self.result + } +} + +#[repr(transparent)] +pub struct MaskFutureAsSend { + future: F, +} + +impl MaskFutureAsSend { + /// Mark a non-`Send` future as `Send`. This is a trick to be able to use + /// `tokio::spawn()` (which requires `Send` futures) in a current thread + /// runtime. + /// + /// # Safety + /// + /// You must ensure that the future is actually used on the same + /// thread, ie. always use current thread runtime flavor from Tokio. + #[inline(always)] + pub unsafe fn new(future: F) -> Self { + Self { future } + } +} + +// SAFETY: we are cheating here - this struct is NOT really Send, +// but we need to mark it Send so that we can use `spawn()` in Tokio. +unsafe impl Send for MaskFutureAsSend {} + +impl Future for MaskFutureAsSend { + type Output = MaskResultAsSend; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // SAFETY: We are sure that future is valid here + unsafe { + let me: &mut MaskFutureAsSend = Pin::into_inner_unchecked(self); + let future = Pin::new_unchecked(&mut me.future); + match future.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(result) => Poll::Ready(MaskResultAsSend { result }), + } + } + } +} + +// Copied from https://github.com/denoland/deno_unsync/blob/503a3fcb82235a591a98b497c8d26be5772c6dc9/src/tokio/joinset.rs +// Copyright 2018-2024 the Deno authors. MIT license. +// Some code and comments under MIT license where adapted from Tokio code +// Copyright (c) 2023 Tokio Contributors + +use std::task::Waker; +use tokio::task::AbortHandle; +use tokio::task::JoinError; + +/// Wraps the tokio [`JoinSet`] to make it !Send-friendly and to make it easier and safer for us to +/// poll while empty. +pub struct JoinSet { + joinset: tokio::task::JoinSet>, + /// If join_next returns Ready(None), we stash the waker + waker: Option, +} + +impl Default for JoinSet { + fn default() -> Self { + Self { + joinset: Default::default(), + waker: None, + } + } +} + +impl JoinSet { + /// Spawn the provided task on the `JoinSet`, returning an [`AbortHandle`] + /// that can be used to remotely cancel the task. + /// + /// The provided future will start running in the background immediately + /// when this method is called, even if you don't await anything on this + /// `JoinSet`. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`AbortHandle`]: tokio::task::AbortHandle + #[track_caller] + pub fn spawn(&mut self, task: F) -> AbortHandle + where + F: Future, + F: 'static, + T: 'static, + { + // SAFETY: We only use this with the single-thread executor + let handle = self.joinset.spawn(unsafe { MaskFutureAsSend::new(task) }); + + // If someone had called poll_join_next while we were empty, ask them to poll again + // so we can properly register the waker with the underlying JoinSet. + if let Some(waker) = self.waker.take() { + waker.wake(); + } + handle + } + + #[track_caller] + pub fn spawn_named(&mut self, name: &str, task: F) -> AbortHandle + where + F: Future, + F: 'static, + T: 'static, + { + // SAFETY: We only use this with the single-thread executor + let handle = self + .joinset + .build_task() + .name(name) + .spawn(unsafe { MaskFutureAsSend::new(task) }) + .expect("tokio error"); + + // If someone had called poll_join_next while we were empty, ask them to poll again + // so we can properly register the waker with the underlying JoinSet. + if let Some(waker) = self.waker.take() { + waker.wake(); + } + handle + } + + /// Returns the number of tasks currently in the `JoinSet`. + pub fn len(&self) -> usize { + self.joinset.len() + } + + /// Returns whether the `JoinSet` is empty. + pub fn is_empty(&self) -> bool { + self.joinset.is_empty() + } + + /// Waits until one of the tasks in the set completes and returns its output. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinSet`. + pub fn poll_join_next(&mut self, cx: &mut Context) -> Poll> { + match self.joinset.poll_join_next(cx) { + Poll::Ready(Some(res)) => Poll::Ready(res.map(|res| res.into_inner())), + Poll::Ready(None) => { + // Stash waker + self.waker = Some(cx.waker().clone()); + Poll::Pending + } + Poll::Pending => Poll::Pending, + } + } + + /// Waits until one of the tasks in the set completes and returns its output. + /// + /// Returns `None` if the set is empty. + /// + /// # Cancel Safety + /// + /// This method is cancel safe. If `join_next` is used as the event in a `tokio::select!` + /// statement and some other branch completes first, it is guaranteed that no tasks were + /// removed from this `JoinSet`. + pub async fn join_next(&mut self) -> Option> { + self.joinset + .join_next() + .await + .map(|result| result.map(|res| res.into_inner())) + } + + /// Aborts all tasks on this `JoinSet`. + /// + /// This does not remove the tasks from the `JoinSet`. To wait for the tasks to complete + /// cancellation, you should call `join_next` in a loop until the `JoinSet` is empty. + pub fn abort_all(&mut self) { + self.joinset.abort_all(); + } + + /// Removes all tasks from this `JoinSet` without aborting them. + /// + /// The tasks removed by this call will continue to run in the background even if the `JoinSet` + /// is dropped. + pub fn detach_all(&mut self) { + self.joinset.detach_all(); + } +} diff --git a/src/denort/worker.rs b/src/denort/worker.rs new file mode 100644 index 0000000..08c4b5b --- /dev/null +++ b/src/denort/worker.rs @@ -0,0 +1,562 @@ +use crate::interlude::*; +use deno::{ + deno_runtime::{ + deno_core::{futures::FutureExt, ModuleSpecifier}, + deno_permissions, + }, + *, +}; + +// thread tag used for basic sanity checks +pub const WORKER_THREAD_NAME: &str = "denort-worker-thread"; + +/// This starts a new thread and uses it to run all the tasks +/// that'll need to touch deno internals. Deno is single threaded. +/// +/// Returned handles will use channels internally to communicate to this worker. +pub async fn worker( + flags: deno::args::Flags, + custom_extensions_cb: Option>, +) -> Res { + let cx = WorkerContext::from_config(flags, custom_extensions_cb).await?; + + let (msg_tx, mut msg_rx) = tokio::sync::mpsc::channel::(32); + + let (term_signal_tx, term_signal_rx) = tokio::sync::watch::channel(false); + + let mut term_signal_rx2 = term_signal_rx.clone(); + let join_handle = crate::unsync::spawn( + "deno-worker", + async move { + let mut task_set = crate::unsync::JoinSet::default(); + trace!("starting deno worker"); + loop { + let msg = tokio::select! { + Some(msg) = msg_rx.recv() => { + msg + } + _ = term_signal_rx2.changed() => break, + else => break + }; + trace!(?msg, "deno worker msg"); + match msg { + DenoWorkerMsg::PrepareModule { + response_channel, + inner, + } => { + response_channel + .send( + module_worker(&cx, term_signal_rx2.clone(), inner, &mut task_set) + .await, + ) + .expect_or_log("channel error"); + } + } + } + // std::mem::forget(cx); + trace!("deno worker done"); + } + .instrument(tracing::trace_span!("deno-worker")), + ); + // let term_signal_tx = Arc::new(term_signal_tx); + let join_handle = Arc::new(std::sync::Mutex::new(Some(join_handle))); + Ok(DenoWorkerHandle { + sender: msg_tx, + term_signal_tx, + term_signal_rx, + join_handle, + }) +} + +type TermSignal = tokio::sync::watch::Receiver; + +async fn module_worker( + cx: &WorkerContext, + global_term_signal: TermSignal, + msg: PrepareModuleMsg, + task_set: &mut crate::unsync::JoinSet<()>, +) -> Res { + let mut module_cx = cx + .prepare_module( + msg.main_module.clone(), + &msg.permissions, + msg.mode, + msg.stdio, + msg.custom_extensions_cb, + ) + .await?; + + let (module_tx, mut module_rx) = tokio::sync::mpsc::channel::(1); + task_set.spawn_named( + &format!("deno-module-worker-{}", msg.main_module), + async move { + trace!("starting module worker"); + while let Some(msg) = module_rx.recv().await { + trace!(?msg, "module worker msg"); + match msg { + ModuleWorkerReq::Run { response_channel } => response_channel + .send( + module_cx + .run(global_term_signal.clone()) + .await + .map_err(|err| ferr!(Box::new(err))), + ) + .expect_or_log("channel error"), + ModuleWorkerReq::DriveTillExit { + term_signal, + response_channel, + } => response_channel + .send( + module_cx + .drive_till_exit(global_term_signal.clone(), term_signal) + .await + .map_err(|err| ferr!(Box::new(err))), + ) + .expect_or_log("channel error"), + ModuleWorkerReq::Execute { response_channel } => response_channel + .send( + module_cx + .execute_main_module() + .await + .map_err(|err| ferr!(Box::new(err))), + ) + .expect_or_log("channel error"), + ModuleWorkerReq::GetLoadedModules { response_channel } => response_channel + .send(module_cx.get_loaded_modules()) + .expect_or_log("channel error"), + } + } + // std::mem::forget(module_cx); + trace!("module worker done"); + } + .instrument(tracing::trace_span!( + "deno-module-worker", + main_module = %msg.main_module + )), + ); + Ok(ModuleWorkerHandle { sender: module_tx }) +} + +#[derive(educe::Educe)] +#[educe(Debug)] +struct WorkerContext { + #[educe(Debug(ignore))] + cli_factory: deno::factory::CliFactory, + #[educe(Debug(ignore))] + worker_factory: deno::worker::CliMainWorkerFactory, + #[educe(Debug(ignore))] + graph: Arc, +} + +impl WorkerContext { + async fn from_config( + flags: deno::args::Flags, + root_custom_extensions_cb: Option>, + ) -> Res { + deno_permissions::set_prompt_callbacks( + Box::new(util::draw_thread::DrawThread::hide), + Box::new(util::draw_thread::DrawThread::show), + ); + + let flags = args::Flags { ..flags }; + let flags = Arc::new(flags); + let cli_factory = factory::CliFactory::from_flags(flags); + let cli_factory = if let Some(custom_extensions_cb) = &root_custom_extensions_cb { + cli_factory.with_custom_ext_cb(custom_extensions_cb.clone()) + } else { + cli_factory + }; + let worker_factory = cli_factory + .create_cli_main_worker_factory() + .await + .map_err(|err| ferr!(Box::new(err)))?; + + let graph = cli_factory + .main_module_graph_container() + .await + .map_err(|err| ferr!(Box::new(err)))? + .clone(); + Ok(Self { + cli_factory, + worker_factory, + graph, + }) + } + + async fn prepare_module( + &self, + main_module: ModuleSpecifier, + permissions: &deno_permissions::PermissionsOptions, + mode: deno_runtime::WorkerExecutionMode, + stdio: deno_runtime::deno_io::Stdio, + custom_extensions_cb: Option>, + ) -> Res { + let desc_parser = self + .cli_factory + .permission_desc_parser() + .map_err(|err| ferr!(Box::new(err)))? + .clone(); + let permissions = + deno_permissions::Permissions::from_options(desc_parser.as_ref(), permissions)?; + let permissions = deno_permissions::PermissionsContainer::new(desc_parser, permissions); + let mut worker = self + .worker_factory + .create_custom_worker( + mode, + main_module.clone(), + permissions, + custom_extensions_cb, + stdio, + ) + .await + .map_err(|err| ferr!(Box::new(err)))?; + let maybe_coverage_collector = worker + .maybe_setup_coverage_collector() + .await + .map_err(|err| ferr!(Box::new(err)))?; + + // TODO: hot module support, expose shared worker contet from deno/cli/worker + // let maybe_hmr_runner = worker + // .maybe_setup_hmr_runner() + // .await + // .map_err(|err| ferr!(Box::new(err)))?; + + let worker = worker.into_main_worker(); + + Ok(ModuleWorkerContext { + main_module, + worker, + graph: self.graph.clone(), + maybe_coverage_collector, + // maybe_hmr_runner, + }) + } +} + +#[derive(educe::Educe)] +#[educe(Debug)] +struct PrepareModuleMsg { + main_module: ModuleSpecifier, + permissions: deno_permissions::PermissionsOptions, + #[educe(Debug(ignore))] + mode: deno_runtime::WorkerExecutionMode, + #[educe(Debug(ignore))] + stdio: deno_runtime::deno_io::Stdio, + #[educe(Debug(ignore))] + custom_extensions_cb: Option>, +} + +#[derive(educe::Educe)] +#[educe(Debug)] +enum DenoWorkerMsg { + PrepareModule { + #[educe(Debug(ignore))] + response_channel: tokio::sync::oneshot::Sender>, + inner: PrepareModuleMsg, + }, +} + +#[derive(Clone, educe::Educe)] +#[educe(Debug)] +pub struct DenoWorkerHandle { + sender: tokio::sync::mpsc::Sender, + term_signal_tx: tokio::sync::watch::Sender, + #[educe(Debug(ignore))] + join_handle: Arc>>>, + term_signal_rx: tokio::sync::watch::Receiver, +} + +impl DenoWorkerHandle { + pub fn term_signal_watcher(&self) -> tokio::sync::watch::Receiver { + self.term_signal_rx.clone() + } + + pub async fn terminate(self) -> Res<()> { + let join_handle = { + let mut opt = self.join_handle.lock().expect_or_log("mutex error"); + opt.take() + }; + let Some(join_handle) = join_handle else { + return Ok(()); + }; + self.term_signal_tx.send(true)?; + let abort_handle = join_handle.abort_handle(); + match tokio::time::timeout(std::time::Duration::from_millis(100), join_handle).await { + Ok(val) => val.wrap_err("tokio error"), + Err(_) => { + trace!("timeout waiting for deno worker termination, aborting"); + abort_handle.abort(); + Ok(()) + } + } + //join_handle.await.wrap_err("tokio error") + } +} + +impl DenoWorkerHandle { + pub async fn prepare_module( + &self, + main_module: ModuleSpecifier, + permissions: deno_permissions::PermissionsOptions, + mode: deno_runtime::WorkerExecutionMode, + stdio: deno_runtime::deno_io::Stdio, + custom_extensions_cb: Option>, + ) -> Res { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(DenoWorkerMsg::PrepareModule { + response_channel: tx, + inner: PrepareModuleMsg { + main_module, + permissions, + mode, + stdio, + custom_extensions_cb, + }, + }) + .await + .expect_or_log("channel error"); + rx.await.expect_or_log("channel error") + } +} + +#[derive(educe::Educe)] +#[educe(Debug)] +struct ModuleWorkerContext { + main_module: deno_core::ModuleSpecifier, + #[educe(Debug(ignore))] + worker: deno_runtime::worker::MainWorker, + #[educe(Debug(ignore))] + graph: Arc, + #[educe(Debug(ignore))] + maybe_coverage_collector: Option>, + // maybe_hmr_runner: Option>, +} + +impl ModuleWorkerContext { + fn get_loaded_modules(&self) -> Vec { + use deno::graph_container::*; + self.graph + .graph() + .walk( + [&self.main_module].into_iter(), + deno::deno_graph::WalkOptions { + kind: deno::deno_graph::GraphKind::CodeOnly, + check_js: false, + follow_dynamic: true, + prefer_fast_check_graph: false, + }, + ) + .map(|(url, _)| url.clone()) + .collect() + } + + async fn run(&mut self, global_term_signal: TermSignal) -> anyhow::Result { + debug!("main_module {}", self.main_module); + self.execute_main_module().await?; + + let (_local_signal_tx, local_signal_rx) = tokio::sync::watch::channel(false); + self.drive_till_exit(global_term_signal, local_signal_rx) + .await + } + + async fn drive_till_exit( + &mut self, + mut global_term_signal: TermSignal, + mut term_signal: TermSignal, + ) -> anyhow::Result { + self.worker.dispatch_load_event()?; + loop { + /* if let Some(hmr_runner) = self.maybe_hmr_runner.as_mut() { + let watcher_communicator = + self.shared.maybe_file_watcher_communicator.clone().unwrap(); + + let hmr_future = hmr_runner.run().boxed_local(); + let event_loop_future = self.worker.run_event_loop(false).boxed_local(); + + let result; + tokio::select! { + hmr_result = hmr_future => { + result = hmr_result; + }, + event_loop_result = event_loop_future => { + result = event_loop_result; + } + } + if let Err(e) = result { + watcher_communicator.change_restart_mode(WatcherRestartMode::Automatic); + return Err(e); + } + } else { + self.worker + .run_event_loop(self.maybe_coverage_collector.is_none()) + .await?; + } */ + + let event_loop_future = self.worker.run_event_loop(false).boxed_local(); + + tokio::select! { + _ = global_term_signal.wait_for(|sig| *sig) => { + trace!("global term signal lit, shutting down event loop"); + break + }, + _ = term_signal.wait_for(|sig| *sig) => { + trace!("worker term signal lit, shutting down event loop"); + break + }, + event_loop_result = event_loop_future => { + event_loop_result? + } + }; + self.worker + .run_event_loop(self.maybe_coverage_collector.is_none()) + .await?; + + let web_continue = self.worker.dispatch_beforeunload_event()?; + if !web_continue { + let node_continue = self.worker.dispatch_process_beforeexit_event()?; + if !node_continue { + trace!("beforeunload and beforeexit success, shutting down loop"); + break; + } + } + } + self.worker.dispatch_unload_event()?; + self.worker.dispatch_process_exit_event()?; + if let Some(coverage_collector) = self.maybe_coverage_collector.as_mut() { + self.worker + .js_runtime + .with_event_loop_future( + coverage_collector.stop_collecting().boxed_local(), + deno_core::PollEventLoopOptions::default(), + ) + .await?; + } + /* if let Some(hmr_runner) = self.maybe_hmr_runner.as_mut() { + self.worker + .js_runtime + .with_event_loop_future( + hmr_runner.stop().boxed_local(), + deno_core::PollEventLoopOptions::default(), + ) + .await?; + } */ + Ok(self.worker.exit_code()) + //.map_err(|err| ferr!(Box::new(err))) + } + + async fn execute_main_module(&mut self) -> anyhow::Result<()> { + let id = self.worker.preload_main_module(&self.main_module).await?; + self.worker.evaluate_module(id).await + } +} + +#[derive(educe::Educe)] +#[educe(Debug)] +enum ModuleWorkerReq { + Run { + #[educe(Debug(ignore))] + response_channel: tokio::sync::oneshot::Sender>, + }, + DriveTillExit { + term_signal: TermSignal, + #[educe(Debug(ignore))] + response_channel: tokio::sync::oneshot::Sender>, + }, + Execute { + #[educe(Debug(ignore))] + response_channel: tokio::sync::oneshot::Sender>, + }, + GetLoadedModules { + #[educe(Debug(ignore))] + response_channel: tokio::sync::oneshot::Sender>, + }, +} + +#[derive(Clone, Debug)] +pub struct ModuleWorkerHandle { + sender: tokio::sync::mpsc::Sender, +} + +#[derive(Clone, Debug)] +pub struct FinishedWorkerHandle { + sender: tokio::sync::mpsc::Sender, +} + +impl ModuleWorkerHandle { + /// Load and execute the main module + /// and drive the main loop until the program + /// exits. + pub async fn run(self) -> Res<(i32, FinishedWorkerHandle)> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(ModuleWorkerReq::Run { + response_channel: tx, + }) + .await + .expect_or_log("channel error"); + Ok(( + rx.await.expect_or_log("channel error")?, + FinishedWorkerHandle { + sender: self.sender, + }, + )) + } + + /// Load and execute the main module + /// but doesn't progress the main event + /// loop. + pub async fn execute(&mut self) -> Res<()> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(ModuleWorkerReq::Execute { + response_channel: tx, + }) + .await + .expect_or_log("channel error"); + rx.await.expect_or_log("channel error") + } + + /// Drive the event loop until exit and return + /// result in returned channel or the term signal + /// is lit. + /// Expects that [`execute`] was called first on the worker. + pub async fn drive_till_exit( + self, + ) -> Res<( + tokio::sync::oneshot::Receiver>, + tokio::sync::watch::Sender, + FinishedWorkerHandle, + )> { + let (term_signal_tx, term_signal_rx) = tokio::sync::watch::channel(false); + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(ModuleWorkerReq::DriveTillExit { + term_signal: term_signal_rx, + response_channel: tx, + }) + .await + .expect_or_log("channel error"); + Ok(( + rx, + term_signal_tx, + FinishedWorkerHandle { + sender: self.sender, + }, + )) + } +} + +impl FinishedWorkerHandle { + pub async fn get_loaded_modules(&mut self) -> Vec { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(ModuleWorkerReq::GetLoadedModules { + response_channel: tx, + }) + .await + .expect_or_log("channel error"); + // FIXME: can use sync oneshot here? + rx.await.expect_or_log("channel error") + } +} diff --git a/src/ghjk/Cargo.toml b/src/ghjk/Cargo.toml index 834f567..c0964a1 100644 --- a/src/ghjk/Cargo.toml +++ b/src/ghjk/Cargo.toml @@ -55,7 +55,7 @@ tracing-futures = "0.2" async-trait = "0.1.83" futures-concurrency = "7.6.2" futures = { version = "=0.3.30", default-features = false, features = ["std", "async-await"] } -tokio = { workspace = true, features = ["full", "parking_lot"] } +tokio = { workspace = true, features = ["full", "parking_lot", "tracing"] } tokio-stream = "0.1" dashmap = { version = "5.5", features = ["serde"]} @@ -73,6 +73,9 @@ sha2 = "0.10.8" pathdiff = "0.2.2" directories = "5.0.1" +dialoguer = "0.11.0" +console = "0.15.8" +console-subscriber = "0.4.1" [build-dependencies] shadow-rs.workspace = true diff --git a/src/ghjk/cli.rs b/src/ghjk/cli.rs index 3be391f..437c7e9 100644 --- a/src/ghjk/cli.rs +++ b/src/ghjk/cli.rs @@ -15,6 +15,8 @@ pub async fn cli() -> Res { let config = Config::source().await?; + debug!("config sourced: {config:?}"); + let Some(quick_err) = try_quick_cli(&config).await? else { return Ok(ExitCode::SUCCESS); }; @@ -39,9 +41,13 @@ pub async fn cli() -> Res { .deno_lockfile .as_ref() .map(|path| path.to_string_lossy().into()), + internal: deno::args::InternalFlags { + cache_path: Some(config.deno_dir.clone()), + ..default() + }, ..default() }; - denort::worker(flags, Some(Arc::new(Vec::new))).await? + denort::worker::worker(flags, Some(Arc::new(Vec::new))).await? }; let gcx = GhjkCtx { diff --git a/src/ghjk/config.rs b/src/ghjk/config.rs index fbf0a10..767e456 100644 --- a/src/ghjk/config.rs +++ b/src/ghjk/config.rs @@ -64,7 +64,9 @@ impl Config { // if ghjkfile var is set, set the GHJK_DIR overriding // any set by the user let (ghjkfile_path, ghjkdir_path) = if let Some(path) = ghjkfile_path { - let file_path = tokio::fs::canonicalize(path).await?; + let file_path = tokio::fs::canonicalize(&path) + .await + .wrap_err_with(|| format!("error canonicalizing ghjkfile path at {path:?}"))?; let dir_path = file_path.parent().unwrap().join(".ghjk"); (Some(file_path), Some(dir_path)) } else { diff --git a/src/ghjk/ext.rs b/src/ghjk/ext.rs index da8a612..706f1dd 100644 --- a/src/ghjk/ext.rs +++ b/src/ghjk/ext.rs @@ -34,8 +34,11 @@ impl ExtConfig { Self::default() } - pub fn callbacks_handle(&mut self) -> callbacks::CallbacksHandle { - let (line, handle) = callbacks::CallbackLine::new(); + pub fn callbacks_handle( + &mut self, + dworker: &denort::worker::DenoWorkerHandle, + ) -> callbacks::CallbacksHandle { + let (line, handle) = callbacks::CallbackLine::new(dworker); self.callbacks_rx = Arc::new(std::sync::Mutex::new(line)); handle diff --git a/src/ghjk/ext/callbacks.rs b/src/ghjk/ext/callbacks.rs index 6939b64..5a8def2 100644 --- a/src/ghjk/ext/callbacks.rs +++ b/src/ghjk/ext/callbacks.rs @@ -24,35 +24,43 @@ pub enum CallbackError { V8Error(eyre::Report), } +struct CallbackCtx { + rx: tokio::sync::mpsc::Receiver, + term_signal: tokio::sync::watch::Receiver, +} + /// Line used by the callback_worker to receive /// invocations. #[derive(Default)] pub struct CallbackLine { - line: Option>, + cx: Option, was_set: bool, } impl CallbackLine { - pub fn new() -> (Self, CallbacksHandle) { + pub fn new(dworker: &denort::worker::DenoWorkerHandle) -> (Self, CallbacksHandle) { let (tx, rx) = tokio::sync::mpsc::channel(1); ( Self { was_set: true, - line: Some(rx), + cx: Some(CallbackCtx { + rx, + term_signal: dworker.term_signal_watcher(), + }), }, CallbacksHandle { sender: tx }, ) } - fn take(&mut self) -> Option> { + fn take(&mut self) -> Option { if !self.was_set { - debug!("callback line was not set, worker callbacks will noop"); + // debug!("callback line was not set, worker callbacks will noop"); return None; } - match self.line.take() { + match self.cx.take() { Some(val) => Some(val), None => { - debug!("realm with callbacks just had a child, it won't inherit callback feature"); + // debug!("realm with callbacks just had a child, it won't inherit callback feature"); None } } @@ -107,23 +115,33 @@ pub struct Callbacks { /// Stored callbacks are not Sync so this expects to be started /// on the same thread as deno. pub fn worker(config: &ExtConfig) -> Option { - let mut line = config.callbacks_rx.lock().expect_or_log("mutex err"); - let mut line = line.take()?; + let CallbackCtx { + mut rx, + term_signal, + } = { + let mut mutex = config.callbacks_rx.lock().expect_or_log("mutex err"); + mutex.take()? + }; let callbacks = Callbacks::default(); let callbacks_2go = callbacks.clone(); - deno_core::unsync::spawn( + denort::unsync::spawn( + "callback-worker", async move { trace!("callback worker starting"); - while let Some(msg) = line.recv().await { - trace!(?msg, "callback worker msg"); + while let Some(msg) = rx.recv().await { + trace!(?msg, "msg"); match msg { CallbacksMsg::Exec { key: name, args, response_channel, } => response_channel - .send(callbacks_2go.exec_callback(name, args).await) + .send( + callbacks_2go + .exec_callback(name, args, term_signal.clone()) + .await, + ) .expect_or_log("channel error"), } } @@ -140,6 +158,7 @@ impl Callbacks { &self, key: CHeapStr, args: serde_json::Value, + mut term_signal: tokio::sync::watch::Receiver, ) -> Result { let Some(cb) = self.store.get(&key[..]).map(|cb| cb.clone()) else { return Err(CallbackError::NotFound { @@ -147,6 +166,11 @@ impl Callbacks { }); }; + if *term_signal.borrow_and_update() { + trace!("callback invoked on terminated runtime"); + return Err(CallbackError::V8Error(ferr!("deno is shutting down"))); + } + let (tx, rx) = oneshot::channel::>(); // we use the sender to spawn work on the v8 thread @@ -164,6 +188,7 @@ impl Callbacks { // and yet we're transmuting it to a Local here. // This is observed from the deno codebase // and I can't explain it + // SAFETY: cargo culted from deno codebase let func = unsafe { std::mem::transmute::, v8::Local>( cb.js_fn, @@ -191,27 +216,31 @@ impl Callbacks { if res.is_promise() { let promise = v8::Local::::try_from(res).unwrap(); - denort::promises::watch_promise(scope, promise, move |scope, _rf, res| { - let res = match res { - Ok(val) => serde_v8::from_v8(scope, val).map_err(|err| { - CallbackError::ProtocolError( - ferr!(err) - .wrap_err("error deserializaing promise result from v8"), - ) - }), - Err(err) => Err(CallbackError::JsError(ferr!( - "callback promise rejection: {}", - err.to_rust_string_lossy(scope) - ))), /* Err(err) => match serde_v8::from_v8(scope, err) { - Ok(json) => Err(CallbackError::JsError(json)), - Err(err) => Err(CallbackError::ProtocolError( - ferr!(err) - .wrap_err("error deserializaing promise rejection from v8"), - )), - }, */ - }; - tx.send(res).expect_or_log("channel error") - }); + if let None = + denort::promises::watch_promise(scope, promise, move |scope, _rf, res| { + let res = + match res { + Ok(val) => serde_v8::from_v8(scope, val).map_err(|err| { + CallbackError::ProtocolError(ferr!(err).wrap_err( + "error deserializaing promise result from v8", + )) + }), + Err(err) => Err(CallbackError::JsError(ferr!( + "callback promise rejection: {}", + err.to_rust_string_lossy(scope) + ))), /* Err(err) => match serde_v8::from_v8(scope, err) { + Ok(json) => Err(CallbackError::JsError(json)), + Err(err) => Err(CallbackError::ProtocolError( + ferr!(err) + .wrap_err("error deserializaing promise rejection from v8"), + )), + }, */ + }; + tx.send(res).expect_or_log("channel error") + }) + { + return Err(CallbackError::V8Error(ferr!("js runtime is shutting down"))); + }; Ok(None) } else { let res = serde_v8::from_v8(scope, res).map_err(|err| { @@ -224,11 +253,18 @@ impl Callbacks { }) }); - let res = match join_handle.await.expect_or_log("tokio error")? { - Some(res) => res, - None => { - trace!("waiting for callback proimse"); - rx.await.expect_or_log("channel error")? + // if the callback is not async, we recieve the value right away + if let Some(res) = join_handle.await.expect_or_log("tokio error")? { + return Ok(res); + }; + + let res = tokio::select! { + _ = term_signal.wait_for(|signal| *signal) => { + trace!("callback worker recieved term signal"); + return Err(CallbackError::V8Error(ferr!("deno terminated waiting on callback"))); + }, + res = rx => { + res.expect_or_log("channel error")? } }; diff --git a/src/ghjk/host.rs b/src/ghjk/host.rs index 2f9e775..73ed645 100644 --- a/src/ghjk/host.rs +++ b/src/ghjk/host.rs @@ -1,3 +1,5 @@ +use std::io::IsTerminal; + use crate::interlude::*; use crate::systems::*; @@ -55,6 +57,7 @@ pub async fn systems_from_ghjkfile( ghjkdir_path.join("lock.json"), ); + // read both files concurrently let (hash_obj, lock_obj) = ( HashObj::from_file(&hashfile_path), LockObj::from_file(&lockfile_path), @@ -62,9 +65,38 @@ pub async fn systems_from_ghjkfile( .join() .await; + // discard corrupt files if needed let (mut hash_obj, mut lock_obj) = ( - hash_obj.inspect_err(|err| warn!("{err}")).ok(), - lock_obj.inspect_err(|err| warn!("{err}")).ok(), + match hash_obj { + Ok(val) => val, + Err(hashfile::HashfileError::Serialization(_)) => { + error!("hashfile is corrupt, discarding"); + None + } + Err(hashfile::HashfileError::Other(err)) => return Err(err), + }, + match lock_obj { + Ok(val) => val, + Err(LockfileError::Serialization(err)) => { + // interactive discard of lockfile if in an interactive shell + if std::io::stderr().is_terminal() + && tokio::task::spawn_blocking(|| { + dialoguer::Confirm::new() + .with_prompt("lockfile is corrupt, discard?") + .default(false) + .interact() + }) + .await + .expect_or_log("tokio error") + .wrap_err("prompt error")? + { + None + } else { + return Err(ferr!(err).wrap_err("corrupt lockfile")); + } + } + Err(LockfileError::Other(err)) => return Err(err), + }, ); if hcx.config.locked { @@ -80,10 +112,9 @@ pub async fn systems_from_ghjkfile( ( matches!(tokio::fs::try_exists(path).await, Ok(true)), Some( - hashfile::file_content_digest_hash(hcx.as_ref(), path) + hashfile::file_digest_hash(hcx.as_ref(), path) .await? - .await - .map_err(|err| ferr!(err))?, + .unwrap(), ), ) } else { @@ -98,10 +129,17 @@ pub async fn systems_from_ghjkfile( } if !hcx.config.locked && (hcx.config.re_serialize + // no need for expensive staleness checks if the ghjkfile + // no longer exists || ghjkfile_hash.is_none() || obj .is_stale(hcx.as_ref(), ghjkfile_hash.as_ref().unwrap()) - .await?) + .await + .inspect(|is_stale| { + if *is_stale { + debug!("stale hashfile, discarding") + } + })?) { hash_obj = None; } @@ -253,12 +291,12 @@ impl GhjkfileSystems { warn!("locked flag set, changes to lockfile discarded"); } else { trace!(lockfile_path = ?self.lockfile_path, /* ?lock_obj, */ "writing lock.json"); - /* tokio::fs::write( + tokio::fs::write( &self.lockfile_path, serde_json::to_vec_pretty(&lock_obj).expect_or_log("error jsonifying lockfile"), ) .await - .wrap_err("error writing to lockfile")?; */ + .wrap_err("error writing to lockfile")?; self.old_lock_obj.replace(lock_obj); } } @@ -270,13 +308,13 @@ impl GhjkfileSystems { unreachable!("code should have early exited"); } trace!(hashfile_path = ?self.hashfile_path, /* hash_obj= ?self.hash_obj, */ "writing hash.json"); - /* tokio::fs::write( + tokio::fs::write( &self.hashfile_path, serde_json::to_vec_pretty(&self.hash_obj) .expect_or_log("error jsonifying hashfile"), ) .await - .wrap_err("error writing to lockfile")?; */ + .wrap_err("error writing to lockfile")?; self.hashfile_written = true; } Ok(()) @@ -318,14 +356,24 @@ pub struct LockObj { pub config: Arc, } +#[derive(Debug, thiserror::Error)] +pub enum LockfileError { + #[error("error parsing lockfile:{0}")] + Serialization(serde_json::Error), + #[error("{0}")] + Other(eyre::Report), +} + impl LockObj { /// The lock.json file stores the serialized config and some entries /// from systems. It's primary purpose is as a memo store to avoid /// re-serialization on each CLI invocation. - pub async fn from_file(path: &Path) -> Res { - let raw = tokio::fs::read(path) - .await - .wrap_err("error reading hash.json")?; - serde_json::from_slice(&raw).wrap_err("error parsing lock.json") + pub async fn from_file(path: &Path) -> Result, LockfileError> { + let raw = match tokio::fs::read(path).await { + Ok(val) => val, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(LockfileError::Other(ferr!("error reading hashfile: {err}"))), + }; + serde_json::from_slice(&raw).map_err(LockfileError::Serialization) } } diff --git a/src/ghjk/host/hashfile.rs b/src/ghjk/host/hashfile.rs index 47423b8..038c9e8 100644 --- a/src/ghjk/host/hashfile.rs +++ b/src/ghjk/host/hashfile.rs @@ -16,6 +16,14 @@ pub struct HashObj { pub listed_files: Vec, } +#[derive(Debug, thiserror::Error)] +pub enum HashfileError { + #[error("error parsing hashfile: {0}")] + Serialization(serde_json::Error), + #[error("{0}")] + Other(eyre::Report), +} + impl HashObj { #[tracing::instrument(skip(hcx, res))] pub async fn from_result( @@ -52,15 +60,19 @@ impl HashObj { /// of a ghjkfile during serialization. The primary purpose is to /// do "cache invalidation" on ghjkfiles, re-serializing them if /// any of the digests change. - pub async fn from_file(path: &Path) -> Res { - let raw = tokio::fs::read(path) - .await - .wrap_err("error reading hash.json")?; - serde_json::from_slice(&raw).wrap_err("error parsing hash.json") + pub async fn from_file(path: &Path) -> Result, HashfileError> { + let raw = match tokio::fs::read(path).await { + Ok(val) => val, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), + Err(err) => return Err(HashfileError::Other(ferr!("error reading hashfile: {err}"))), + }; + serde_json::from_slice(&raw).map_err(HashfileError::Serialization) } + #[tracing::instrument(skip(hcx))] pub async fn is_stale(&self, hcx: &HostCtx, ghjkfile_hash: &str) -> Res { if self.ghjkfile_hash != ghjkfile_hash { + trace!("stale ghjkfile hash"); return Ok(true); } { @@ -69,12 +81,14 @@ impl HashObj { self.env_var_hashes.keys().map(|key| &key[..]), ); if self.env_var_hashes != new_digest { + trace!("stale env var digests"); return Ok(true); } } { for path in &self.listed_files { if !matches!(tokio::fs::try_exists(path).await, Ok(true)) { + trace!("stale listed files"); return Ok(true); } } @@ -90,6 +104,7 @@ impl HashObj { ) .await? { + trace!("stale read files digest"); return Ok(true); } } @@ -128,7 +143,7 @@ async fn file_digests( Ok(out.into_iter().collect()) } -async fn file_digest_hash(hcx: &HostCtx, path: &Path) -> Res> { +pub async fn file_digest_hash(hcx: &HostCtx, path: &Path) -> Res> { let path = tokio::fs::canonicalize(path) .await .wrap_err("error resolving realpath")?; @@ -158,7 +173,7 @@ async fn file_digest_hash(hcx: &HostCtx, path: &Path) -> Res> { pub type SharedFileContentDigestFuture = futures::future::Shared>>; -pub async fn file_content_digest_hash( +async fn file_content_digest_hash( hcx: &HostCtx, path: &Path, ) -> Res { diff --git a/src/ghjk/log.rs b/src/ghjk/log.rs index 130b02f..bbb3b7c 100644 --- a/src/ghjk/log.rs +++ b/src/ghjk/log.rs @@ -84,12 +84,14 @@ Args: {args:?} let filter = tracing_subscriber::EnvFilter::from_default_env(); tracing_subscriber::registry() - // filter on values from RUST_LOG - .with(filter) - // subscriber that emits to stderr - .with(fmt) - // instrument errors with SpanTraces, used by color-eyre - .with(tracing_error::ErrorLayer::default()) - .init(); + .with(console_subscriber::spawn()) + // filter on values from RUST_LOG + .with(filter) + // subscriber that emits to stderr + .with(fmt) + // instrument errors with SpanTraces, used by color-eyre + .with(tracing_error::ErrorLayer::default()) + .init(); + // console_subscriber::init(); }); } diff --git a/src/ghjk/main.rs b/src/ghjk/main.rs index abbdb26..9c78c3e 100644 --- a/src/ghjk/main.rs +++ b/src/ghjk/main.rs @@ -69,6 +69,6 @@ shadow!(shadow); #[derive(Debug)] pub struct GhjkCtx { - deno: denort::DenoWorkerHandle, + deno: denort::worker::DenoWorkerHandle, config: config::Config, } diff --git a/src/ghjk/systems/deno.rs b/src/ghjk/systems/deno.rs index 738bfaa..a393c85 100644 --- a/src/ghjk/systems/deno.rs +++ b/src/ghjk/systems/deno.rs @@ -97,7 +97,7 @@ pub async fn systems_from_deno( .boxed() }), ); - let cb_line = ext_conf.callbacks_handle(); + let cb_line = ext_conf.callbacks_handle(&gcx.deno); let mut worker = gcx .deno @@ -144,16 +144,12 @@ pub async fn systems_from_deno( let err = match exit_code_channel.await.expect_or_log("channel error") { Ok(0) => return Ok(()), Ok(exit_code) => { - error!(%exit_code, "deno systems died with non-zero exit code"); - let err = ferr!("deno systems died with non-zero exit code: {exit_code}"); - error!("{err}"); - err + ferr!("deno systems died with non-zero exit code: {exit_code}") } Err(err) => err.wrap_err("error on event loop for deno systems"), }; - // TODO: better exit signals - debug!("killing whole deno context"); - dcx.terminate().await.unwrap(); + error!("deno systems error: {err:?}"); + dcx.terminate().await.expect_or_log("error terminating deno worker"); Err(err) }); diff --git a/src/ghjk/utils.rs b/src/ghjk/utils.rs index 3d7583c..937e0a3 100644 --- a/src/ghjk/utils.rs +++ b/src/ghjk/utils.rs @@ -19,7 +19,7 @@ mod cheapstr { }; // lifted from github.com/bevyengine/bevy 's bevy_core/Name struct // MIT/APACHE2 licence - #[derive(Debug, Clone, Serialize, Deserialize)] + #[derive(Clone, Serialize, Deserialize)] #[serde(crate = "serde", from = "String", into = "String")] pub struct CHeapStr { hash: u64, @@ -128,6 +128,12 @@ mod cheapstr { self.string.fmt(f) } } + + impl std::fmt::Debug for CHeapStr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.string.fmt(f) + } + } } const SHA2_256: u64 = 0x12; @@ -238,18 +244,18 @@ pub async fn find_entry_recursive(from: &Path, name: &str) -> Res { + Ok(true) => { return Ok(Some(location)); } - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + Err(err) if err.kind() != std::io::ErrorKind::NotFound => { + return Err(err).wrap_err("error on file stat"); + } + _ => { let Some(next_cur) = cur.parent() else { return Ok(None); }; cur = next_cur; } - Err(err) => { - return Err(err).wrap_err("error on file stat"); - } } } } diff --git a/tests/envs.ts b/tests/envs.ts index 05106a7..c9c6b74 100644 --- a/tests/envs.ts +++ b/tests/envs.ts @@ -47,6 +47,7 @@ const envVarTestEnvs: EnvDefArgs[] = [ ]; const envVarTestsPosix = ` set -ex +env # by default, we should be in main [ "$SONG" = "ditto" ] || exit 1010 [ "$GHJK_ENV" = "main" ] || exit 1011 diff --git a/tests/utils.ts b/tests/utils.ts index b468677..87be25b 100644 --- a/tests/utils.ts +++ b/tests/utils.ts @@ -128,7 +128,7 @@ export async function localE2eTest(testCase: E2eTestCase) { const ghjkExePath = $.path(import.meta.resolve("../target/debug/ghjk")); - const env: Record = { + const env: Record = { GHJK_AUTO_HOOK: "true", BASH_ENV: `${ghjkDataDir.toString()}/env.bash`, ZDOTDIR: ghjkDataDir.toString(), @@ -139,6 +139,8 @@ export async function localE2eTest(testCase: E2eTestCase) { // share the system's deno cache GHJK_DENO_DIR: Deno.env.get("DENO_DIR") ?? $.path(Deno.env.get("HOME")!).join(".cache", "deno").toString(), + RUST_LOG: $.dbg(Deno.env.get("RUST_LOG"), "RUST_LOG"), + GHJK_LOG: Deno.env.get("GHJK_LOG"), ...testEnvs, }; // install ghjk