diff --git a/README.md b/README.md index 354020cd..b80ad28f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,12 @@ publishing events in a single transaction with a domain model change. The service allows you to subscribe to changes in the PostgreSQL database using its logical decoding capability and publish them to the NATS Streaming server. -Inspired after watching https://github.com/hasura/pgdeltastream +### Logic of work +To receive events about data changes in our PostgreSQL DB +  we use the standard logic decoding module (**pgoutput**) This module converts + changes read from the WAL into a logical replication protocol. +  And we already consume all this information on our side. +Then we filter out only the events we need and publish them in the queue ### Event publishing @@ -25,22 +30,17 @@ the name of the database and the name of the table `prefix + schema_table`. ``` { - Schema string - Table string - Action string - Data map[string]interface{} + ID uuid.UUID # unique ID + Schema string + Table string + Action string + Data map[string]interface{} + EventTime time.Time # commit time } ``` Messages are published to Nats-Streaming at least once! -### Restrictions - -* DB Postgres must be configured for logical replication and `wal2json` extension installed -(use for test `docker run -it -p 5432:5432 debezium/postgres:11`) -* Tables must have a primary key -* DDL, truncate and sequences are not replicated - ### Filter configuration example ```yaml @@ -55,10 +55,15 @@ databases: This filter means that we only process events occurring with the `users` table, and in particular `insert` and `update` data. +### DB setting +You must make the following settings in the db configuration (postgresql.conf) +* wal_level >= “logical” +* max_replication_slots >= 1 + ### Docker You can start the container from the project folder (configuration file is required) ``` -docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:master +docker run -v $(pwd)/config.yml:/app/config.yml ihippik/wal-listener:pgoutput ``` \ No newline at end of file diff --git a/cmd/wal-listener/init.go b/cmd/wal-listener/init.go new file mode 100644 index 00000000..f7a7a2f1 --- /dev/null +++ b/cmd/wal-listener/init.go @@ -0,0 +1,71 @@ +package main + +import ( + "fmt" + "github.com/ihippik/wal-listener/listener" + "github.com/jackc/pgx" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" + + "github.com/ihippik/wal-listener/config" +) + +// logger log levels. +const ( + warningLoggerLevel = "warning" + errorLoggerLevel = "error" + fatalLoggerLevel = "fatal" + infoLoggerLevel = "info" +) + +// initLogger init logrus preferences. +func initLogger(cfg config.LoggerCfg) { + logrus.SetReportCaller(cfg.Caller) + if !cfg.HumanReadable { + logrus.SetFormatter(&logrus.JSONFormatter{}) + } + var level logrus.Level + switch cfg.Level { + case warningLoggerLevel: + level = logrus.WarnLevel + case errorLoggerLevel: + level = logrus.ErrorLevel + case fatalLoggerLevel: + level = logrus.FatalLevel + case infoLoggerLevel: + level = logrus.InfoLevel + default: + level = logrus.DebugLevel + } + logrus.SetLevel(level) +} + +// initPgxConnections initialise db and replication connections. +func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) { + pgxConf := pgx.ConnConfig{ + // TODO logger + LogLevel: pgx.LogLevelInfo, + Logger: pgxLogger{}, + Host: cfg.Host, + Port: cfg.Port, + Database: cfg.Name, + User: cfg.User, + Password: cfg.Password, + } + pgConn, err := pgx.Connect(pgxConf) + if err != nil { + return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection) + } + + rConnection, err := pgx.ReplicationConnect(pgxConf) + if err != nil { + return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err) + } + return pgConn, rConnection, nil +} + +type pgxLogger struct{} + +func (l pgxLogger) Log(level pgx.LogLevel, msg string, data map[string]interface{}) { + logrus.Debugln(msg) +} diff --git a/cmd/wal-listener/main.go b/cmd/wal-listener/main.go index 46eb1311..da15ddaf 100644 --- a/cmd/wal-listener/main.go +++ b/cmd/wal-listener/main.go @@ -1,12 +1,11 @@ package main import ( + "encoding/binary" "fmt" "os" - "github.com/jackc/pgx" "github.com/nats-io/stan.go" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/urfave/cli/v2" @@ -53,7 +52,8 @@ func main() { } repo := listener.NewRepository(conn) natsPublisher := listener.NewNatsPublisher(sc) - service := listener.NewWalListener(cfg, repo, rConn, natsPublisher) + parser := listener.NewBinaryParser(binary.BigEndian) + service := listener.NewWalListener(cfg, repo, rConn, natsPublisher, parser) return service.Process() }, } @@ -79,51 +79,3 @@ func getConf(path string) (*config.Config, error) { return &cfg, nil } - -// initPgxConnections initialise db and replication connections. -func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn, error) { - pgxConf := pgx.ConnConfig{ - Host: cfg.Host, - Port: cfg.Port, - Database: cfg.Name, - User: cfg.User, - Password: cfg.Password, - } - pgConn, err := pgx.Connect(pgxConf) - if err != nil { - return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection) - } - - rConnection, err := pgx.ReplicationConnect(pgxConf) - if err != nil { - return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err) - } - return pgConn, rConnection, nil -} - -// logger log levels. -const ( - warningLoggerLevel = "warning" - errorLoggerLevel = "error" - fatalLoggerLevel = "fatal" -) - -// initLogger init logrus preferences. -func initLogger(cfg config.LoggerCfg) { - logrus.SetReportCaller(cfg.Caller) - if !cfg.HumanReadable { - logrus.SetFormatter(&logrus.JSONFormatter{}) - } - var level logrus.Level - switch cfg.Level { - case warningLoggerLevel: - level = logrus.WarnLevel - case errorLoggerLevel: - level = logrus.ErrorLevel - case fatalLoggerLevel: - level = logrus.FatalLevel - default: - level = logrus.DebugLevel - } - logrus.SetLevel(level) -} diff --git a/config.yml b/config.yml index 4f27600d..fcf97c2a 100644 --- a/config.yml +++ b/config.yml @@ -1,21 +1,22 @@ listener: - slotName: myslot + slotName: myslot_2 ackTimeout: 10s refreshConnection: 30s heartbeatInterval: 10s logger: caller: false - level: debug + level: info humanReadable: true database: - host: localhost + host: 81.90.180.219 port: 5432 - name: test - user: postgres - password: postgres + name: scum_db + user: pglogrepl + debug: false + password: secret filter: tables: - my_table: + table_name: - insert - update nats: diff --git a/go.mod b/go.mod index 4f516fad..74ad2bd8 100644 --- a/go.mod +++ b/go.mod @@ -5,20 +5,24 @@ go 1.13 require ( bou.ke/monkey v1.0.2 github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a - github.com/cockroachdb/apd v1.1.0 // indirect + github.com/davecgh/go-spew v1.1.1 github.com/gofrs/uuid v3.2.0+incompatible // indirect + github.com/google/uuid v1.1.1 github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect - github.com/jackc/pgx v3.6.0+incompatible - github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect + github.com/jackc/pgconn v1.2.1 + github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4 + github.com/jackc/pgproto3/v2 v2.0.0 + github.com/jackc/pgx v3.6.1+incompatible + github.com/kyleconroy/pgoutput v0.1.0 + github.com/magiconair/properties v1.8.1 github.com/mailru/easyjson v0.7.0 github.com/nats-io/nats-server/v2 v2.1.2 // indirect github.com/nats-io/nats-streaming-server v0.16.2 // indirect github.com/nats-io/stan.go v0.6.0 - github.com/pkg/errors v0.8.1 - github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect + github.com/pkg/errors v0.9.0 github.com/sirupsen/logrus v1.4.2 github.com/spf13/viper v1.6.1 github.com/stretchr/testify v1.4.0 github.com/urfave/cli/v2 v2.1.1 - golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 // indirect + golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 // indirect ) diff --git a/go.sum b/go.sum index 71829cb7..53e263fe 100644 --- a/go.sum +++ b/go.sum @@ -25,9 +25,11 @@ github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkE github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -51,13 +53,15 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= @@ -81,17 +85,51 @@ github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T github.com/hashicorp/raft v1.1.1 h1:HJr7UE1x/JrJSc9Oy6aDBHtNHUUBHjcQjTgvUVihoZs= github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8= github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk= +github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0 h1:DUwgMQuuPnS0rhMXenUtZpqZqrR/30NWY+qQvTpSvEs= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGUK1IDqRa5lAAvEkZG1LKaCRc= github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= -github.com/jackc/pgx v3.6.0+incompatible h1:bJeo4JdVbDAW8KB2m8XkFeo8CPipREoG37BwEoKGz+Q= -github.com/jackc/pgx v3.6.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.1.0/go.mod h1:GgY/Lbj1VonNaVdNUHs9AwWom3yP2eymFQ1C8z9r/Lk= +github.com/jackc/pgconn v1.2.1 h1:+73KD6pbtv6Dbs6/rqlSRUa8XffPlW6YBd1hyFLpwuA= +github.com/jackc/pgconn v1.2.1/go.mod h1:GgY/Lbj1VonNaVdNUHs9AwWom3yP2eymFQ1C8z9r/Lk= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4 h1:UTV6LMahlWdHowugk7JMoAcnVqgS2/ZW5mpAgn3fk4Q= +github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4/go.mod h1:3Nwqi/QTsjaYxFh6Enw980AgQ16IypmAnsXyDoWHtY4= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0 h1:FApgMJ/GtaXfI0s8Lvd0kaLaRwMOhs4VH92pwkwQQvU= +github.com/jackc/pgproto3/v2 v2.0.0/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgx v3.6.1+incompatible h1:qKBkTh0LCxZL+wCqXDqPpqlqbSUWFKtyirpYC/1nXqI= +github.com/jackc/pgx v3.6.1+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -99,20 +137,27 @@ github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFB github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kyleconroy/pgoutput v0.1.0 h1:nY5Bq4A8CqAdqW6LawaNOCWleziBzbd5ouoBtoNDElo= +github.com/kyleconroy/pgoutput v0.1.0/go.mod h1:xj1JLOlXvWLJ1CSJKlrKoWMxkDt9unagoStd2zrZ8IA= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.7.0 h1:aizVhC/NAAcKWb+5QsU1iNOZb4Yws5UO2I+aIprQITM= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/nats-io/jwt v0.2.14/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= @@ -125,7 +170,6 @@ github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= github.com/nats-io/nkeys v0.0.2/go.mod h1:dab7URMsZm6Z/jp9Z5UGa87Uutgc2mVpXLC4B7TDb/4= -github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3 h1:6JrEfig+HzTH85yxzhSVbjHRJv9cn0p6n3IngIcM5/k= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= @@ -140,8 +184,9 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.0 h1:J8lpUdobwIeCI7OiSxHqEwJUKvJwicL5+3v1oe2Yb4k= +github.com/pkg/errors v0.9.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -159,14 +204,18 @@ github.com/prometheus/procfs v0.0.3 h1:CTwfnzjQ+8dS6MhHHu4YswVAD99sL2wjPqP+VkURm github.com/prometheus/procfs v0.0.3/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE= github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= -github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM= @@ -188,9 +237,9 @@ github.com/spf13/viper v1.6.1/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfD github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= @@ -203,20 +252,25 @@ github.com/urfave/cli/v2 v2.1.1 h1:Qt8FeAtxE/vfdrLmR3rxR6JRE0RoVmbXu8+6kZtYU4k= github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7 h1:0hQKqeLdqlt5iIwVOBErRisrHJAN57yOiPRQItI20fU= -golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20191117063200-497ca9f6d64f/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 h1:nVJ3guKA9qdkEQ3TUdXI9QSINo2CUPM/cySEvw2w8I0= +golang.org/x/crypto v0.0.0-20200109152110-61a87790db17/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -227,6 +281,8 @@ golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -236,17 +292,17 @@ golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg= golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= @@ -256,7 +312,14 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= @@ -267,6 +330,7 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= diff --git a/listener/errors.go b/listener/errors.go index aac13538..1366630c 100644 --- a/listener/errors.go +++ b/listener/errors.go @@ -7,10 +7,9 @@ const ( ErrPostgresConnection = "db connection error" ErrReplicationConnection = "replication connection error" ErrNatsConnection = "nats connection error" - ErrMarshalMsg = "marshal wal message error" + ErrPublishEvent = "publish message error" ErrUnmarshalMsg = "unmarshal wal message error" ErrAckWalMessage = "acknowledge wal message error" - ErrValidateMessage = "message validate error" ErrSendStandbyStatus = "send standby status error" ) @@ -18,6 +17,9 @@ const ( var ( errReplConnectionIsLost = errors.New("replication connection to postgres is lost") errConnectionIsLost = errors.New("db connection to postgres is lost") + errMessageLost = errors.New("messages are lost") + errEmptyWALMessage = errors.New("empty WAL message") + errUnknownMessageType = errors.New("unknown message type") ) type serviceErr struct { diff --git a/listener/listener.go b/listener/listener.go index 5e772f46..789b0f44 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -19,8 +19,7 @@ const errorBufferSize = 100 // Logical decoding plugin. const ( - wal2JsonPlugin = "wal2json" - pluginArgIncludeLSN = `"include-lsn" 'on'` + pgoutputPlugin = "pgoutput" ) // Service info message. @@ -30,10 +29,14 @@ const ( ) type publisher interface { - Publish(subject string, msg []byte) error + Publish(string, Event) error Close() error } +type parser interface { + ParseWalMessage([]byte, *WalTransaction) error +} + type replication interface { CreateReplicationSlotEx(slotName, outputPlugin string) (consistentPoint string, snapshotName string, err error) DropReplicationSlot(slotName string) (err error) @@ -57,64 +60,72 @@ type Listener struct { publisher publisher replicator replication repository repository - restartLSN uint64 + parser parser + LSN uint64 errChannel chan error } -func NewWalListener(cfg *config.Config, repo repository, repl replication, publ publisher) *Listener { +func NewWalListener( + cfg *config.Config, + repo repository, + repl replication, + publ publisher, + parser parser, +) *Listener { return &Listener{ slotName: fmt.Sprintf("%s_%s", cfg.Listener.SlotName, cfg.Database.Name), config: *cfg, publisher: publ, repository: repo, replicator: repl, + parser: parser, errChannel: make(chan error, errorBufferSize), } } // Process is main service entry point. -func (w *Listener) Process() error { +func (l *Listener) Process() error { var serviceErr *serviceErr - logger := logrus.WithField("slot_name", w.slotName) + logger := logrus.WithField("slot_name", l.slotName) ctx, cancelFunc := context.WithCancel(context.Background()) - logrus.WithField("logger_level", w.config.Logger.Level).Infoln(StartServiceMessage) + logrus.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage) - slotIsExists, err := w.slotIsExists() + slotIsExists, err := l.slotIsExists() if err != nil { return err } if !slotIsExists { - consistentPoint, _, err := w.replicator.CreateReplicationSlotEx(w.slotName, wal2JsonPlugin) + consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgoutputPlugin) if err != nil { return err } - w.restartLSN, err = pgx.ParseLSN(consistentPoint) + l.LSN, err = pgx.ParseLSN(consistentPoint) logger.Infoln("create new slot") if err != nil { return err } } else { - logger.Infoln("slot already exists, restartLSN updated") + logger.Infoln("slot already exists, LSN updated") } - go w.Stream(ctx) + go l.Stream(ctx) signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - refresh := time.Tick(w.config.Listener.RefreshConnection) + refresh := time.Tick(l.config.Listener.RefreshConnection) ProcessLoop: for { select { case <-refresh: - if !w.replicator.IsAlive() { + if !l.replicator.IsAlive() { logrus.Fatalln(errReplConnectionIsLost) } - if !w.repository.IsAlive() { + if !l.repository.IsAlive() { logrus.Fatalln(errConnectionIsLost) - w.errChannel <- errConnectionIsLost + l.errChannel <- errConnectionIsLost } - case err := <-w.errChannel: + case err := <-l.errChannel: if errors.As(err, &serviceErr) { cancelFunc() logrus.Fatalln(err) @@ -124,7 +135,7 @@ ProcessLoop: case <-signalChan: cancelFunc() - err := w.Stop() + err := l.Stop() if err != nil { logrus.Errorln(err) } @@ -135,8 +146,8 @@ ProcessLoop: } // slotIsExists checks whether a slot has already been created and if it has been created uses it. -func (w *Listener) slotIsExists() (bool, error) { - restartLSNStr, err := w.repository.GetSlotLSN(w.slotName) +func (l *Listener) slotIsExists() (bool, error) { + restartLSNStr, err := l.repository.GetSlotLSN(l.slotName) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return false, nil @@ -146,97 +157,91 @@ func (w *Listener) slotIsExists() (bool, error) { if len(restartLSNStr) == 0 { return false, nil } - w.restartLSN, err = pgx.ParseLSN(restartLSNStr) + l.LSN, err = pgx.ParseLSN(restartLSNStr) if err != nil { return false, err } return true, nil } +func publicationNames(publication string) string { + return fmt.Sprintf(`publication_names '%s'`, publication) +} + +const protoVersion = "proto_version '1'" + // Stream receive event from PostgreSQL. // Accept message, apply filter and publish it in NATS server. -func (w *Listener) Stream(ctx context.Context) { - err := w.replicator.StartReplication(w.slotName, w.restartLSN, -1, pluginArgIncludeLSN) +func (l *Listener) Stream(ctx context.Context) { + err := l.replicator.StartReplication(l.slotName, l.LSN, -1, protoVersion, publicationNames("sport")) if err != nil { - w.errChannel <- newListenerError("StartReplication()", err) + l.errChannel <- newListenerError("StartReplication()", err) return } - go w.SendPeriodicHeartbeats(ctx) - + go l.SendPeriodicHeartbeats(ctx) + tx := NewWalTransaction() for { if ctx.Err() != nil { - w.errChannel <- newListenerError("read message", err) + l.errChannel <- newListenerError("read msg", err) break } - message, err := w.replicator.WaitForReplicationMessage(ctx) + msg, err := l.replicator.WaitForReplicationMessage(ctx) if err != nil { - w.errChannel <- newListenerError("WaitForReplicationMessage()", err) + l.errChannel <- newListenerError("WaitForReplicationMessage()", err) continue } - if message != nil { - if message.WalMessage != nil { - var m WalEvent - walData := message.WalMessage.WalData - err = m.UnmarshalJSON(walData) - if err != nil { - w.errChannel <- fmt.Errorf("%v: %w", ErrUnmarshalMsg, err) - continue - } - if len(m.Change) == 0 { - logrus.WithField("next_lsn", m.NextLSN).Infoln("skip empty WAL message") - continue - } - err = m.Validate() + if msg != nil { + if msg.WalMessage != nil { + logrus.WithField("wal", msg.WalMessage.WalStart). + Debugln("receive wal message") + err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx) if err != nil { - logrus.WithError(err).Warningln(ErrValidateMessage) + logrus.WithError(err).Errorln("msg parse failed") + l.errChannel <- fmt.Errorf("%v: %w", ErrUnmarshalMsg, err) continue } - logrus.WithFields(logrus.Fields{"next_lsn": m.NextLSN, "count_events": len(m.Change)}). - Infoln("receive wal message") - for _, event := range m.CreateEventsWithFilter(w.config.Database.Filter.Tables) { - logrus.WithFields(logrus.Fields{"action": event.Action, "table": event.Table}). - Debugln("receive events") - msg, err := event.MarshalJSON() - if err != nil { - w.errChannel <- fmt.Errorf("%v: %w", ErrMarshalMsg, err) - continue + if tx.CommitTime != nil { + natsEvents := tx.CreateEventsWithFilter(l.config.Database.Filter.Tables) + for _, event := range natsEvents { + subjectName := event.GetSubjectName(l.config.Nats.TopicPrefix) + if err = l.publisher.Publish(subjectName, event); err != nil { + l.errChannel <- fmt.Errorf("%v: %w", ErrPublishEvent, err) + continue + } else { + logrus. + WithField("subject", subjectName). + WithField("action", event.Action). + WithField("lsn", l.LSN). + Infoln("event was send") + } } + tx.Clear() + } - subjectName := event.GetSubjectName(w.config.Nats.TopicPrefix) - - // TODO tracer?! - // TODO retry func for publish?! - err = w.publisher.Publish(subjectName, msg) + if msg.WalMessage.WalStart > l.LSN { + err = l.AckWalMessage(msg.WalMessage.WalStart) if err != nil { - w.errChannel <- newListenerError("Publish()", err) + l.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err) continue } else { - logrus.WithField("nxt_lsn", m.NextLSN).Infoln("event publish to NATS") + logrus.WithField("lsn", l.LSN).Debugln("ack wal msg") } } - - err = w.AckWalMessage(m.NextLSN) - if err != nil { - w.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err) - continue - } else { - logrus.WithField("lsn", m.NextLSN).Debugln("ack wal message") - } } - if message.ServerHeartbeat != nil { + if msg.ServerHeartbeat != nil { //FIXME panic if there have been no messages for a long time. logrus.WithFields(logrus.Fields{ - "server_wal_end": message.ServerHeartbeat.ServerWalEnd, - "server_time": message.ServerHeartbeat.ServerTime, + "server_wal_end": msg.ServerHeartbeat.ServerWalEnd, + "server_time": msg.ServerHeartbeat.ServerTime, }). Debugln("received server heartbeat") - if message.ServerHeartbeat.ReplyRequested == 1 { + if msg.ServerHeartbeat.ReplyRequested == 1 { logrus.Debugln("status requested") - err = w.SendStandbyStatus() + err = l.SendStandbyStatus() if err != nil { - w.errChannel <- fmt.Errorf("%v: %w", ErrSendStandbyStatus, err) + l.errChannel <- fmt.Errorf("%v: %w", ErrSendStandbyStatus, err) } } } @@ -245,17 +250,17 @@ func (w *Listener) Stream(ctx context.Context) { } // Stop is a finalizer function. -func (w *Listener) Stop() error { +func (l *Listener) Stop() error { var err error - err = w.publisher.Close() + err = l.publisher.Close() if err != nil { return err } - err = w.repository.Close() + err = l.repository.Close() if err != nil { return err } - err = w.replicator.Close() + err = l.replicator.Close() if err != nil { return err } @@ -264,16 +269,16 @@ func (w *Listener) Stop() error { } // SendPeriodicHeartbeats send periodic keep alive hearbeats to the server. -func (w *Listener) SendPeriodicHeartbeats(ctx context.Context) { +func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { for { select { case <-ctx.Done(): logrus.WithField("func", "SendPeriodicHeartbeats"). Infoln("context was canceled, stop sending heartbeats") return - case <-time.Tick(w.config.Listener.HeartbeatInterval): + case <-time.Tick(l.config.Listener.HeartbeatInterval): { - err := w.SendStandbyStatus() + err := l.SendStandbyStatus() if err != nil { logrus.WithError(err).Errorln("failed to send status heartbeat") continue @@ -285,13 +290,13 @@ func (w *Listener) SendPeriodicHeartbeats(ctx context.Context) { } // SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server. -func (w *Listener) SendStandbyStatus() error { - standbyStatus, err := pgx.NewStandbyStatus(w.restartLSN) +func (l *Listener) SendStandbyStatus() error { + standbyStatus, err := pgx.NewStandbyStatus(l.LSN) if err != nil { return fmt.Errorf("unable to create StandbyStatus object: %w", err) } standbyStatus.ReplyRequested = 0 - err = w.replicator.SendStandbyStatus(standbyStatus) + err = l.replicator.SendStandbyStatus(standbyStatus) if err != nil { return fmt.Errorf("unable to send StandbyStatus object: %w", err) } @@ -299,13 +304,9 @@ func (w *Listener) SendStandbyStatus() error { } // AckWalMessage acknowledge received wal message. -func (w *Listener) AckWalMessage(restartLSNStr string) error { - restartLSN, err := pgx.ParseLSN(restartLSNStr) - if err != nil { - return err - } - w.restartLSN = restartLSN - err = w.SendStandbyStatus() +func (l *Listener) AckWalMessage(lsn uint64) error { + l.LSN = lsn + err := l.SendStandbyStatus() if err != nil { return err } diff --git a/listener/listener_test.go b/listener/listener_test.go index 16a35229..fe3533d9 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -1,15 +1,18 @@ package listener import ( + "bytes" "context" "errors" - "github.com/stretchr/testify/mock" + "github.com/google/uuid" + "github.com/sirupsen/logrus" "testing" "time" "bou.ke/monkey" "github.com/jackc/pgx" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/ihippik/wal-listener/config" ) @@ -257,7 +260,7 @@ func TestListener_SendStandbyStatus(t *testing.T) { tt.setup() w := &Listener{ replicator: repl, - restartLSN: tt.fields.restartLSN, + LSN: tt.fields.restartLSN, } if err := w.SendStandbyStatus(); (err != nil) != tt.wantErr { t.Errorf("SendStandbyStatus() error = %v, wantErr %v", err, tt.wantErr) @@ -273,7 +276,7 @@ func TestListener_AckWalMessage(t *testing.T) { restartLSN uint64 } type args struct { - restartLSNStr string + LSN uint64 } setSendStandbyStatus := func(status *pgx.StandbyStatus, err error) { @@ -313,7 +316,7 @@ func TestListener_AckWalMessage(t *testing.T) { restartLSN: 0, }, args: args{ - restartLSNStr: "0/17843B8", + LSN: 24658872, }, wantErr: false, }, @@ -335,18 +338,7 @@ func TestListener_AckWalMessage(t *testing.T) { restartLSN: 0, }, args: args{ - restartLSNStr: "0/17843B8", - }, - wantErr: true, - }, - { - name: "invalid lsn", - setup: func() {}, - fields: fields{ - restartLSN: 0, - }, - args: args{ - restartLSNStr: "invalid", + LSN: 24658872, }, wantErr: true, }, @@ -356,9 +348,9 @@ func TestListener_AckWalMessage(t *testing.T) { tt.setup() w := &Listener{ replicator: repl, - restartLSN: tt.fields.restartLSN, + LSN: tt.fields.restartLSN, } - if err := w.AckWalMessage(tt.args.restartLSNStr); (err != nil) != tt.wantErr { + if err := w.AckWalMessage(tt.args.LSN); (err != nil) != tt.wantErr { t.Errorf("AckWalMessage() error = %v, wantErr %v", err, tt.wantErr) } @@ -368,9 +360,11 @@ func TestListener_AckWalMessage(t *testing.T) { } func TestListener_Stream(t *testing.T) { + logrus.SetLevel(logrus.FatalLevel) repo := new(repositoryMock) publ := new(publisherMock) repl := new(replicatorMock) + prs := new(parserMock) type fields struct { config config.Config slotName string @@ -384,12 +378,9 @@ func TestListener_Stream(t *testing.T) { patch := monkey.Patch(time.Now, func() time.Time { return wayback }) defer patch.Unpatch() - setSendStandbyStatus := func(status *pgx.StandbyStatus, err error) { - repl.On( - "SendStandbyStatus", - status, - ). - Return(err) + setParseWalMessageOnce := func(msg []byte, tx *WalTransaction, err error) { + prs.On("ParseWalMessage", msg, tx).Return(err).Once(). + After(10 * time.Millisecond) } setStartReplication := func(err error, slotName string, startLsn uint64, timeline int64, pluginArguments ...string) { @@ -399,20 +390,30 @@ func TestListener_Stream(t *testing.T) { startLsn, timeline, pluginArguments, - ).Return(err) + ).Return(err).Once().After(10 * time.Millisecond) } setWaitForReplicationMessage := func(msg *pgx.ReplicationMessage, err error) { repl.On( "WaitForReplicationMessage", mock.Anything, - ).Return(msg, err) + ).Return(msg, err).Once().After(10 * time.Millisecond) } - setPublish := func(subject string, msg []byte, err error) { - publ.On("Publish", subject, msg).Return(err) + setSendStandbyStatus := func(status *pgx.StandbyStatus, err error) { + repl.On( + "SendStandbyStatus", + status, + ). + Return(err).After(10 * time.Millisecond) } + setPublish := func(subject string, event Event, err error) { + publ.On("Publish", subject, event).Return(err). + Once(). + After(10 * time.Millisecond) + } + uuid.SetRand(bytes.NewReader(make([]byte, 512))) tests := []struct { name string setup func() @@ -427,36 +428,56 @@ func TestListener_Stream(t *testing.T) { "myslot", uint64(0), int64(-1), - pluginArgIncludeLSN, - ) - setPublish( - "pre_user_service_users", - []byte(`{"schema":"user_service","table":"users","action":"insert","data":{"k1":"v1"}}`), - nil, + protoVersion, + "publication_names 'sport'", ) setSendStandbyStatus( &pgx.StandbyStatus{ - WalWritePosition: 24658872, - WalFlushPosition: 24658872, - WalApplyPosition: 24658872, + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, ClientTime: 18445935546232551617, ReplyRequested: 0, }, nil, ) + setParseWalMessageOnce( + []byte(`some bytes`), + &WalTransaction{ + LSN: 0, + BeginTime: nil, + CommitTime: nil, + RelationStore: make(map[int32]RelationData), + Actions: nil, + }, + nil, + ) + + setPublish( + "pre_public_users", + Event{ + ID: uuid.MustParse("00000000-0000-4000-8000-000000000000"), + Schema: "public", + Table: "users", + Action: "INSERT", + Data: map[string]interface{}{"id": 1}, + EventTime: wayback, + }, + nil, + ) setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ - WalStart: 0, + WalStart: 10, ServerWalEnd: 0, ServerTime: 0, - WalData: []byte(`{"nextlsn":"0/17843B8","change":[{"kind":"insert","schema":"user_service","table":"users","columnnames":["k1"],"columnvalues":["v1"]}]}`), + WalData: []byte(`some bytes`), }, ServerHeartbeat: &pgx.ServerHeartbeat{ ServerWalEnd: 0, ServerTime: 0, - ReplyRequested: 0, + ReplyRequested: 1, }, }, nil, @@ -482,40 +503,19 @@ func TestListener_Stream(t *testing.T) { restartLSN: 0, }, args: args{ - timeout: 10 * time.Millisecond, + timeout: 40 * time.Millisecond, }, }, { - name: "publish err", + name: "start replication err", setup: func() { setStartReplication( - nil, + someErr, "myslot", uint64(0), int64(-1), - pluginArgIncludeLSN, - ) - setPublish( - "pre_users", - []byte(`{"tableName":"users","action":"insert","data":{"k1":"v1"}}`), - someErr, - ) - - setWaitForReplicationMessage( - &pgx.ReplicationMessage{ - WalMessage: &pgx.WalMessage{ - WalStart: 0, - ServerWalEnd: 0, - ServerTime: 0, - WalData: []byte(`{"nextLsn":"0","change":[{"kind":"insert","schema":"user_service","table":"users","columnnames":["k1"],"columnvalues":["v1"]}]}`), - }, - ServerHeartbeat: &pgx.ServerHeartbeat{ - ServerWalEnd: 0, - ServerTime: 0, - ReplyRequested: 0, - }, - }, - nil, + protoVersion, + "publication_names 'sport'", ) }, fields: fields{ @@ -538,35 +538,35 @@ func TestListener_Stream(t *testing.T) { restartLSN: 0, }, args: args{ - timeout: 10 * time.Millisecond, + timeout: 100 * time.Microsecond, }, }, { - name: "validate err", + name: "wait replication err", setup: func() { setStartReplication( nil, "myslot", uint64(0), int64(-1), - pluginArgIncludeLSN, + protoVersion, + "publication_names 'sport'", ) - setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ - WalStart: 0, + WalStart: 10, ServerWalEnd: 0, ServerTime: 0, - WalData: []byte(`{"nextLsn":"0","change":[{"columnnames":["v1","v2"]}]}`), + WalData: []byte(`some bytes`), }, ServerHeartbeat: &pgx.ServerHeartbeat{ ServerWalEnd: 0, ServerTime: 0, - ReplyRequested: 0, + ReplyRequested: 1, }, }, - nil, + someErr, ) }, fields: fields{ @@ -574,145 +574,146 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - RefreshConnection: 0, HeartbeatInterval: 0, }, + Database: config.DatabaseCfg{ + Filter: config.FilterStruct{ + Tables: map[string][]string{"users": {"insert"}}, + }, + }, + Nats: config.NatsCfg{ + TopicPrefix: "pre_", + }, }, slotName: "myslot", restartLSN: 0, }, args: args{ - timeout: 10 * time.Millisecond, + timeout: 20 * time.Millisecond, }, }, { - name: "skip empty WAL", + name: "parse err", setup: func() { setStartReplication( nil, "myslot", uint64(0), int64(-1), - pluginArgIncludeLSN, + protoVersion, + "publication_names 'sport'", ) - setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ - WalStart: 0, + WalStart: 10, ServerWalEnd: 0, ServerTime: 0, - WalData: []byte(`{"nextLsn":"0","change":[]}`), + WalData: []byte(`some bytes`), }, ServerHeartbeat: &pgx.ServerHeartbeat{ ServerWalEnd: 0, ServerTime: 0, - ReplyRequested: 0, + ReplyRequested: 1, }, }, nil, ) + setParseWalMessageOnce( + []byte(`some bytes`), + &WalTransaction{ + LSN: 0, + BeginTime: nil, + CommitTime: nil, + RelationStore: make(map[int32]RelationData), + Actions: nil, + }, + someErr, + ) }, fields: fields{ config: config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - RefreshConnection: 0, HeartbeatInterval: 0, }, + Database: config.DatabaseCfg{ + Filter: config.FilterStruct{ + Tables: map[string][]string{"users": {"insert"}}, + }, + }, + Nats: config.NatsCfg{ + TopicPrefix: "pre_", + }, }, slotName: "myslot", restartLSN: 0, }, args: args{ - timeout: 10 * time.Millisecond, + timeout: 30 * time.Millisecond, }, }, { - name: "message unmarshal err", + name: "publish err", setup: func() { setStartReplication( nil, "myslot", uint64(0), int64(-1), - pluginArgIncludeLSN, + protoVersion, + "publication_names 'sport'", ) - setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ - WalStart: 0, + WalStart: 10, ServerWalEnd: 0, ServerTime: 0, - WalData: nil, + WalData: []byte(`some bytes`), }, ServerHeartbeat: &pgx.ServerHeartbeat{ ServerWalEnd: 0, ServerTime: 0, - ReplyRequested: 0, + ReplyRequested: 1, }, }, nil, ) - }, - fields: fields{ - config: config.Config{ - Listener: config.ListenerCfg{ - SlotName: "myslot", - AckTimeout: 0, - RefreshConnection: 0, - HeartbeatInterval: 0, + setParseWalMessageOnce( + []byte(`some bytes`), + &WalTransaction{ + LSN: 0, + BeginTime: nil, + CommitTime: nil, + RelationStore: make(map[int32]RelationData), + Actions: nil, + }, + nil, + ) + + setPublish( + "pre_public_users", + Event{ + ID: uuid.MustParse("00000000-0000-4000-8000-000000000000"), + Schema: "public", + Table: "users", + Action: "INSERT", + Data: map[string]interface{}{"id": 1}, + EventTime: wayback, }, - }, - slotName: "myslot", - restartLSN: 0, - }, - args: args{ - timeout: 10 * time.Millisecond, - }, - }, - { - name: "start replication err", - setup: func() { - setStartReplication( someErr, - "myslot", - uint64(0), - int64(-1), - pluginArgIncludeLSN, ) - }, - fields: fields{ - config: config.Config{ - Listener: config.ListenerCfg{ - SlotName: "myslot", - AckTimeout: 0, - RefreshConnection: 0, - HeartbeatInterval: 0, + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: 18445935546232551617, + ReplyRequested: 0, }, - }, - slotName: "myslot", - restartLSN: 0, - }, - args: args{ - timeout: 1 * time.Second, - }, - }, - { - name: "wait message err", - setup: func() { - setStartReplication( nil, - "myslot", - uint64(0), - int64(-1), - pluginArgIncludeLSN, - ) - setWaitForReplicationMessage( - &pgx.ReplicationMessage{}, - someErr, ) }, fields: fields{ @@ -720,15 +721,22 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - RefreshConnection: 0, HeartbeatInterval: 0, }, + Database: config.DatabaseCfg{ + Filter: config.FilterStruct{ + Tables: map[string][]string{"users": {"insert"}}, + }, + }, + Nats: config.NatsCfg{ + TopicPrefix: "pre_", + }, }, slotName: "myslot", restartLSN: 0, }, args: args{ - timeout: 1 * time.Second, + timeout: 50 * time.Millisecond, }, }, } @@ -742,7 +750,8 @@ func TestListener_Stream(t *testing.T) { publisher: publ, replicator: repl, repository: repo, - restartLSN: tt.fields.restartLSN, + parser: prs, + LSN: tt.fields.restartLSN, errChannel: make(chan error, errorBufferSize), } go func() { diff --git a/listener/nats_publisher.go b/listener/nats_publisher.go index c2ee7a5e..1636b044 100644 --- a/listener/nats_publisher.go +++ b/listener/nats_publisher.go @@ -2,7 +2,9 @@ package listener import ( "fmt" + "time" + "github.com/google/uuid" "github.com/nats-io/stan.go" ) @@ -19,13 +21,19 @@ func (n NatsPublisher) Close() error { // Event event structure for publishing to the NATS server. //easyjson:json type Event struct { - Schema string `json:"schema"` - Table string `json:"table"` - Action string `json:"action"` - Data map[string]interface{} `json:"data"` + ID uuid.UUID `json:"id"` + Schema string `json:"schema"` + Table string `json:"table"` + Action string `json:"action"` + Data map[string]interface{} `json:"data"` + EventTime time.Time `json:"commitTime"` } -func (n NatsPublisher) Publish(subject string, msg []byte) error { +func (n NatsPublisher) Publish(subject string, event Event) error { + msg, err := event.MarshalJSON() + if err != nil { + return fmt.Errorf("marshal err: %w", err) + } return n.conn.Publish(subject, msg) } diff --git a/listener/nats_publisher_easyjson.go b/listener/nats_publisher_easyjson.go index 49562a2e..b02d4e21 100644 --- a/listener/nats_publisher_easyjson.go +++ b/listener/nats_publisher_easyjson.go @@ -36,6 +36,10 @@ func easyjsonAd513449DecodeGithubComIhippikWalListenerListener(in *jlexer.Lexer, continue } switch key { + case "id": + if data := in.UnsafeBytes(); in.Ok() { + in.AddError((out.ID).UnmarshalText(data)) + } case "schema": out.Schema = string(in.String()) case "table": @@ -68,6 +72,10 @@ func easyjsonAd513449DecodeGithubComIhippikWalListenerListener(in *jlexer.Lexer, } in.Delim('}') } + case "commitTime": + if data := in.Raw(); in.Ok() { + in.AddError((out.EventTime).UnmarshalJSON(data)) + } default: in.SkipRecursive() } @@ -83,8 +91,13 @@ func easyjsonAd513449EncodeGithubComIhippikWalListenerListener(out *jwriter.Writ first := true _ = first { - const prefix string = ",\"schema\":" + const prefix string = ",\"id\":" out.RawString(prefix[1:]) + out.RawText((in.ID).MarshalText()) + } + { + const prefix string = ",\"schema\":" + out.RawString(prefix) out.String(string(in.Schema)) } { @@ -124,6 +137,11 @@ func easyjsonAd513449EncodeGithubComIhippikWalListenerListener(out *jwriter.Writ out.RawByte('}') } } + { + const prefix string = ",\"commitTime\":" + out.RawString(prefix) + out.Raw((in.EventTime).MarshalJSON()) + } out.RawByte('}') } diff --git a/listener/nats_publisher_mock.go b/listener/nats_publisher_mock.go index bb91d9b9..fd145d8e 100644 --- a/listener/nats_publisher_mock.go +++ b/listener/nats_publisher_mock.go @@ -6,8 +6,8 @@ type publisherMock struct { mock.Mock } -func (p *publisherMock) Publish(subject string, msg []byte) error { - args := p.Called(subject, msg) +func (p *publisherMock) Publish(subject string, event Event) error { + args := p.Called(subject, event) return args.Error(0) } diff --git a/listener/parser.go b/listener/parser.go new file mode 100644 index 00000000..d8487f53 --- /dev/null +++ b/listener/parser.go @@ -0,0 +1,278 @@ +package listener + +import ( + "bytes" + "encoding/binary" + "fmt" + "time" + + "github.com/sirupsen/logrus" +) + +type BinaryParser struct { + byteOrder binary.ByteOrder + msgType byte + buffer *bytes.Buffer +} + +func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser { + return &BinaryParser{ + byteOrder: byteOrder, + } +} + +func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { + if len(msg) == 0 { + return errEmptyWALMessage + } + p.msgType = msg[0] + p.buffer = bytes.NewBuffer(msg[1:]) + switch p.msgType { + case BeginMsgType: + begin := p.getBeginMsg() + logrus. + WithFields( + logrus.Fields{ + "lsn": begin.LSN, + "xid": begin.XID, + }). + Infoln("receive begin message") + tx.LSN = begin.LSN + tx.BeginTime = &begin.Timestamp + case CommitMsgType: + commit := p.getCommitMsg() + logrus. + WithFields( + logrus.Fields{ + "lsn": commit.LSN, + "transaction_lsn": commit.TransactionLSN, + }). + Infoln("receive commit message") + if tx.LSN > 0 && tx.LSN != commit.LSN { + return fmt.Errorf("commit: %w", errMessageLost) + } + tx.CommitTime = &commit.Timestamp + case OriginMsgType: + logrus.Infoln("receive origin message") + case RelationMsgType: + relation := p.getRelationMsg() + logrus. + WithFields( + logrus.Fields{ + "relation_id": relation.ID, + "replica": relation.Replica, + }). + Infoln("receive relation message") + if tx.LSN == 0 { + return fmt.Errorf("commit: %w", errMessageLost) + } + rd := RelationData{ + Schema: relation.Namespace, + Table: relation.Name, + } + for _, rf := range relation.Columns { + c := Column{ + name: rf.Name, + valueType: int(rf.TypeID), + isKey: rf.Key, + } + rd.Columns = append(rd.Columns, c) + } + tx.RelationStore[relation.ID] = rd + + case TypeMsgType: + logrus.Infoln("type") + case InsertMsgType: + insert := p.getInsertMsg() + logrus. + WithFields( + logrus.Fields{ + "relation_id": insert.RelationID, + }). + Infoln("receive insert message") + action, err := tx.CreateActionData( + insert.RelationID, + insert.Row, + ActionKindInsert, + ) + if err != nil { + return fmt.Errorf("create action data: %w", err) + } + tx.Actions = append(tx.Actions, action) + case UpdateMsgType: + upd := p.getUpdateMsg() + logrus. + WithFields( + logrus.Fields{ + "relation_id": upd.RelationID, + }). + Infoln("receive update message") + action, err := tx.CreateActionData( + upd.RelationID, + upd.Row, + ActionKindUpdate, + ) + if err != nil { + return fmt.Errorf("create action data: %w", err) + } + tx.Actions = append(tx.Actions, action) + case DeleteMsgType: + del := p.getDeleteMsg() + logrus. + WithFields( + logrus.Fields{ + "relation_id": del.RelationID, + }). + Infoln("receive delete message") + action, err := tx.CreateActionData( + del.RelationID, + del.Row, + ActionKindDelete, + ) + if err != nil { + return fmt.Errorf("create action data: %w", err) + } + tx.Actions = append(tx.Actions, action) + default: + return fmt.Errorf("%w : %s", errUnknownMessageType, []byte{p.msgType}) + } + return nil +} + +func (p *BinaryParser) getBeginMsg() Begin { + return Begin{ + LSN: p.readInt64(), + Timestamp: p.readTimestamp(), + XID: p.readInt32(), + } +} + +func (p *BinaryParser) getCommitMsg() Commit { + return Commit{ + Flags: p.readInt8(), + LSN: p.readInt64(), + TransactionLSN: p.readInt64(), + Timestamp: p.readTimestamp(), + } +} + +func (p *BinaryParser) getInsertMsg() Insert { + return Insert{ + RelationID: p.readInt32(), + NewTuple: p.buffer.Next(1)[0] == NewTupleDataType, + Row: p.readTupleData(), + } +} + +func (p *BinaryParser) getDeleteMsg() Delete { + return Delete{ + RelationID: p.readInt32(), + KeyTuple: p.charIsExists('K'), + OldTuple: p.charIsExists('O'), + Row: p.readTupleData(), + } +} + +func (p *BinaryParser) getUpdateMsg() Update { + u := Update{} + u.RelationID = p.readInt32() + u.KeyTuple = p.charIsExists('K') + u.OldTuple = p.charIsExists('O') + if u.KeyTuple || u.OldTuple { + u.OldRow = p.readTupleData() + } + u.OldTuple = p.charIsExists('N') + u.Row = p.readTupleData() + return u +} + +func (p *BinaryParser) getRelationMsg() Relation { + return Relation{ + ID: p.readInt32(), + Namespace: p.readString(), + Name: p.readString(), + Replica: p.readInt8(), + Columns: p.readColumns(), + } +} + +func (p *BinaryParser) readInt32() (val int32) { + r := bytes.NewReader(p.buffer.Next(4)) + _ = binary.Read(r, p.byteOrder, &val) + return +} + +func (p *BinaryParser) readInt64() (val int64) { + r := bytes.NewReader(p.buffer.Next(8)) + _ = binary.Read(r, p.byteOrder, &val) + return +} + +func (p *BinaryParser) readInt8() (val int8) { + r := bytes.NewReader(p.buffer.Next(1)) + _ = binary.Read(r, p.byteOrder, &val) + return +} + +func (p *BinaryParser) readInt16() (val int16) { + r := bytes.NewReader(p.buffer.Next(2)) + _ = binary.Read(r, p.byteOrder, &val) + return +} + +func (p *BinaryParser) readTimestamp() time.Time { + ns := p.readInt64() + return postgresEpoch.Add(time.Duration(ns) * time.Microsecond) +} + +func (p *BinaryParser) readString() (str string) { + stringBytes, _ := p.buffer.ReadBytes(0) + return string(bytes.Trim(stringBytes, "\x00")) +} + +func (p *BinaryParser) readBool() bool { + x := p.buffer.Next(1)[0] + return x != 0 +} + +func (p *BinaryParser) charIsExists(char byte) bool { + if p.buffer.Next(1)[0] == char { + return true + } else { + _ = p.buffer.UnreadByte() + return false + } +} + +func (p *BinaryParser) readColumns() []RelationColumn { + size := int(p.readInt16()) + data := make([]RelationColumn, size) + for i := 0; i < size; i++ { + data[i] = RelationColumn{ + Key: p.readBool(), + Name: p.readString(), + TypeID: p.readInt32(), + ModifierType: p.readInt32(), + } + } + return data +} + +func (p *BinaryParser) readTupleData() []TupleData { + size := int(p.readInt16()) + data := make([]TupleData, size) + for i := 0; i < size; i++ { + sl := p.buffer.Next(1) + switch sl[0] { + case NullDataType: + logrus.Debugln("tupleData: null data type") + case ToastDataType: + logrus.Debugln( + "tupleData: toast data type") + case TextDataType: + vsize := int(p.readInt32()) + data[i] = TupleData{Value: p.buffer.Next(vsize)} + } + } + return data +} diff --git a/listener/parser_mock.go b/listener/parser_mock.go new file mode 100644 index 00000000..e4985aa6 --- /dev/null +++ b/listener/parser_mock.go @@ -0,0 +1,33 @@ +package listener + +import ( + "github.com/stretchr/testify/mock" + "time" +) + +type parserMock struct { + mock.Mock +} + +func (p *parserMock) ParseWalMessage(msg []byte, tx *WalTransaction) error { + args := p.Called(msg, tx) + now := time.Now() + tx.BeginTime = &now + tx.CommitTime = &now + tx.Actions = []ActionData{ + { + Schema: "public", + Table: "users", + Kind: "INSERT", + Columns: []Column{ + { + name: "id", + value: 1, + valueType: 23, + isKey: true, + }, + }, + }, + } + return args.Error(0) +} diff --git a/listener/parser_test.go b/listener/parser_test.go new file mode 100644 index 00000000..535b3d81 --- /dev/null +++ b/listener/parser_test.go @@ -0,0 +1,841 @@ +package listener + +import ( + "bytes" + "encoding/binary" + "reflect" + "testing" + + "github.com/jackc/pgx/pgtype" + "github.com/stretchr/testify/assert" +) + +func TestBinaryParser_readTupleData(t *testing.T) { + type fields struct { + buffer *bytes.Buffer + } + tests := []struct { + name string + fields fields + want []TupleData + }{ + { + name: "success", + fields: fields{ + // 0,1 - 1(int16) BigEndian + // 116 - t(type, text) + // 0,0,0,1 - 1(int32) BigEndian + // 116 - t(value, text) + buffer: bytes.NewBuffer([]byte{ + 0, 1, + 116, + 0, 0, 0, 1, + 116, + }), + }, + want: []TupleData{ + { + Value: []byte{116}, + }, + }, + }, + { + name: "null value", + fields: fields{ + buffer: bytes.NewBuffer([]byte{0, 1, 110, 0, 0, 0, 1, 116}), + }, + want: []TupleData{ + {}, + }, + }, + { + name: "toast value", + fields: fields{ + buffer: bytes.NewBuffer([]byte{0, 1, 117, 0, 0, 0, 1, 116}), + }, + want: []TupleData{ + {}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: tt.fields.buffer, + } + if got := p.readTupleData(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("readTupleData() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_readColumns(t *testing.T) { + type fields struct { + buffer *bytes.Buffer + } + tests := []struct { + name string + fields fields + want []RelationColumn + }{ + { + name: "success", + fields: fields{ + // 0,1 - 1(count rows, int16) + // 1 - 0 (isKey bool,int8) + // 105,100 - id(field name, text) + // 0 - end of string + // 0,0,0,25 - 25(pgtype text, int32) + // 0,0,0,1 - 1 (modifier, int32) + buffer: bytes.NewBuffer([]byte{ + 0, 1, + 1, + 105, 100, 0, + 0, 0, 0, 25, + 0, 0, 0, 1, + }), + }, + want: []RelationColumn{ + { + Key: true, + Name: "id", + TypeID: pgtype.TextOID, + ModifierType: 1, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: tt.fields.buffer, + } + if got := p.readColumns(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("readColumns() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_getRelationMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Relation + }{ + { + name: "get relation", + // 0,0,0,1 = 1 (relation id int32) + // 112, 117, 98, 108, 105, 99 = public (namespace, text) + // 0 = end of string + // 117, 115, 101, 114, 115 = users (table name, text) + // 0 = end of string + // 1 = int8, replica + // 0 = zero columns + fields: fields{ + src: []byte{ + 0, 0, 0, 1, + 112, 117, 98, 108, 105, 99, 0, + 117, 115, 101, 114, 115, 0, + 1, + 0, + }, + }, + want: Relation{ + ID: 1, + Namespace: "public", + Name: "users", + Replica: 1, + Columns: []RelationColumn{}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + if got := p.getRelationMsg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getRelationMsg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_getUpdateMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Update + }{ + { + name: "get message", + fields: fields{ + // 0,0,0,5 = 5 int32, relation id + // 79 = O flag - old tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,5 = 5 int32 size (data bytes) + // 104, 101, 108, 108, 111 = hello + // 78 = N flag - new tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,6 = 6 int32 size (data bytes) + // 104, 101, 108, 108, 111, 50 = hello2 + src: []byte{ + 0, 0, 0, 5, + 79, + 0, 1, + 116, + 0, 0, 0, 5, + 104, 101, 108, 108, 111, + 78, + 0, 1, + 116, + 0, 0, 0, 6, + 104, 101, 108, 108, 111, 50, + }, + }, + want: Update{ + RelationID: 5, + KeyTuple: false, + OldTuple: true, + NewTuple: false, + Row: []TupleData{ + { + Value: []byte{104, 101, 108, 108, 111, 50}, + }, + }, + OldRow: []TupleData{ + { + Value: []byte{104, 101, 108, 108, 111}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + got := p.getUpdateMsg() + assert.Equal(t, got, tt.want) + }) + } +} + +func TestBinaryParser_getDeleteMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Delete + }{ + { + name: "parse delete message", + fields: fields{ + // 0,0,0,5 = 5 int32, relation id + // 79 = O flag - old tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,5 = 5 int32 size (data bytes) + // 105, 100 = id + src: []byte{ + 0, 0, 0, 5, + 79, + 0, 1, + 116, + 0, 0, 0, 5, + 105, 100, + }, + }, + want: Delete{ + RelationID: 5, + KeyTuple: false, + OldTuple: true, + Row: []TupleData{ + { + Value: []byte{105, 100}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + if got := p.getDeleteMsg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getDeleteMsg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_getInsertMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Insert + }{ + { + name: "parse insert message", + fields: fields{ + // 0,0,0,5 = 5 int32, relation id + // 78 = N flag - new tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,6 = 6 int32 size (data bytes) + // 104, 101, 108, 108, 111 = hello + src: []byte{ + 0, 0, 0, 5, + 78, + 0, 1, + 116, + 0, 0, 0, 6, + 104, 101, 108, 108, 111, + }, + }, + want: Insert{ + RelationID: 5, + NewTuple: true, + Row: []TupleData{ + { + Value: []byte{104, 101, 108, 108, 111}, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + if got := p.getInsertMsg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getInsertMsg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_getCommitMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Commit + }{ + { + name: "parse commit message", + fields: fields{ + // 0 int8, flag + // 7 int64, lsn start + // 8 int64, lsn stop + // 0 int64, timestamp (start postgres epoch) + src: []byte{ + 0, + 0, 0, 0, 0, 0, 0, 0, 7, + 0, 0, 0, 0, 0, 0, 0, 8, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + }, + want: Commit{ + Flags: 0, + LSN: 7, + TransactionLSN: 8, + Timestamp: postgresEpoch, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + if got := p.getCommitMsg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getCommitMsg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_getBeginMsg(t *testing.T) { + type fields struct { + src []byte + } + tests := []struct { + name string + fields fields + want Begin + }{ + { + name: "parse begin message", + fields: fields{ + // int64 lsn + // int64 timestamp + // int32 transaction id + src: []byte{ + 0, 0, 0, 0, 0, 0, 0, 7, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 5, + }, + }, + want: Begin{ + LSN: 7, + Timestamp: postgresEpoch, + XID: 5, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + buffer: bytes.NewBuffer(tt.fields.src), + } + if got := p.getBeginMsg(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("getBeginMsg() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestBinaryParser_ParseWalMessage(t *testing.T) { + type args struct { + msg []byte + tx *WalTransaction + } + tests := []struct { + name string + args args + wantErr bool + want *WalTransaction + }{ + { + name: "empty data", + args: args{}, + wantErr: true, + }, + { + name: "begin message", + args: args{ + msg: []byte{ + 66, + 0, 0, 0, 0, 0, 0, 0, 7, + 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 5, + }, + tx: NewWalTransaction(), + }, + want: &WalTransaction{ + LSN: 7, + BeginTime: &postgresEpoch, + RelationStore: make(map[int32]RelationData), + }, + wantErr: false, + }, + { + name: "commit message", + args: args{ + msg: []byte{ + 67, + 0, + 0, 0, 0, 0, 0, 0, 0, 7, + 0, 0, 0, 0, 0, 0, 0, 8, + 0, 0, 0, 0, 0, 0, 0, 0, + }, + tx: &WalTransaction{ + LSN: 7, + BeginTime: &postgresEpoch, + RelationStore: make(map[int32]RelationData), + }, + }, + want: &WalTransaction{ + LSN: 7, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: make(map[int32]RelationData), + }, + wantErr: false, + }, + { + name: "relation message", + args: args{ + // 82 - R + // 3 - int32 relation id + // public + // users + // int8 replica ? + // int16 rows count + // int8 isKey bool + // field name + // filed type pgtype + // modificator? + msg: []byte{ + 82, + 0, 0, 0, 3, + 112, 117, 98, 108, 105, 99, 0, + 117, 115, 101, 114, 115, 0, + 1, + 0, 1, + 1, + 105, 100, 0, + 0, 0, 0, 23, + 0, 0, 0, 1, + }, + tx: &WalTransaction{ + LSN: 3, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: make(map[int32]RelationData), + }, + }, + want: &WalTransaction{ + LSN: 3, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 3: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "insert message", + args: args{ + // 73 - I + // 3 - int32 relation id + // public + // users + // int8 replica ? + // int16 rows count + // int8 isKey bool + // field name + // filed type pgtype + // modificator? + msg: []byte{ + 73, + 0, 0, 0, 2, + 78, + 0, 1, + 116, + 0, 0, 0, 6, + 49, 48, + }, + tx: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 2: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + }, + want: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 2: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + Actions: []ActionData{ + { + Schema: "public", + Table: "users", + Kind: ActionKindInsert, + Columns: []Column{ + { + name: "id", + value: 10, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "parse update message", + args: args{ + // 85 - U + // 0,0,0,5 = 5 int32, relation id + // 79 = O flag - old tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,2 = 2 int32 size (data bytes) + // 55, 55 = 77 + // 78 = N flag - new tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,2 = 2 int32 size (data bytes) + // 56, 48 = 80 + msg: []byte{ + 85, + 0, 0, 0, 5, + 79, + 0, 1, + 116, + 0, 0, 0, 2, + 55, 55, + 78, + 0, 1, + 116, + 0, 0, 0, 2, + 56, 48, + }, + tx: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + }, + want: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + Actions: []ActionData{ + { + Schema: "public", + Table: "users", + Kind: ActionKindUpdate, + Columns: []Column{ + { + name: "id", + value: 80, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "parse delete message", + args: args{ + // 68 - D, + // 0,0,0,5 = 5 int32, relation id + // 79 = O flag - old tuple data + // 0,1 = 1 int16 - count of rows + // 116 = t data type text + // 0,0,0,2 = 2 int32 size (data bytes) + // 55,55 = 77 + msg: []byte{ + 68, + 0, 0, 0, 5, + 79, + 0, 1, + 116, + 0, 0, 0, 2, + 55, 55, + }, + tx: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + }, + want: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + Actions: []ActionData{ + { + Schema: "public", + Table: "users", + Kind: ActionKindDelete, + Columns: []Column{ + { + name: "id", + value: 77, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + wantErr: false, + }, + { + name: "unknown message type", + args: args{ + msg: []byte{ + 11, + 0, 0, 0, 5, + 79, + 0, 1, + 116, + 0, 0, 0, 2, + 55, 55, + }, + tx: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + }, + want: &WalTransaction{ + LSN: 4, + BeginTime: &postgresEpoch, + CommitTime: &postgresEpoch, + RelationStore: map[int32]RelationData{ + 5: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: nil, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &BinaryParser{ + byteOrder: binary.BigEndian, + } + if err := p.ParseWalMessage(tt.args.msg, tt.args.tx); (err != nil) != tt.wantErr { + t.Errorf("ParseWalMessage() error = %v, wantErr %v", err, tt.wantErr) + } + assert.Equal(t, tt.want, tt.args.tx) + }) + } +} diff --git a/listener/protocol.go b/listener/protocol.go new file mode 100644 index 00000000..f3076c47 --- /dev/null +++ b/listener/protocol.go @@ -0,0 +1,125 @@ +package listener + +import ( + "time" +) + +const ( + CommitMsgType byte = 'C' + BeginMsgType byte = 'B' + OriginMsgType byte = 'O' + RelationMsgType byte = 'R' + TypeMsgType byte = 'Y' + InsertMsgType byte = 'I' + UpdateMsgType byte = 'U' + DeleteMsgType byte = 'D' + + NewTupleDataType byte = 'N' + TextDataType byte = 't' + NullDataType byte = 'n' + ToastDataType byte = 'u' +) + +var postgresEpoch = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) + +// Logical Replication Message Formats. +// https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats# +type ( + Begin struct { + // Identifies the message as a begin message. + LSN int64 + // Commit timestamp of the transaction. + Timestamp time.Time + // Xid of the transaction. + XID int32 + } + + Commit struct { + // Flags; currently unused (must be 0). + Flags int8 + // The LSN of the commit. + LSN int64 + // The end LSN of the transaction. + TransactionLSN int64 + // Commit timestamp of the transaction. + Timestamp time.Time + } + + Origin struct { + // The LSN of the commit on the origin server. + LSN int64 + // name of the origin. + Name string + } + + Relation struct { + // ID of the relation. + ID int32 + // Namespace (empty string for pg_catalog). + Namespace string + // Relation name. + Name string + // Replica identity setting for the relation (same as relreplident in pg_class). + Replica int8 + Columns []RelationColumn + } + + Insert struct { + /// ID of the relation corresponding to the ID in the relation message. + RelationID int32 + // Identifies the following TupleData message as a new tuple. + NewTuple bool + // TupleData message part representing the contents of new tuple. + Row []TupleData + } + + Update struct { + /// ID of the relation corresponding to the ID in the relation message. + RelationID int32 + // Identifies the following TupleData submessage as a key. + KeyTuple bool + // Identifies the following TupleData message as a old tuple. + OldTuple bool + // Identifies the following TupleData message as a new tuple. + NewTuple bool + // TupleData message part representing the contents of new tuple. + Row []TupleData + // TupleData message part representing the contents of the old tuple or primary key. + //Only present if the previous 'O' or 'K' part is present. + OldRow []TupleData + } + + Delete struct { + /// ID of the relation corresponding to the ID in the relation message. + RelationID int32 + // Identifies the following TupleData submessage as a key. + KeyTuple bool + // Identifies the following TupleData message as a old tuple. + OldTuple bool + // TupleData message part representing the contents of new tuple. + Row []TupleData + } +) +type DataType struct { + // ID of the data type. + ID int32 + // Namespace (empty string for pg_catalog). + Namespace string + // name of the data type. + Name string +} + +type RelationColumn struct { + // Flags for the column which marks the column as part of the key. + Key bool + // name of the column. + Name string + // ID of the column's data type. + TypeID int32 + // valueType modifier of the column (atttypmod). + ModifierType int32 +} + +type TupleData struct { + Value []byte +} diff --git a/listener/wal_event.go b/listener/wal_event.go deleted file mode 100644 index 132ca82c..00000000 --- a/listener/wal_event.go +++ /dev/null @@ -1,98 +0,0 @@ -package listener - -import ( - "errors" - - "github.com/sirupsen/logrus" -) - -//go:generate easyjson wal_event.go - -// Constant with kind fo wal message. -const ActionDelete = "delete" - -// Error message. -const NotValidMessage = "not valid WAL message" - -// WalEvent incoming message structure. -//easyjson:json -type WalEvent struct { - NextLSN string `json:"nextlsn"` - Change []ChangeItem `json:"change"` -} - -type ChangeItem struct { - Kind string `json:"kind"` - Schema string `json:"schema"` - Table string `json:"table"` - ColumnNames []string `json:"columnnames"` - ColumnTypes []string `json:"columntypes"` - ColumnValues []interface{} `json:"columnvalues"` - OldKeys struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - } `json:"oldkeys"` -} - -// Validate simple message checking for integrity. -func (w *WalEvent) Validate() error { - for _, val := range w.Change { - if len(val.ColumnValues) != len(val.ColumnNames) { - return errors.New(NotValidMessage) - } - } - return nil -} - -// CreateEventsWithFilter filter wal message by table, action and create events for each value. -func (w *WalEvent) CreateEventsWithFilter(tableMap map[string][]string) []Event { - var events []Event - - for _, item := range w.Change { - data := make(map[string]interface{}) - switch item.Kind { - case ActionDelete: - for i, val := range item.OldKeys.KeyNames { - data[val] = item.OldKeys.KeyValues[i] - } - default: - for i, val := range item.ColumnNames { - data[val] = item.ColumnValues[i] - } - - } - - event := Event{ - Schema: item.Schema, - Table: item.Table, - Action: item.Kind, - Data: data, - } - - actions, validTable := tableMap[item.Table] - validAction := inArray(actions, item.Kind) - if validTable && validAction { - events = append(events, event) - } else { - logrus.WithFields( - logrus.Fields{ - "schema": item.Schema, - "table": item.Table, - "action": item.Kind, - }). - Infoln("wal message skip by filter") - } - } - return events -} - -// inArray checks whether the value is in an array -func inArray(arr []string, value string) bool { - for _, v := range arr { - if v == value { - return true - } - } - return false -} diff --git a/listener/wal_event_easyjson.go b/listener/wal_event_easyjson.go deleted file mode 100644 index b44488b9..00000000 --- a/listener/wal_event_easyjson.go +++ /dev/null @@ -1,488 +0,0 @@ -// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. - -package listener - -import ( - json "encoding/json" - easyjson "github.com/mailru/easyjson" - jlexer "github.com/mailru/easyjson/jlexer" - jwriter "github.com/mailru/easyjson/jwriter" -) - -// suppress unused package warning -var ( - _ *json.RawMessage - _ *jlexer.Lexer - _ *jwriter.Writer - _ easyjson.Marshaler -) - -func easyjson39bd694bDecodeGithubComIhippikWalListenerListener(in *jlexer.Lexer, out *WalEvent) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "nextlsn": - out.NextLSN = string(in.String()) - case "change": - if in.IsNull() { - in.Skip() - out.Change = nil - } else { - in.Delim('[') - if out.Change == nil { - if !in.IsDelim(']') { - out.Change = make([]ChangeItem, 0, 1) - } else { - out.Change = []ChangeItem{} - } - } else { - out.Change = (out.Change)[:0] - } - for !in.IsDelim(']') { - var v1 ChangeItem - easyjson39bd694bDecodeGithubComIhippikWalListenerListener1(in, &v1) - out.Change = append(out.Change, v1) - in.WantComma() - } - in.Delim(']') - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson39bd694bEncodeGithubComIhippikWalListenerListener(out *jwriter.Writer, in WalEvent) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"nextlsn\":" - out.RawString(prefix[1:]) - out.String(string(in.NextLSN)) - } - { - const prefix string = ",\"change\":" - out.RawString(prefix) - if in.Change == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v2, v3 := range in.Change { - if v2 > 0 { - out.RawByte(',') - } - easyjson39bd694bEncodeGithubComIhippikWalListenerListener1(out, v3) - } - out.RawByte(']') - } - } - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v WalEvent) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjson39bd694bEncodeGithubComIhippikWalListenerListener(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v WalEvent) MarshalEasyJSON(w *jwriter.Writer) { - easyjson39bd694bEncodeGithubComIhippikWalListenerListener(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *WalEvent) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjson39bd694bDecodeGithubComIhippikWalListenerListener(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *WalEvent) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjson39bd694bDecodeGithubComIhippikWalListenerListener(l, v) -} -func easyjson39bd694bDecodeGithubComIhippikWalListenerListener1(in *jlexer.Lexer, out *ChangeItem) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "kind": - out.Kind = string(in.String()) - case "schema": - out.Schema = string(in.String()) - case "table": - out.Table = string(in.String()) - case "columnnames": - if in.IsNull() { - in.Skip() - out.ColumnNames = nil - } else { - in.Delim('[') - if out.ColumnNames == nil { - if !in.IsDelim(']') { - out.ColumnNames = make([]string, 0, 4) - } else { - out.ColumnNames = []string{} - } - } else { - out.ColumnNames = (out.ColumnNames)[:0] - } - for !in.IsDelim(']') { - var v4 string - v4 = string(in.String()) - out.ColumnNames = append(out.ColumnNames, v4) - in.WantComma() - } - in.Delim(']') - } - case "columntypes": - if in.IsNull() { - in.Skip() - out.ColumnTypes = nil - } else { - in.Delim('[') - if out.ColumnTypes == nil { - if !in.IsDelim(']') { - out.ColumnTypes = make([]string, 0, 4) - } else { - out.ColumnTypes = []string{} - } - } else { - out.ColumnTypes = (out.ColumnTypes)[:0] - } - for !in.IsDelim(']') { - var v5 string - v5 = string(in.String()) - out.ColumnTypes = append(out.ColumnTypes, v5) - in.WantComma() - } - in.Delim(']') - } - case "columnvalues": - if in.IsNull() { - in.Skip() - out.ColumnValues = nil - } else { - in.Delim('[') - if out.ColumnValues == nil { - if !in.IsDelim(']') { - out.ColumnValues = make([]interface{}, 0, 4) - } else { - out.ColumnValues = []interface{}{} - } - } else { - out.ColumnValues = (out.ColumnValues)[:0] - } - for !in.IsDelim(']') { - var v6 interface{} - if m, ok := v6.(easyjson.Unmarshaler); ok { - m.UnmarshalEasyJSON(in) - } else if m, ok := v6.(json.Unmarshaler); ok { - _ = m.UnmarshalJSON(in.Raw()) - } else { - v6 = in.Interface() - } - out.ColumnValues = append(out.ColumnValues, v6) - in.WantComma() - } - in.Delim(']') - } - case "oldkeys": - easyjson39bd694bDecode(in, &out.OldKeys) - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson39bd694bEncodeGithubComIhippikWalListenerListener1(out *jwriter.Writer, in ChangeItem) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"kind\":" - out.RawString(prefix[1:]) - out.String(string(in.Kind)) - } - { - const prefix string = ",\"schema\":" - out.RawString(prefix) - out.String(string(in.Schema)) - } - { - const prefix string = ",\"table\":" - out.RawString(prefix) - out.String(string(in.Table)) - } - { - const prefix string = ",\"columnnames\":" - out.RawString(prefix) - if in.ColumnNames == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v7, v8 := range in.ColumnNames { - if v7 > 0 { - out.RawByte(',') - } - out.String(string(v8)) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"columntypes\":" - out.RawString(prefix) - if in.ColumnTypes == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v9, v10 := range in.ColumnTypes { - if v9 > 0 { - out.RawByte(',') - } - out.String(string(v10)) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"columnvalues\":" - out.RawString(prefix) - if in.ColumnValues == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v11, v12 := range in.ColumnValues { - if v11 > 0 { - out.RawByte(',') - } - if m, ok := v12.(easyjson.Marshaler); ok { - m.MarshalEasyJSON(out) - } else if m, ok := v12.(json.Marshaler); ok { - out.Raw(m.MarshalJSON()) - } else { - out.Raw(json.Marshal(v12)) - } - } - out.RawByte(']') - } - } - { - const prefix string = ",\"oldkeys\":" - out.RawString(prefix) - easyjson39bd694bEncode(out, in.OldKeys) - } - out.RawByte('}') -} -func easyjson39bd694bDecode(in *jlexer.Lexer, out *struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` -}) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "keynames": - if in.IsNull() { - in.Skip() - out.KeyNames = nil - } else { - in.Delim('[') - if out.KeyNames == nil { - if !in.IsDelim(']') { - out.KeyNames = make([]string, 0, 4) - } else { - out.KeyNames = []string{} - } - } else { - out.KeyNames = (out.KeyNames)[:0] - } - for !in.IsDelim(']') { - var v13 string - v13 = string(in.String()) - out.KeyNames = append(out.KeyNames, v13) - in.WantComma() - } - in.Delim(']') - } - case "keytypes": - if in.IsNull() { - in.Skip() - out.KeyTypes = nil - } else { - in.Delim('[') - if out.KeyTypes == nil { - if !in.IsDelim(']') { - out.KeyTypes = make([]string, 0, 4) - } else { - out.KeyTypes = []string{} - } - } else { - out.KeyTypes = (out.KeyTypes)[:0] - } - for !in.IsDelim(']') { - var v14 string - v14 = string(in.String()) - out.KeyTypes = append(out.KeyTypes, v14) - in.WantComma() - } - in.Delim(']') - } - case "keyvalues": - if in.IsNull() { - in.Skip() - out.KeyValues = nil - } else { - in.Delim('[') - if out.KeyValues == nil { - if !in.IsDelim(']') { - out.KeyValues = make([]interface{}, 0, 4) - } else { - out.KeyValues = []interface{}{} - } - } else { - out.KeyValues = (out.KeyValues)[:0] - } - for !in.IsDelim(']') { - var v15 interface{} - if m, ok := v15.(easyjson.Unmarshaler); ok { - m.UnmarshalEasyJSON(in) - } else if m, ok := v15.(json.Unmarshaler); ok { - _ = m.UnmarshalJSON(in.Raw()) - } else { - v15 = in.Interface() - } - out.KeyValues = append(out.KeyValues, v15) - in.WantComma() - } - in.Delim(']') - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjson39bd694bEncode(out *jwriter.Writer, in struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` -}) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"keynames\":" - out.RawString(prefix[1:]) - if in.KeyNames == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v16, v17 := range in.KeyNames { - if v16 > 0 { - out.RawByte(',') - } - out.String(string(v17)) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"keytypes\":" - out.RawString(prefix) - if in.KeyTypes == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v18, v19 := range in.KeyTypes { - if v18 > 0 { - out.RawByte(',') - } - out.String(string(v19)) - } - out.RawByte(']') - } - } - { - const prefix string = ",\"keyvalues\":" - out.RawString(prefix) - if in.KeyValues == nil && (out.Flags&jwriter.NilSliceAsEmpty) == 0 { - out.RawString("null") - } else { - out.RawByte('[') - for v20, v21 := range in.KeyValues { - if v20 > 0 { - out.RawByte(',') - } - if m, ok := v21.(easyjson.Marshaler); ok { - m.MarshalEasyJSON(out) - } else if m, ok := v21.(json.Marshaler); ok { - out.Raw(m.MarshalJSON()) - } else { - out.Raw(json.Marshal(v21)) - } - } - out.RawByte(']') - } - } - out.RawByte('}') -} diff --git a/listener/wal_event_test.go b/listener/wal_event_test.go deleted file mode 100644 index f54af5d0..00000000 --- a/listener/wal_event_test.go +++ /dev/null @@ -1,362 +0,0 @@ -package listener - -import ( - "errors" - "github.com/stretchr/testify/assert" - "reflect" - "testing" -) - -func TestWalEvent_Validate(t *testing.T) { - type fields struct { - NextLSN string - Change []ChangeItem - } - tests := []struct { - name string - fields fields - wantErr error - }{ - { - name: "success", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"first", "second"}, - }, - }, - }, - wantErr: nil, - }, - { - name: "invalid count", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - ColumnNames: []string{"first"}, - ColumnValues: []interface{}{"first", "second"}, - }, - }, - }, - wantErr: errors.New("not valid WAL message"), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - w := &WalEvent{ - NextLSN: tt.fields.NextLSN, - Change: tt.fields.Change, - } - err := w.Validate() - if err == nil { - assert.Nil(t, tt.wantErr) - } else { - assert.EqualError(t, tt.wantErr, err.Error()) - } - }) - } -} - -func TestWalEvent_CreateEventsWithFilter(t *testing.T) { - type fields struct { - NextLSN string - Change []ChangeItem - } - type args struct { - tableMap map[string][]string - } - tests := []struct { - name string - fields fields - args args - want []Event - }{ - { - name: "success", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "insert", - Schema: "public", - Table: "users", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{"users": {"insert", "update"}}, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "insert", - Data: map[string]interface{}{"first": "v1", "second": "v2"}, - }, - }, - }, - { - name: "success with delete", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "delete", - Schema: "public", - Table: "users", - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: []string{"id"}, - KeyTypes: nil, - KeyValues: []interface{}{1}, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{"users": {"insert", "update", "delete"}}, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "delete", - Data: map[string]interface{}{"id": 1}, - }, - }, - }, - { - name: "success", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "insert", - Schema: "public", - Table: "users", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{"users": {"insert", "update"}}, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "insert", - Data: map[string]interface{}{"first": "v1", "second": "v2"}, - }, - }, - }, - { - name: "success with two event", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "insert", - Schema: "public", - Table: "users", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - { - Kind: "update", - Schema: "public", - Table: "pets", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"k1", "k2"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{ - "users": {"insert", "update"}, - "pets": {"insert", "update"}, - }, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "insert", - Data: map[string]interface{}{"first": "v1", "second": "v2"}, - }, - { - Schema: "public", - Table: "pets", - Action: "update", - Data: map[string]interface{}{"k1": "v1", "k2": "v2"}, - }, - }, - }, - { - name: "filtered unknown table", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "insert", - Schema: "public", - Table: "users", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - { - Kind: "insert", - Schema: "public", - Table: "pets", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{"users": {"insert", "update"}}, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "insert", - Data: map[string]interface{}{"first": "v1", "second": "v2"}, - }, - }, - }, - { - name: "success filtered unknown action", - fields: fields{ - NextLSN: "12345", - Change: []ChangeItem{ - { - Kind: "insert", - Schema: "public", - Table: "users", - ColumnTypes: []string{"string", "string"}, - ColumnNames: []string{"first", "second"}, - ColumnValues: []interface{}{"v1", "v2"}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - { - Kind: "delete", - Schema: "public", - Table: "users", - ColumnTypes: []string{"integer"}, - ColumnNames: []string{"id"}, - ColumnValues: []interface{}{1}, - OldKeys: struct { - KeyNames []string `json:"keynames"` - KeyTypes []string `json:"keytypes"` - KeyValues []interface{} `json:"keyvalues"` - }{ - KeyNames: nil, - KeyTypes: nil, - KeyValues: nil, - }, - }, - }, - }, - args: args{ - tableMap: map[string][]string{"users": {"insert", "update"}}, - }, - want: []Event{ - { - Schema: "public", - Table: "users", - Action: "insert", - Data: map[string]interface{}{"first": "v1", "second": "v2"}, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - w := &WalEvent{ - NextLSN: tt.fields.NextLSN, - Change: tt.fields.Change, - } - if got := w.CreateEventsWithFilter(tt.args.tableMap); !reflect.DeepEqual(got, tt.want) { - t.Errorf("CreateEventsWithFilter() = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go new file mode 100644 index 00000000..bf1c6a14 --- /dev/null +++ b/listener/wal_transaction.go @@ -0,0 +1,160 @@ +package listener + +import ( + "errors" + "strconv" + "strings" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/pgtype" + "github.com/sirupsen/logrus" +) + +type ActionKind string + +// kind of wall message. +const ( + ActionKindInsert ActionKind = "INSERT" + ActionKindUpdate ActionKind = "UPDATE" + ActionKindDelete ActionKind = "DELETE" +) + +type WalTransaction struct { + LSN int64 + BeginTime *time.Time + CommitTime *time.Time + RelationStore map[int32]RelationData + Actions []ActionData +} + +func NewWalTransaction() *WalTransaction { + return &WalTransaction{ + RelationStore: make(map[int32]RelationData), + } +} + +func (k ActionKind) string() string { + return string(k) +} + +type RelationData struct { + Schema string + Table string + Columns []Column +} + +type ActionData struct { + Schema string + Table string + Kind ActionKind + Columns []Column +} + +type Column struct { + name string + value interface{} + valueType int + isKey bool +} + +func (c *Column) AssertValue(src []byte) { + var val interface{} + strSrc := string(src) + switch c.valueType { + case pgtype.BoolOID: + val, _ = strconv.ParseBool(strSrc) + case pgtype.Int4OID: + val, _ = strconv.Atoi(strSrc) + case pgtype.TextOID: + val = strSrc + case pgtype.TimestampOID, pgtype.TimestamptzOID: + val = strSrc + default: + logrus.WithField("pgtype", c.valueType). + Warnln("unknown oid type") + val = strSrc + } + c.value = val +} + +func (w *WalTransaction) Clear() { + w.CommitTime = nil + w.BeginTime = nil + w.Actions = nil +} + +func (w WalTransaction) CreateActionData( + relationID int32, + rows []TupleData, + kind ActionKind, +) (a ActionData, err error) { + rel, ok := w.RelationStore[relationID] + if !ok { + return a, errors.New("relation not found") + } + a = ActionData{ + Schema: rel.Schema, + Table: rel.Table, + Kind: kind, + } + var columns []Column + for num, row := range rows { + column := Column{ + name: rel.Columns[num].name, + valueType: rel.Columns[num].valueType, + isKey: rel.Columns[num].isKey, + } + column.AssertValue(row.Value) + + columns = append(columns, column) + } + a.Columns = columns + return a, nil +} + +// CreateEventsWithFilter filter wal message by table, action and create events for each value. +func (w *WalTransaction) CreateEventsWithFilter( + tableMap map[string][]string) []Event { + var events []Event + + for _, item := range w.Actions { + data := make(map[string]interface{}) + for _, val := range item.Columns { + data[val.name] = val.value + } + event := Event{ + ID: uuid.New(), + Schema: item.Schema, + Table: item.Table, + Action: item.Kind.string(), + Data: data, + EventTime: *w.CommitTime, + } + + actions, validTable := tableMap[item.Table] + validAction := inArray(actions, item.Kind.string()) + if validTable && validAction { + events = append(events, event) + } else { + logrus.WithFields( + logrus.Fields{ + "schema": item.Schema, + "table": item.Table, + "action": item.Kind, + }). + Infoln("wal message skip by filter") + } + } + return events +} + +// inArray checks whether the value is in an array +func inArray(arr []string, value string) bool { + for _, v := range arr { + if strings.ToLower(v) == strings.ToLower(value) { + return true + } + } + return false +} diff --git a/listener/wal_transaction_test.go b/listener/wal_transaction_test.go new file mode 100644 index 00000000..abf99c6f --- /dev/null +++ b/listener/wal_transaction_test.go @@ -0,0 +1,225 @@ +package listener + +import ( + "github.com/jackc/pgx/pgtype" + "github.com/magiconair/properties/assert" + "reflect" + "testing" + "time" +) + +func TestWalTransaction_CreateActionData(t *testing.T) { + type fields struct { + LSN int64 + BeginTime *time.Time + CommitTime *time.Time + RelationStore map[int32]RelationData + Actions []ActionData + } + type args struct { + relationID int32 + rows []TupleData + kind ActionKind + } + now := time.Now() + tests := []struct { + name string + fields fields + args args + wantA ActionData + wantErr bool + }{ + { + name: "success", + fields: fields{ + LSN: 10, + BeginTime: &now, + CommitTime: &now, + RelationStore: map[int32]RelationData{ + 10: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: 5, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + Actions: nil, + }, + args: args{ + relationID: 10, + rows: []TupleData{ + { + Value: []byte{49, 49}, + }, + }, + kind: ActionKindUpdate, + }, + wantA: ActionData{ + Schema: "public", + Table: "users", + Kind: ActionKindUpdate, + Columns: []Column{ + { + name: "id", + value: 11, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + wantErr: false, + }, + { + name: "relation not exists", + fields: fields{ + LSN: 10, + BeginTime: &now, + CommitTime: &now, + RelationStore: map[int32]RelationData{ + 11: { + Schema: "public", + Table: "users", + Columns: []Column{ + { + name: "id", + value: 5, + valueType: pgtype.Int4OID, + isKey: true, + }, + }, + }, + }, + Actions: nil, + }, + args: args{ + relationID: 10, + rows: nil, + kind: ActionKindUpdate, + }, + wantA: ActionData{}, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + w := WalTransaction{ + LSN: tt.fields.LSN, + BeginTime: tt.fields.BeginTime, + CommitTime: tt.fields.CommitTime, + RelationStore: tt.fields.RelationStore, + Actions: tt.fields.Actions, + } + gotA, err := w.CreateActionData(tt.args.relationID, tt.args.rows, tt.args.kind) + if (err != nil) != tt.wantErr { + t.Errorf("CreateActionData() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotA, tt.wantA) { + t.Errorf("CreateActionData() gotA = %v, want %v", gotA, tt.wantA) + } + }) + } +} + +func TestColumn_AssertValue(t *testing.T) { + type fields struct { + name string + valueType int + isKey bool + } + type args struct { + src []byte + } + tests := []struct { + name string + fields fields + args args + want *Column + }{ + { + name: "bool", + fields: fields{ + name: "isBool", + valueType: pgtype.BoolOID, + isKey: false, + }, + args: args{ + src: []byte{116}, + }, + want: &Column{ + name: "isBool", + value: true, + valueType: 16, + isKey: false, + }, + }, + { + name: "text", + fields: fields{ + name: "name", + valueType: pgtype.TextOID, + isKey: false, + }, + args: args{ + src: []byte{104, 101, 108, 108, 111}, + }, + want: &Column{ + name: "name", + value: "hello", + valueType: 25, + isKey: false, + }, + }, + { + name: "timestamp", + fields: fields{ + name: "created", + valueType: pgtype.TimestampOID, + isKey: false, + }, + args: args{ + src: []byte{50, 48, 50, 48, 45, 49, 48, 45, 49, 50}, + }, + want: &Column{ + name: "created", + value: "2020-10-12", + valueType: 1114, + isKey: false, + }, + }, + { + name: "unknown", + fields: fields{ + name: "created", + valueType: pgtype.Float4ArrayOID, + isKey: false, + }, + args: args{ + src: []byte{50, 48, 50, 48, 45, 49, 48, 45, 49, 50}, + }, + want: &Column{ + name: "created", + value: "2020-10-12", + valueType: 1021, + isKey: false, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Column{ + name: tt.fields.name, + valueType: tt.fields.valueType, + isKey: tt.fields.isKey, + } + c.AssertValue(tt.args.src) + assert.Equal(t, c, tt.want) + }) + } +}