From a647180b2c48201c180c20e82b6d8fe52619f725 Mon Sep 17 00:00:00 2001 From: ihippik Date: Sat, 10 Sep 2022 23:07:40 +0400 Subject: [PATCH] Refactoring Add Sentry. Fix logs. Fix config. Update Readme. --- README.md | 48 +++++++++++++++++++++++-- cmd/wal-listener/init.go | 51 +++++++++++++++++++++++--- cmd/wal-listener/main.go | 10 +++--- config/config.go | 25 ++++++++----- go.mod | 3 ++ go.sum | 6 ++++ listener/listener.go | 63 ++++++++++++++++++--------------- listener/listener_test.go | 57 +++++++++++++++-------------- listener/nats_publisher.go | 18 ++++++++-- listener/nats_publisher_test.go | 17 ++++++--- listener/parser.go | 27 ++++++++------ listener/wal_transaction.go | 2 +- 12 files changed, 232 insertions(+), 95 deletions(-) diff --git a/README.md b/README.md index 4046bc38..173b43ac 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ publishing events in a single transaction with a domain model change. The service allows you to subscribe to changes in the PostgreSQL database using its logical decoding capability and publish them to the NATS Streaming server. -### Logic of work +## Logic of work To receive events about data changes in our PostgreSQL DB   we use the standard logic decoding module (**pgoutput**) This module converts changes read from the WAL into a logical replication protocol. @@ -55,7 +55,16 @@ databases: This filter means that we only process events occurring with the `users` table, and in particular `insert` and `update` data. -### DB setting +### Topic mapping +By default, output NATS topic name consist of prefix, DB schema, and DB table name, +but if you want to send all update in one topic you should be configured the topic map: +```yaml + topicsMap: + main_users: "notifier" + main_customers: "notifier" +``` + +## DB setting You must make the following settings in the db configuration (postgresql.conf) * wal_level >= “logical” * max_replication_slots >= 1 @@ -67,7 +76,40 @@ https://www.postgresql.org/docs/current/sql-createpublication.html If you change the publication, do not forget to change the slot name or delete the current one. -### Docker +## Service configuration +```yaml +listener: + slotName: myslot_1 + refreshConnection: 30s + heartbeatInterval: 10s + filter: + tables: + seasons: + - insert + - update + topicsMap: + schema_table_name: "notifier" +logger: + caller: false + level: info + format: json +database: + host: localhost + port: 5432 + name: my_db + user: postgres + password: postgres + debug: false +nats: + address: localhost:4222 + clusterID: test-cluster + clientID: wal-listener + topicPrefix: "" +monitoring: + sentryDSN: "dsn string" +``` + +## Docker You can start the container from the project folder (configuration file is required) diff --git a/cmd/wal-listener/init.go b/cmd/wal-listener/init.go index 7ea33f97..b8c4be70 100644 --- a/cmd/wal-listener/init.go +++ b/cmd/wal-listener/init.go @@ -2,7 +2,9 @@ package main import ( "fmt" + "runtime/debug" + "github.com/evalphobia/logrus_sentry" "github.com/jackc/pgx" "github.com/sirupsen/logrus" "github.com/spf13/viper" @@ -18,6 +20,21 @@ const ( infoLoggerLevel = "info" ) +func getVersion() string { + var version = "unknown" + + info, ok := debug.ReadBuildInfo() + if ok { + for _, item := range info.Settings { + if item.Key == "vcs.revision" { + version = item.Value[:4] + } + } + } + + return version +} + // getConf load config from file. func getConf(path string) (*config.Config, error) { var cfg config.Config @@ -36,11 +53,13 @@ func getConf(path string) (*config.Config, error) { } // initLogger init logrus preferences. -func initLogger(cfg config.LoggerCfg) { - logrus.SetReportCaller(cfg.Caller) +func initLogger(cfg config.LoggerCfg, version string) *logrus.Entry { + logger := logrus.New() + + logger.SetReportCaller(cfg.Caller) - if !cfg.HumanReadable { - logrus.SetFormatter(&logrus.JSONFormatter{}) + if cfg.Format == "json" { + logger.SetFormatter(&logrus.JSONFormatter{}) } var level logrus.Level @@ -58,7 +77,29 @@ func initLogger(cfg config.LoggerCfg) { level = logrus.DebugLevel } - logrus.SetLevel(level) + logger.SetLevel(level) + + return logger.WithField("version", version) +} + +func initSentry(dsn string, logger *logrus.Entry) { + if len(dsn) == 0 { + logger.Warnln("empty Sentry DSN") + return + } + + hook, err := logrus_sentry.NewSentryHook( + dsn, + []logrus.Level{ + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + }, + ) + + if err == nil { + logger.Logger.AddHook(hook) + } } // initPgxConnections initialise db and replication connections. diff --git a/cmd/wal-listener/main.go b/cmd/wal-listener/main.go index 171ad987..340c44a1 100644 --- a/cmd/wal-listener/main.go +++ b/cmd/wal-listener/main.go @@ -12,9 +12,6 @@ import ( "github.com/ihippik/wal-listener/listener" ) -// go build -ldflags "-X main.version=1.0.1" main.go -var version = "0.2.0" - func main() { cli.VersionFlag = &cli.BoolFlag{ Name: "version", @@ -22,6 +19,8 @@ func main() { Usage: "print only the version", } + version := getVersion() + app := &cli.App{ Name: "Wal-Listener", Usage: "listen postgres events", @@ -44,7 +43,9 @@ func main() { return fmt.Errorf("validate config: %w", err) } - initLogger(cfg.Logger) + logger := initLogger(cfg.Logger, version) + + initSentry(cfg.Monitoring.SentryDSN, logger) natsConn, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address)) if err != nil { @@ -58,6 +59,7 @@ func main() { service := listener.NewWalListener( cfg, + logger, listener.NewRepository(conn), rConn, listener.NewNatsPublisher(natsConn), diff --git a/config/config.go b/config/config.go index 9161990a..bd5688a6 100644 --- a/config/config.go +++ b/config/config.go @@ -8,10 +8,11 @@ import ( // Config for wal-listener/ type Config struct { - Listener ListenerCfg - Database DatabaseCfg - Nats NatsCfg - Logger LoggerCfg + Listener ListenerCfg + Database DatabaseCfg + Nats NatsCfg + Logger LoggerCfg + Monitoring MonitoringCfg } // ListenerCfg path of the listener config. @@ -20,6 +21,8 @@ type ListenerCfg struct { AckTimeout time.Duration RefreshConnection time.Duration `valid:"required"` HeartbeatInterval time.Duration `valid:"required"` + Filter FilterStruct + TopicsMap map[string]string } // NatsCfg path of the NATS config. @@ -27,14 +30,19 @@ type NatsCfg struct { Address string `valid:"required"` ClusterID string `valid:"required"` ClientID string `valid:"required"` - TopicPrefix string `valid:"required"` + TopicPrefix string +} + +// MonitoringCfg monitoring configuration. +type MonitoringCfg struct { + SentryDSN string } // LoggerCfg path of the logger config. type LoggerCfg struct { - Caller bool - Level string - HumanReadable bool + Caller bool + Level string + Format string } // DatabaseCfg path of the PostgreSQL DB config. @@ -44,7 +52,6 @@ type DatabaseCfg struct { Name string `valid:"required"` User string `valid:"required"` Password string `valid:"required"` - Filter FilterStruct } // FilterStruct incoming WAL message filter. diff --git a/go.mod b/go.mod index b361b18b..a556ced2 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.19 require ( bou.ke/monkey v1.0.2 github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d + github.com/evalphobia/logrus_sentry v0.8.2 github.com/goccy/go-json v0.9.10 github.com/google/uuid v1.3.0 github.com/jackc/pgx v3.6.2+incompatible @@ -17,10 +18,12 @@ require ( ) require ( + github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect github.com/cockroachdb/apd v1.1.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect + github.com/getsentry/raven-go v0.2.0 // indirect github.com/gofrs/uuid v3.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect diff --git a/go.sum b/go.sum index 966bc777..1dabaa57 100644 --- a/go.sum +++ b/go.sum @@ -48,6 +48,8 @@ github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:W github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s= +github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= @@ -70,10 +72,14 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ= +github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc= github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU= +github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs= +github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= diff --git a/listener/listener.go b/listener/listener.go index 9656b5f8..28796792 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -48,8 +48,9 @@ type repository interface { // Listener main service struct. type Listener struct { + cfg *config.Config + log *logrus.Entry mu sync.RWMutex - config config.Config slotName string publisher publisher replicator replication @@ -62,14 +63,16 @@ type Listener struct { // NewWalListener create and initialize new service instance. func NewWalListener( cfg *config.Config, + log *logrus.Entry, repo repository, repl replication, publ publisher, parser parser, ) *Listener { return &Listener{ + log: log, slotName: fmt.Sprintf("%s_%s", cfg.Listener.SlotName, cfg.Database.Name), - config: *cfg, + cfg: cfg, publisher: publ, repository: repo, replicator: repl, @@ -80,9 +83,7 @@ func NewWalListener( // Process is main service entry point. func (l *Listener) Process(ctx context.Context) error { - var svcErr serviceErr - - logger := logrus.WithField("slot_name", l.slotName) + logger := l.log.WithField("slot_name", l.slotName) ctx, stop := signal.NotifyContext(ctx, os.Interrupt) defer stop() @@ -110,16 +111,18 @@ func (l *Listener) Process(ctx context.Context) error { } l.setLSN(lsn) - logger.Infoln("create new slot") + logger.Infoln("new slot was created") } else { logger.Infoln("slot already exists, LSN updated") } go l.Stream(ctx) - refresh := time.NewTicker(l.config.Listener.RefreshConnection) + refresh := time.NewTicker(l.cfg.Listener.RefreshConnection) defer refresh.Stop() + var svcErr *serviceErr + ProcessLoop: for { select { @@ -132,16 +135,16 @@ ProcessLoop: return fmt.Errorf("repository: %w", errConnectionIsLost) } case err := <-l.errChannel: - if errors.As(err, svcErr) { + if errors.As(err, &svcErr) { return err } - logrus.WithError(err).Errorln("received error") + l.log.WithError(err).Errorln("received error") case <-ctx.Done(): - logrus.Debugln("context was canceled") + logger.Debugln("context was canceled") if err := l.Stop(); err != nil { - logrus.WithError(err).Errorln("listener stop error") + logger.WithError(err).Errorln("listener stop error") } break ProcessLoop @@ -159,7 +162,7 @@ func (l *Listener) slotIsExists() (bool, error) { } if len(restartLSNStr) == 0 { - logrus.WithField("slot_name", l.slotName).Warningln("restart LSN not found") + l.log.WithField("slot_name", l.slotName).Warningln("restart LSN not found") return false, nil } @@ -200,6 +203,7 @@ func (l *Listener) Stream(ctx context.Context) { go l.SendPeriodicHeartbeats(ctx) tx := NewWalTransaction() + for { if err := ctx.Err(); err != nil { l.errChannel <- newListenerError("read msg", err) @@ -214,30 +218,31 @@ func (l *Listener) Stream(ctx context.Context) { if msg != nil { if msg.WalMessage != nil { - logrus.WithField("wal", msg.WalMessage.WalStart).Debugln("receive wal message") + l.log.WithField("wal", msg.WalMessage.WalStart).Debugln("receive wal message") if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil { - logrus.WithError(err).Errorln("msg parse failed") + l.log.WithError(err).Errorln("msg parse failed") l.errChannel <- fmt.Errorf("unmarshal wal message: %w", err) continue } if tx.CommitTime != nil { - natsEvents := tx.CreateEventsWithFilter(l.config.Database.Filter.Tables) + natsEvents := tx.CreateEventsWithFilter(l.cfg.Listener.Filter.Tables) for _, event := range natsEvents { - subjectName := event.SubjectName(l.config.Nats.TopicPrefix) + subjectName := event.SubjectName(l.cfg) if err = l.publisher.Publish(subjectName, event); err != nil { l.errChannel <- fmt.Errorf("publish message: %w", err) continue } - logrus. - WithField("subject", subjectName). - WithField("action", event.Action). - WithField("lsn", l.readLSN()). - Infoln("event was send") + l.log.WithFields(logrus.Fields{ + "subject": subjectName, + "action": event.Action, + "table": event.Table, + "lsn": l.readLSN(), + }).Infoln("event was sent") } tx.Clear() @@ -249,19 +254,19 @@ func (l *Listener) Stream(ctx context.Context) { continue } - logrus.WithField("lsn", l.readLSN()).Debugln("ack wal msg") + l.log.WithField("lsn", l.readLSN()).Debugln("ack wal msg") } } if msg.ServerHeartbeat != nil { //FIXME panic if there have been no messages for a long time. - logrus.WithFields(logrus.Fields{ + l.log.WithFields(logrus.Fields{ "server_wal_end": msg.ServerHeartbeat.ServerWalEnd, "server_time": msg.ServerHeartbeat.ServerTime, }).Debugln("received server heartbeat") if msg.ServerHeartbeat.ReplyRequested == 1 { - logrus.Debugln("status requested") + l.log.Debugln("status requested") if err = l.SendStandbyStatus(); err != nil { l.errChannel <- fmt.Errorf("send standby status: %w", err) @@ -286,31 +291,31 @@ func (l *Listener) Stop() error { return fmt.Errorf("replicator close: %w", err) } - logrus.Infoln("service was stopped") + l.log.Infoln("service was stopped") return nil } // SendPeriodicHeartbeats send periodic keep alive heartbeats to the server. func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { - heart := time.NewTicker(l.config.Listener.HeartbeatInterval) + heart := time.NewTicker(l.cfg.Listener.HeartbeatInterval) defer heart.Stop() for { select { case <-ctx.Done(): - logrus.WithField("func", "SendPeriodicHeartbeats"). + l.log.WithField("func", "SendPeriodicHeartbeats"). Infoln("context was canceled, stop sending heartbeats") return case <-heart.C: { if err := l.SendStandbyStatus(); err != nil { - logrus.WithError(err).Errorln("failed to send status heartbeat") + l.log.WithError(err).Errorln("failed to send status heartbeat") continue } - logrus.Debugln("sending periodic status heartbeat") + l.log.Debugln("sending periodic status heartbeat") } } } diff --git a/listener/listener_test.go b/listener/listener_test.go index ab4db797..c7ba99d6 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -4,12 +4,13 @@ import ( "bytes" "context" "errors" + "github.com/sirupsen/logrus" + "io" "testing" "time" - "github.com/google/uuid" - "bou.ke/monkey" + "github.com/google/uuid" "github.com/jackc/pgx" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -82,11 +83,16 @@ func TestListener_slotIsExists(t *testing.T) { wantErr: true, }, } + + logger := logrus.New() + logger.Out = io.Discard + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.setup() w := &Listener{ + log: logrus.NewEntry(logger), slotName: tt.fields.slotName, repository: repo, } @@ -127,6 +133,9 @@ func TestListener_Stop(t *testing.T) { Once() } + logger := logrus.New() + logger.Out = io.Discard + tests := []struct { name string setup func() @@ -170,6 +179,7 @@ func TestListener_Stop(t *testing.T) { t.Run(tt.name, func(t *testing.T) { tt.setup() w := &Listener{ + log: logrus.NewEntry(logger), publisher: publ, replicator: repl, repository: repo, @@ -360,7 +370,7 @@ func TestListener_Stream(t *testing.T) { prs := new(parserMock) type fields struct { - config config.Config + config *config.Config slotName string restartLSN uint64 } @@ -408,7 +418,9 @@ func TestListener_Stream(t *testing.T) { Once(). After(10 * time.Millisecond) } + uuid.SetRand(bytes.NewReader(make([]byte, 512))) + tests := []struct { name string setup func() @@ -489,13 +501,11 @@ func TestListener_Stream(t *testing.T) { ) }, fields: fields{ - config: config.Config{ + config: &config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, HeartbeatInterval: 1, - }, - Database: config.DatabaseCfg{ Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, @@ -524,14 +534,11 @@ func TestListener_Stream(t *testing.T) { ) }, fields: fields{ - config: config.Config{ + config: &config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 1, - }, - Database: config.DatabaseCfg{ - Filter: config.FilterStruct{ + HeartbeatInterval: 1, Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, }, @@ -585,14 +592,11 @@ func TestListener_Stream(t *testing.T) { ) }, fields: fields{ - config: config.Config{ + config: &config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 1, - }, - Database: config.DatabaseCfg{ - Filter: config.FilterStruct{ + HeartbeatInterval: 1, Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, }, @@ -657,14 +661,11 @@ func TestListener_Stream(t *testing.T) { ) }, fields: fields{ - config: config.Config{ + config: &config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 1, - }, - Database: config.DatabaseCfg{ - Filter: config.FilterStruct{ + HeartbeatInterval: 1, Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, }, @@ -762,14 +763,11 @@ func TestListener_Stream(t *testing.T) { ) }, fields: fields{ - config: config.Config{ + config: &config.Config{ Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 1, - }, - Database: config.DatabaseCfg{ - Filter: config.FilterStruct{ + HeartbeatInterval: 1, Filter: config.FilterStruct{ Tables: map[string][]string{"users": {"insert"}}, }, }, @@ -785,13 +783,18 @@ func TestListener_Stream(t *testing.T) { }, }, } + + logger := logrus.New() + logger.Out = io.Discard + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.setup() ctx, cancel := context.WithTimeout(context.Background(), tt.args.timeout) w := &Listener{ - config: tt.fields.config, + log: logrus.NewEntry(logger), + cfg: tt.fields.config, slotName: tt.fields.slotName, publisher: publ, replicator: repl, diff --git a/listener/nats_publisher.go b/listener/nats_publisher.go index 6571abbd..b84ed56b 100644 --- a/listener/nats_publisher.go +++ b/listener/nats_publisher.go @@ -7,6 +7,8 @@ import ( "github.com/goccy/go-json" "github.com/google/uuid" "github.com/nats-io/stan.go" + + "github.com/ihippik/wal-listener/config" ) // NatsPublisher represent event publisher. @@ -44,7 +46,17 @@ func NewNatsPublisher(conn stan.Conn) *NatsPublisher { return &NatsPublisher{conn: conn} } -// SubjectName creates subject name from the prefix, schema and table name. -func (e *Event) SubjectName(prefix string) string { - return fmt.Sprintf("%s%s_%s", prefix, e.Schema, e.Table) +// SubjectName creates subject name from the prefix, schema and table name. Also using topic map from cfg. +func (e *Event) SubjectName(cfg *config.Config) string { + topic := fmt.Sprintf("%s_%s", e.Schema, e.Table) + + if cfg.Listener.TopicsMap != nil { + if t, ok := cfg.Listener.TopicsMap[topic]; ok { + topic = t + } + } + + topic = cfg.Nats.TopicPrefix + topic + + return topic } diff --git a/listener/nats_publisher_test.go b/listener/nats_publisher_test.go index 656427db..d24a51a7 100644 --- a/listener/nats_publisher_test.go +++ b/listener/nats_publisher_test.go @@ -1,6 +1,10 @@ package listener -import "testing" +import ( + "testing" + + "github.com/ihippik/wal-listener/config" +) func TestEvent_GetSubjectName(t *testing.T) { type fields struct { @@ -10,7 +14,7 @@ func TestEvent_GetSubjectName(t *testing.T) { Data map[string]interface{} } type args struct { - prefix string + cfg *config.Config } tests := []struct { name string @@ -27,7 +31,12 @@ func TestEvent_GetSubjectName(t *testing.T) { Data: nil, }, args: args{ - prefix: "prefix_", + cfg: &config.Config{ + Listener: config.ListenerCfg{ + TopicsMap: nil, + }, + Nats: config.NatsCfg{TopicPrefix: "prefix_"}, + }, }, want: "prefix_public_users", }, @@ -40,7 +49,7 @@ func TestEvent_GetSubjectName(t *testing.T) { Action: tt.fields.Action, Data: tt.fields.Data, } - if got := e.SubjectName(tt.args.prefix); got != tt.want { + if got := e.SubjectName(tt.args.cfg); got != tt.want { t.Errorf("SubjectName() = %v, want %v", got, tt.want) } }) diff --git a/listener/parser.go b/listener/parser.go index 8d7c4773..b6717a41 100644 --- a/listener/parser.go +++ b/listener/parser.go @@ -35,24 +35,27 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { switch p.msgType { case BeginMsgType: begin := p.getBeginMsg() + logrus. WithFields( logrus.Fields{ "lsn": begin.LSN, "xid": begin.XID, }). - Infoln("receive begin message") + Debugln("begin type message was received") + tx.LSN = begin.LSN tx.BeginTime = &begin.Timestamp case CommitMsgType: commit := p.getCommitMsg() + logrus. WithFields( logrus.Fields{ "lsn": commit.LSN, "transaction_lsn": commit.TransactionLSN, }). - Infoln("receive commit message") + Debugln("commit message was received") if tx.LSN > 0 && tx.LSN != commit.LSN { return fmt.Errorf("commit: %w", errMessageLost) @@ -60,16 +63,17 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { tx.CommitTime = &commit.Timestamp case OriginMsgType: - logrus.Infoln("receive origin message") + logrus.Debugln("origin type message was received") case RelationMsgType: relation := p.getRelationMsg() + logrus. WithFields( logrus.Fields{ "relation_id": relation.ID, "replica": relation.Replica, }). - Infoln("receive relation message") + Debugln("relation type message was received") if tx.LSN == 0 { return fmt.Errorf("commit: %w", errMessageLost) @@ -92,21 +96,23 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { tx.RelationStore[relation.ID] = rd case TypeMsgType: - logrus.Infoln("type") + logrus.Debugln("type message was received") case InsertMsgType: insert := p.getInsertMsg() + logrus. WithFields( logrus.Fields{ "relation_id": insert.RelationID, }). - Infoln("receive insert message") + Debugln("insert type message was received") action, err := tx.CreateActionData( insert.RelationID, insert.Row, ActionKindInsert, ) + if err != nil { return fmt.Errorf("create action data: %w", err) } @@ -114,12 +120,13 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { tx.Actions = append(tx.Actions, action) case UpdateMsgType: upd := p.getUpdateMsg() + logrus. WithFields( logrus.Fields{ "relation_id": upd.RelationID, }). - Infoln("receive update message") + Debugln("update type message was received") action, err := tx.CreateActionData( upd.RelationID, @@ -133,12 +140,13 @@ func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { tx.Actions = append(tx.Actions, action) case DeleteMsgType: del := p.getDeleteMsg() + logrus. WithFields( logrus.Fields{ "relation_id": del.RelationID, }). - Infoln("receive delete message") + Debugln("delete type message was received") action, err := tx.CreateActionData( del.RelationID, @@ -297,8 +305,7 @@ func (p *BinaryParser) readTupleData() []TupleData { case NullDataType: logrus.Debugln("tupleData: null data type") case ToastDataType: - logrus.Debugln( - "tupleData: toast data type") + logrus.Debugln("tupleData: toast data type") case TextDataType: vSize := int(p.readInt32()) data[i] = TupleData{Value: p.buffer.Next(vSize)} diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go index 120f722c..f4ce0f8a 100644 --- a/listener/wal_transaction.go +++ b/listener/wal_transaction.go @@ -162,7 +162,7 @@ func (w *WalTransaction) CreateEventsWithFilter(tableMap map[string][]string) [] "table": item.Table, "action": item.Kind, }). - Infoln("wal message skip by filter") + Infoln("wal-message was skipped by filter") } return events