From 1e4004fd8f999923885daf9b0c1b93fbd4d0117f Mon Sep 17 00:00:00 2001 From: ihippik Date: Fri, 5 Aug 2022 22:43:45 +0400 Subject: [PATCH] Refactoring Remove easyjson. --- Dockerfile | 3 +- cmd/wal-listener/init.go | 7 +- cmd/wal-listener/main.go | 6 +- go.mod | 5 +- go.sum | 6 +- listener/errors.go | 10 -- listener/listener.go | 88 ++++++-------- listener/listener_test.go | 15 +-- listener/nats_publisher.go | 12 +- listener/nats_publisher_easyjson.go | 171 ---------------------------- listener/nats_publisher_test.go | 4 +- listener/parser.go | 4 +- listener/parser_mock.go | 2 + listener/repository.go | 15 ++- listener/wal_transaction.go | 31 ++--- 15 files changed, 86 insertions(+), 293 deletions(-) delete mode 100644 listener/nats_publisher_easyjson.go diff --git a/Dockerfile b/Dockerfile index 6d5717eb..aae64bf8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,5 @@ -FROM alpine:3.10 +FROM alpine:3.16 + MAINTAINER Konstantin Makarov RUN adduser -D developer WORKDIR /app diff --git a/cmd/wal-listener/init.go b/cmd/wal-listener/init.go index 14c79226..7ea33f97 100644 --- a/cmd/wal-listener/init.go +++ b/cmd/wal-listener/init.go @@ -4,12 +4,10 @@ import ( "fmt" "github.com/jackc/pgx" - "github.com/pkg/errors" "github.com/sirupsen/logrus" "github.com/spf13/viper" "github.com/ihippik/wal-listener/config" - "github.com/ihippik/wal-listener/listener" ) // logger log levels. @@ -40,6 +38,7 @@ func getConf(path string) (*config.Config, error) { // initLogger init logrus preferences. func initLogger(cfg config.LoggerCfg) { logrus.SetReportCaller(cfg.Caller) + if !cfg.HumanReadable { logrus.SetFormatter(&logrus.JSONFormatter{}) } @@ -77,12 +76,12 @@ func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn pgConn, err := pgx.Connect(pgxConf) if err != nil { - return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection) + return nil, nil, fmt.Errorf("db connection: %w", err) } rConnection, err := pgx.ReplicationConnect(pgxConf) if err != nil { - return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err) + return nil, nil, fmt.Errorf("replication connect: %w", err) } return pgConn, rConnection, nil diff --git a/cmd/wal-listener/main.go b/cmd/wal-listener/main.go index c9ffcf65..171ad987 100644 --- a/cmd/wal-listener/main.go +++ b/cmd/wal-listener/main.go @@ -13,7 +13,7 @@ import ( ) // go build -ldflags "-X main.version=1.0.1" main.go -var version = "0.1.0" +var version = "0.2.0" func main() { cli.VersionFlag = &cli.BoolFlag{ @@ -46,7 +46,7 @@ func main() { initLogger(cfg.Logger) - sc, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address)) + natsConn, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address)) if err != nil { return fmt.Errorf("nats connection: %w", err) } @@ -60,7 +60,7 @@ func main() { cfg, listener.NewRepository(conn), rConn, - listener.NewNatsPublisher(sc), + listener.NewNatsPublisher(natsConn), listener.NewBinaryParser(binary.BigEndian), ) diff --git a/go.mod b/go.mod index cd896dd3..b361b18b 100644 --- a/go.mod +++ b/go.mod @@ -5,12 +5,11 @@ go 1.19 require ( bou.ke/monkey v1.0.2 github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d + github.com/goccy/go-json v0.9.10 github.com/google/uuid v1.3.0 github.com/jackc/pgx v3.6.2+incompatible github.com/magiconair/properties v1.8.6 - github.com/mailru/easyjson v0.7.7 github.com/nats-io/stan.go v0.10.3 - github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 @@ -26,7 +25,6 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect - github.com/josharian/intern v1.0.0 // indirect github.com/lib/pq v1.2.0 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/nats-io/jwt v1.2.2 // indirect @@ -38,6 +36,7 @@ require ( github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.2 // indirect + github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect diff --git a/go.sum b/go.sum index b3cee745..966bc777 100644 --- a/go.sum +++ b/go.sum @@ -78,6 +78,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9 github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/goccy/go-json v0.9.10 h1:hCeNmprSNLB8B8vQKWl6DpuH0t60oEs+TAk9a7CScKc= +github.com/goccy/go-json v0.9.10/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= @@ -164,8 +166,6 @@ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGU github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ= github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o= github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I= -github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= -github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= @@ -181,8 +181,6 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo= github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= -github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= diff --git a/listener/errors.go b/listener/errors.go index 5d9601d0..f6fcfc22 100644 --- a/listener/errors.go +++ b/listener/errors.go @@ -2,16 +2,6 @@ package listener import "errors" -// Constants with error text message -const ( - ErrPostgresConnection = "db connection error" - ErrReplicationConnection = "replication connection error" - ErrPublishEvent = "publish message error" - ErrUnmarshalMsg = "unmarshal wal message error" - ErrAckWalMessage = "acknowledge wal message error" - ErrSendStandbyStatus = "send standby status error" -) - // Variable with connection errors. var ( errReplConnectionIsLost = errors.New("replication connection to postgres is lost") diff --git a/listener/listener.go b/listener/listener.go index 8b6da604..9656b5f8 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "sync" - "syscall" "time" "github.com/jackc/pgx" @@ -19,15 +18,7 @@ import ( const errorBufferSize = 100 // Logical decoding plugin. -const ( - pgOutputPlugin = "pgoutput" -) - -// Service info message. -const ( - StartServiceMessage = "service was started" - StopServiceMessage = "service was stopped" -) +const pgOutputPlugin = "pgoutput" type publisher interface { Publish(string, Event) error @@ -89,14 +80,14 @@ func NewWalListener( // Process is main service entry point. func (l *Listener) Process(ctx context.Context) error { - var serviceErr *serviceErr + var svcErr serviceErr logger := logrus.WithField("slot_name", l.slotName) - ctx, cancelFunc := context.WithCancel(ctx) - defer cancelFunc() + ctx, stop := signal.NotifyContext(ctx, os.Interrupt) + defer stop() - logger.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage) + logger.Infoln("service was started") if err := l.repository.CreatePublication(publicationName); err != nil { logger.WithError(err).Warnln("skip create publication") @@ -119,7 +110,6 @@ func (l *Listener) Process(ctx context.Context) error { } l.setLSN(lsn) - logger.Infoln("create new slot") } else { logger.Infoln("slot already exists, LSN updated") @@ -127,33 +117,29 @@ func (l *Listener) Process(ctx context.Context) error { go l.Stream(ctx) - signalChan := make(chan os.Signal, 1) refresh := time.NewTicker(l.config.Listener.RefreshConnection) - - signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) + defer refresh.Stop() ProcessLoop: for { select { case <-refresh.C: if !l.replicator.IsAlive() { - logrus.Fatalln(errReplConnectionIsLost) + return fmt.Errorf("replicator: %w", errReplConnectionIsLost) } if !l.repository.IsAlive() { - logrus.Fatalln(errConnectionIsLost) + return fmt.Errorf("repository: %w", errConnectionIsLost) } - case err := <-l.errChannel: - if errors.As(err, &serviceErr) { - cancelFunc() - - logrus.Fatalln(err) - } else { - logrus.Errorln(err) + if errors.As(err, svcErr) { + return err } - case <-signalChan: + logrus.WithError(err).Errorln("received error") + case <-ctx.Done(): + logrus.Debugln("context was canceled") + if err := l.Stop(); err != nil { logrus.WithError(err).Errorln("listener stop error") } @@ -169,18 +155,11 @@ ProcessLoop: func (l *Listener) slotIsExists() (bool, error) { restartLSNStr, err := l.repository.GetSlotLSN(l.slotName) if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - logrus. - WithField("slot", l.slotName). - Warningln("restart_lsn for slot not found") - - return false, nil - } - return false, err } if len(restartLSNStr) == 0 { + logrus.WithField("slot_name", l.slotName).Warningln("restart LSN not found") return false, nil } @@ -206,8 +185,13 @@ const ( // Stream receive event from PostgreSQL. // Accept message, apply filter and publish it in NATS server. func (l *Listener) Stream(ctx context.Context) { - err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames(publicationName)) - if err != nil { + if err := l.replicator.StartReplication( + l.slotName, + l.readLSN(), + -1, + protoVersion, + publicationNames(publicationName), + ); err != nil { l.errChannel <- newListenerError("StartReplication()", err) return @@ -217,7 +201,7 @@ func (l *Listener) Stream(ctx context.Context) { tx := NewWalTransaction() for { - if ctx.Err() != nil { + if err := ctx.Err(); err != nil { l.errChannel <- newListenerError("read msg", err) break } @@ -230,12 +214,11 @@ func (l *Listener) Stream(ctx context.Context) { if msg != nil { if msg.WalMessage != nil { - logrus.WithField("wal", msg.WalMessage.WalStart). - Debugln("receive wal message") + logrus.WithField("wal", msg.WalMessage.WalStart).Debugln("receive wal message") if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil { logrus.WithError(err).Errorln("msg parse failed") - l.errChannel <- fmt.Errorf("%v: %w", ErrUnmarshalMsg, err) + l.errChannel <- fmt.Errorf("unmarshal wal message: %w", err) continue } @@ -243,10 +226,10 @@ func (l *Listener) Stream(ctx context.Context) { if tx.CommitTime != nil { natsEvents := tx.CreateEventsWithFilter(l.config.Database.Filter.Tables) for _, event := range natsEvents { - subjectName := event.GetSubjectName(l.config.Nats.TopicPrefix) - if err = l.publisher.Publish(subjectName, event); err != nil { - l.errChannel <- fmt.Errorf("%v: %w", ErrPublishEvent, err) + subjectName := event.SubjectName(l.config.Nats.TopicPrefix) + if err = l.publisher.Publish(subjectName, event); err != nil { + l.errChannel <- fmt.Errorf("publish message: %w", err) continue } @@ -255,7 +238,6 @@ func (l *Listener) Stream(ctx context.Context) { WithField("action", event.Action). WithField("lsn", l.readLSN()). Infoln("event was send") - } tx.Clear() @@ -263,28 +245,26 @@ func (l *Listener) Stream(ctx context.Context) { if msg.WalMessage.WalStart > l.readLSN() { if err = l.AckWalMessage(msg.WalMessage.WalStart); err != nil { - l.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err) - + l.errChannel <- fmt.Errorf("acknowledge wal message: %w", err) continue } logrus.WithField("lsn", l.readLSN()).Debugln("ack wal msg") - } } + if msg.ServerHeartbeat != nil { //FIXME panic if there have been no messages for a long time. logrus.WithFields(logrus.Fields{ "server_wal_end": msg.ServerHeartbeat.ServerWalEnd, "server_time": msg.ServerHeartbeat.ServerTime, - }). - Debugln("received server heartbeat") + }).Debugln("received server heartbeat") if msg.ServerHeartbeat.ReplyRequested == 1 { logrus.Debugln("status requested") if err = l.SendStandbyStatus(); err != nil { - l.errChannel <- fmt.Errorf("%v: %w", ErrSendStandbyStatus, err) + l.errChannel <- fmt.Errorf("send standby status: %w", err) } } } @@ -294,7 +274,6 @@ func (l *Listener) Stream(ctx context.Context) { // Stop is a finalizer function. func (l *Listener) Stop() error { - if err := l.publisher.Close(); err != nil { return fmt.Errorf("publisher close: %w", err) } @@ -307,7 +286,7 @@ func (l *Listener) Stop() error { return fmt.Errorf("replicator close: %w", err) } - logrus.Infoln(StopServiceMessage) + logrus.Infoln("service was stopped") return nil } @@ -328,7 +307,6 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { { if err := l.SendStandbyStatus(); err != nil { logrus.WithError(err).Errorln("failed to send status heartbeat") - continue } diff --git a/listener/listener_test.go b/listener/listener_test.go index 9d6bf586..ab4db797 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -70,17 +70,6 @@ func TestListener_slotIsExists(t *testing.T) { want: false, wantErr: true, }, - { - name: "slot not exists (no rows)", - setup: func() { - setGetSlotLSN("myslot", "", pgx.ErrNoRows) - }, - fields: fields{ - slotName: "myslot", - }, - want: false, - wantErr: false, - }, { name: "repository error", setup: func() { @@ -96,18 +85,22 @@ func TestListener_slotIsExists(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.setup() + w := &Listener{ slotName: tt.fields.slotName, repository: repo, } + got, err := w.slotIsExists() if (err != nil) != tt.wantErr { t.Errorf("slotIsExists() error = %v, wantErr %v", err, tt.wantErr) return } + if got != tt.want { t.Errorf("slotIsExists() got = %v, want %v", got, tt.want) } + repo.AssertExpectations(t) }) } diff --git a/listener/nats_publisher.go b/listener/nats_publisher.go index 2c70bacb..6571abbd 100644 --- a/listener/nats_publisher.go +++ b/listener/nats_publisher.go @@ -4,12 +4,11 @@ import ( "fmt" "time" + "github.com/goccy/go-json" "github.com/google/uuid" "github.com/nats-io/stan.go" ) -//go:generate easyjson nats_publisher.go - // NatsPublisher represent event publisher. type NatsPublisher struct { conn stan.Conn @@ -20,8 +19,7 @@ func (n NatsPublisher) Close() error { return n.conn.Close() } -// Event event structure for publishing to the NATS server. -//easyjson:json +// Event structure for publishing to the NATS server. type Event struct { ID uuid.UUID `json:"id"` Schema string `json:"schema"` @@ -33,7 +31,7 @@ type Event struct { // Publish serializes the event and publishes it on the bus. func (n NatsPublisher) Publish(subject string, event Event) error { - msg, err := event.MarshalJSON() + msg, err := json.Marshal(event) if err != nil { return fmt.Errorf("marshal err: %w", err) } @@ -46,7 +44,7 @@ func NewNatsPublisher(conn stan.Conn) *NatsPublisher { return &NatsPublisher{conn: conn} } -// GetSubjectName creates subject name from the prefix, schema and table name. -func (e Event) GetSubjectName(prefix string) string { +// SubjectName creates subject name from the prefix, schema and table name. +func (e *Event) SubjectName(prefix string) string { return fmt.Sprintf("%s%s_%s", prefix, e.Schema, e.Table) } diff --git a/listener/nats_publisher_easyjson.go b/listener/nats_publisher_easyjson.go deleted file mode 100644 index 663a3f6c..00000000 --- a/listener/nats_publisher_easyjson.go +++ /dev/null @@ -1,171 +0,0 @@ -// Code generated by easyjson for marshaling/unmarshaling. DO NOT EDIT. - -package listener - -import ( - json "encoding/json" - - easyjson "github.com/mailru/easyjson" - jlexer "github.com/mailru/easyjson/jlexer" - jwriter "github.com/mailru/easyjson/jwriter" -) - -// suppress unused package warning -var ( - _ *json.RawMessage - _ *jlexer.Lexer - _ *jwriter.Writer - _ easyjson.Marshaler -) - -func easyjsonAd513449DecodeGithubComIhippikWalListenerListener(in *jlexer.Lexer, out *Event) { - isTopLevel := in.IsStart() - if in.IsNull() { - if isTopLevel { - in.Consumed() - } - in.Skip() - return - } - in.Delim('{') - for !in.IsDelim('}') { - key := in.UnsafeString() - in.WantColon() - if in.IsNull() { - in.Skip() - in.WantComma() - continue - } - switch key { - case "id": - if data := in.UnsafeBytes(); in.Ok() { - in.AddError((out.ID).UnmarshalText(data)) - } - case "schema": - out.Schema = string(in.String()) - case "table": - out.Table = string(in.String()) - case "action": - out.Action = string(in.String()) - case "data": - if in.IsNull() { - in.Skip() - } else { - in.Delim('{') - if !in.IsDelim('}') { - out.Data = make(map[string]interface{}) - } else { - out.Data = nil - } - for !in.IsDelim('}') { - key := string(in.String()) - in.WantColon() - var v1 interface{} - if m, ok := v1.(easyjson.Unmarshaler); ok { - m.UnmarshalEasyJSON(in) - } else if m, ok := v1.(json.Unmarshaler); ok { - _ = m.UnmarshalJSON(in.Raw()) - } else { - v1 = in.Interface() - } - (out.Data)[key] = v1 - in.WantComma() - } - in.Delim('}') - } - case "commitTime": - if data := in.Raw(); in.Ok() { - in.AddError((out.EventTime).UnmarshalJSON(data)) - } - default: - in.SkipRecursive() - } - in.WantComma() - } - in.Delim('}') - if isTopLevel { - in.Consumed() - } -} -func easyjsonAd513449EncodeGithubComIhippikWalListenerListener(out *jwriter.Writer, in Event) { - out.RawByte('{') - first := true - _ = first - { - const prefix string = ",\"id\":" - out.RawString(prefix[1:]) - out.RawText((in.ID).MarshalText()) - } - { - const prefix string = ",\"schema\":" - out.RawString(prefix) - out.String(string(in.Schema)) - } - { - const prefix string = ",\"table\":" - out.RawString(prefix) - out.String(string(in.Table)) - } - { - const prefix string = ",\"action\":" - out.RawString(prefix) - out.String(string(in.Action)) - } - { - const prefix string = ",\"data\":" - out.RawString(prefix) - if in.Data == nil && (out.Flags&jwriter.NilMapAsEmpty) == 0 { - out.RawString(`null`) - } else { - out.RawByte('{') - v2First := true - for v2Name, v2Value := range in.Data { - if v2First { - v2First = false - } else { - out.RawByte(',') - } - out.String(string(v2Name)) - out.RawByte(':') - if m, ok := v2Value.(easyjson.Marshaler); ok { - m.MarshalEasyJSON(out) - } else if m, ok := v2Value.(json.Marshaler); ok { - out.Raw(m.MarshalJSON()) - } else { - out.Raw(json.Marshal(v2Value)) - } - } - out.RawByte('}') - } - } - { - const prefix string = ",\"commitTime\":" - out.RawString(prefix) - out.Raw((in.EventTime).MarshalJSON()) - } - out.RawByte('}') -} - -// MarshalJSON supports json.Marshaler interface -func (v Event) MarshalJSON() ([]byte, error) { - w := jwriter.Writer{} - easyjsonAd513449EncodeGithubComIhippikWalListenerListener(&w, v) - return w.Buffer.BuildBytes(), w.Error -} - -// MarshalEasyJSON supports easyjson.Marshaler interface -func (v Event) MarshalEasyJSON(w *jwriter.Writer) { - easyjsonAd513449EncodeGithubComIhippikWalListenerListener(w, v) -} - -// UnmarshalJSON supports json.Unmarshaler interface -func (v *Event) UnmarshalJSON(data []byte) error { - r := jlexer.Lexer{Data: data} - easyjsonAd513449DecodeGithubComIhippikWalListenerListener(&r, v) - return r.Error() -} - -// UnmarshalEasyJSON supports easyjson.Unmarshaler interface -func (v *Event) UnmarshalEasyJSON(l *jlexer.Lexer) { - easyjsonAd513449DecodeGithubComIhippikWalListenerListener(l, v) -} diff --git a/listener/nats_publisher_test.go b/listener/nats_publisher_test.go index 33b0042b..656427db 100644 --- a/listener/nats_publisher_test.go +++ b/listener/nats_publisher_test.go @@ -40,8 +40,8 @@ func TestEvent_GetSubjectName(t *testing.T) { Action: tt.fields.Action, Data: tt.fields.Data, } - if got := e.GetSubjectName(tt.args.prefix); got != tt.want { - t.Errorf("GetSubjectName() = %v, want %v", got, tt.want) + if got := e.SubjectName(tt.args.prefix); got != tt.want { + t.Errorf("SubjectName() = %v, want %v", got, tt.want) } }) } diff --git a/listener/parser.go b/listener/parser.go index a9216ae5..8d7c4773 100644 --- a/listener/parser.go +++ b/listener/parser.go @@ -300,8 +300,8 @@ func (p *BinaryParser) readTupleData() []TupleData { logrus.Debugln( "tupleData: toast data type") case TextDataType: - vsize := int(p.readInt32()) - data[i] = TupleData{Value: p.buffer.Next(vsize)} + vSize := int(p.readInt32()) + data[i] = TupleData{Value: p.buffer.Next(vSize)} } } diff --git a/listener/parser_mock.go b/listener/parser_mock.go index f9a7f46d..a02cd6c9 100644 --- a/listener/parser_mock.go +++ b/listener/parser_mock.go @@ -13,6 +13,7 @@ type parserMock struct { func (p *parserMock) ParseWalMessage(msg []byte, tx *WalTransaction) error { args := p.Called(msg, tx) now := time.Now() + tx.BeginTime = &now tx.CommitTime = &now tx.Actions = []ActionData{ @@ -30,5 +31,6 @@ func (p *parserMock) ParseWalMessage(msg []byte, tx *WalTransaction) error { }, }, } + return args.Error(0) } diff --git a/listener/repository.go b/listener/repository.go index 6c7329dc..f230130a 100644 --- a/listener/repository.go +++ b/listener/repository.go @@ -1,6 +1,9 @@ package listener -import "github.com/jackc/pgx" +import ( + "errors" + "github.com/jackc/pgx" +) // RepositoryImpl service repository. type RepositoryImpl struct { @@ -16,10 +19,12 @@ func NewRepository(conn *pgx.Conn) *RepositoryImpl { func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error) { var restartLSNStr string - err := r.conn.QueryRow( - "SELECT restart_lsn FROM pg_replication_slots WHERE slot_name=$1;", - slotName, - ).Scan(&restartLSNStr) + err := r.conn.QueryRow("SELECT restart_lsn FROM pg_replication_slots WHERE slot_name=$1;", slotName). + Scan(&restartLSNStr) + + if errors.Is(err, pgx.ErrNoRows) { + return "", nil + } return restartLSNStr, err } diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go index 071b32fd..120f722c 100644 --- a/listener/wal_transaction.go +++ b/listener/wal_transaction.go @@ -98,21 +98,20 @@ func (w *WalTransaction) Clear() { } // CreateActionData create action from WAL message data. -func (w WalTransaction) CreateActionData( - relationID int32, - rows []TupleData, - kind ActionKind, -) (a ActionData, err error) { +func (w *WalTransaction) CreateActionData(relationID int32, rows []TupleData, kind ActionKind) (a ActionData, err error) { rel, ok := w.RelationStore[relationID] if !ok { return a, errors.New("relation not found") } + a = ActionData{ Schema: rel.Schema, Table: rel.Table, Kind: kind, } + var columns []Column + for num, row := range rows { column := Column{ name: rel.Columns[num].name, @@ -122,14 +121,15 @@ func (w WalTransaction) CreateActionData( column.AssertValue(row.Value) columns = append(columns, column) } + a.Columns = columns + return a, nil } // CreateEventsWithFilter filter WAL message by table, // action and create events for each value. -func (w *WalTransaction) CreateEventsWithFilter( - tableMap map[string][]string) []Event { +func (w *WalTransaction) CreateEventsWithFilter(tableMap map[string][]string) []Event { var events []Event for _, item := range w.Actions { @@ -153,15 +153,16 @@ func (w *WalTransaction) CreateEventsWithFilter( validAction := inArray(actions, item.Kind.string()) if validTable && validAction { events = append(events, event) - } else { - logrus.WithFields( - logrus.Fields{ - "schema": item.Schema, - "table": item.Table, - "action": item.Kind, - }). - Infoln("wal message skip by filter") + continue } + + logrus.WithFields( + logrus.Fields{ + "schema": item.Schema, + "table": item.Table, + "action": item.Kind, + }). + Infoln("wal message skip by filter") } return events