Skip to content

Commit

Permalink
feat: add initial support for postgresql replication (apecloud#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
fanyang01 authored Nov 14, 2024
1 parent 74ac39d commit 3e9e0e0
Show file tree
Hide file tree
Showing 23 changed files with 3,089 additions and 391 deletions.
43 changes: 43 additions & 0 deletions .github/workflows/mysql-replication.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: MySQL Binlog Replication Test

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Install dependencies
run: |
go get .
pip3 install "sqlglot[rs]"
curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
chmod +x duckdb
sudo mv duckdb /usr/local/bin
duckdb -c 'INSTALL json from core'
duckdb -c 'SELECT extension_name, loaded, install_path FROM duckdb_extensions() where installed'
- name: Build
run: go build -v

- name: Test Binlog Replication
run: go test -v -p 1 --timeout 360s ./binlogreplication
43 changes: 43 additions & 0 deletions .github/workflows/postgres-replication.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
name: Postgres Logical Replication Test

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: '1.23'

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.10'

- name: Install dependencies
run: |
go get .
pip3 install "sqlglot[rs]"
curl -LJO https://github.com/duckdb/duckdb/releases/download/v1.1.3/duckdb_cli-linux-amd64.zip
unzip duckdb_cli-linux-amd64.zip
chmod +x duckdb
sudo mv duckdb /usr/local/bin
duckdb -c 'INSTALL json from core'
duckdb -c 'SELECT extension_name, loaded, install_path FROM duckdb_extensions() where installed'
- name: Build
run: go build -v

- name: Test Postgres Logical Replication
run: go test -v --timeout 30s ./pgserver/logrepl
1 change: 1 addition & 0 deletions binlogreplication/binlog_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ func startDuckSqlServer(dir string, persistentSystemVars map[string]string) (int
args := []string{"go", "run", ".",
fmt.Sprintf("--port=%v", duckPort),
fmt.Sprintf("--datadir=%s", dir),
fmt.Sprintf("--pg-port=-1"),
"--loglevel=6", // TRACE
}

Expand Down
9 changes: 9 additions & 0 deletions catalog/internal_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (it *InternalTable) SelectStmt() string {
var InternalTables = struct {
PersistentVariable InternalTable
BinlogPosition InternalTable
PgReplicationLSN InternalTable
GlobalStatus InternalTable
}{
PersistentVariable: InternalTable{
Expand All @@ -93,6 +94,13 @@ var InternalTables = struct {
ValueColumns: []string{"position"},
DDL: "channel TEXT PRIMARY KEY, position TEXT",
},
PgReplicationLSN: InternalTable{
Schema: "main",
Name: "pg_replication_lsn",
KeyColumns: []string{"slot_name"},
ValueColumns: []string{"lsn"},
DDL: "slot_name TEXT PRIMARY KEY, lsn TEXT",
},
GlobalStatus: InternalTable{
Schema: "performance_schema",
Name: "global_status",
Expand All @@ -108,5 +116,6 @@ var InternalTables = struct {
var internalTables = []InternalTable{
InternalTables.PersistentVariable,
InternalTables.BinlogPosition,
InternalTables.PgReplicationLSN,
InternalTables.GlobalStatus,
}
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/dolthub/go-mysql-server v0.18.2-0.20241112002228-81b13e8034f2
github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9
github.com/go-sql-driver/mysql v1.8.1
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.7.1
github.com/jmoiron/sqlx v1.4.0
github.com/lib/pq v1.10.9
Expand All @@ -28,6 +29,7 @@ require (
replace (
github.com/dolthub/go-mysql-server v0.18.2-0.20241112002228-81b13e8034f2 => github.com/apecloud/go-mysql-server v0.0.0-20241112031328-30cddba3eea7
github.com/dolthub/vitess v0.0.0-20241111235433-a20a5ab9d7c9 => github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a
github.com/marcboeker/go-duckdb v1.8.3 => github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595
)

require (
Expand Down Expand Up @@ -62,6 +64,9 @@ require (
github.com/gorilla/mux v1.8.1 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/kr/pretty v0.3.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ github.com/apache/thrift v0.21.0 h1:tdPmh/ptjE1IJnhbhrcl2++TauVjy242rkV/UzJChnE=
github.com/apache/thrift v0.21.0/go.mod h1:W1H8aR/QRtYNvrPeFXBtobyRkd0/YVhTc6i07XIAgDw=
github.com/apecloud/dolt-vitess v0.0.0-20241107081545-d894da3857d8 h1:OKsyuwps5eKiUa4GHn35O8kq8R+Tf2/iUYNo3f3SoCc=
github.com/apecloud/dolt-vitess v0.0.0-20241107081545-d894da3857d8/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595 h1:zAJgtlElXKLbo3HgZmFvfc96vSWGwTqAJphwFarz6Os=
github.com/apecloud/go-duckdb v0.0.0-20241113073916-47619770e595/go.mod h1:C9bYRE1dPYb1hhfu/SSomm78B0FXmNgRvv6YBW/Hooc=
github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a h1:2D9spsdHL5yqHqxghc7FrTfknswMbiUCCJ1Ci3WaIPY=
github.com/apecloud/dolt-vitess v0.0.0-20241112063127-f62e98a9936a/go.mod h1:uBvlRluuL+SbEWTCZ68o0xvsdYZER3CEG/35INdzfJM=
github.com/apecloud/go-mysql-server v0.0.0-20241112031328-30cddba3eea7 h1:nlBHJDxPrUaDpKkS1xj78C0o/hdU5O3RMwOlBJC+U2k=
Expand Down Expand Up @@ -318,6 +320,10 @@ github.com/iris-contrib/i18n v0.0.0-20171121225848-987a633949d0/go.mod h1:pMCz62
github.com/iris-contrib/jade v1.1.3/go.mod h1:H/geBymxJhShH5kecoiOCSssPX7QWYH7UaeZTSWddIk=
github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g=
github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9 h1:86CQbMauoZdLS0HDLcEHYo6rErjiCBjVvcxGsioIn7s=
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9/go.mod h1:SO15KF4QqfUM5UhsG9roXre5qeAQLC1rm8a8Gjpgg5k=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
Expand Down
52 changes: 39 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ import (
"github.com/apecloud/myduckserver/replica"
"github.com/apecloud/myduckserver/transpiler"
sqle "github.com/dolthub/go-mysql-server"
"github.com/dolthub/go-mysql-server/memory"
"github.com/dolthub/go-mysql-server/server"
"github.com/dolthub/go-mysql-server/sql"
"github.com/sirupsen/logrus"

"github.com/dolthub/vitess/go/mysql"
_ "github.com/marcboeker/go-duckdb"
"github.com/sirupsen/logrus"
)

// This is an example of how to implement a MySQL server.
// After running the example, you may connect to it using the following:
// After running the executable, you may connect to it using the following:
//
// > mysql --host=localhost --port=3306 --user=root
//
Expand All @@ -45,12 +45,15 @@ var (
address = "0.0.0.0"
port = 3306
socket string
postgresPort = 5432
dataDirectory = "."
dbFileName = "mysql.db"
logLevel = int(logrus.InfoLevel)

replicaOptions replica.ReplicaOptions

postgresPort = 5432
postgresPrimaryDsn string
postgresSlotName = "myduck"
)

func init() {
Expand All @@ -60,8 +63,6 @@ func init() {
flag.StringVar(&dataDirectory, "datadir", dataDirectory, "The directory to store the database.")
flag.IntVar(&logLevel, "loglevel", logLevel, "The log level to use.")

flag.IntVar(&postgresPort, "pg-port", postgresPort, "The port to bind to for PostgreSQL wire protocol.")

// The following options need to be set for MySQL Shell's utilities to work properly.

// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_host
Expand All @@ -72,6 +73,12 @@ func init() {
flag.StringVar(&replicaOptions.ReportUser, "report-user", replicaOptions.ReportUser, "The account user name of the replica to be reported to the source during replica registration.")
// https://dev.mysql.com/doc/refman/8.4/en/replication-options-replica.html#sysvar_report_password
flag.StringVar(&replicaOptions.ReportPassword, "report-password", replicaOptions.ReportPassword, "The account password of the replica to be reported to the source during replica registration.")

// The following options are used to configure the Postgres server.

flag.IntVar(&postgresPort, "pg-port", postgresPort, "The port to bind to for PostgreSQL wire protocol.")
flag.StringVar(&postgresPrimaryDsn, "pg-primary-dsn", postgresPrimaryDsn, "The DSN of the primary server for logical replication.")
flag.StringVar(&postgresSlotName, "pg-slot-name", postgresSlotName, "The name of the logical replication slot to use.")
}

func ensureSQLTranslate() {
Expand Down Expand Up @@ -119,20 +126,39 @@ func main() {
Address: fmt.Sprintf("%s:%d", address, port),
Socket: socket,
}
srv, err := server.NewServerWithHandler(config, engine, backend.NewSessionBuilder(provider, pool), nil, backend.WrapHandler(pool))
myServer, err := server.NewServerWithHandler(config, engine, backend.NewSessionBuilder(provider, pool), nil, backend.WrapHandler(pool))
if err != nil {
panic(err)
logrus.WithError(err).Fatalln("Failed to create MySQL-protocol server")
}

if postgresPort > 0 {
pgServer, err := pgserver.NewServer(srv, address, postgresPort)
// Postgres tables are created in the `public` schema by default.
// Create the `public` schema if it doesn't exist.
_, err := pool.ExecContext(context.Background(), "CREATE SCHEMA IF NOT EXISTS public")
if err != nil {
panic(err)
logrus.WithError(err).Fatalln("Failed to create the `public` schema")
}

pgServer, err := pgserver.NewServer(
address, postgresPort,
func() *sql.Context {
session := backend.NewSession(memory.NewSession(sql.NewBaseSession(), provider), provider, pool)
return sql.NewContext(context.Background(), sql.WithSession(session))
},
pgserver.WithEngine(myServer.Engine),
pgserver.WithSessionManager(myServer.SessionManager()),
pgserver.WithConnID(&myServer.Listener.(*mysql.Listener).ConnectionID), // Shared connection ID counter
)
if err != nil {
logrus.WithError(err).Fatalln("Failed to create Postgres-protocol server")
}
if postgresPrimaryDsn != "" && postgresSlotName != "" {
go pgServer.StartReplication(postgresPrimaryDsn, postgresSlotName)
}
go pgServer.Start()
}

if err = srv.Start(); err != nil {
panic(err)
if err = myServer.Start(); err != nil {
logrus.WithError(err).Fatalln("Failed to start MySQL-protocol server")
}
}
6 changes: 4 additions & 2 deletions pgserver/connection_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import (
"fmt"

"github.com/cockroachdb/cockroachdb-parser/pkg/sql/sem/tree"
"github.com/dolthub/go-mysql-server/sql"
"github.com/dolthub/vitess/go/vt/proto/query"
"github.com/jackc/pgx/v5/pgproto3"
"github.com/lib/pq/oid"
"github.com/marcboeker/go-duckdb"
)

// ErrorResponseSeverity represents the severity of an ErrorResponse message.
Expand Down Expand Up @@ -55,6 +55,7 @@ type ConvertedQuery struct {
String string
AST tree.Statement
StatementTag string
PgParsable bool
}

// copyFromStdinState tracks the metadata for an import of data into a table using a COPY FROM STDIN statement. When
Expand All @@ -79,13 +80,14 @@ type PortalData struct {
Query ConvertedQuery
IsEmptyQuery bool
Fields []pgproto3.FieldDescription
BoundPlan sql.Node
Stmt *duckdb.Stmt
}

type PreparedStatementData struct {
Query ConvertedQuery
ReturnFields []pgproto3.FieldDescription
BindVarTypes []uint32
Stmt *duckdb.Stmt
}

// VitessTypeToObjectID returns a type, as defined by Vitess, into a type as defined by Postgres.
Expand Down
Loading

0 comments on commit 3e9e0e0

Please sign in to comment.