From e0671cd686aed93f4b01f7121748823189783ffc Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Thu, 9 Jan 2025 22:41:16 -0800 Subject: [PATCH] admin: show clients --- pgdog/src/admin/backend.rs | 6 +- pgdog/src/admin/mod.rs | 1 + pgdog/src/admin/parser.rs | 24 +++- pgdog/src/admin/show_clients.rs | 59 ++++++++++ pgdog/src/backend/error.rs | 7 +- pgdog/src/backend/pool/connection.rs | 19 +++- pgdog/src/frontend/client.rs | 39 ++++--- pgdog/src/frontend/comms.rs | 36 ++++-- pgdog/src/frontend/connected_client.rs | 23 ++++ pgdog/src/frontend/listener.rs | 7 +- pgdog/src/frontend/mod.rs | 2 + pgdog/src/frontend/stats.rs | 11 +- pgdog/src/net/messages/data_row.rs | 107 ++++++++++++++++++ pgdog/src/net/messages/mod.rs | 4 + pgdog/src/net/messages/row_description.rs | 127 ++++++++++++++++++++++ pgdog/src/state.rs | 15 +++ pgdog/tests/admin.sh | 2 + pgdog/tests/pgbench.sh | 2 +- 18 files changed, 455 insertions(+), 36 deletions(-) create mode 100644 pgdog/src/admin/show_clients.rs create mode 100644 pgdog/src/frontend/connected_client.rs create mode 100644 pgdog/src/net/messages/data_row.rs create mode 100644 pgdog/src/net/messages/row_description.rs create mode 100644 pgdog/tests/admin.sh diff --git a/pgdog/src/admin/backend.rs b/pgdog/src/admin/backend.rs index 32a65b4..d8ce08b 100644 --- a/pgdog/src/admin/backend.rs +++ b/pgdog/src/admin/backend.rs @@ -43,7 +43,7 @@ impl Backend { let command = Parser::parse(&query.query.to_lowercase())?; - command.execute().await?; + self.messages.extend(command.execute().await?); self.messages.push_back( CommandComplete { @@ -66,4 +66,8 @@ impl Backend { } } } + + pub fn done(&self) -> bool { + self.messages.is_empty() + } } diff --git a/pgdog/src/admin/mod.rs b/pgdog/src/admin/mod.rs index 69081d7..a072c4d 100644 --- a/pgdog/src/admin/mod.rs +++ b/pgdog/src/admin/mod.rs @@ -10,6 +10,7 @@ pub mod parser; pub mod pause; pub mod prelude; pub mod reconnect; +pub mod show_clients; pub use error::Error; diff --git a/pgdog/src/admin/parser.rs b/pgdog/src/admin/parser.rs index 6f0189c..7812ddd 100644 --- a/pgdog/src/admin/parser.rs +++ b/pgdog/src/admin/parser.rs @@ -1,11 +1,16 @@ //! Admin command parser. -use super::{pause::Pause, prelude::Message, reconnect::Reconnect, Command, Error}; +use super::{ + pause::Pause, prelude::Message, reconnect::Reconnect, show_clients::ShowClients, Command, Error, +}; + +use tracing::debug; /// Parser result. pub enum ParseResult { Pause(Pause), Reconnect(Reconnect), + ShowClients(ShowClients), } impl ParseResult { @@ -16,6 +21,7 @@ impl ParseResult { match self { Pause(pause) => pause.execute().await, Reconnect(reconnect) => reconnect.execute().await, + ShowClients(show_clients) => show_clients.execute().await, } } @@ -26,6 +32,7 @@ impl ParseResult { match self { Pause(pause) => pause.name(), Reconnect(reconnect) => reconnect.name(), + ShowClients(show_clients) => show_clients.name(), } } } @@ -37,11 +44,22 @@ impl Parser { /// Parse the query and return a command we can execute. pub fn parse(sql: &str) -> Result { let sql = sql.trim().replace(";", "").to_lowercase(); + let mut iter = sql.split(" "); - Ok(match sql.split(" ").next().ok_or(Error::Syntax)? { + Ok(match iter.next().ok_or(Error::Syntax)?.trim() { "pause" | "resume" => ParseResult::Pause(Pause::parse(&sql)?), "reconnect" => ParseResult::Reconnect(Reconnect::parse(&sql)?), - _ => return Err(Error::Syntax), + "show" => match iter.next().ok_or(Error::Syntax)?.trim() { + "clients" => ParseResult::ShowClients(ShowClients::parse(&sql)?), + command => { + debug!("unknown admin show command: '{}'", command); + return Err(Error::Syntax); + } + }, + command => { + debug!("unknown admin command: {}", command); + return Err(Error::Syntax); + } }) } } diff --git a/pgdog/src/admin/show_clients.rs b/pgdog/src/admin/show_clients.rs new file mode 100644 index 0000000..d2c254c --- /dev/null +++ b/pgdog/src/admin/show_clients.rs @@ -0,0 +1,59 @@ +//! `SHOW CLIENTS` command implementation. + +use super::prelude::*; +use crate::frontend::comms::{comms, Comms}; +use crate::net::messages::*; + +/// Show clients command. +pub struct ShowClients; + +#[async_trait] +impl Command for ShowClients { + fn name(&self) -> String { + "SHOW CLIENTS".into() + } + + fn parse(_sql: &str) -> Result { + Ok(ShowClients) + } + + async fn execute(&self) -> Result, Error> { + let rd = RowDescription::new(&[ + Field::text("host"), + Field::numeric("port"), + Field::text("state"), + Field::numeric("queries"), + Field::numeric("transactions"), + Field::numeric("wait_time"), + Field::numeric("query_time"), + Field::numeric("transaction_time"), + Field::numeric("bytes_received"), + Field::numeric("bytes_sent"), + Field::numeric("errors"), + ]); + + let mut rows = vec![]; + let clients = comms().clients(); + + for client in clients.values() { + let mut row = DataRow::new(); + row.add(client.addr.ip().to_string()) + .add(client.addr.port().to_string()) + .add(client.stats.state.to_string()) + .add(client.stats.queries) + .add(client.stats.transactions) + .add(client.stats.wait_time().as_secs_f64() * 1000.0) + .add(client.stats.query_time.as_secs_f64() * 1000.0) + .add(client.stats.transaction_time.as_secs_f64() * 1000.0) + .add(client.stats.bytes_received) + .add(client.stats.bytes_sent) + .add(client.stats.errors); + rows.push(row.message()?); + } + + let mut messages = vec![rd.message()?]; + messages.extend(rows); + + Ok(messages) + } +} diff --git a/pgdog/src/backend/error.rs b/pgdog/src/backend/error.rs index 9d940e8..2ce044f 100644 --- a/pgdog/src/backend/error.rs +++ b/pgdog/src/backend/error.rs @@ -48,9 +48,12 @@ pub enum Error { impl Error { /// Checkout timeout. - pub fn checkout_timeout(&self) -> bool { + pub fn no_server(&self) -> bool { + use crate::backend::pool::Error as PoolError; match self { - Error::Pool(crate::backend::pool::Error::CheckoutTimeout) => true, + // These are recoverable errors. + Error::Pool(PoolError::CheckoutTimeout) => true, + Error::Pool(PoolError::AllReplicasDown) => true, _ => false, } } diff --git a/pgdog/src/backend/pool/connection.rs b/pgdog/src/backend/pool/connection.rs index a5228e5..7b173ae 100644 --- a/pgdog/src/backend/pool/connection.rs +++ b/pgdog/src/backend/pool/connection.rs @@ -24,6 +24,7 @@ pub struct Connection { server: Option, cluster: Option, admin: Option, + is_admin: bool, } impl Connection { @@ -34,7 +35,8 @@ impl Connection { cluster: None, user: user.to_owned(), database: database.to_owned(), - admin: if admin { Some(Backend::new()) } else { None }, + admin: None, + is_admin: admin, }; if !admin { @@ -51,7 +53,9 @@ impl Connection { /// Create a server connection if one doesn't exist already. pub async fn connect(&mut self, id: &BackendKeyData, route: &Route) -> Result<(), Error> { - if self.server.is_none() && self.admin.is_none() { + if self.is_admin { + self.admin = Some(Backend::new()); + } else if self.server.is_none() { match self.try_conn(id, route).await { Ok(()) => (), Err(Error::Pool(super::Error::Offline)) => { @@ -82,7 +86,7 @@ impl Connection { /// Get server parameters. pub async fn parameters(&mut self, id: &BackendKeyData) -> Result, Error> { - if self.admin.is_some() { + if self.is_admin { Ok(ParameterStatus::fake()) } else { self.connect(id, &Route::unknown()).await?; @@ -100,6 +104,7 @@ impl Connection { /// Disconnect from a server. pub fn disconnect(&mut self) { self.server = None; + self.admin = None; } /// Read a message from the server connection. @@ -128,8 +133,10 @@ impl Connection { /// Fetch the cluster from the global database store. pub fn reload(&mut self) -> Result<(), Error> { - let cluster = databases().cluster((self.user.as_str(), self.database.as_str()))?; - self.cluster = Some(cluster); + if !self.is_admin { + let cluster = databases().cluster((self.user.as_str(), self.database.as_str()))?; + self.cluster = Some(cluster); + } Ok(()) } @@ -138,6 +145,8 @@ impl Connection { pub fn done(&self) -> bool { if let Some(ref server) = self.server { server.done() + } else if let Some(ref admin) = self.admin { + admin.done() } else { true } diff --git a/pgdog/src/frontend/client.rs b/pgdog/src/frontend/client.rs index 774eae7..51b0d4a 100644 --- a/pgdog/src/frontend/client.rs +++ b/pgdog/src/frontend/client.rs @@ -20,6 +20,7 @@ pub struct Client { stream: Stream, id: BackendKeyData, params: Parameters, + comms: Comms, } impl Client { @@ -51,7 +52,7 @@ impl Client { let params = match conn.parameters(&id).await { Ok(params) => params, Err(err) => { - if err.checkout_timeout() { + if err.no_server() { error!("Connection pool is down"); stream.fatal(ErrorResponse::connection()).await?; return Ok(()); @@ -68,7 +69,7 @@ impl Client { stream.send(id).await?; stream.send_flush(ReadyForQuery::idle()).await?; - comms.connect(&id); + comms.connect(&id, addr); info!("Client connected [{}]", addr); @@ -77,15 +78,16 @@ impl Client { stream, id, params, + comms, }; if client.admin() { // Admin clients are not waited on during shutdown. spawn(async move { - client.spawn_internal(comms).await; + client.spawn_internal().await; }); } else { - client.spawn_internal(comms).await; + client.spawn_internal().await; } Ok(()) @@ -97,15 +99,15 @@ impl Client { } /// Run the client and log disconnect. - async fn spawn_internal(&mut self, comms: Comms) { - match self.run(comms).await { + async fn spawn_internal(&mut self) { + match self.run().await { Ok(_) => info!("Client disconnected [{}]", self.addr), Err(err) => error!("Client disconnected with error [{}]: {}", self.addr, err), } } /// Run the client. - async fn run(&mut self, mut comms: Comms) -> Result<(), Error> { + async fn run(&mut self) -> Result<(), Error> { let user = self.params.get_required("user")?; let database = self.params.get_default("database", user); @@ -113,6 +115,7 @@ impl Client { let mut router = Router::new(); let mut timer = Instant::now(); let mut stats = Stats::new(); + let comms = self.comms.clone(); loop { select! { @@ -141,7 +144,7 @@ impl Client { comms.stats(stats.waiting()); match backend.connect(&self.id, router.route()).await { Ok(()) => (), - Err(err) => if err.checkout_timeout() { + Err(err) => if err.no_server() { error!("Connection pool is down"); self.stream.error(ErrorResponse::connection()).await?; comms.stats(stats.error()); @@ -151,7 +154,9 @@ impl Client { } }; comms.stats(stats.connected()); - debug!("client paired with {} [{:.4}ms]", backend.addr()?, timer.elapsed().as_secs_f64() * 1000.0); + if let Ok(addr) = backend.addr() { + debug!("client paired with {} [{:.4}ms]", addr, timer.elapsed().as_secs_f64() * 1000.0); + } } // Send query to server. @@ -190,8 +195,6 @@ impl Client { .await?; } - comms.disconnect(); - Ok(()) } @@ -201,7 +204,7 @@ impl Client { /// sent a complete request. async fn buffer(&mut self) -> Buffer { let mut buffer = Buffer::new(); - let timer = Instant::now(); + let mut timer = None; while !buffer.full() { let message = match self.stream.read().await { @@ -211,6 +214,10 @@ impl Client { } }; + if timer.is_none() { + timer = Some(Instant::now()); + } + match message.code() { // Terminate (F) 'X' => return vec![].into(), @@ -220,7 +227,7 @@ impl Client { trace!( "request buffered [{:.4}ms]", - timer.elapsed().as_secs_f64() * 1000.0 + timer.unwrap().elapsed().as_secs_f64() * 1000.0 ); buffer @@ -230,3 +237,9 @@ impl Client { self.params.get_default("database", "") == "admin" } } + +impl Drop for Client { + fn drop(&mut self) { + self.comms.disconnect(); + } +} diff --git a/pgdog/src/frontend/comms.rs b/pgdog/src/frontend/comms.rs index 40d6c2a..5a7c1af 100644 --- a/pgdog/src/frontend/comms.rs +++ b/pgdog/src/frontend/comms.rs @@ -1,6 +1,8 @@ //! Communication to/from connected clients. use fnv::FnvHashMap as HashMap; +use once_cell::sync::Lazy; +use std::net::SocketAddr; use std::sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -11,13 +13,20 @@ use tokio::sync::Notify; use crate::net::messages::BackendKeyData; -use super::Stats; +use super::{ConnectedClient, Stats}; + +static COMMS: Lazy = Lazy::new(|| Comms::new()); + +/// Get global communication channel. +pub fn comms() -> Comms { + COMMS.clone() +} /// Sync primitives shared between all clients. struct Global { shutdown: Notify, offline: AtomicBool, - stats: Mutex>, + clients: Mutex>, } /// Bi-directional communications between client and internals. @@ -35,20 +44,28 @@ impl Default for Comms { impl Comms { /// Create new communications channel between a client and pgDog. - pub fn new() -> Self { + fn new() -> Self { Self { global: Arc::new(Global { shutdown: Notify::new(), offline: AtomicBool::new(false), - stats: Mutex::new(HashMap::default()), + clients: Mutex::new(HashMap::default()), }), id: None, } } + /// Get all connected clients. + pub fn clients(&self) -> HashMap { + self.global.clients.lock().clone() + } + /// New client connected. - pub fn connect(&mut self, id: &BackendKeyData) -> Self { - self.global.stats.lock().insert(*id, Stats::new()); + pub fn connect(&mut self, id: &BackendKeyData, addr: SocketAddr) -> Self { + self.global + .clients + .lock() + .insert(*id, ConnectedClient::new(addr)); self.id = Some(*id); self.clone() } @@ -56,14 +73,17 @@ impl Comms { /// Client disconected. pub fn disconnect(&mut self) { if let Some(id) = self.id.take() { - self.global.stats.lock().remove(&id); + self.global.clients.lock().remove(&id); } } /// Update stats. pub fn stats(&self, stats: Stats) { if let Some(ref id) = self.id { - self.global.stats.lock().insert(*id, stats); + let mut guard = self.global.clients.lock(); + if let Some(entry) = guard.get_mut(id) { + entry.stats = stats; + } } } diff --git a/pgdog/src/frontend/connected_client.rs b/pgdog/src/frontend/connected_client.rs new file mode 100644 index 0000000..8af315e --- /dev/null +++ b/pgdog/src/frontend/connected_client.rs @@ -0,0 +1,23 @@ +use std::net::SocketAddr; +use std::time::SystemTime; + +use super::Stats; + +/// Connected client. +#[derive(Copy, Clone, Debug)] +pub struct ConnectedClient { + pub stats: Stats, + pub addr: SocketAddr, + pub connected_at: SystemTime, +} + +impl ConnectedClient { + /// New connected client. + pub fn new(addr: SocketAddr) -> Self { + Self { + stats: Stats::new(), + addr, + connected_at: SystemTime::now(), + } + } +} diff --git a/pgdog/src/frontend/listener.rs b/pgdog/src/frontend/listener.rs index b088bbe..fa59fc5 100644 --- a/pgdog/src/frontend/listener.rs +++ b/pgdog/src/frontend/listener.rs @@ -15,7 +15,10 @@ use crate::net::Stream; use tracing::{error, info}; -use super::{Client, Comms, Error}; +use super::{ + comms::{comms, Comms}, + Client, Error, +}; /// Client connections listener and handler. #[derive(Debug)] @@ -36,7 +39,7 @@ impl Listener { /// Listen for client connections and handle them. pub async fn listen(&mut self) -> Result<(), Error> { let listener = TcpListener::bind(&self.addr).await?; - let comms = Comms::new(); + let comms = comms(); info!("🐕 pgDog listening on {}", self.addr); loop { diff --git a/pgdog/src/frontend/mod.rs b/pgdog/src/frontend/mod.rs index 8a21e1a..a1589f7 100644 --- a/pgdog/src/frontend/mod.rs +++ b/pgdog/src/frontend/mod.rs @@ -3,6 +3,7 @@ pub mod buffer; pub mod client; pub mod comms; +pub mod connected_client; pub mod error; pub mod listener; pub mod router; @@ -11,6 +12,7 @@ pub mod stats; pub use buffer::Buffer; pub use client::Client; pub use comms::Comms; +pub use connected_client::ConnectedClient; pub use error::Error; pub use router::Router; pub use stats::Stats; diff --git a/pgdog/src/frontend/stats.rs b/pgdog/src/frontend/stats.rs index 12e83ba..f8dbdd4 100644 --- a/pgdog/src/frontend/stats.rs +++ b/pgdog/src/frontend/stats.rs @@ -5,7 +5,7 @@ use std::time::{Duration, Instant}; use crate::state::State; /// Client statistics. -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug)] pub struct Stats { /// Bytes sent over network. pub bytes_sent: usize, @@ -76,6 +76,15 @@ impl Stats { *self } + /// Get wait time if waiting. + pub fn wait_time(&self) -> Duration { + if self.state == State::Waiting { + self.wait_timer.elapsed() + } else { + Duration::from_secs(0) + } + } + pub(super) fn connected(&mut self) -> Self { let now = Instant::now(); self.state = State::Active; diff --git a/pgdog/src/net/messages/data_row.rs b/pgdog/src/net/messages/data_row.rs new file mode 100644 index 0000000..6356253 --- /dev/null +++ b/pgdog/src/net/messages/data_row.rs @@ -0,0 +1,107 @@ +//! DataRow (B) message. +use super::code; +use super::prelude::*; + +use bytes::BytesMut; + +/// DataRow message. +#[derive(Debug, Clone)] +pub struct DataRow { + columns: Vec, +} + +/// Convert value to data row column +/// using text formatting. +pub trait ToDataRowColumn { + fn to_data_row_column(&self) -> Bytes; +} + +impl ToDataRowColumn for String { + fn to_data_row_column(&self) -> Bytes { + Bytes::copy_from_slice(self.as_bytes()) + } +} + +impl ToDataRowColumn for &str { + fn to_data_row_column(&self) -> Bytes { + Bytes::copy_from_slice(self.as_bytes()) + } +} + +impl ToDataRowColumn for i64 { + fn to_data_row_column(&self) -> Bytes { + Bytes::copy_from_slice(self.to_string().as_bytes()) + } +} + +impl ToDataRowColumn for usize { + fn to_data_row_column(&self) -> Bytes { + Bytes::copy_from_slice(self.to_string().as_bytes()) + } +} + +impl ToDataRowColumn for bool { + fn to_data_row_column(&self) -> Bytes { + Bytes::copy_from_slice(if *self { b"t" } else { b"f" }) + } +} + +impl ToDataRowColumn for f64 { + fn to_data_row_column(&self) -> Bytes { + let number = format!("{:.5}", self); + Bytes::copy_from_slice(number.as_bytes()) + } +} + +impl DataRow { + /// New data row. + pub fn new() -> Self { + Self { columns: vec![] } + } + + /// Add a column to the data row. + pub fn add(&mut self, value: impl ToDataRowColumn) -> &mut Self { + self.columns.push(value.to_data_row_column()); + self + } +} + +impl FromBytes for DataRow { + fn from_bytes(mut bytes: Bytes) -> Result { + code!(bytes, 'D'); + let _len = bytes.get_i32(); + let columns = (0..bytes.get_i16()) + .map(|_| { + let len = bytes.get_i32() as usize; + let mut column = BytesMut::new(); + for _ in 0..len { + column.put_u8(bytes.get_u8()); + } + + column.freeze() + }) + .collect(); + + Ok(Self { columns }) + } +} + +impl ToBytes for DataRow { + fn to_bytes(&self) -> Result { + let mut payload = Payload::named(self.code()); + payload.put_i16(self.columns.len() as i16); + + for column in &self.columns { + payload.put_i32(column.len() as i32); + payload.put(&column[..]); + } + + Ok(payload.freeze()) + } +} + +impl Protocol for DataRow { + fn code(&self) -> char { + 'D' + } +} diff --git a/pgdog/src/net/messages/mod.rs b/pgdog/src/net/messages/mod.rs index a787b4b..0ea6b83 100644 --- a/pgdog/src/net/messages/mod.rs +++ b/pgdog/src/net/messages/mod.rs @@ -3,6 +3,7 @@ pub mod auth; pub mod backend_key; pub mod bind; pub mod command_complete; +pub mod data_row; pub mod error_response; pub mod hello; pub mod parameter_status; @@ -11,17 +12,20 @@ pub mod payload; pub mod prelude; pub mod query; pub mod rfq; +pub mod row_description; pub mod terminate; pub use auth::Authentication; pub use backend_key::BackendKeyData; pub use bind::Bind; +pub use data_row::{DataRow, ToDataRowColumn}; pub use error_response::ErrorResponse; pub use hello::Startup; pub use parameter_status::ParameterStatus; pub use payload::Payload; pub use query::Query; pub use rfq::ReadyForQuery; +pub use row_description::{Field, RowDescription}; pub use terminate::Terminate; use crate::net::Error; diff --git a/pgdog/src/net/messages/row_description.rs b/pgdog/src/net/messages/row_description.rs new file mode 100644 index 0000000..a2b5bdf --- /dev/null +++ b/pgdog/src/net/messages/row_description.rs @@ -0,0 +1,127 @@ +//! RowDescription (B) message. + +use crate::net::c_string_buf; + +use super::code; +use super::prelude::*; + +/// Column field description. +#[derive(Clone, Debug)] +pub struct Field { + /// Name of the field. + pub name: String, + /// Table OID. + pub table_oid: i32, + /// Column number. + pub column: i16, + /// Type OID. + pub type_oid: i32, + /// Type size. + pub type_size: i16, + /// Type modifier. + pub type_modifier: i32, + /// Format code. + pub format: i16, +} + +impl Field { + /// Numeric field. + pub fn numeric(name: &str) -> Self { + Self { + name: name.into(), + table_oid: 0, + column: 0, + type_oid: 1700, + type_size: -1, + type_modifier: -1, + format: 0, // We always use text format. + } + } + + /// Text field. + pub fn text(name: &str) -> Self { + Self { + name: name.into(), + table_oid: 0, + column: 0, + type_oid: 25, + type_size: -1, + type_modifier: -1, + format: 0, // We always use text format. + } + } + + /// Boolean field. + pub fn bool(name: &str) -> Self { + Self { + name: name.into(), + table_oid: 0, + column: 0, + type_oid: 16, + type_size: 1, + type_modifier: -1, + format: 0, // We always use text format. + } + } +} + +/// RowDescription message. +#[derive(Debug, Clone)] +pub struct RowDescription { + /// Fields. + fields: Vec, +} + +impl RowDescription { + pub fn new(fields: &[Field]) -> Self { + Self { + fields: fields.to_vec(), + } + } +} + +impl FromBytes for RowDescription { + fn from_bytes(mut bytes: Bytes) -> Result { + code!(bytes, 'T'); + let _len = bytes.get_i32(); + + let fields = (0..bytes.get_i16()) + .map(|_| Field { + name: c_string_buf(&mut bytes), + table_oid: bytes.get_i32(), + column: bytes.get_i16(), + type_oid: bytes.get_i32(), + type_size: bytes.get_i16(), + type_modifier: bytes.get_i32(), + format: bytes.get_i16(), + }) + .collect(); + + Ok(Self { fields }) + } +} + +impl ToBytes for RowDescription { + fn to_bytes(&self) -> Result { + let mut payload = Payload::named(self.code()); + payload.put_i16(self.fields.len() as i16); + + for field in &self.fields { + payload.put_string(&field.name); + payload.put_i32(field.table_oid); + payload.put_i16(field.column); + payload.put_i32(field.type_oid); + payload.put_i16(field.type_size); + payload.put_i32(field.type_modifier); + payload.put_i16(field.format); + } + + Ok(payload.freeze()) + } +} + +impl Protocol for RowDescription { + fn code(&self) -> char { + 'T' + } +} diff --git a/pgdog/src/state.rs b/pgdog/src/state.rs index 4107aa2..cfc808b 100644 --- a/pgdog/src/state.rs +++ b/pgdog/src/state.rs @@ -19,3 +19,18 @@ pub enum State { /// An error occurered. Error, } + +impl std::fmt::Display for State { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use State::*; + match self { + Idle => write!(f, "idle"), + Active => write!(f, "active"), + IdleInTransaction => write!(f, "idle in transaction"), + TransactionError => write!(f, "transaction error"), + Waiting => write!(f, "waiting"), + Disconnected => write!(f, "disconnected"), + Error => write!(f, "error"), + } + } +} diff --git a/pgdog/tests/admin.sh b/pgdog/tests/admin.sh new file mode 100644 index 0000000..0cc4494 --- /dev/null +++ b/pgdog/tests/admin.sh @@ -0,0 +1,2 @@ +#!/bin/bash +psql -h 127.0.0.1 -p 6432 -U pgdog admin diff --git a/pgdog/tests/pgbench.sh b/pgdog/tests/pgbench.sh index 8cc361a..a12ca11 100644 --- a/pgdog/tests/pgbench.sh +++ b/pgdog/tests/pgbench.sh @@ -2,4 +2,4 @@ # # pgBench test run. # -pgbench -P 1 -h 127.0.0.1 -p 6432 -U pgdog pgdog -c 50 -t 1000 -S +pgbench -P 1 -h 127.0.0.1 -p 6432 -U pgdog pgdog -c 50 -t 100000 -S