Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: box MySqlConnection to reduce sizes of futures #3265

Merged
merged 1 commit into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions sqlx-mysql/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use futures_core::future::BoxFuture;

use crate::collation::{CharSet, Collation};
use crate::common::StatementCache;
use crate::connection::{tls, MySqlStream, MAX_PACKET_SIZE};
use crate::connection::{tls, MySqlConnectionInner, MySqlStream, MAX_PACKET_SIZE};
use crate::error::Error;
use crate::net::{Socket, WithSocket};
use crate::protocol::connect::{
Expand All @@ -25,10 +25,12 @@ impl MySqlConnection {
let stream = handshake.await?;

Ok(Self {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
inner: Box::new(MySqlConnectionInner {
stream,
transaction_depth: 0,
cache_statement: StatementCache::new(options.statement_cache_capacity),
log_settings: options.log_settings.clone(),
}),
})
}
}
Expand Down
76 changes: 46 additions & 30 deletions sqlx-mysql/src/connection/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,22 @@ impl MySqlConnection {
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare.html
// https://dev.mysql.com/doc/internals/en/com-stmt-prepare-response.html#packet-COM_STMT_PREPARE_OK

self.stream.send_packet(Prepare { query: sql }).await?;
self.inner
.stream
.send_packet(Prepare { query: sql })
.await?;

let ok: PrepareOk = self.stream.recv().await?;
let ok: PrepareOk = self.inner.stream.recv().await?;

// the parameter definitions are very unreliable so we skip over them
// as we have little use

if ok.params > 0 {
for _ in 0..ok.params {
let _def: ColumnDefinition = self.stream.recv().await?;
let _def: ColumnDefinition = self.inner.stream.recv().await?;
}

self.stream.maybe_recv_eof().await?;
self.inner.stream.maybe_recv_eof().await?;
}

// the column definitions are berefit the type information from the
Expand All @@ -54,7 +57,7 @@ impl MySqlConnection {
let mut columns = Vec::new();

let column_names = if ok.columns > 0 {
recv_result_metadata(&mut self.stream, ok.columns as usize, &mut columns).await?
recv_result_metadata(&mut self.inner.stream, ok.columns as usize, &mut columns).await?
} else {
Default::default()
};
Expand All @@ -73,16 +76,23 @@ impl MySqlConnection {
&mut self,
sql: &str,
) -> Result<(u32, MySqlStatementMetadata), Error> {
if let Some(statement) = self.cache_statement.get_mut(sql) {
if let Some(statement) = self.inner.cache_statement.get_mut(sql) {
// <MySqlStatementMetadata> is internally reference-counted
return Ok((*statement).clone());
}

let (id, metadata) = self.prepare_statement(sql).await?;

// in case of the cache being full, close the least recently used statement
if let Some((id, _)) = self.cache_statement.insert(sql, (id, metadata.clone())) {
self.stream.send_packet(StmtClose { statement: id }).await?;
if let Some((id, _)) = self
.inner
.cache_statement
.insert(sql, (id, metadata.clone()))
{
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;
}

Ok((id, metadata))
Expand All @@ -96,10 +106,10 @@ impl MySqlConnection {
persistent: bool,
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
{
let mut logger = QueryLogger::new(sql, self.log_settings.clone());
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());

self.stream.wait_until_ready().await?;
self.stream.waiting.push_back(Waiting::Result);
self.inner.stream.wait_until_ready().await?;
self.inner.stream.waiting.push_back(Waiting::Result);

Ok(Box::pin(try_stream! {
// make a slot for the shared column data
Expand All @@ -108,13 +118,13 @@ impl MySqlConnection {
let mut columns = Arc::new(Vec::new());

let (mut column_names, format, mut needs_metadata) = if let Some(arguments) = arguments {
if persistent && self.cache_statement.is_enabled() {
if persistent && self.inner.cache_statement.is_enabled() {
let (id, metadata) = self
.get_or_prepare_statement(sql)
.await?;

// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
Expand All @@ -128,28 +138,28 @@ impl MySqlConnection {
.await?;

// https://dev.mysql.com/doc/internals/en/com-stmt-execute.html
self.stream
self.inner.stream
.send_packet(StatementExecute {
statement: id,
arguments: &arguments,
})
.await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner.stream.send_packet(StmtClose { statement: id }).await?;

(metadata.column_names, MySqlValueFormat::Binary, false)
}
} else {
// https://dev.mysql.com/doc/internals/en/com-query.html
self.stream.send_packet(Query(sql)).await?;
self.inner.stream.send_packet(Query(sql)).await?;

(Arc::default(), MySqlValueFormat::Text, true)
};

loop {
// query response is a meta-packet which may be one of:
// Ok, Err, ResultSet, or (unhandled) LocalInfileRequest
let mut packet = self.stream.recv_packet().await?;
let mut packet = self.inner.stream.recv_packet().await?;

if packet[0] == 0x00 || packet[0] == 0xff {
// first packet in a query response is OK or ERR
Expand All @@ -170,31 +180,31 @@ impl MySqlConnection {
continue;
}

self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}

// otherwise, this first packet is the start of the result-set metadata,
*self.stream.waiting.front_mut().unwrap() = Waiting::Row;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Row;

let num_columns = packet.get_uint_lenenc() as usize; // column count

if needs_metadata {
column_names = Arc::new(recv_result_metadata(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?);
column_names = Arc::new(recv_result_metadata(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?);
} else {
// next time we hit here, it'll be a new result set and we'll need the
// full metadata
needs_metadata = true;

recv_result_columns(&mut self.stream, num_columns, Arc::make_mut(&mut columns)).await?;
recv_result_columns(&mut self.inner.stream, num_columns, Arc::make_mut(&mut columns)).await?;
}

// finally, there will be none or many result-rows
loop {
let packet = self.stream.recv_packet().await?;
let packet = self.inner.stream.recv_packet().await?;

if packet[0] == 0xfe && packet.len() < 9 {
let eof = packet.eof(self.stream.capabilities)?;
let eof = packet.eof(self.inner.stream.capabilities)?;

r#yield!(Either::Left(MySqlQueryResult {
rows_affected: 0,
Expand All @@ -203,11 +213,11 @@ impl MySqlConnection {

if eof.status.contains(Status::SERVER_MORE_RESULTS_EXISTS) {
// more result sets exist, continue to the next one
*self.stream.waiting.front_mut().unwrap() = Waiting::Result;
*self.inner.stream.waiting.front_mut().unwrap() = Waiting::Result;
break;
}

self.stream.waiting.pop_front();
self.inner.stream.waiting.pop_front();
return Ok(());
}

Expand Down Expand Up @@ -290,14 +300,17 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;

let metadata = if self.cache_statement.is_enabled() {
let metadata = if self.inner.cache_statement.is_enabled() {
self.get_or_prepare_statement(sql).await?.1
} else {
let (id, metadata) = self.prepare_statement(sql).await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;

metadata
};
Expand All @@ -316,11 +329,14 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
'c: 'e,
{
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.inner.stream.wait_until_ready().await?;

let (id, metadata) = self.prepare_statement(sql).await?;

self.stream.send_packet(StmtClose { statement: id }).await?;
self.inner
.stream
.send_packet(StmtClose { statement: id })
.await?;

let columns = (&*metadata.columns).clone();

Expand Down
29 changes: 17 additions & 12 deletions sqlx-mysql/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ const MAX_PACKET_SIZE: u32 = 1024;

/// A connection to a MySQL database.
pub struct MySqlConnection {
pub(crate) inner: Box<MySqlConnectionInner>,
}

pub(crate) struct MySqlConnectionInner {
// underlying TCP stream,
// wrapped in a potentially TLS stream,
// wrapped in a buffered stream
Expand Down Expand Up @@ -50,43 +54,44 @@ impl Connection for MySqlConnection {

fn close(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.send_packet(Quit).await?;
self.stream.shutdown().await?;
self.inner.stream.send_packet(Quit).await?;
self.inner.stream.shutdown().await?;

Ok(())
})
}

fn close_hard(mut self) -> BoxFuture<'static, Result<(), Error>> {
Box::pin(async move {
self.stream.shutdown().await?;
self.inner.stream.shutdown().await?;
Ok(())
})
}

fn ping(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
self.stream.wait_until_ready().await?;
self.stream.send_packet(Ping).await?;
self.stream.recv_ok().await?;
self.inner.stream.wait_until_ready().await?;
self.inner.stream.send_packet(Ping).await?;
self.inner.stream.recv_ok().await?;

Ok(())
})
}

#[doc(hidden)]
fn flush(&mut self) -> BoxFuture<'_, Result<(), Error>> {
self.stream.wait_until_ready().boxed()
self.inner.stream.wait_until_ready().boxed()
}

fn cached_statements_size(&self) -> usize {
self.cache_statement.len()
self.inner.cache_statement.len()
}

fn clear_cached_statements(&mut self) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
while let Some((statement_id, _)) = self.cache_statement.remove_lru() {
self.stream
while let Some((statement_id, _)) = self.inner.cache_statement.remove_lru() {
self.inner
.stream
.send_packet(StmtClose {
statement: statement_id,
})
Expand All @@ -99,7 +104,7 @@ impl Connection for MySqlConnection {

#[doc(hidden)]
fn should_flush(&self) -> bool {
!self.stream.write_buffer().is_empty()
!self.inner.stream.write_buffer().is_empty()
}

fn begin(&mut self) -> BoxFuture<'_, Result<Transaction<'_, Self::Database>, Error>>
Expand All @@ -110,6 +115,6 @@ impl Connection for MySqlConnection {
}

fn shrink_buffers(&mut self) {
self.stream.shrink_buffers();
self.inner.stream.shrink_buffers();
}
}
4 changes: 2 additions & 2 deletions sqlx-mysql/src/options/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ impl ConnectOptions for MySqlConnectOptions {
if self.set_names {
options.push(format!(
r#"NAMES {} COLLATE {}"#,
conn.stream.charset.as_str(),
conn.stream.collation.as_str()
conn.inner.stream.charset.as_str(),
conn.inner.stream.collation.as_str()
))
}

Expand Down
23 changes: 12 additions & 11 deletions sqlx-mysql/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@ impl TransactionManager for MySqlTransactionManager {

fn begin(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

conn.execute(&*begin_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth + 1;
conn.inner.transaction_depth = depth + 1;

Ok(())
})
}

fn commit(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.execute(&*commit_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}

Ok(())
Expand All @@ -40,27 +40,28 @@ impl TransactionManager for MySqlTransactionManager {

fn rollback(conn: &mut MySqlConnection) -> BoxFuture<'_, Result<(), Error>> {
Box::pin(async move {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.execute(&*rollback_ansi_transaction_sql(depth)).await?;
conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}

Ok(())
})
}

fn start_rollback(conn: &mut MySqlConnection) {
let depth = conn.transaction_depth;
let depth = conn.inner.transaction_depth;

if depth > 0 {
conn.stream.waiting.push_back(Waiting::Result);
conn.stream.sequence_id = 0;
conn.stream
conn.inner.stream.waiting.push_back(Waiting::Result);
conn.inner.stream.sequence_id = 0;
conn.inner
.stream
.write_packet(Query(&*rollback_ansi_transaction_sql(depth)));

conn.transaction_depth = depth - 1;
conn.inner.transaction_depth = depth - 1;
}
}
}
Loading