Skip to content

Commit

Permalink
Add session mode
Browse files Browse the repository at this point in the history
  • Loading branch information
levkk committed Jan 11, 2025
1 parent 74bbbf0 commit f28ab2d
Show file tree
Hide file tree
Showing 17 changed files with 215 additions and 75 deletions.
4 changes: 4 additions & 0 deletions pgdog-plugin/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,7 @@ This crate is a C (and Rust) library that should be linked at compile time again
## Writing plugins

Examples of plugins written in C and Rust are available [here](https://github.com/levkk/pgdog/tree/main/examples).

## License

This library is distributed under the MIT license. See [LICENSE](LICENSE) for details.
7 changes: 7 additions & 0 deletions pgdog/src/backend/databases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub fn reconnect() {
replace_databases(databases().duplicate());
}

/// Iniitialize the databases for the first time.
pub fn init() {
let config = config();
replace_databases(from_config(&config));
}

/// Re-create pools from config.
///
/// TODO: Avoid creating new pools if they haven't changed at all
Expand Down Expand Up @@ -191,6 +197,7 @@ pub fn from_config(config: &ConfigAndUsers) -> Databases {
&[(primary, &replicas)],
general.load_balancing_strategy,
&user.password,
user.pooler_mode.unwrap_or(general.pooler_mode),
),
);
}
Expand Down
11 changes: 10 additions & 1 deletion pgdog/src/backend/pool/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! A collection of replicas and a primary.
use crate::net::messages::BackendKeyData;
use crate::{config::PoolerMode, net::messages::BackendKeyData};

use super::{Address, Config, Error, Guard, Shard};
use crate::config::LoadBalancingStrategy;
Expand All @@ -23,6 +23,7 @@ pub struct Cluster {
name: String,
shards: Vec<Shard>,
password: String,
pooler_mode: PoolerMode,
}

impl Cluster {
Expand All @@ -32,6 +33,7 @@ impl Cluster {
shards: &[(Option<PoolConfig>, &[PoolConfig])],
lb_strategy: LoadBalancingStrategy,
password: &str,
pooler_mode: PoolerMode,
) -> Self {
Self {
shards: shards
Expand All @@ -40,6 +42,7 @@ impl Cluster {
.collect(),
name: name.to_owned(),
password: password.to_owned(),
pooler_mode,
}
}

Expand All @@ -64,6 +67,7 @@ impl Cluster {
shards: self.shards.iter().map(|s| s.duplicate()).collect(),
name: self.name.clone(),
password: self.password.clone(),
pooler_mode: self.pooler_mode,
}
}

Expand Down Expand Up @@ -128,4 +132,9 @@ impl Cluster {
pub fn password(&self) -> &str {
&self.password
}

/// Get pooler mode.
pub fn pooler_mode(&self) -> PoolerMode {
self.pooler_mode
}
}
23 changes: 22 additions & 1 deletion pgdog/src/backend/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tokio::time::sleep;
use crate::{
admin::backend::Backend,
backend::databases::databases,
config::PoolerMode,
net::messages::{BackendKeyData, Message, ParameterStatus, Protocol},
};

Expand Down Expand Up @@ -73,12 +74,18 @@ impl Connection {
async fn try_conn(&mut self, id: &BackendKeyData, route: &Route) -> Result<(), Error> {
let shard = route.shard().unwrap_or(0);

let server = if route.is_read() {
let mut server = if route.is_read() {
self.cluster()?.replica(shard, id).await?
} else {
self.cluster()?.primary(shard, id).await?
};

// Cleanup session mode connections when
// they are done.
if self.session_mode() {
server.reset = true;
}

self.server = Some(server);

Ok(())
Expand Down Expand Up @@ -179,4 +186,18 @@ impl Connection {
Err(Error::NotConnected)
}
}

/// Transaction mode pooling.
#[inline]
pub fn transaction_mode(&self) -> bool {
self.cluster()
.map(|c| c.pooler_mode() == PoolerMode::Transaction)
.unwrap_or(true)
}

/// Pooler is in session mod
#[inline]
pub fn session_mode(&self) -> bool {
!self.transaction_mode()
}
}
21 changes: 17 additions & 4 deletions pgdog/src/backend/pool/guard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use super::Pool;
pub struct Guard {
server: Option<Server>,
pub(super) pool: Pool,
pub(super) reset: bool,
}

impl std::fmt::Debug for Guard {
Expand All @@ -37,6 +38,7 @@ impl Guard {
Self {
server: Some(server),
pool,
reset: false,
}
}

Expand All @@ -47,13 +49,24 @@ impl Guard {
let pool = self.pool.clone();

if let Some(mut server) = server {
if server.in_transaction() {
let rollback = server.in_transaction();
let reset = self.reset;

if rollback || reset {
let rollback_timeout = pool.lock().config.rollback_timeout();
spawn(async move {
// Rollback any unfinished transactions,
// but only if the server is in sync (protocol-wise).
let rollback_timeout = pool.lock().config.rollback_timeout();
if let Err(_) = timeout(rollback_timeout, server.rollback()).await {
error!("rollback timeout [{}]", server.addr());
if rollback {
if let Err(_) = timeout(rollback_timeout, server.rollback()).await {
error!("rollback timeout [{}]", server.addr());
}
}

if reset {
if let Err(_) = timeout(rollback_timeout, server.reset()).await {
error!("reset timeout [{}]", server.addr());
}
}

pool.checkin(server);
Expand Down
7 changes: 5 additions & 2 deletions pgdog/src/backend/pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ impl Pool {

/// Launch the maintenance loop, bringing the pool online.
pub fn launch(&self) {
self.lock().online = true;
Monitor::new(self);
let mut guard = self.lock();
if !guard.online {
guard.online = true;
Monitor::new(self);
}
}

/// Get a connetion from the pool.
Expand Down
45 changes: 37 additions & 8 deletions pgdog/src/backend/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,23 +266,38 @@ impl Server {
&self.params
}

/// Execute a query on the server and return the result.
pub async fn execute(&mut self, query: &str) -> Result<Vec<Message>, Error> {
/// Execute a batch of queries and return all results.
pub async fn execute_batch(&mut self, queries: &[&str]) -> Result<Vec<Message>, Error> {
if !self.in_sync() {
return Err(Error::NotInSync);
}

self.send(vec![Query::new(query)]).await?;

let mut messages = vec![];

while !self.in_sync() {
messages.push(self.read().await?);
let queries = queries
.iter()
.map(|query| Query::new(query))
.collect::<Vec<Query>>();
let expected = queries.len();

self.send(queries).await?;

let mut zs = 0;
while zs < expected {
let message = self.read().await?;
if message.code() == 'Z' {
zs += 1;
}
messages.push(message);
}

Ok(messages)
}

/// Execute a query on the server and return the result.
pub async fn execute(&mut self, query: &str) -> Result<Vec<Message>, Error> {
self.execute_batch(&[query]).await
}

/// Perform a healthcheck on this connection using the provided query.
pub async fn healthcheck(&mut self, query: &str) -> Result<(), Error> {
debug!("running healthcheck \"{}\" [{}]", query, self.addr);
Expand All @@ -306,6 +321,16 @@ impl Server {
}
}

/// Reset all server parameters and session state.
pub async fn reset(&mut self) {
if self.done() {
if let Err(_err) = self.execute_batch(&["RESET ALL", "DISCARD ALL"]).await {
self.state = State::Error;
}
debug!("connection reset [{}]", self.addr());
}
}

/// Server connection unique identifier.
#[inline]
pub fn id(&self) -> &BackendKeyData {
Expand Down Expand Up @@ -349,7 +374,11 @@ impl Server {
impl Drop for Server {
fn drop(&mut self) {
if let Some(mut stream) = self.stream.take() {
info!("closing server connection [{}]", self.addr,);
// If you see a lot of these, tell your clients
// to not send queries unless they are willing to stick
// around for results.
let out_of_sync = if self.done() { " " } else { " out of sync " };
info!("closing{}server connection [{}]", out_of_sync, self.addr,);

spawn(async move {
stream.write_all(&Terminate.to_bytes()?).await?;
Expand Down
17 changes: 16 additions & 1 deletion pgdog/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ pub struct General {
/// Load balancing strategy.
#[serde(default = "General::load_balancing_strategy")]
pub load_balancing_strategy: LoadBalancingStrategy,
/// TLS certificate.
pub tls_certificate: Option<PathBuf>,
/// TLS private key.
pub tls_private_key: Option<PathBuf>,
}

impl General {
Expand Down Expand Up @@ -186,12 +190,23 @@ impl General {
fn load_balancing_strategy() -> LoadBalancingStrategy {
LoadBalancingStrategy::Random
}

/// Get TLS config, if any.
pub fn tls(&self) -> Option<(&PathBuf, &PathBuf)> {
if let Some(cert) = &self.tls_certificate {
if let Some(key) = &self.tls_private_key {
return Some((cert, key));
}
}

None
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct Stats {}

#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PoolerMode {
#[default]
Expand Down
73 changes: 37 additions & 36 deletions pgdog/src/frontend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use tracing::{debug, error, info, trace};
use super::{Buffer, Comms, Error, Router, Stats};
use crate::auth::scram::Server;
use crate::backend::pool::Connection;
use crate::config::PoolerMode;
use crate::net::messages::{
Authentication, BackendKeyData, ErrorResponse, Protocol, ReadyForQuery,
};
Expand Down Expand Up @@ -42,47 +43,45 @@ impl Client {
let id = BackendKeyData::new();

// Get server parameters and send them to the client.
{
let mut conn = match Connection::new(user, database, admin) {
Ok(conn) => conn,
Err(_) => {
stream.fatal(ErrorResponse::auth(user, database)).await?;
return Ok(());
}
};
let mut conn = match Connection::new(user, database, admin) {
Ok(conn) => conn,
Err(_) => {
stream.fatal(ErrorResponse::auth(user, database)).await?;
return Ok(());
}
};

let params = match conn.parameters(&id).await {
Ok(params) => params,
Err(err) => {
if err.no_server() {
error!("Connection pool is down");
stream.fatal(ErrorResponse::connection()).await?;
return Ok(());
} else {
return Err(err.into());
}
let server_params = match conn.parameters(&id).await {
Ok(params) => params,
Err(err) => {
if err.no_server() {
error!("Connection pool is down");
stream.fatal(ErrorResponse::connection()).await?;
return Ok(());
} else {
return Err(err.into());
}
};
}
};

let password = if admin {
admin_password
} else {
conn.cluster()?.password()
};
let password = if admin {
admin_password
} else {
conn.cluster()?.password()
};

stream.send_flush(Authentication::scram()).await?;
stream.send_flush(Authentication::scram()).await?;

let scram = Server::new(password);
if let Ok(true) = scram.handle(&mut stream).await {
stream.send(Authentication::Ok).await?;
} else {
stream.fatal(ErrorResponse::auth(user, database)).await?;
return Ok(());
}
let scram = Server::new(password);
if let Ok(true) = scram.handle(&mut stream).await {
stream.send(Authentication::Ok).await?;
} else {
stream.fatal(ErrorResponse::auth(user, database)).await?;
return Ok(());
}

for param in params {
stream.send(param).await?;
}
for param in server_params {
stream.send(param).await?;
}

stream.send(id).await?;
Expand Down Expand Up @@ -195,7 +194,9 @@ impl Client {
}

if backend.done() {
backend.disconnect();
if backend.transaction_mode() {
backend.disconnect();
}
comms.stats(stats.transaction());
trace!("transaction finished [{}ms]", stats.transaction_time.as_secs_f64() * 1000.0);
if comms.offline() {
Expand Down
Loading

0 comments on commit f28ab2d

Please sign in to comment.