Skip to content

Commit

Permalink
WIP: Support Postgresql Protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
holicc committed Jun 5, 2024
1 parent 83e34d1 commit d2f108e
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 94 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ edition = "2021"
arrow = "51.0.0"
sqlparser = { git = "https://github.com/holicc/sqlparser.git" }
url = "2.5.0"
adbc_core = { git = "https://github.com/alexandreyc/adbc-rs", branch = "main", features = [
"driver_manager",
] }
parquet = "51.0.0"
pgwire = "0.22.0"
tokio = { version = "1.37.0", features = ["macros","time"] }
async-trait = "0.1.80"
log = "0.4.21"

[dev-dependencies]
arrow = { version = "51.0.0", features = ["prettyprint", "test_utils"] }
83 changes: 0 additions & 83 deletions src/datasource/adbc.rs

This file was deleted.

1 change: 0 additions & 1 deletion src/datasource/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod adbc;
pub mod file;
pub mod memory;

Expand Down
7 changes: 0 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,6 @@ impl From<parquet::errors::ParquetError> for Error {
}
}

impl From<adbc_core::error::Error> for Error{
fn from(e: adbc_core::error::Error) -> Self {
Error::InternalError(e.to_string())
}

}

impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod optimizer;
pub mod physical;
pub mod planner;
pub mod utils;
pub mod server;

#[cfg(test)]
pub mod test_utils;
2 changes: 2 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod postgresql;
pub mod server;
17 changes: 17 additions & 0 deletions src/server/postgresql/handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use async_trait::async_trait;
use pgwire::{
api::{query::SimpleQueryHandler, results::Response, ClientInfo},
error::PgWireResult,
};

pub struct PostgresqlHandler;

#[async_trait]
impl SimpleQueryHandler for PostgresqlHandler {
async fn do_query<'a, C>(&self, _client: &mut C, sql: &'a str) -> PgWireResult<Vec<Response<'a>>>
where
C: ClientInfo + Unpin + Send + Sync,
{
todo!("Implement PostgresqlHandler::do_query()")
}
}
5 changes: 5 additions & 0 deletions src/server/postgresql/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod handler;
mod server;

pub use server::PostgresqlServer;
pub use handler::PostgresqlHandler;
77 changes: 77 additions & 0 deletions src/server/postgresql/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::error::Result;
use log::error;
use pgwire::api::MakeHandler;
use pgwire::api::{auth::noop::NoopStartupHandler, query::PlaceholderExtendedQueryHandler, StatelessMakeHandler};
use std::{net::SocketAddr, sync::Arc};

use super::PostgresqlHandler;

pub struct PostgresqlServer {
addr: SocketAddr,
}

impl PostgresqlServer {
pub fn new(addr: SocketAddr) -> Self {
PostgresqlServer { addr }
}
}

impl PostgresqlServer {
pub async fn start(&self) -> Result<()> {
tokio::spawn(Self::listen(self.addr));

Ok(())
}

pub fn shutdown(&self) -> Result<()> {
todo!("Implement PostgresqlServer::shutdown()")
}

async fn listen(addr: SocketAddr) {
let listener = tokio::net::TcpListener::bind(addr)
.await
.unwrap_or_else(|e| panic!("PostgreSQL Server bind fail. err: {}", e));

let authenticator = Arc::new(StatelessMakeHandler::new(Arc::new(NoopStartupHandler)));
let processor = Arc::new(StatelessMakeHandler::new(Arc::new(PostgresqlHandler)));
let placeholder = Arc::new(StatelessMakeHandler::new(Arc::new(PlaceholderExtendedQueryHandler)));

loop {
tokio::select! {
peer = listener.accept() => {
match peer {
Ok((socket, _)) => {
tokio::spawn(pgwire::tokio::process_socket(
socket,
None,
authenticator.make(),
processor.make(),
placeholder.make(),
));
}
Err(e) => {
error!("PostgreSQL Server accept new connection fail. err: {}", e);
}
}
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::net::SocketAddr;
use std::str::FromStr;

#[tokio::test]
async fn test_postgresql_server() {
let addr = SocketAddr::from_str("127.0.0.1:5434").unwrap();
let server = PostgresqlServer::new(addr);
server.start().await.unwrap();

// wait
tokio::time::sleep(tokio::time::Duration::from_secs(10000000)).await;
}
}
5 changes: 5 additions & 0 deletions src/server/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use crate::server::postgresql;

pub struct Server {
postgres: postgresql::PostgresqlServer,
}

0 comments on commit d2f108e

Please sign in to comment.