Skip to content

Commit

Permalink
Merge pull request #1 from ihippik/pgoutput
Browse files Browse the repository at this point in the history
Pgoutput
  • Loading branch information
ihippik authored Jan 28, 2020
2 parents ff42c97 + fdeea04 commit bab7d22
Show file tree
Hide file tree
Showing 21 changed files with 2,137 additions and 1,288 deletions.
31 changes: 18 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ publishing events in a single transaction with a domain model change.
The service allows you to subscribe to changes in the PostgreSQL database using its logical decoding capability
and publish them to the NATS Streaming server.

Inspired after watching https://github.com/hasura/pgdeltastream
### Logic of work
To receive events about data changes in our PostgreSQL DB
  we use the standard logic decoding module (**pgoutput**) This module converts
changes read from the WAL into a logical replication protocol.
  And we already consume all this information on our side.
Then we filter out only the events we need and publish them in the queue

### Event publishing

Expand All @@ -25,22 +30,17 @@ the name of the database and the name of the table `prefix + schema_table`.

```
{
Schema string
Table string
Action string
Data map[string]interface{}
ID uuid.UUID # unique ID
Schema string
Table string
Action string
Data map[string]interface{}
EventTime time.Time # commit time
}
```

Messages are published to Nats-Streaming at least once!

### Restrictions

* DB Postgres must be configured for logical replication and `wal2json` extension installed
(use for test `docker run -it -p 5432:5432 debezium/postgres:11`)
* Tables must have a primary key
* DDL, truncate and sequences are not replicated

### Filter configuration example

```yaml
Expand All @@ -55,10 +55,15 @@ databases:
This filter means that we only process events occurring with the `users` table,
and in particular `insert` and `update` data.

### DB setting
You must make the following settings in the db configuration (postgresql.conf)
* wal_level >= “logical”
* max_replication_slots >= 1

### Docker

You can start the container from the project folder (configuration file is required)

```
docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:master
docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:pgoutput
```
71 changes: 71 additions & 0 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package main

import (
"fmt"
"github.com/ihippik/wal-listener/listener"
"github.com/jackc/pgx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"

"github.com/ihippik/wal-listener/config"
)

// logger log levels.
const (
warningLoggerLevel = "warning"
errorLoggerLevel = "error"
fatalLoggerLevel = "fatal"
infoLoggerLevel = "info"
)

// initLogger init logrus preferences.
func initLogger(cfg config.LoggerCfg) {
logrus.SetReportCaller(cfg.Caller)
if !cfg.HumanReadable {
logrus.SetFormatter(&logrus.JSONFormatter{})
}
var level logrus.Level
switch cfg.Level {
case warningLoggerLevel:
level = logrus.WarnLevel
case errorLoggerLevel:
level = logrus.ErrorLevel
case fatalLoggerLevel:
level = logrus.FatalLevel
case infoLoggerLevel:
level = logrus.InfoLevel
default:
level = logrus.DebugLevel
}
logrus.SetLevel(level)
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
// TODO logger
LogLevel: pgx.LogLevelInfo,
Logger: pgxLogger{},
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Name,
User: cfg.User,
Password: cfg.Password,
}
pgConn, err := pgx.Connect(pgxConf)
if err != nil {
return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection)
}

rConnection, err := pgx.ReplicationConnect(pgxConf)
if err != nil {
return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err)
}
return pgConn, rConnection, nil
}

type pgxLogger struct{}

func (l pgxLogger) Log(level pgx.LogLevel, msg string, data map[string]interface{}) {
logrus.Debugln(msg)
}
54 changes: 3 additions & 51 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package main

import (
"encoding/binary"
"fmt"
"os"

"github.com/jackc/pgx"
"github.com/nats-io/stan.go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/urfave/cli/v2"
Expand Down Expand Up @@ -53,7 +52,8 @@ func main() {
}
repo := listener.NewRepository(conn)
natsPublisher := listener.NewNatsPublisher(sc)
service := listener.NewWalListener(cfg, repo, rConn, natsPublisher)
parser := listener.NewBinaryParser(binary.BigEndian)
service := listener.NewWalListener(cfg, repo, rConn, natsPublisher, parser)
return service.Process()
},
}
Expand All @@ -79,51 +79,3 @@ func getConf(path string) (*config.Config, error) {

return &cfg, nil
}

// initPgxConnections initialise db and replication connections.
func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) {
pgxConf := pgx.ConnConfig{
Host: cfg.Host,
Port: cfg.Port,
Database: cfg.Name,
User: cfg.User,
Password: cfg.Password,
}
pgConn, err := pgx.Connect(pgxConf)
if err != nil {
return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection)
}

rConnection, err := pgx.ReplicationConnect(pgxConf)
if err != nil {
return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err)
}
return pgConn, rConnection, nil
}

// logger log levels.
const (
warningLoggerLevel = "warning"
errorLoggerLevel = "error"
fatalLoggerLevel = "fatal"
)

// initLogger init logrus preferences.
func initLogger(cfg config.LoggerCfg) {
logrus.SetReportCaller(cfg.Caller)
if !cfg.HumanReadable {
logrus.SetFormatter(&logrus.JSONFormatter{})
}
var level logrus.Level
switch cfg.Level {
case warningLoggerLevel:
level = logrus.WarnLevel
case errorLoggerLevel:
level = logrus.ErrorLevel
case fatalLoggerLevel:
level = logrus.FatalLevel
default:
level = logrus.DebugLevel
}
logrus.SetLevel(level)
}
15 changes: 8 additions & 7 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
listener:
slotName: myslot
slotName: myslot_2
ackTimeout: 10s
refreshConnection: 30s
heartbeatInterval: 10s
logger:
caller: false
level: debug
level: info
humanReadable: true
database:
host: localhost
host: 81.90.180.219
port: 5432
name: test
user: postgres
password: postgres
name: scum_db
user: pglogrepl
debug: false
password: secret
filter:
tables:
my_table:
table_name:
- insert
- update
nats:
Expand Down
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,24 @@ go 1.13
require (
bou.ke/monkey v1.0.2
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/google/uuid v1.1.1
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgx v3.6.0+incompatible
github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect
github.com/jackc/pgconn v1.2.1
github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4
github.com/jackc/pgproto3/v2 v2.0.0
github.com/jackc/pgx v3.6.1+incompatible
github.com/kyleconroy/pgoutput v0.1.0
github.com/magiconair/properties v1.8.1
github.com/mailru/easyjson v0.7.0
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
github.com/nats-io/stan.go v0.6.0
github.com/pkg/errors v0.8.1
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
github.com/pkg/errors v0.9.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/viper v1.6.1
github.com/stretchr/testify v1.4.0
github.com/urfave/cli/v2 v2.1.1
golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 // indirect
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 // indirect
)
Loading

0 comments on commit bab7d22

Please sign in to comment.