From a05cc61398b5ff3bba542f4f1163d023dd07de57 Mon Sep 17 00:00:00 2001 From: Stepan Tubanov Date: Thu, 6 Jun 2024 00:34:43 +0400 Subject: [PATCH] perf: box `MySqlConnection` to reduce sizes of futures --- sqlx-mysql/src/connection/establish.rs | 12 ++-- sqlx-mysql/src/connection/executor.rs | 76 ++++++++++++++++---------- sqlx-mysql/src/connection/mod.rs | 29 ++++++---- sqlx-mysql/src/options/connect.rs | 4 +- sqlx-mysql/src/transaction.rs | 23 ++++---- 5 files changed, 84 insertions(+), 60 deletions(-) diff --git a/sqlx-mysql/src/connection/establish.rs b/sqlx-mysql/src/connection/establish.rs index d7ffc048f7..72590324f7 100644 --- a/sqlx-mysql/src/connection/establish.rs +++ b/sqlx-mysql/src/connection/establish.rs @@ -4,7 +4,7 @@ use futures_core::future::BoxFuture; use crate::collation::{CharSet, Collation}; use crate::common::StatementCache; -use crate::connection::{tls, MySqlStream, MAX_PACKET_SIZE}; +use crate::connection::{tls, MySqlConnectionInner, MySqlStream, MAX_PACKET_SIZE}; use crate::error::Error; use crate::net::{Socket, WithSocket}; use crate::protocol::connect::{ @@ -25,10 +25,12 @@ impl MySqlConnection { let stream = handshake.await?; Ok(Self { - stream, - transaction_depth: 0, - cache_statement: StatementCache::new(options.statement_cache_capacity), - log_settings: options.log_settings.clone(), + inner: Box::new(MySqlConnectionInner { + stream, + transaction_depth: 0, + cache_statement: StatementCache::new(options.statement_cache_capacity), + log_settings: options.log_settings.clone(), + }), }) } } diff --git a/sqlx-mysql/src/connection/executor.rs b/sqlx-mysql/src/connection/executor.rs index 474337cd6f..babe7a6e78 100644 --- a/sqlx-mysql/src/connection/executor.rs +++ b/sqlx-mysql/src/connection/executor.rs @@ -32,19 +32,22 @@ impl MySqlConnection { // https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html // https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK - self.stream.send_packet(Prepare { query: sql }).await?; + self.inner + .stream + .send_packet(Prepare { query: sql }) + .await?; - let ok: PrepareOk = self.stream.recv().await?; + let ok: PrepareOk = self.inner.stream.recv().await?; // the parameter definitions are very unreliable so we skip over them // as we have little use if ok.params > 0 { for _ in 0..ok.params { - let _def: ColumnDefinition = self.stream.recv().await?; + let _def: ColumnDefinition = self.inner.stream.recv().await?; } - self.stream.maybe_recv_eof().await?; + self.inner.stream.maybe_recv_eof().await?; } // the column definitions are berefit the type information from the @@ -54,7 +57,7 @@ impl MySqlConnection { let mut columns = Vec::new(); let column_names = if ok.columns > 0 { - recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await? + recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await? } else { Default::default() }; @@ -73,7 +76,7 @@ impl MySqlConnection { &mut self, sql: &str, ) -> Result<(u32, MySqlStatementMetadata), Error> { - if let Some(statement) = self.cache_statement.get_mut(sql) { + if let Some(statement) = self.inner.cache_statement.get_mut(sql) { // is internally reference-counted return Ok((*statement).clone()); } @@ -81,8 +84,15 @@ impl MySqlConnection { let (id, metadata) = self.prepare_statement(sql).await?; // in case of the cache being full, close the least recently used statement - if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) { - self.stream.send_packet(StmtClose { statement: id }).await?; + if let Some((id, _)) = self + .inner + .cache_statement + .insert(sql, (id, metadata.clone())) + { + self.inner + .stream + .send_packet(StmtClose { statement: id }) + .await?; } Ok((id, metadata)) @@ -96,10 +106,10 @@ impl MySqlConnection { persistent: bool, ) -> Result, Error>> + 'e, Error> { - let mut logger = QueryLogger::new(sql, self.log_settings.clone()); + let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone()); - self.stream.wait_until_ready().await?; - self.stream.waiting.push_back(Waiting::Result); + self.inner.stream.wait_until_ready().await?; + self.inner.stream.waiting.push_back(Waiting::Result); Ok(Box::pin(try_stream! { // make a slot for the shared column data @@ -108,13 +118,13 @@ impl MySqlConnection { let mut columns = Arc::new(Vec::new()); let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments { - if persistent && self.cache_statement.is_enabled() { + if persistent && self.inner.cache_statement.is_enabled() { let (id, metadata) = self .get_or_prepare_statement(sql) .await?; // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html - self.stream + self.inner.stream .send_packet(StatementExecute { statement: id, arguments: &arguments, @@ -128,20 +138,20 @@ impl MySqlConnection { .await?; // https://dev.mysql.com/doc/internals/en/com-stmt-execute.html - self.stream + self.inner.stream .send_packet(StatementExecute { statement: id, arguments: &arguments, }) .await?; - self.stream.send_packet(StmtClose { statement: id }).await?; + self.inner.stream.send_packet(StmtClose { statement: id }).await?; (metadata.column_names, MySqlValueFormat::Binary, false) } } else { // https://dev.mysql.com/doc/internals/en/com-query.html - self.stream.send_packet(Query(sql)).await?; + self.inner.stream.send_packet(Query(sql)).await?; (Arc::default(), MySqlValueFormat::Text, true) }; @@ -149,7 +159,7 @@ impl MySqlConnection { loop { // query response is a meta-packet which may be one of: // Ok, Err, ResultSet, or (unhandled) LocalInfileRequest - let mut packet = self.stream.recv_packet().await?; + let mut packet = self.inner.stream.recv_packet().await?; if packet[0] == 0x00 || packet[0] == 0xff { // first packet in a query response is OK or ERR @@ -170,31 +180,31 @@ impl MySqlConnection { continue; } - self.stream.waiting.pop_front(); + self.inner.stream.waiting.pop_front(); return Ok(()); } // otherwise, this first packet is the start of the result-set metadata, - *self.stream.waiting.front_mut().unwrap() = Waiting::Row; + *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row; let num_columns = packet.get_uint_lenenc() as usize; // column count if needs_metadata { - column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?); + column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?); } else { // next time we hit here, it'll be a new result set and we'll need the // full metadata needs_metadata = true; - recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?; + recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?; } // finally, there will be none or many result-rows loop { - let packet = self.stream.recv_packet().await?; + let packet = self.inner.stream.recv_packet().await?; if packet[0] == 0xfe && packet.len() < 9 { - let eof = packet.eof(self.stream.capabilities)?; + let eof = packet.eof(self.inner.stream.capabilities)?; r#yield!(Either::Left(MySqlQueryResult { rows_affected: 0, @@ -203,11 +213,11 @@ impl MySqlConnection { if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) { // more result sets exist, continue to the next one - *self.stream.waiting.front_mut().unwrap() = Waiting::Result; + *self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result; break; } - self.stream.waiting.pop_front(); + self.inner.stream.waiting.pop_front(); return Ok(()); } @@ -290,14 +300,17 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection { 'c: 'e, { Box::pin(async move { - self.stream.wait_until_ready().await?; + self.inner.stream.wait_until_ready().await?; - let metadata = if self.cache_statement.is_enabled() { + let metadata = if self.inner.cache_statement.is_enabled() { self.get_or_prepare_statement(sql).await?.1 } else { let (id, metadata) = self.prepare_statement(sql).await?; - self.stream.send_packet(StmtClose { statement: id }).await?; + self.inner + .stream + .send_packet(StmtClose { statement: id }) + .await?; metadata }; @@ -316,11 +329,14 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection { 'c: 'e, { Box::pin(async move { - self.stream.wait_until_ready().await?; + self.inner.stream.wait_until_ready().await?; let (id, metadata) = self.prepare_statement(sql).await?; - self.stream.send_packet(StmtClose { statement: id }).await?; + self.inner + .stream + .send_packet(StmtClose { statement: id }) + .await?; let columns = (&*metadata.columns).clone(); diff --git a/sqlx-mysql/src/connection/mod.rs b/sqlx-mysql/src/connection/mod.rs index c7b3543d23..c4978a7701 100644 --- a/sqlx-mysql/src/connection/mod.rs +++ b/sqlx-mysql/src/connection/mod.rs @@ -23,6 +23,10 @@ const MAX_PACKET_SIZE: u32 = 1024; /// A connection to a MySQL database. pub struct MySqlConnection { + pub(crate) inner: Box, +} + +pub(crate) struct MySqlConnectionInner { // underlying TCP stream, // wrapped in a potentially TLS stream, // wrapped in a buffered stream @@ -50,8 +54,8 @@ impl Connection for MySqlConnection { fn close(mut self) -> BoxFuture<'static, Result<(), Error>> { Box::pin(async move { - self.stream.send_packet(Quit).await?; - self.stream.shutdown().await?; + self.inner.stream.send_packet(Quit).await?; + self.inner.stream.shutdown().await?; Ok(()) }) @@ -59,16 +63,16 @@ impl Connection for MySqlConnection { fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> { Box::pin(async move { - self.stream.shutdown().await?; + self.inner.stream.shutdown().await?; Ok(()) }) } fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - self.stream.wait_until_ready().await?; - self.stream.send_packet(Ping).await?; - self.stream.recv_ok().await?; + self.inner.stream.wait_until_ready().await?; + self.inner.stream.send_packet(Ping).await?; + self.inner.stream.recv_ok().await?; Ok(()) }) @@ -76,17 +80,18 @@ impl Connection for MySqlConnection { #[doc(hidden)] fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> { - self.stream.wait_until_ready().boxed() + self.inner.stream.wait_until_ready().boxed() } fn cached_statements_size(&self) -> usize { - self.cache_statement.len() + self.inner.cache_statement.len() } fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - while let Some((statement_id, _)) = self.cache_statement.remove_lru() { - self.stream + while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() { + self.inner + .stream .send_packet(StmtClose { statement: statement_id, }) @@ -99,7 +104,7 @@ impl Connection for MySqlConnection { #[doc(hidden)] fn should_flush(&self) -> bool { - !self.stream.write_buffer().is_empty() + !self.inner.stream.write_buffer().is_empty() } fn begin(&mut self) -> BoxFuture<'_, Result, Error>> @@ -110,6 +115,6 @@ impl Connection for MySqlConnection { } fn shrink_buffers(&mut self) { - self.stream.shrink_buffers(); + self.inner.stream.shrink_buffers(); } } diff --git a/sqlx-mysql/src/options/connect.rs b/sqlx-mysql/src/options/connect.rs index 4c89b43921..151146be7e 100644 --- a/sqlx-mysql/src/options/connect.rs +++ b/sqlx-mysql/src/options/connect.rs @@ -71,8 +71,8 @@ impl ConnectOptions for MySqlConnectOptions { if self.set_names { options.push(format!( r#"NAMES {} COLLATE {}"#, - conn.stream.charset.as_str(), - conn.stream.collation.as_str() + conn.inner.stream.charset.as_str(), + conn.inner.stream.collation.as_str() )) } diff --git a/sqlx-mysql/src/transaction.rs b/sqlx-mysql/src/transaction.rs index 731bdb5750..2d5dcec82f 100644 --- a/sqlx-mysql/src/transaction.rs +++ b/sqlx-mysql/src/transaction.rs @@ -16,10 +16,10 @@ impl TransactionManager for MySqlTransactionManager { fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - let depth = conn.transaction_depth; + let depth = conn.inner.transaction_depth; conn.execute(&*begin_ansi_transaction_sql(depth)).await?; - conn.transaction_depth = depth + 1; + conn.inner.transaction_depth = depth + 1; Ok(()) }) @@ -27,11 +27,11 @@ impl TransactionManager for MySqlTransactionManager { fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - let depth = conn.transaction_depth; + let depth = conn.inner.transaction_depth; if depth > 0 { conn.execute(&*commit_ansi_transaction_sql(depth)).await?; - conn.transaction_depth = depth - 1; + conn.inner.transaction_depth = depth - 1; } Ok(()) @@ -40,11 +40,11 @@ impl TransactionManager for MySqlTransactionManager { fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> { Box::pin(async move { - let depth = conn.transaction_depth; + let depth = conn.inner.transaction_depth; if depth > 0 { conn.execute(&*rollback_ansi_transaction_sql(depth)).await?; - conn.transaction_depth = depth - 1; + conn.inner.transaction_depth = depth - 1; } Ok(()) @@ -52,15 +52,16 @@ impl TransactionManager for MySqlTransactionManager { } fn start_rollback(conn: &mut MySqlConnection) { - let depth = conn.transaction_depth; + let depth = conn.inner.transaction_depth; if depth > 0 { - conn.stream.waiting.push_back(Waiting::Result); - conn.stream.sequence_id = 0; - conn.stream + conn.inner.stream.waiting.push_back(Waiting::Result); + conn.inner.stream.sequence_id = 0; + conn.inner + .stream .write_packet(Query(&*rollback_ansi_transaction_sql(depth))); - conn.transaction_depth = depth - 1; + conn.inner.transaction_depth = depth - 1; } } }