Skip to content

Commit

Permalink
admin: show clients
Browse files Browse the repository at this point in the history
  • Loading branch information
levkk committed Jan 10, 2025
1 parent fceb774 commit e0671cd
Show file tree
Hide file tree
Showing 18 changed files with 455 additions and 36 deletions.
6 changes: 5 additions & 1 deletion pgdog/src/admin/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl Backend {

let command = Parser::parse(&query.query.to_lowercase())?;

command.execute().await?;
self.messages.extend(command.execute().await?);

self.messages.push_back(
CommandComplete {
Expand All @@ -66,4 +66,8 @@ impl Backend {
}
}
}

pub fn done(&self) -> bool {
self.messages.is_empty()
}
}
1 change: 1 addition & 0 deletions pgdog/src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub mod parser;
pub mod pause;
pub mod prelude;
pub mod reconnect;
pub mod show_clients;

pub use error::Error;

Expand Down
24 changes: 21 additions & 3 deletions pgdog/src/admin/parser.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
//! Admin command parser.
use super::{pause::Pause, prelude::Message, reconnect::Reconnect, Command, Error};
use super::{
pause::Pause, prelude::Message, reconnect::Reconnect, show_clients::ShowClients, Command, Error,
};

use tracing::debug;

/// Parser result.
pub enum ParseResult {
Pause(Pause),
Reconnect(Reconnect),
ShowClients(ShowClients),
}

impl ParseResult {
Expand All @@ -16,6 +21,7 @@ impl ParseResult {
match self {
Pause(pause) => pause.execute().await,
Reconnect(reconnect) => reconnect.execute().await,
ShowClients(show_clients) => show_clients.execute().await,
}
}

Expand All @@ -26,6 +32,7 @@ impl ParseResult {
match self {
Pause(pause) => pause.name(),
Reconnect(reconnect) => reconnect.name(),
ShowClients(show_clients) => show_clients.name(),
}
}
}
Expand All @@ -37,11 +44,22 @@ impl Parser {
/// Parse the query and return a command we can execute.
pub fn parse(sql: &str) -> Result<ParseResult, Error> {
let sql = sql.trim().replace(";", "").to_lowercase();
let mut iter = sql.split(" ");

Ok(match sql.split(" ").next().ok_or(Error::Syntax)? {
Ok(match iter.next().ok_or(Error::Syntax)?.trim() {
"pause" | "resume" => ParseResult::Pause(Pause::parse(&sql)?),
"reconnect" => ParseResult::Reconnect(Reconnect::parse(&sql)?),
_ => return Err(Error::Syntax),
"show" => match iter.next().ok_or(Error::Syntax)?.trim() {
"clients" => ParseResult::ShowClients(ShowClients::parse(&sql)?),
command => {
debug!("unknown admin show command: '{}'", command);
return Err(Error::Syntax);
}
},
command => {
debug!("unknown admin command: {}", command);
return Err(Error::Syntax);
}
})
}
}
59 changes: 59 additions & 0 deletions pgdog/src/admin/show_clients.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! `SHOW CLIENTS` command implementation.
use super::prelude::*;
use crate::frontend::comms::{comms, Comms};
use crate::net::messages::*;

/// Show clients command.
pub struct ShowClients;

#[async_trait]
impl Command for ShowClients {
fn name(&self) -> String {
"SHOW CLIENTS".into()
}

fn parse(_sql: &str) -> Result<Self, Error> {
Ok(ShowClients)
}

async fn execute(&self) -> Result<Vec<Message>, Error> {
let rd = RowDescription::new(&[
Field::text("host"),
Field::numeric("port"),
Field::text("state"),
Field::numeric("queries"),
Field::numeric("transactions"),
Field::numeric("wait_time"),
Field::numeric("query_time"),
Field::numeric("transaction_time"),
Field::numeric("bytes_received"),
Field::numeric("bytes_sent"),
Field::numeric("errors"),
]);

let mut rows = vec![];
let clients = comms().clients();

for client in clients.values() {
let mut row = DataRow::new();
row.add(client.addr.ip().to_string())
.add(client.addr.port().to_string())
.add(client.stats.state.to_string())
.add(client.stats.queries)
.add(client.stats.transactions)
.add(client.stats.wait_time().as_secs_f64() * 1000.0)
.add(client.stats.query_time.as_secs_f64() * 1000.0)
.add(client.stats.transaction_time.as_secs_f64() * 1000.0)
.add(client.stats.bytes_received)
.add(client.stats.bytes_sent)
.add(client.stats.errors);
rows.push(row.message()?);
}

let mut messages = vec![rd.message()?];
messages.extend(rows);

Ok(messages)
}
}
7 changes: 5 additions & 2 deletions pgdog/src/backend/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,12 @@ pub enum Error {

impl Error {
/// Checkout timeout.
pub fn checkout_timeout(&self) -> bool {
pub fn no_server(&self) -> bool {
use crate::backend::pool::Error as PoolError;
match self {
Error::Pool(crate::backend::pool::Error::CheckoutTimeout) => true,
// These are recoverable errors.
Error::Pool(PoolError::CheckoutTimeout) => true,
Error::Pool(PoolError::AllReplicasDown) => true,
_ => false,
}
}
Expand Down
19 changes: 14 additions & 5 deletions pgdog/src/backend/pool/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct Connection {
server: Option<Guard>,
cluster: Option<Cluster>,
admin: Option<Backend>,
is_admin: bool,
}

impl Connection {
Expand All @@ -34,7 +35,8 @@ impl Connection {
cluster: None,
user: user.to_owned(),
database: database.to_owned(),
admin: if admin { Some(Backend::new()) } else { None },
admin: None,
is_admin: admin,
};

if !admin {
Expand All @@ -51,7 +53,9 @@ impl Connection {

/// Create a server connection if one doesn't exist already.
pub async fn connect(&mut self, id: &BackendKeyData, route: &Route) -> Result<(), Error> {
if self.server.is_none() && self.admin.is_none() {
if self.is_admin {
self.admin = Some(Backend::new());
} else if self.server.is_none() {
match self.try_conn(id, route).await {
Ok(()) => (),
Err(Error::Pool(super::Error::Offline)) => {
Expand Down Expand Up @@ -82,7 +86,7 @@ impl Connection {

/// Get server parameters.
pub async fn parameters(&mut self, id: &BackendKeyData) -> Result<Vec<ParameterStatus>, Error> {
if self.admin.is_some() {
if self.is_admin {
Ok(ParameterStatus::fake())
} else {
self.connect(id, &Route::unknown()).await?;
Expand All @@ -100,6 +104,7 @@ impl Connection {
/// Disconnect from a server.
pub fn disconnect(&mut self) {
self.server = None;
self.admin = None;
}

/// Read a message from the server connection.
Expand Down Expand Up @@ -128,8 +133,10 @@ impl Connection {

/// Fetch the cluster from the global database store.
pub fn reload(&mut self) -> Result<(), Error> {
let cluster = databases().cluster((self.user.as_str(), self.database.as_str()))?;
self.cluster = Some(cluster);
if !self.is_admin {
let cluster = databases().cluster((self.user.as_str(), self.database.as_str()))?;
self.cluster = Some(cluster);
}

Ok(())
}
Expand All @@ -138,6 +145,8 @@ impl Connection {
pub fn done(&self) -> bool {
if let Some(ref server) = self.server {
server.done()
} else if let Some(ref admin) = self.admin {
admin.done()
} else {
true
}
Expand Down
39 changes: 26 additions & 13 deletions pgdog/src/frontend/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub struct Client {
stream: Stream,
id: BackendKeyData,
params: Parameters,
comms: Comms,
}

impl Client {
Expand Down Expand Up @@ -51,7 +52,7 @@ impl Client {
let params = match conn.parameters(&id).await {
Ok(params) => params,
Err(err) => {
if err.checkout_timeout() {
if err.no_server() {
error!("Connection pool is down");
stream.fatal(ErrorResponse::connection()).await?;
return Ok(());
Expand All @@ -68,7 +69,7 @@ impl Client {

stream.send(id).await?;
stream.send_flush(ReadyForQuery::idle()).await?;
comms.connect(&id);
comms.connect(&id, addr);

info!("Client connected [{}]", addr);

Expand All @@ -77,15 +78,16 @@ impl Client {
stream,
id,
params,
comms,
};

if client.admin() {
// Admin clients are not waited on during shutdown.
spawn(async move {
client.spawn_internal(comms).await;
client.spawn_internal().await;
});
} else {
client.spawn_internal(comms).await;
client.spawn_internal().await;
}

Ok(())
Expand All @@ -97,22 +99,23 @@ impl Client {
}

/// Run the client and log disconnect.
async fn spawn_internal(&mut self, comms: Comms) {
match self.run(comms).await {
async fn spawn_internal(&mut self) {
match self.run().await {
Ok(_) => info!("Client disconnected [{}]", self.addr),
Err(err) => error!("Client disconnected with error [{}]: {}", self.addr, err),
}
}

/// Run the client.
async fn run(&mut self, mut comms: Comms) -> Result<(), Error> {
async fn run(&mut self) -> Result<(), Error> {
let user = self.params.get_required("user")?;
let database = self.params.get_default("database", user);

let mut backend = Connection::new(user, database, self.admin())?;
let mut router = Router::new();
let mut timer = Instant::now();
let mut stats = Stats::new();
let comms = self.comms.clone();

loop {
select! {
Expand Down Expand Up @@ -141,7 +144,7 @@ impl Client {
comms.stats(stats.waiting());
match backend.connect(&self.id, router.route()).await {
Ok(()) => (),
Err(err) => if err.checkout_timeout() {
Err(err) => if err.no_server() {
error!("Connection pool is down");
self.stream.error(ErrorResponse::connection()).await?;
comms.stats(stats.error());
Expand All @@ -151,7 +154,9 @@ impl Client {
}
};
comms.stats(stats.connected());
debug!("client paired with {} [{:.4}ms]", backend.addr()?, timer.elapsed().as_secs_f64() * 1000.0);
if let Ok(addr) = backend.addr() {
debug!("client paired with {} [{:.4}ms]", addr, timer.elapsed().as_secs_f64() * 1000.0);
}
}

// Send query to server.
Expand Down Expand Up @@ -190,8 +195,6 @@ impl Client {
.await?;
}

comms.disconnect();

Ok(())
}

Expand All @@ -201,7 +204,7 @@ impl Client {
/// sent a complete request.
async fn buffer(&mut self) -> Buffer {
let mut buffer = Buffer::new();
let timer = Instant::now();
let mut timer = None;

while !buffer.full() {
let message = match self.stream.read().await {
Expand All @@ -211,6 +214,10 @@ impl Client {
}
};

if timer.is_none() {
timer = Some(Instant::now());
}

match message.code() {
// Terminate (F)
'X' => return vec![].into(),
Expand All @@ -220,7 +227,7 @@ impl Client {

trace!(
"request buffered [{:.4}ms]",
timer.elapsed().as_secs_f64() * 1000.0
timer.unwrap().elapsed().as_secs_f64() * 1000.0
);

buffer
Expand All @@ -230,3 +237,9 @@ impl Client {
self.params.get_default("database", "") == "admin"
}
}

impl Drop for Client {
fn drop(&mut self) {
self.comms.disconnect();
}
}
Loading

0 comments on commit e0671cd

Please sign in to comment.