Skip to content

Commit

Permalink
Create db publication
Browse files Browse the repository at this point in the history
  • Loading branch information
ihippik committed Jan 8, 2021
1 parent d7a8276 commit a6b1fd4
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 30 deletions.
16 changes: 9 additions & 7 deletions config.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
listener:
slotName: myslot_2
ackTimeout: 10s
slotName: myslot_1
refreshConnection: 30s
heartbeatInterval: 10s
logger:
caller: false
level: info
humanReadable: true
database:
host: 81.90.180.219
host: localhost
port: 5432
name: scum_db
user: pglogrepl
name: test_db
user: back
debug: false
password: secret
password: pass
filter:
tables:
table_name:
users:
- insert
- update
city:
- insert
- update
nats:
Expand Down
16 changes: 10 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@ go 1.13
require (
bou.ke/monkey v1.0.2
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a
github.com/davecgh/go-spew v1.1.1
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/google/uuid v1.1.1
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/jackc/pgconn v1.2.1
github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4
github.com/jackc/pgproto3/v2 v2.0.0
github.com/jackc/pgconn v1.2.1 // indirect
github.com/jackc/pgx v3.6.1+incompatible
github.com/kyleconroy/pgoutput v0.1.0
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.1
github.com/mailru/easyjson v0.7.0
github.com/nats-io/nats-server/v2 v2.1.2 // indirect
github.com/nats-io/nats-streaming-server v0.16.2 // indirect
github.com/nats-io/stan.go v0.6.0
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pkg/errors v0.9.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.6.1
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/urfave/cli/v2 v2.1.1
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 // indirect
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e // indirect
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
gopkg.in/yaml.v2 v2.2.8 // indirect
)
30 changes: 21 additions & 9 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -94,13 +95,10 @@ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80s
github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA=
github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE=
github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s=
github.com/jackc/pgconn v1.1.0/go.mod h1:GgY/Lbj1VonNaVdNUHs9AwWom3yP2eymFQ1C8z9r/Lk=
github.com/jackc/pgconn v1.2.1 h1:+73KD6pbtv6Dbs6/rqlSRUa8XffPlW6YBd1hyFLpwuA=
github.com/jackc/pgconn v1.2.1/go.mod h1:GgY/Lbj1VonNaVdNUHs9AwWom3yP2eymFQ1C8z9r/Lk=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4 h1:UTV6LMahlWdHowugk7JMoAcnVqgS2/ZW5mpAgn3fk4Q=
github.com/jackc/pglogrepl v0.0.0-20200108151620-37117db5ead4/go.mod h1:3Nwqi/QTsjaYxFh6Enw980AgQ16IypmAnsXyDoWHtY4=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
Expand All @@ -115,12 +113,13 @@ github.com/jackc/pgproto3/v2 v2.0.0 h1:FApgMJ/GtaXfI0s8Lvd0kaLaRwMOhs4VH92pwkwQQ
github.com/jackc/pgproto3/v2 v2.0.0/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59 h1:xOamcCJ9MFJTxR5bvw3ZXmiP8evQMohdt2VJ57C0W8Q=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
github.com/jackc/pgx v3.2.0+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx v3.6.1+incompatible h1:qKBkTh0LCxZL+wCqXDqPpqlqbSUWFKtyirpYC/1nXqI=
github.com/jackc/pgx v3.6.1+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y=
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186 h1:ZQM8qLT/E/CGD6XX0E6q9FAwxJYmWpJufzmLMaFuzgQ=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
Expand All @@ -129,6 +128,7 @@ github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2 h1:DB17ag19krx9CFsz4o3enTrPXyIXCl+2iCXH/aMAp9s=
Expand All @@ -140,8 +140,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kyleconroy/pgoutput v0.1.0 h1:nY5Bq4A8CqAdqW6LawaNOCWleziBzbd5ouoBtoNDElo=
github.com/kyleconroy/pgoutput v0.1.0/go.mod h1:xj1JLOlXvWLJ1CSJKlrKoWMxkDt9unagoStd2zrZ8IA=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
Expand Down Expand Up @@ -178,6 +178,8 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS
github.com/nats-io/stan.go v0.5.0/go.mod h1:dYqB+vMN3C2F9pT1FRQpg9eHbjPj6mP0yYuyBNuXHZE=
github.com/nats-io/stan.go v0.6.0 h1:26IJPeykh88d8KVLT4jJCIxCyUBOC5/IQup8oWD/QYY=
github.com/nats-io/stan.go v0.6.0/go.mod h1:eIcD5bi3pqbHT/xIIvXMwvzXYElgouBvaVRftaE+eac=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
Expand Down Expand Up @@ -209,6 +211,7 @@ github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OK
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
Expand All @@ -232,6 +235,8 @@ github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.6.1 h1:VPZzIkznI1YhVMRi6vNFLHSwhnhReBfgTxIPccpfdZk=
github.com/spf13/viper v1.6.1/go.mod h1:t3iDnF5Jlj76alVNuyFBk5oUMCvsrkbvZK0WQdfDi5k=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -243,6 +248,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand All @@ -268,7 +275,6 @@ golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaE
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191117063200-497ca9f6d64f/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17 h1:nVJ3guKA9qdkEQ3TUdXI9QSINo2CUPM/cySEvw2w8I0=
golang.org/x/crypto v0.0.0-20200109152110-61a87790db17/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down Expand Up @@ -303,6 +309,8 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456 h1:ng0gs1AKnRRuEMZoTLLlbOd+C17zUDepwGQBb/n+JVg=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e h1:N7DeIrjYszNmSW409R3frPPwglRwMkXSBzwVbkOjLLA=
golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
Expand All @@ -318,8 +326,8 @@ golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I=
google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0=
Expand All @@ -330,6 +338,8 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f h1:BLraFXnmrev5lT+xlilqcH8XK9/i0At2xKjWk4p6zsU=
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
Expand All @@ -339,4 +349,6 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
14 changes: 11 additions & 3 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type replication interface {
}

type repository interface {
CreatePublication(name string) error
GetSlotLSN(slotName string) (string, error)
IsAlive() bool
Close() error
Expand Down Expand Up @@ -104,7 +105,11 @@ func (l *Listener) Process() error {
logger := logrus.WithField("slot_name", l.slotName)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logrus.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage)
logger.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage)

if err := l.repository.CreatePublication(publicationName); err != nil {
logger.WithError(err).Warnln("skip create publication")
}

slotIsExists, err := l.slotIsExists()
if err != nil {
Expand Down Expand Up @@ -191,12 +196,15 @@ func publicationNames(publication string) string {
return fmt.Sprintf(`publication_names '%s'`, publication)
}

const protoVersion = "proto_version '1'"
const (
protoVersion = "proto_version '1'"
publicationName = "wal-listener"
)

// Stream receive event from PostgreSQL.
// Accept message, apply filter and publish it in NATS server.
func (l *Listener) Stream(ctx context.Context) {
err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames("sport"))
err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames(publicationName))
if err != nil {
l.errChannel <- newListenerError("StartReplication()", err)
return
Expand Down
10 changes: 5 additions & 5 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestListener_Stream(t *testing.T) {
uint64(0),
int64(-1),
protoVersion,
"publication_names 'sport'",
"publication_names 'wal-listener'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestListener_Stream(t *testing.T) {
uint64(0),
int64(-1),
protoVersion,
"publication_names 'sport'",
"publication_names 'wal-listener'",
)
},
fields: fields{
Expand Down Expand Up @@ -559,7 +559,7 @@ func TestListener_Stream(t *testing.T) {
uint64(0),
int64(-1),
protoVersion,
"publication_names 'sport'",
"publication_names 'wal-listener'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand Down Expand Up @@ -620,7 +620,7 @@ func TestListener_Stream(t *testing.T) {
uint64(0),
int64(-1),
protoVersion,
"publication_names 'sport'",
"publication_names 'wal-listener'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand Down Expand Up @@ -692,7 +692,7 @@ func TestListener_Stream(t *testing.T) {
uint64(0),
int64(-1),
protoVersion,
"publication_names 'sport'",
"publication_names 'wal-listener'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand Down
6 changes: 6 additions & 0 deletions listener/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error) {
return restartLSNStr, err
}

// CreatePublication create publication fo all.
func (r RepositoryImpl) CreatePublication(name string) error {
_, err := r.conn.Exec(`CREATE PUBLICATION "` + name + `" FOR ALL TABLES`)
return err
}

// IsAlive check database connection problems.
func (r RepositoryImpl) IsAlive() bool {
return r.conn.IsAlive()
Expand Down
4 changes: 4 additions & 0 deletions listener/repository_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,7 @@ func (r *repositoryMock) IsAlive() bool {
func (r *repositoryMock) Close() error {
return r.Called().Error(0)
}

func (r *repositoryMock) CreatePublication(name string) (err error) {
panic("implement me")
}

0 comments on commit a6b1fd4

Please sign in to comment.