From 86bb0833cb24223674aa99e42cc6881d1e908636 Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Thu, 18 Jul 2024 10:50:52 -0500 Subject: [PATCH 1/5] sqlite: fix inconsistent read-after-write fetch_one/fetch_optional --- sqlx-sqlite/src/connection/executor.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index ebc2908c88..ea39b963f8 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -64,13 +64,15 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { futures_util::pin_mut!(stream); + let mut out = Ok(None); + while let Some(res) = stream.try_next().await? { if let Either::Right(row) = res { - return Ok(Some(row)); + out = Ok(Some(row)); } } - Ok(None) + out }) } From 6e0f5bed7049971cd599887f6afdd88dd6ce79d3 Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Mon, 22 Jul 2024 22:46:58 -0500 Subject: [PATCH 2/5] try pushing fetch_optional early-return into worker --- sqlx-sqlite/src/any.rs | 4 ++-- sqlx-sqlite/src/connection/executor.rs | 10 ++++------ sqlx-sqlite/src/connection/mod.rs | 5 +++++ sqlx-sqlite/src/connection/worker.rs | 25 +++++++++++++++++++++---- 4 files changed, 32 insertions(+), 12 deletions(-) diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 5e422655bf..13cc1247c7 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -83,7 +83,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin( self.worker - .execute(query, args, self.row_channel_size, persistent) + .execute(query, args, self.row_channel_size, persistent, crate::connection::Returning::Many) .map_ok(flume::Receiver::into_stream) .try_flatten_stream() .map( @@ -107,7 +107,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(query, args, self.row_channel_size, persistent) + .execute(query, args, self.row_channel_size, persistent, crate::connection::Returning::One) .map_ok(flume::Receiver::into_stream) .await?; futures_util::pin_mut!(stream); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index ea39b963f8..0532e71049 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -32,7 +32,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin( self.worker - .execute(sql, arguments, self.row_channel_size, persistent) + .execute(sql, arguments, self.row_channel_size, persistent, crate::connection::Returning::Many) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(), ) @@ -58,21 +58,19 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(sql, arguments, self.row_channel_size, persistent) + .execute(sql, arguments, self.row_channel_size, persistent, crate::connection::Returning::One) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(); futures_util::pin_mut!(stream); - let mut out = Ok(None); - while let Some(res) = stream.try_next().await? { if let Either::Right(row) = res { - out = Ok(Some(row)); + return Ok(Some(row)) } } - out + Ok(None) }) } diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index 3588b94f82..5d906c6137 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -426,3 +426,8 @@ impl Statements { self.temp = None; } } + +pub(crate) enum Returning { + Many, + One +} diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 18e34aae86..9cc8559fbf 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -21,6 +21,8 @@ use crate::connection::execute; use crate::connection::ConnectionState; use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow, SqliteStatement}; +use super::Returning; + // Each SQLite connection has a dedicated thread. // TODO: Tweak this so that we can use a thread pool per pool of SQLite3 connections to reduce @@ -52,6 +54,7 @@ enum Command { arguments: Option>, persistent: bool, tx: flume::Sender, Error>>, + returning: Returning }, Begin { tx: rendezvous_oneshot::Sender>, @@ -136,6 +139,7 @@ impl ConnectionWorker { arguments, persistent, tx, + returning } => { let iter = match execute::iter(&mut conn, &query, arguments, persistent) { @@ -146,10 +150,21 @@ impl ConnectionWorker { } }; - for res in iter { - if tx.send(res).is_err() { - break; - } + match returning { + Returning::Many => { + for res in iter { + if tx.send(res).is_err() { + break; + } + } + }, + Returning::One => { + let mut iter = iter; + if let Some(res) = iter.next() { + drop(iter); + let _ = tx.send(res); + } + }, } update_cached_statements_size(&conn, &shared.cached_statements_size); @@ -284,6 +299,7 @@ impl ConnectionWorker { args: Option>, chan_size: usize, persistent: bool, + returning: Returning ) -> Result, Error>>, Error> { let (tx, rx) = flume::bounded(chan_size); @@ -294,6 +310,7 @@ impl ConnectionWorker { arguments: args.map(SqliteArguments::into_static), persistent, tx, + returning }, Span::current(), )) From 04be563ea7495fd6da6d3e25d3609bc9b9b85f3a Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Tue, 23 Jul 2024 13:03:06 -0500 Subject: [PATCH 3/5] run cargo fmt --- sqlx-sqlite/src/any.rs | 16 ++++++++++++++-- sqlx-sqlite/src/connection/executor.rs | 18 +++++++++++++++--- sqlx-sqlite/src/connection/mod.rs | 2 +- sqlx-sqlite/src/connection/worker.rs | 6 +++--- 4 files changed, 33 insertions(+), 9 deletions(-) diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 13cc1247c7..646687702d 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -83,7 +83,13 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin( self.worker - .execute(query, args, self.row_channel_size, persistent, crate::connection::Returning::Many) + .execute( + query, + args, + self.row_channel_size, + persistent, + crate::connection::Returning::Many, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream() .map( @@ -107,7 +113,13 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(query, args, self.row_channel_size, persistent, crate::connection::Returning::One) + .execute( + query, + args, + self.row_channel_size, + persistent, + crate::connection::Returning::One, + ) .map_ok(flume::Receiver::into_stream) .await?; futures_util::pin_mut!(stream); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index 0532e71049..1dc2dd4b88 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -32,7 +32,13 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin( self.worker - .execute(sql, arguments, self.row_channel_size, persistent, crate::connection::Returning::Many) + .execute( + sql, + arguments, + self.row_channel_size, + persistent, + crate::connection::Returning::Many, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(), ) @@ -58,7 +64,13 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin(async move { let stream = self .worker - .execute(sql, arguments, self.row_channel_size, persistent, crate::connection::Returning::One) + .execute( + sql, + arguments, + self.row_channel_size, + persistent, + crate::connection::Returning::One, + ) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(); @@ -66,7 +78,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { while let Some(res) = stream.try_next().await? { if let Either::Right(row) = res { - return Ok(Some(row)) + return Ok(Some(row)); } } diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index 5d906c6137..a32d60b866 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -429,5 +429,5 @@ impl Statements { pub(crate) enum Returning { Many, - One + One, } diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 9cc8559fbf..2ccdbae9b4 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -54,7 +54,7 @@ enum Command { arguments: Option>, persistent: bool, tx: flume::Sender, Error>>, - returning: Returning + returning: Returning, }, Begin { tx: rendezvous_oneshot::Sender>, @@ -299,7 +299,7 @@ impl ConnectionWorker { args: Option>, chan_size: usize, persistent: bool, - returning: Returning + returning: Returning, ) -> Result, Error>>, Error> { let (tx, rx) = flume::bounded(chan_size); @@ -310,7 +310,7 @@ impl ConnectionWorker { arguments: args.map(SqliteArguments::into_static), persistent, tx, - returning + returning, }, Span::current(), )) From f188f70460b39461b2dcaa13e97c9e7357da4966 Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Tue, 23 Jul 2024 13:24:55 -0500 Subject: [PATCH 4/5] fix "it_can_execute_multiple_statements" test failure --- sqlx-sqlite/src/connection/worker.rs | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index 2ccdbae9b4..dc0ba96ff7 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -160,9 +160,18 @@ impl ConnectionWorker { }, Returning::One => { let mut iter = iter; - if let Some(res) = iter.next() { - drop(iter); - let _ = tx.send(res); + + while let Some(res) = iter.next() { + if let Ok(ok) = &res { + if ok.is_right() { + drop(iter); + let _ = tx.send(res); + break; + } + } + if tx.send(res).is_err() { + break; + } } }, } From b95a2568f461a8d8f4047ffc79dc177c2bd680e3 Mon Sep 17 00:00:00 2001 From: Clark Kampfe Date: Thu, 25 Jul 2024 23:23:13 -0500 Subject: [PATCH 5/5] use Option instead of bespoke enum for rows returned --- sqlx-sqlite/src/any.rs | 16 ++-------------- sqlx-sqlite/src/connection/executor.rs | 16 ++-------------- sqlx-sqlite/src/connection/mod.rs | 5 ----- sqlx-sqlite/src/connection/worker.rs | 26 ++++++++++++++------------ 4 files changed, 18 insertions(+), 45 deletions(-) diff --git a/sqlx-sqlite/src/any.rs b/sqlx-sqlite/src/any.rs index 646687702d..01600d9931 100644 --- a/sqlx-sqlite/src/any.rs +++ b/sqlx-sqlite/src/any.rs @@ -83,13 +83,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin( self.worker - .execute( - query, - args, - self.row_channel_size, - persistent, - crate::connection::Returning::Many, - ) + .execute(query, args, self.row_channel_size, persistent, None) .map_ok(flume::Receiver::into_stream) .try_flatten_stream() .map( @@ -113,13 +107,7 @@ impl AnyConnectionBackend for SqliteConnection { Box::pin(async move { let stream = self .worker - .execute( - query, - args, - self.row_channel_size, - persistent, - crate::connection::Returning::One, - ) + .execute(query, args, self.row_channel_size, persistent, Some(1)) .map_ok(flume::Receiver::into_stream) .await?; futures_util::pin_mut!(stream); diff --git a/sqlx-sqlite/src/connection/executor.rs b/sqlx-sqlite/src/connection/executor.rs index 1dc2dd4b88..541a4f7d4d 100644 --- a/sqlx-sqlite/src/connection/executor.rs +++ b/sqlx-sqlite/src/connection/executor.rs @@ -32,13 +32,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin( self.worker - .execute( - sql, - arguments, - self.row_channel_size, - persistent, - crate::connection::Returning::Many, - ) + .execute(sql, arguments, self.row_channel_size, persistent, None) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(), ) @@ -64,13 +58,7 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection { Box::pin(async move { let stream = self .worker - .execute( - sql, - arguments, - self.row_channel_size, - persistent, - crate::connection::Returning::One, - ) + .execute(sql, arguments, self.row_channel_size, persistent, Some(1)) .map_ok(flume::Receiver::into_stream) .try_flatten_stream(); diff --git a/sqlx-sqlite/src/connection/mod.rs b/sqlx-sqlite/src/connection/mod.rs index a32d60b866..3588b94f82 100644 --- a/sqlx-sqlite/src/connection/mod.rs +++ b/sqlx-sqlite/src/connection/mod.rs @@ -426,8 +426,3 @@ impl Statements { self.temp = None; } } - -pub(crate) enum Returning { - Many, - One, -} diff --git a/sqlx-sqlite/src/connection/worker.rs b/sqlx-sqlite/src/connection/worker.rs index dc0ba96ff7..a01de2419c 100644 --- a/sqlx-sqlite/src/connection/worker.rs +++ b/sqlx-sqlite/src/connection/worker.rs @@ -21,8 +21,6 @@ use crate::connection::execute; use crate::connection::ConnectionState; use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow, SqliteStatement}; -use super::Returning; - // Each SQLite connection has a dedicated thread. // TODO: Tweak this so that we can use a thread pool per pool of SQLite3 connections to reduce @@ -54,7 +52,7 @@ enum Command { arguments: Option>, persistent: bool, tx: flume::Sender, Error>>, - returning: Returning, + limit: Option, }, Begin { tx: rendezvous_oneshot::Sender>, @@ -139,7 +137,7 @@ impl ConnectionWorker { arguments, persistent, tx, - returning + limit } => { let iter = match execute::iter(&mut conn, &query, arguments, persistent) { @@ -150,23 +148,27 @@ impl ConnectionWorker { } }; - match returning { - Returning::Many => { + match limit { + None => { for res in iter { if tx.send(res).is_err() { break; } } }, - Returning::One => { + Some(limit) => { let mut iter = iter; + let mut rows_returned = 0; while let Some(res) = iter.next() { if let Ok(ok) = &res { if ok.is_right() { - drop(iter); - let _ = tx.send(res); - break; + rows_returned += 1; + if rows_returned >= limit { + drop(iter); + let _ = tx.send(res); + break; + } } } if tx.send(res).is_err() { @@ -308,7 +310,7 @@ impl ConnectionWorker { args: Option>, chan_size: usize, persistent: bool, - returning: Returning, + limit: Option, ) -> Result, Error>>, Error> { let (tx, rx) = flume::bounded(chan_size); @@ -319,7 +321,7 @@ impl ConnectionWorker { arguments: args.map(SqliteArguments::into_static), persistent, tx, - returning, + limit, }, Span::current(), ))