Skip to content

Commit

Permalink
fix: solve some compatibility issues (apecloud#203)
Browse files Browse the repository at this point in the history
* fix: 'DISCARD ALL' will close the backend connection to reset all states on DuckDB (apecloud#196)

* fix: switch to schema 'public' instead of 'main' for each PostgreSQL protocol connection (apecloud#187)

* test: change the default dbname from main to postgres in compatibilites test
  • Loading branch information
VWagen1989 authored Nov 22, 2024
1 parent c563289 commit 771eb68
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 24 deletions.
5 changes: 5 additions & 0 deletions adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,17 @@ type ConnectionHolder interface {
GetCatalogTxn(ctx context.Context, options *stdsql.TxOptions) (*stdsql.Tx, error)
TryGetTxn() *stdsql.Tx
CloseTxn()
CloseBackendConn()
}

func GetConn(ctx *sql.Context) (*stdsql.Conn, error) {
return ctx.Session.(ConnectionHolder).GetConn(ctx)
}

func CloseBackendConn(ctx *sql.Context) {
ctx.Session.(ConnectionHolder).CloseBackendConn()
}

func GetTxn(ctx *sql.Context, options *stdsql.TxOptions) (*stdsql.Tx, error) {
return ctx.Session.(ConnectionHolder).GetTxn(ctx, options)
}
Expand Down
5 changes: 5 additions & 0 deletions backend/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ func (sess *Session) CloseTxn() {
sess.pool.CloseTxn(sess.ID())
}

// CloseBackendConn implements adapter.ConnectionHolder.
func (sess *Session) CloseBackendConn() {
sess.pool.CloseConn(sess.ID())
}

func (sess *Session) ExecContext(ctx context.Context, query string, args ...any) (stdsql.Result, error) {
conn, err := sess.GetCatalogConn(ctx)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions compatibility/pg/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.class
2 changes: 1 addition & 1 deletion compatibility/pg/c/pg_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ typedef struct {

void connectDB(PGTest *pgTest, const char *ip, int port, const char *user, const char *password) {
char conninfo[256];
snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=main user=%s password=%s", ip, port, user, password);
snprintf(conninfo, sizeof(conninfo), "host=%s port=%d dbname=postgres user=%s password=%s", ip, port, user, password);
pgTest->conn = PQconnectdb(conninfo);
if (PQstatus(pgTest->conn) != CONNECTION_OK) {
printf("Connection to database failed: %s", PQerrorMessage(pgTest->conn));
Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/java/PGTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public static class Tests {

public void connect(String ip, int port, String user, String password) {
try {
String url = "jdbc:postgresql://" + ip + ":" + port + "/main";
String url = "jdbc:postgresql://" + ip + ":" + port + "/postgres";
conn = DriverManager.getConnection(url, user, password);
st = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
} catch (SQLException e) {
Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/node/pg_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class Tests {
port: port,
user: user,
password: password,
database: 'main'
database: 'postgres'
});
await this.client.connect();
}
Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/perl/pg_test.pl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ sub new {

sub connect {
my ($self, $ip, $port, $user, $password) = @_;
my $dsn = "dbi:Pg:dbname=main;host=$ip;port=$port";
my $dsn = "dbi:Pg:dbname=postgres;host=$ip;port=$port";
$self->{conn} = DBI->connect($dsn, $user, $password, { RaiseError => 1, AutoCommit => 1 });
}

Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/php/pg_test.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class Tests {

public function connect($ip, $port, $user, $password) {
try {
$url = "pgsql:host=$ip;port=$port;dbname=main";
$url = "pgsql:host=$ip;port=$port;dbname=postgres";
$this->conn = new PDO($url, $user, $password);
} catch (PDOException $e) {
throw new RuntimeException($e->getMessage());
Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/python/pg_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def connect(self, ip, port, user, password):
self.conn = psycopg2.connect(
host=ip,
port=port,
dbname="main",
dbname="postgres",
user=user,
password=password
)
Expand Down
2 changes: 1 addition & 1 deletion compatibility/pg/ruby/pg_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize

def connect(ip, port, user, password)
begin
@conn = PG.connect(host: ip, port: port, user: user, password: password, dbname: 'main')
@conn = PG.connect(host: ip, port: port, user: user, password: password, dbname: 'postgres')
rescue PG::Error => e
raise "Connection failed: #{e.message}"
end
Expand Down
18 changes: 9 additions & 9 deletions compatibility/pg/test.bats
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env bats

setup() {
psql -h 127.0.0.1 -p 5432 -U main -c "DROP SCHEMA IF EXISTS test CASCADE;"
psql -h 127.0.0.1 -p 5432 -U postgres -c "DROP SCHEMA IF EXISTS test CASCADE;"
}

@test "pg-c" {
gcc -o $BATS_TEST_DIRNAME/c/pg_test $BATS_TEST_DIRNAME/c/pg_test.c -I/usr/include/postgresql -lpq
run $BATS_TEST_DIRNAME/c/pg_test 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run $BATS_TEST_DIRNAME/c/pg_test 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
fi
Expand All @@ -15,7 +15,7 @@ setup() {

@test "pg-java" {
javac $BATS_TEST_DIRNAME/java/PGTest.java
run java -cp $BATS_TEST_DIRNAME/java:$BATS_TEST_DIRNAME/java/postgresql-42.7.4.jar PGTest 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run java -cp $BATS_TEST_DIRNAME/java:$BATS_TEST_DIRNAME/java/postgresql-42.7.4.jar PGTest 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
Expand All @@ -24,7 +24,7 @@ setup() {
}

@test "pg-node" {
run node $BATS_TEST_DIRNAME/node/pg_test.js 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run node $BATS_TEST_DIRNAME/node/pg_test.js 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
Expand All @@ -33,7 +33,7 @@ setup() {
}

@test "pg-perl" {
run perl $BATS_TEST_DIRNAME/perl/pg_test.pl 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run perl $BATS_TEST_DIRNAME/perl/pg_test.pl 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
Expand All @@ -42,7 +42,7 @@ setup() {
}

@test "pg-php" {
run php $BATS_TEST_DIRNAME/php/pg_test.php 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run php $BATS_TEST_DIRNAME/php/pg_test.php 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
Expand All @@ -51,7 +51,7 @@ setup() {
}

@test "pg-python" {
run python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run python3 $BATS_TEST_DIRNAME/python/pg_test.py 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
Expand All @@ -60,10 +60,10 @@ setup() {
}

@test "pg-ruby" {
run ruby $BATS_TEST_DIRNAME/ruby/pg_test.rb 127.0.0.1 5432 root "" $BATS_TEST_DIRNAME/test.data
run ruby $BATS_TEST_DIRNAME/ruby/pg_test.rb 127.0.0.1 5432 postgres "" $BATS_TEST_DIRNAME/test.data
if [ "$status" -ne 0 ]; then
echo "$output"
echo "$stderr"
fi
[ "$status" -eq 0 ]
}
}
26 changes: 23 additions & 3 deletions docs/tutorial/pg-htap-pgpool-setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,26 @@ The HTAP service can be accessed by
docker exec -ti htap-pgpool bash -c "PGPASSWORD=postgres psql -h localhost -U postgres -d postgres"
```

And a table `test` with one row data is replicated from the primary PostreSQL to MyDuck server.

```sql
psql (17.1)
Type "help" for help.

postgres=# \d
List of relations
Schema | Name | Type | Owner
--------+------+-------+----------
public | test | table | postgres
(1 row)

postgres=# select * from test;
id | name
----+------
1 | test
(1 row)
```

# Monitor status

* The status of proxy `PGPool-II` can be retrieved by the built-in statement [SHOW POOL_NODES](https://www.pgpool.net/docs/latest/en/html/sql-show-pool-nodes.html). e.g. You can get the status of the servers by executing the statement `SHOW POOL_NODES;` on the connection to PGPool server.
Expand All @@ -53,7 +73,7 @@ postgres=# show pool_nodes;

after executing the `READ` statement. As you can see, the `select_cnt` has been increased by 1, indicating that the read statement has been routed to MyDuck server.
```sql
postgres=# select * from public.test;
postgres=# select * from test;
id | name
----+------
1 | test
Expand All @@ -70,9 +90,9 @@ postgres=# show pool_nodes;
Let's insert a new row and then query the table. Without surprise, the data has been replicated to our MyDuck server.
```sql
postgres=#
postgres=# insert into public.test values (2, 'test again');
postgres=# insert into test values (2, 'test again');
INSERT 0 1
postgres=# select * from public.test;
postgres=# select * from test;
id | name
----+------------
1 | test
Expand Down
16 changes: 11 additions & 5 deletions pgserver/connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ func NewConnectionHandler(conn net.Conn, handler mysql.Handler, engine *gms.Engi
}
}

func (h *ConnectionHandler) closeBackendConn() {
ctx, err := h.duckHandler.NewContext(context.Background(), h.mysqlConn, "")
if err != nil {
fmt.Println(err.Error())
}
adapter.CloseBackendConn(ctx)
}

// HandleConnection handles a connection's session, reading messages, executing queries, and sending responses.
// Expected to run in a goroutine per connection.
func (h *ConnectionHandler) HandleConnection() {
Expand Down Expand Up @@ -157,6 +165,7 @@ func (h *ConnectionHandler) HandleConnection() {
}

h.duckHandler.ConnectionClosed(h.mysqlConn)
h.closeBackendConn()
if err := h.Conn().Close(); err != nil {
fmt.Printf("Failed to properly close connection:\n%v\n", err)
}
Expand Down Expand Up @@ -307,7 +316,7 @@ func (h *ConnectionHandler) chooseInitialDatabase(startupMessage *pgproto3.Start
}
}

useStmt := fmt.Sprintf("USE %s;", db)
useStmt := fmt.Sprintf("USE %s.public;", db)
setStmt := fmt.Sprintf("SET database TO %s;", db)
parsed, err := parser.ParseOne(setStmt)
if err != nil {
Expand Down Expand Up @@ -1235,10 +1244,7 @@ func (h *ConnectionHandler) convertQuery(query string, modifiers ...QueryModifie

// discardAll handles the DISCARD ALL command
func (h *ConnectionHandler) discardAll(query ConvertedQuery) error {
err := h.duckHandler.ComResetConnection(h.mysqlConn)
if err != nil {
return err
}
h.closeBackendConn()

return h.send(&pgproto3.CommandComplete{
CommandTag: []byte(query.StatementTag),
Expand Down

0 comments on commit 771eb68

Please sign in to comment.