From 2459c5e5cde5dbdadeb6ed505edea0689faa9125 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:23:01 +0800 Subject: [PATCH 1/5] feat(bin): abort connections before dropping temp databases in parallel run Signed-off-by: Bugen Zhao --- sqllogictest-bin/src/main.rs | 38 +++++++++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/sqllogictest-bin/src/main.rs b/sqllogictest-bin/src/main.rs index f6e48e2..97e3d10 100644 --- a/sqllogictest-bin/src/main.rs +++ b/sqllogictest-bin/src/main.rs @@ -20,6 +20,7 @@ use sqllogictest::{ default_column_validator, default_normalizer, default_validator, update_record_with_output, AsyncDB, Injected, MakeConnection, Record, Runner, }; +use utils::AbortOnDropHandle; #[derive(Default, Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] #[must_use] @@ -314,7 +315,7 @@ async fn run_parallel( } } - let mut stream = futures::stream::iter(create_databases.into_iter()) + let mut stream = futures::stream::iter(create_databases) .map(|(db_name, filename)| { let mut config = config.clone(); config.db.clone_from(&db_name); @@ -322,13 +323,13 @@ async fn run_parallel( let engine = engine.clone(); let labels = labels.to_vec(); async move { - let (buf, res) = tokio::spawn(async move { + let (buf, res) = AbortOnDropHandle(tokio::spawn(async move { let mut buf = vec![]; let res = connect_and_run_test_file(&mut buf, filename, &engine, config, &labels) .await; (buf, res) - }) + })) .await .unwrap(); (db_name, file, res, buf) @@ -396,6 +397,10 @@ async fn run_parallel( start.elapsed().as_millis() ); + // Abort running cases and drop the session connections to the DB server + // before dropping temporary databases. + drop(stream); + for db_name in db_names { if keep_db_on_failure && failed_db.contains(&db_name) { eprintln!( @@ -830,3 +835,30 @@ async fn update_record( Ok(()) } + +mod utils { + use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + }; + + use tokio::task::{JoinError, JoinHandle}; + + /// A wrapper around a [`tokio::task::JoinHandle`], which aborts the task when it is dropped. + pub struct AbortOnDropHandle(pub JoinHandle); + + impl Drop for AbortOnDropHandle { + fn drop(&mut self) { + self.0.abort(); + } + } + + impl Future for AbortOnDropHandle { + type Output = Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.0).poll(cx) + } + } +} From 0fa2c562c3d9b131311a6bdfda4baded92fc0df3 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:23:55 +0800 Subject: [PATCH 2/5] fmt Signed-off-by: Bugen Zhao --- sqllogictest-bin/src/main.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sqllogictest-bin/src/main.rs b/sqllogictest-bin/src/main.rs index 97e3d10..cf92cd2 100644 --- a/sqllogictest-bin/src/main.rs +++ b/sqllogictest-bin/src/main.rs @@ -837,11 +837,9 @@ async fn update_record( } mod utils { - use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, - }; + use std::future::Future; + use std::pin::Pin; + use std::task::{Context, Poll}; use tokio::task::{JoinError, JoinHandle}; From ea9f62450bd432a419be4404d214fcec217bb602 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:26:42 +0800 Subject: [PATCH 3/5] refine comments Signed-off-by: Bugen Zhao --- sqllogictest-bin/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sqllogictest-bin/src/main.rs b/sqllogictest-bin/src/main.rs index cf92cd2..50ec5d1 100644 --- a/sqllogictest-bin/src/main.rs +++ b/sqllogictest-bin/src/main.rs @@ -397,8 +397,8 @@ async fn run_parallel( start.elapsed().as_millis() ); - // Abort running cases and drop the session connections to the DB server - // before dropping temporary databases. + // If `fail_fast`, there could be some ongoing cases (then active connections) + // in the stream. Abort them before dropping temporary databases. drop(stream); for db_name in db_names { From 140e3bfe5eab074948f41c5c14af514e4dc14b5c Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:31:33 +0800 Subject: [PATCH 4/5] use one from `tokio-util` Signed-off-by: Bugen Zhao --- Cargo.lock | 9 +++++++++ sqllogictest-bin/Cargo.toml | 1 + sqllogictest-bin/src/main.rs | 29 ++--------------------------- 3 files changed, 12 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61011e5..221f501 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -779,6 +779,12 @@ dependencies = [ "ahash", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.2" @@ -1923,6 +1929,7 @@ dependencies = [ "sqllogictest", "sqllogictest-engines", "tokio", + "tokio-util", "tracing", "tracing-subscriber", ] @@ -2212,6 +2219,8 @@ dependencies = [ "bytes", "futures-core", "futures-sink", + "futures-util", + "hashbrown 0.14.5", "pin-project-lite", "tokio", ] diff --git a/sqllogictest-bin/Cargo.toml b/sqllogictest-bin/Cargo.toml index f8aa4b4..6587463 100644 --- a/sqllogictest-bin/Cargo.toml +++ b/sqllogictest-bin/Cargo.toml @@ -33,6 +33,7 @@ tokio = { version = "1", features = [ "fs", "process", ] } +tokio-util = { version = "0.7.12", features = ["rt"] } fs-err = "3.0.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } tracing = "0.1" diff --git a/sqllogictest-bin/src/main.rs b/sqllogictest-bin/src/main.rs index 50ec5d1..2c6dd46 100644 --- a/sqllogictest-bin/src/main.rs +++ b/sqllogictest-bin/src/main.rs @@ -20,7 +20,7 @@ use sqllogictest::{ default_column_validator, default_normalizer, default_validator, update_record_with_output, AsyncDB, Injected, MakeConnection, Record, Runner, }; -use utils::AbortOnDropHandle; +use tokio_util::task::AbortOnDropHandle; #[derive(Default, Copy, Clone, Debug, PartialEq, Eq, ValueEnum)] #[must_use] @@ -323,7 +323,7 @@ async fn run_parallel( let engine = engine.clone(); let labels = labels.to_vec(); async move { - let (buf, res) = AbortOnDropHandle(tokio::spawn(async move { + let (buf, res) = AbortOnDropHandle::new(tokio::spawn(async move { let mut buf = vec![]; let res = connect_and_run_test_file(&mut buf, filename, &engine, config, &labels) @@ -835,28 +835,3 @@ async fn update_record( Ok(()) } - -mod utils { - use std::future::Future; - use std::pin::Pin; - use std::task::{Context, Poll}; - - use tokio::task::{JoinError, JoinHandle}; - - /// A wrapper around a [`tokio::task::JoinHandle`], which aborts the task when it is dropped. - pub struct AbortOnDropHandle(pub JoinHandle); - - impl Drop for AbortOnDropHandle { - fn drop(&mut self) { - self.0.abort(); - } - } - - impl Future for AbortOnDropHandle { - type Output = Result; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - Pin::new(&mut self.0).poll(cx) - } - } -} From 76027ba009cbb105034f80128bcd486c484ec012 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 14 Jan 2025 14:44:19 +0800 Subject: [PATCH 5/5] bump version and add release notes Signed-off-by: Bugen Zhao --- CHANGELOG.md | 4 ++++ Cargo.lock | 6 +++--- Cargo.toml | 2 +- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 78b14ed..62c0a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## Unreleased +## [0.26.3] - 2025-01-14 + +* bin: when `--fail-fast` is enabled, abort all remaining connections before dropping temporary databases. + ## [0.26.2] - 2025-01-08 * bin: support `--fail-fast`, and add env vars `SLT_FAIL_FAST` and `SLT_KEEP_DB_ON_FAILURE` diff --git a/Cargo.lock b/Cargo.lock index 221f501..7d60aca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1890,7 +1890,7 @@ dependencies = [ [[package]] name = "sqllogictest" -version = "0.26.2" +version = "0.26.3" dependencies = [ "async-trait", "educe", @@ -1913,7 +1913,7 @@ dependencies = [ [[package]] name = "sqllogictest-bin" -version = "0.26.2" +version = "0.26.3" dependencies = [ "anyhow", "async-trait", @@ -1936,7 +1936,7 @@ dependencies = [ [[package]] name = "sqllogictest-engines" -version = "0.26.2" +version = "0.26.3" dependencies = [ "async-trait", "bytes", diff --git a/Cargo.toml b/Cargo.toml index b8c8356..52e6c96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ resolver = "2" members = ["sqllogictest", "sqllogictest-bin", "sqllogictest-engines", "tests"] [workspace.package] -version = "0.26.2" +version = "0.26.3" edition = "2021" homepage = "https://github.com/risinglightdb/sqllogictest-rs" keywords = ["sql", "database", "parser", "cli"]