diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 51aed6c..1517cff 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -86,17 +86,17 @@ jobs: run: brew install fish zsh coreutils cmake - run: deno task test - test-action: - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - uses: dsherret/rust-toolchain-file@v1 - - uses: metatypedev/setup-ghjk@318209a9d215f70716a4ac89dbeb9653a2deb8bc - with: - installer-url: ./install.ts - env: - GHJKFILE: ./examples/protoc/ghjk.ts - - run: | - cd examples/tasks - . $(ghjk print share-dir-path)/env.sh - ghjk x hey + # test-action: + # runs-on: ubuntu-latest + # steps: + # - uses: actions/checkout@v4 + # - uses: dsherret/rust-toolchain-file@v1 + # - uses: metatypedev/setup-ghjk@318209a9d215f70716a4ac89dbeb9653a2deb8bc + # with: + # installer-url: ./install.ts + # env: + # GHJKFILE: ./examples/protoc/ghjk.ts + # - run: | + # cd examples/tasks + # . $(ghjk print share-dir-path)/env.sh + # ghjk x hey diff --git a/Cargo.lock b/Cargo.lock index 3bcd8a2..6b1776b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3623,7 +3623,6 @@ dependencies = [ "directories", "educe", "futures", - "futures-concurrency", "indexmap 2.6.0", "itertools 0.13.0", "json-canon", diff --git a/install/hook.fish b/install/hook.fish index 0d0d714..775063f 100644 --- a/install/hook.fish +++ b/install/hook.fish @@ -109,7 +109,7 @@ function __ghjk_preexec --on-event fish_preexec # activate script has reloaded else if set --query GHJK_LAST_ENV_DIR; - and test -e $next_env_dir/activate.fish; + and test -e $GHJK_LAST_ENV_DIR/activate.fish; and test (__ghjk_get_mtime_ts $GHJK_LAST_ENV_DIR/activate.fish) -gt $GHJK_LAST_ENV_DIR_MTIME; ghjk_hook end diff --git a/modules/envs/posix.ts b/modules/envs/posix.ts index d7466dd..323fa05 100644 --- a/modules/envs/posix.ts +++ b/modules/envs/posix.ts @@ -302,7 +302,7 @@ async function writeActivators( }), ``, `# hooks that want to invoke ghjk are made to rely`, - `# on this shim to improve to improve reliablity`, + `# on this shim to improve reliablity`, ghjk_sh(gcx, ghjkShimName), ``, `# only run the hooks in interactive mode`, diff --git a/src/deno_systems/types.ts b/src/deno_systems/types.ts index 0f887fc..85b0e17 100644 --- a/src/deno_systems/types.ts +++ b/src/deno_systems/types.ts @@ -73,8 +73,8 @@ const cliCommandBase = zod.object({ before_help: zod.string().optional(), before_long_help: zod.string().optional(), - args: zod.record(cliArg).optional().optional(), - flags: zod.record(cliFlag).optional().optional(), + args: zod.record(cliArg).optional(), + flags: zod.record(cliFlag).optional(), }); const flagsAndArgs = zod.record( diff --git a/src/ghjk/Cargo.toml b/src/ghjk/Cargo.toml index 093db3e..c6b652e 100644 --- a/src/ghjk/Cargo.toml +++ b/src/ghjk/Cargo.toml @@ -53,7 +53,6 @@ tracing-appender = "0.2" tracing-futures = "0.2" async-trait = "0.1.83" -futures-concurrency = "7.6.2" futures = { version = "=0.3.30", default-features = false, features = ["std", "async-await"] } tokio = { workspace = true, features = ["full", "parking_lot", "tracing"] } tokio-stream = "0.1" diff --git a/src/ghjk/ext/callbacks.rs b/src/ghjk/ext/callbacks.rs index e41ff24..804a28e 100644 --- a/src/ghjk/ext/callbacks.rs +++ b/src/ghjk/ext/callbacks.rs @@ -26,7 +26,7 @@ pub enum CallbackError { } struct CallbackCtx { - rx: mpsc::Receiver, + msg_rx: mpsc::Receiver, term_signal: tokio::sync::watch::Receiver, } @@ -34,7 +34,10 @@ struct CallbackCtx { /// invocations. #[derive(Default)] pub struct CallbackLine { + /// This would be None if the callback line was already + /// taken or if the callback line was not initially set cx: Option, + /// Indicates weather the callback line was initially set was_set: bool, } @@ -45,7 +48,7 @@ impl CallbackLine { Self { was_set: true, cx: Some(CallbackCtx { - rx: msg_rx, + msg_rx, term_signal: dworker.term_signal_watcher(), }), }, @@ -116,7 +119,7 @@ pub struct Callbacks { /// support callbacks. pub fn worker(config: &ExtConfig) -> Option { let CallbackCtx { - mut rx, + msg_rx: mut rx, term_signal, } = { let mut line = config.callbacks_rx.lock().expect_or_log("mutex err"); @@ -201,15 +204,10 @@ impl Callbacks { .expect_or_log("got None from callback call"); if tc_scope.has_caught() { let exception = tc_scope.exception().unwrap(); - let exception = exception.to_rust_string_lossy(tc_scope); - /* let exception = serde_v8::from_v8(tc_scope, exception).map_err(|err| { - CallbackError::ProtocolError( - ferr!(err).wrap_err("error deserializaing exception from v8"), - ) - })?; */ - return Err(CallbackError::JsError(ferr!( - "callback exception: {exception}" - ))); + return Err(CallbackError::JsError( + ferr!(js_error_message(tc_scope, exception)) + .wrap_err("callback exception"), + )); } res }; @@ -378,6 +376,7 @@ pub fn op_callbacks_set( warn!("callback set but callback feature is not enabled"); anyhow::bail!("callbacks feature is not enabled"); }; + debug!(%name, "registering callback"); callbacks.store.insert( name.into(), Callback { diff --git a/src/ghjk/host.rs b/src/ghjk/host.rs index 3303fc0..4cbcb10 100644 --- a/src/ghjk/host.rs +++ b/src/ghjk/host.rs @@ -62,12 +62,10 @@ pub async fn systems_from_ghjkfile( ); // read both files concurrently - let (hash_obj, lock_obj) = ( + let (hash_obj, lock_obj) = futures::join!( HashObj::from_file(&hashfile_path), LockObj::from_file(&lockfile_path), - ) - .join() - .await; + ); // discard corrupt files if needed let (mut hash_obj, mut lock_obj) = ( @@ -127,7 +125,7 @@ pub async fn systems_from_ghjkfile( // check if we need to discard the hashfile if let Some(obj) = &mut hash_obj { - // TODO: version migrator + // NOTE: version migrator would go here if obj.version != "0" { eyre::bail!("unsupported hashfile version: {:?}", obj.version); } @@ -154,9 +152,6 @@ pub async fn systems_from_ghjkfile( if obj.version != "0" { eyre::bail!("unsupported hashfile version: {:?}", obj.version); } - // if obj.version != "0" { - // hash_obj = None; - // } } // TODO: // if hcx.re_resolve {} diff --git a/src/ghjk/host/hashfile.rs b/src/ghjk/host/hashfile.rs index b8fc461..5ec14e0 100644 --- a/src/ghjk/host/hashfile.rs +++ b/src/ghjk/host/hashfile.rs @@ -127,41 +127,26 @@ async fn file_digests( hcx: &HostCtx, read_files: Vec<&Path>, ) -> Res>> { - // FIXME: this will exhaust memory if the number of files is large - // ConcurrentStream supports limiting concurrency but has a bug - // tracked at https://github.com/yoshuawuyts/futures-concurrency/issues/203 - let mut map = futures::future::join_all( - read_files - .into_iter() - .map(|path| { - async move { - let path = std::path::absolute(path)?; - let hash = file_digest_hash(hcx, &path).await?; - let relative_path = pathdiff::diff_paths(path, &hcx.config.cwd).unwrap(); - Ok((relative_path, hash)) - } - .boxed() - }) - .collect::>(), - ) + use futures::StreamExt; + let mut map = futures::stream::iter(read_files.into_iter().map(|path| { + async move { + let path = std::path::absolute(path)?; + let hash = file_digest_hash(hcx, &path).await?; + let relative_path = pathdiff::diff_paths(path, &hcx.config.cwd).unwrap(); + Ok((relative_path, hash)) + } + .boxed() + })) + .buffer_unordered(16) + .collect::>() .await .into_iter() .collect::>>()?; map.sort_unstable_keys(); Ok(map) - /* let out = read_files - .into_co_stream() - .map(|path| async move { - let path = tokio::fs::canonicalize(path).await?; - let hash = file_digest_hash(hcx, &path).await?; - let relative_path = pathdiff::diff_paths(path, &hcx.config.cwd).unwrap(); - Ok((relative_path, hash)) - }) - .collect::>>() - .await?; - Ok(out.into_iter().collect()) */ } +#[tracing::instrument(skip(hcx))] pub async fn file_digest_hash(hcx: &HostCtx, path: &Path) -> Res> { let path = match tokio::fs::canonicalize(path).await { Ok(val) => val, @@ -172,13 +157,23 @@ pub async fn file_digest_hash(hcx: &HostCtx, path: &Path) -> Res> }; match tokio::fs::metadata(&path).await { Ok(stat) => { + const LARGE_FILE_THRESHOLD: u64 = 100 * 1024 * 1024; // 100MB + let content_hash = if stat.file_type().is_file() || stat.file_type().is_symlink() { - Some( - file_content_digest_hash(hcx, &path) - .await? - .await - .map_err(|err| ferr!(err))?, - ) + if stat.len() > LARGE_FILE_THRESHOLD { + warn!( + len = stat.len(), + "large file detected, skippin content hash" + ); + None + } else { + Some( + file_content_digest_hash(hcx, &path) + .await? + .await + .map_err(|err| ferr!(err))?, + ) + } } else { None }; diff --git a/src/ghjk/log.rs b/src/ghjk/log.rs index 4aa0f03..5aba03c 100644 --- a/src/ghjk/log.rs +++ b/src/ghjk/log.rs @@ -35,12 +35,12 @@ Args: {args:?} let eyre_panic_hook = eyre_panic_hook.into_panic_hook(); std::panic::set_hook(Box::new(move |panic_info| { - if let Some(msg) = panic_info.payload().downcast_ref::<&str>() { - if msg.contains("A Tokio 1.x context was found, but it is being shutdown.") { - warn!("improper shutdown, make sure to terminate all workers first"); - return; - } - } else if let Some(msg) = panic_info.payload().downcast_ref::() { + if let Some(msg) = panic_info + .payload() + .downcast_ref::() + .map(|val| val.as_str()) + .or_else(|| panic_info.payload().downcast_ref::<&str>().cloned()) + { if msg.contains("A Tokio 1.x context was found, but it is being shutdown.") { warn!("improper shutdown, make sure to terminate all workers first"); return; diff --git a/src/ghjk/main.rs b/src/ghjk/main.rs index bb59ac3..95c41ad 100644 --- a/src/ghjk/main.rs +++ b/src/ghjk/main.rs @@ -18,7 +18,6 @@ mod interlude { }; pub use eyre::{format_err as ferr, Context, Result as Res, WrapErr}; pub use futures::{future::BoxFuture, FutureExt}; - pub use futures_concurrency::{future::Join, prelude::*}; pub use indexmap::IndexMap; pub use serde::{Deserialize, Serialize}; pub use serde_json::json; diff --git a/src/ghjk/systems/deno/cli.rs b/src/ghjk/systems/deno/cli.rs index 78dff72..668d47d 100644 --- a/src/ghjk/systems/deno/cli.rs +++ b/src/ghjk/systems/deno/cli.rs @@ -353,18 +353,18 @@ impl From for clap::ValueHint { use ValueHintSerde::*; match val { Unknown => clap::ValueHint::Unknown, - Other => clap::ValueHint::Unknown, - AnyPath => clap::ValueHint::Unknown, - FilePath => clap::ValueHint::Unknown, - DirPath => clap::ValueHint::Unknown, - ExecutablePath => clap::ValueHint::Unknown, - CommandName => clap::ValueHint::Unknown, - CommandString => clap::ValueHint::Unknown, - CommandWithArguments => clap::ValueHint::Unknown, - Username => clap::ValueHint::Unknown, - Hostname => clap::ValueHint::Unknown, - Url => clap::ValueHint::Unknown, - EmailAddress => clap::ValueHint::Unknown, + Other => clap::ValueHint::Other, + AnyPath => clap::ValueHint::AnyPath, + FilePath => clap::ValueHint::FilePath, + DirPath => clap::ValueHint::DirPath, + ExecutablePath => clap::ValueHint::ExecutablePath, + CommandName => clap::ValueHint::CommandName, + CommandString => clap::ValueHint::CommandString, + CommandWithArguments => clap::ValueHint::CommandWithArguments, + Username => clap::ValueHint::Username, + Hostname => clap::ValueHint::Hostname, + Url => clap::ValueHint::Url, + EmailAddress => clap::ValueHint::EmailAddress, } } } diff --git a/src/ghjk/utils.rs b/src/ghjk/utils.rs index c582840..f3d7422 100644 --- a/src/ghjk/utils.rs +++ b/src/ghjk/utils.rs @@ -4,7 +4,7 @@ use std::io::Write; #[inline] pub fn default() -> T { - std::default::Default::default() + T::default() } pub type DHashMap = dashmap::DashMap; diff --git a/tests/ports.ts b/tests/ports.ts index f1a8cae..8487bba 100644 --- a/tests/ports.ts +++ b/tests/ports.ts @@ -29,12 +29,15 @@ const cases: CustomE2eTestCase[] = [ ePoint: `jq --version`, }, { - name: "jq", + name: "asdf-jq", + ePoint: `jq --version`, installConf: ports.asdf({ pluginRepo: "https://github.com/lsanwick/asdf-jq", installType: "version", }), - ePoint: `jq --version`, + secureConf: { + enableRuntimes: true, + }, }, // 3 megs {