From af9ed41e0bf419c53bea0baeeca1985d3e547a17 Mon Sep 17 00:00:00 2001 From: Konstantin Makarov Date: Fri, 13 Mar 2020 00:22:17 +0300 Subject: [PATCH] Static check Fixed comments linter. --- cmd/wal-listener/init.go | 1 + config/config.go | 8 +++ listener/listener.go | 21 ++++--- listener/listener_test.go | 86 ++++++++++++++++++++++------- listener/nats_publisher.go | 4 ++ listener/nats_publisher_easyjson.go | 1 + listener/parser.go | 8 ++- listener/parser_mock.go | 3 +- listener/protocol.go | 53 ++++++++++++++---- listener/repository.go | 17 ++++-- listener/wal_transaction.go | 19 +++++-- listener/wal_transaction_test.go | 5 +- 12 files changed, 174 insertions(+), 52 deletions(-) diff --git a/cmd/wal-listener/init.go b/cmd/wal-listener/init.go index f7a7a2f1..fd266f22 100644 --- a/cmd/wal-listener/init.go +++ b/cmd/wal-listener/init.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "github.com/ihippik/wal-listener/listener" "github.com/jackc/pgx" "github.com/pkg/errors" diff --git a/config/config.go b/config/config.go index 626bfe3a..7df3edc1 100644 --- a/config/config.go +++ b/config/config.go @@ -6,6 +6,7 @@ import ( "github.com/asaskevich/govalidator" ) +// Config for wal-listener/ type Config struct { Listener ListenerCfg Database DatabaseCfg @@ -13,6 +14,7 @@ type Config struct { Logger LoggerCfg } +// ListenerCfg path of the listener config. type ListenerCfg struct { SlotName string `valid:"required"` AckTimeout time.Duration `valid:"required"` @@ -20,6 +22,7 @@ type ListenerCfg struct { HeartbeatInterval time.Duration `valid:"required"` } +// NatsCfg path of the NATS config. type NatsCfg struct { Address string `valid:"required"` ClusterID string `valid:"required"` @@ -27,11 +30,14 @@ type NatsCfg struct { TopicPrefix string `valid:"required"` } +// LoggerCfg path of the logger config. type LoggerCfg struct { Caller bool Level string HumanReadable bool } + +// DatabaseCfg path of the PostgreSQL DB config. type DatabaseCfg struct { Host string `valid:"required"` Port uint16 `valid:"required"` @@ -41,10 +47,12 @@ type DatabaseCfg struct { Filter FilterStruct } +// FilterStruct incoming WAL message filter. type FilterStruct struct { Tables map[string][]string } +// Validate config data. func (c Config) Validate() error { _, err := govalidator.ValidateStruct(c) return err diff --git a/listener/listener.go b/listener/listener.go index 789b0f44..b626d35b 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -65,6 +65,7 @@ type Listener struct { errChannel chan error } +// NewWalListener create and initialize new service instance. func NewWalListener( cfg *config.Config, repo repository, @@ -88,23 +89,27 @@ func (l *Listener) Process() error { var serviceErr *serviceErr logger := logrus.WithField("slot_name", l.slotName) ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() logrus.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage) slotIsExists, err := l.slotIsExists() if err != nil { + logger.WithError(err).Errorln("slotIsExists() error") return err } if !slotIsExists { consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgoutputPlugin) if err != nil { + logger.WithError(err).Infoln("CreateReplicationSlotEx() error") return err } l.LSN, err = pgx.ParseLSN(consistentPoint) - logger.Infoln("create new slot") if err != nil { + logger.WithError(err).Errorln("slotIsExists() error") return err } + logger.Infoln("create new slot") } else { logger.Infoln("slot already exists, LSN updated") } @@ -113,11 +118,11 @@ func (l *Listener) Process() error { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) - refresh := time.Tick(l.config.Listener.RefreshConnection) + refresh := time.NewTicker(l.config.Listener.RefreshConnection) ProcessLoop: for { select { - case <-refresh: + case <-refresh.C: if !l.replicator.IsAlive() { logrus.Fatalln(errReplConnectionIsLost) } @@ -134,10 +139,9 @@ ProcessLoop: } case <-signalChan: - cancelFunc() err := l.Stop() if err != nil { - logrus.Errorln(err) + logrus.WithError(err).Errorln("l.Stop() error") } break ProcessLoop } @@ -150,6 +154,9 @@ func (l *Listener) slotIsExists() (bool, error) { restartLSNStr, err := l.repository.GetSlotLSN(l.slotName) if err != nil { if errors.Is(err, pgx.ErrNoRows) { + logrus. + WithField("slot", l.slotName). + Warningln("restart_lsn for slot not found") return false, nil } return false, err @@ -268,7 +275,7 @@ func (l *Listener) Stop() error { return nil } -// SendPeriodicHeartbeats send periodic keep alive hearbeats to the server. +// SendPeriodicHeartbeats send periodic keep alive heartbeats to the server. func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { for { select { @@ -276,7 +283,7 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { logrus.WithField("func", "SendPeriodicHeartbeats"). Infoln("context was canceled, stop sending heartbeats") return - case <-time.Tick(l.config.Listener.HeartbeatInterval): + case <-time.NewTicker(l.config.Listener.HeartbeatInterval).C: { err := l.SendStandbyStatus() if err != nil { diff --git a/listener/listener_test.go b/listener/listener_test.go index fe3533d9..6a9d4ca1 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -4,11 +4,11 @@ import ( "bytes" "context" "errors" - "github.com/google/uuid" - "github.com/sirupsen/logrus" "testing" "time" + "github.com/google/uuid" + "bou.ke/monkey" "github.com/jackc/pgx" "github.com/stretchr/testify/assert" @@ -17,7 +17,7 @@ import ( "github.com/ihippik/wal-listener/config" ) -var someErr = errors.New("some err") +var errSimple = errors.New("some err") func TestListener_slotIsExists(t *testing.T) { repo := new(repositoryMock) @@ -84,7 +84,7 @@ func TestListener_slotIsExists(t *testing.T) { { name: "repository error", setup: func() { - setGetSlotLSN("myslot", "", someErr) + setGetSlotLSN("myslot", "", errSimple) }, fields: fields{ slotName: "myslot", @@ -194,7 +194,6 @@ func TestListener_Stop(t *testing.T) { func TestListener_SendStandbyStatus(t *testing.T) { repl := new(replicatorMock) type fields struct { - status *pgx.StandbyStatus restartLSN uint64 } @@ -236,7 +235,7 @@ func TestListener_SendStandbyStatus(t *testing.T) { wantErr: false, }, { - name: "success", + name: "some err", setup: func() { setSendStandbyStatus( &pgx.StandbyStatus{ @@ -246,7 +245,7 @@ func TestListener_SendStandbyStatus(t *testing.T) { ClientTime: 18445935546232551617, ReplyRequested: 0, }, - someErr, + errSimple, ) }, fields: fields{ @@ -331,7 +330,7 @@ func TestListener_AckWalMessage(t *testing.T) { ClientTime: 18445935546232551617, ReplyRequested: 0, }, - someErr, + errSimple, ) }, fields: fields{ @@ -360,7 +359,7 @@ func TestListener_AckWalMessage(t *testing.T) { } func TestListener_Stream(t *testing.T) { - logrus.SetLevel(logrus.FatalLevel) + //logrus.SetLevel(logrus.FatalLevel) repo := new(repositoryMock) publ := new(publisherMock) repl := new(replicatorMock) @@ -431,6 +430,16 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'sport'", ) + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 0, + WalFlushPosition: 0, + WalApplyPosition: 0, + ClientTime: 18445935546232551617, + ReplyRequested: 0, + }, + nil, + ) setSendStandbyStatus( &pgx.StandbyStatus{ WalWritePosition: 10, @@ -488,7 +497,7 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 0, + HeartbeatInterval: 1, }, Database: config.DatabaseCfg{ Filter: config.FilterStruct{ @@ -510,7 +519,7 @@ func TestListener_Stream(t *testing.T) { name: "start replication err", setup: func() { setStartReplication( - someErr, + errSimple, "myslot", uint64(0), int64(-1), @@ -523,7 +532,7 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 0, + HeartbeatInterval: 1, }, Database: config.DatabaseCfg{ Filter: config.FilterStruct{ @@ -552,6 +561,16 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'sport'", ) + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 0, + WalFlushPosition: 0, + WalApplyPosition: 0, + ClientTime: 18445935546232551617, + ReplyRequested: 0, + }, + nil, + ) setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ @@ -566,7 +585,7 @@ func TestListener_Stream(t *testing.T) { ReplyRequested: 1, }, }, - someErr, + errSimple, ) }, fields: fields{ @@ -574,7 +593,7 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 0, + HeartbeatInterval: 1, }, Database: config.DatabaseCfg{ Filter: config.FilterStruct{ @@ -603,6 +622,16 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'sport'", ) + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 0, + WalFlushPosition: 0, + WalApplyPosition: 0, + ClientTime: 18445935546232551617, + ReplyRequested: 0, + }, + nil, + ) setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ @@ -628,7 +657,7 @@ func TestListener_Stream(t *testing.T) { RelationStore: make(map[int32]RelationData), Actions: nil, }, - someErr, + errSimple, ) }, fields: fields{ @@ -636,7 +665,7 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 0, + HeartbeatInterval: 1, }, Database: config.DatabaseCfg{ Filter: config.FilterStruct{ @@ -665,6 +694,26 @@ func TestListener_Stream(t *testing.T) { protoVersion, "publication_names 'sport'", ) + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 10, + WalFlushPosition: 10, + WalApplyPosition: 10, + ClientTime: 18445935546232551617, + ReplyRequested: 0, + }, + nil, + ) + setSendStandbyStatus( + &pgx.StandbyStatus{ + WalWritePosition: 0, + WalFlushPosition: 0, + WalApplyPosition: 0, + ClientTime: 18445935546232551617, + ReplyRequested: 0, + }, + nil, + ) setWaitForReplicationMessage( &pgx.ReplicationMessage{ WalMessage: &pgx.WalMessage{ @@ -703,7 +752,7 @@ func TestListener_Stream(t *testing.T) { Data: map[string]interface{}{"id": 1}, EventTime: wayback, }, - someErr, + errSimple, ) setSendStandbyStatus( &pgx.StandbyStatus{ @@ -721,7 +770,7 @@ func TestListener_Stream(t *testing.T) { Listener: config.ListenerCfg{ SlotName: "myslot", AckTimeout: 0, - HeartbeatInterval: 0, + HeartbeatInterval: 1, }, Database: config.DatabaseCfg{ Filter: config.FilterStruct{ @@ -757,7 +806,6 @@ func TestListener_Stream(t *testing.T) { go func() { <-w.errChannel cancel() - }() w.Stream(ctx) repl.AssertExpectations(t) diff --git a/listener/nats_publisher.go b/listener/nats_publisher.go index 1636b044..a8152436 100644 --- a/listener/nats_publisher.go +++ b/listener/nats_publisher.go @@ -10,10 +10,12 @@ import ( //go:generate easyjson nats_publisher.go +// NatsPublisher represent event publisher. type NatsPublisher struct { conn stan.Conn } +// Close NATS connection. func (n NatsPublisher) Close() error { return n.conn.Close() } @@ -29,6 +31,7 @@ type Event struct { EventTime time.Time `json:"commitTime"` } +// Publish serializes the event and publishes it on the bus. func (n NatsPublisher) Publish(subject string, event Event) error { msg, err := event.MarshalJSON() if err != nil { @@ -37,6 +40,7 @@ func (n NatsPublisher) Publish(subject string, event Event) error { return n.conn.Publish(subject, msg) } +// NewNatsPublisher return new NatsPublisher instance. func NewNatsPublisher(conn stan.Conn) *NatsPublisher { return &NatsPublisher{conn: conn} } diff --git a/listener/nats_publisher_easyjson.go b/listener/nats_publisher_easyjson.go index b02d4e21..663a3f6c 100644 --- a/listener/nats_publisher_easyjson.go +++ b/listener/nats_publisher_easyjson.go @@ -4,6 +4,7 @@ package listener import ( json "encoding/json" + easyjson "github.com/mailru/easyjson" jlexer "github.com/mailru/easyjson/jlexer" jwriter "github.com/mailru/easyjson/jwriter" diff --git a/listener/parser.go b/listener/parser.go index d8487f53..4ba8b0c5 100644 --- a/listener/parser.go +++ b/listener/parser.go @@ -9,18 +9,21 @@ import ( "github.com/sirupsen/logrus" ) +// BinaryParser represent binary protocol parser. type BinaryParser struct { byteOrder binary.ByteOrder msgType byte buffer *bytes.Buffer } +// NewBinaryParser create instance of binary parser. func NewBinaryParser(byteOrder binary.ByteOrder) *BinaryParser { return &BinaryParser{ byteOrder: byteOrder, } } +// ParseWalMessage parse postgres WAL message. func (p *BinaryParser) ParseWalMessage(msg []byte, tx *WalTransaction) error { if len(msg) == 0 { return errEmptyWALMessage @@ -238,10 +241,9 @@ func (p *BinaryParser) readBool() bool { func (p *BinaryParser) charIsExists(char byte) bool { if p.buffer.Next(1)[0] == char { return true - } else { - _ = p.buffer.UnreadByte() - return false } + _ = p.buffer.UnreadByte() + return false } func (p *BinaryParser) readColumns() []RelationColumn { diff --git a/listener/parser_mock.go b/listener/parser_mock.go index e4985aa6..f9a7f46d 100644 --- a/listener/parser_mock.go +++ b/listener/parser_mock.go @@ -1,8 +1,9 @@ package listener import ( - "github.com/stretchr/testify/mock" "time" + + "github.com/stretchr/testify/mock" ) type parserMock struct { diff --git a/listener/protocol.go b/listener/protocol.go index f3076c47..c1c0b5a2 100644 --- a/listener/protocol.go +++ b/listener/protocol.go @@ -5,19 +5,41 @@ import ( ) const ( - CommitMsgType byte = 'C' - BeginMsgType byte = 'B' - OriginMsgType byte = 'O' + // CommitMsgType protocol commit message type. + CommitMsgType byte = 'C' + + // BeginMsgType protocol begin message type. + BeginMsgType byte = 'B' + + // OriginMsgType protocol original message type. + OriginMsgType byte = 'O' + + // RelationMsgType protocol relation message type. RelationMsgType byte = 'R' - TypeMsgType byte = 'Y' - InsertMsgType byte = 'I' - UpdateMsgType byte = 'U' - DeleteMsgType byte = 'D' + // TypeMsgType protocol message type. + TypeMsgType byte = 'Y' + + // InsertMsgType protocol insert message type. + InsertMsgType byte = 'I' + + // UpdateMsgType protocol update message type. + UpdateMsgType byte = 'U' + + // DeleteMsgType protocol delete message type. + DeleteMsgType byte = 'D' + + // NewTupleDataType protocol new tuple data type. NewTupleDataType byte = 'N' - TextDataType byte = 't' - NullDataType byte = 'n' - ToastDataType byte = 'u' + + // TextDataType protocol test data type. + TextDataType byte = 't' + + // NullDataType protocol NULL data type. + NullDataType byte = 'n' + + // ToastDataType protocol toast data type. + ToastDataType byte = 'u' ) var postgresEpoch = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) @@ -25,6 +47,7 @@ var postgresEpoch = time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) // Logical Replication Message Formats. // https://postgrespro.ru/docs/postgrespro/10/protocol-logicalrep-message-formats# type ( + // Begin message format. Begin struct { // Identifies the message as a begin message. LSN int64 @@ -34,6 +57,7 @@ type ( XID int32 } + // Commit message format. Commit struct { // Flags; currently unused (must be 0). Flags int8 @@ -45,6 +69,7 @@ type ( Timestamp time.Time } + // Origin message format. Origin struct { // The LSN of the commit on the origin server. LSN int64 @@ -52,6 +77,7 @@ type ( Name string } + // Relation message format. Relation struct { // ID of the relation. ID int32 @@ -64,6 +90,7 @@ type ( Columns []RelationColumn } + // Insert message format. Insert struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 @@ -73,6 +100,7 @@ type ( Row []TupleData } + // Update message format. Update struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 @@ -89,6 +117,7 @@ type ( OldRow []TupleData } + // Delete message format. Delete struct { /// ID of the relation corresponding to the ID in the relation message. RelationID int32 @@ -100,6 +129,8 @@ type ( Row []TupleData } ) + +// DataType path of WAL message data. type DataType struct { // ID of the data type. ID int32 @@ -109,6 +140,7 @@ type DataType struct { Name string } +// RelationColumn path of WAL message data. type RelationColumn struct { // Flags for the column which marks the column as part of the key. Key bool @@ -120,6 +152,7 @@ type RelationColumn struct { ModifierType int32 } +// TupleData path of WAL message data. type TupleData struct { Value []byte } diff --git a/listener/repository.go b/listener/repository.go index 8bde9123..28e4958f 100644 --- a/listener/repository.go +++ b/listener/repository.go @@ -2,15 +2,18 @@ package listener import "github.com/jackc/pgx" -type repositoryImpl struct { +// RepositoryImpl service repository. +type RepositoryImpl struct { conn *pgx.Conn } -func NewRepository(conn *pgx.Conn) *repositoryImpl { - return &repositoryImpl{conn: conn} +// NewRepository returns a new instance of the repository. +func NewRepository(conn *pgx.Conn) *RepositoryImpl { + return &RepositoryImpl{conn: conn} } -func (r repositoryImpl) GetSlotLSN(slotName string) (string, error) { +// GetSlotLSN returns the value of the last offset for a specific slot. +func (r RepositoryImpl) GetSlotLSN(slotName string) (string, error) { var restartLSNStr string err := r.conn.QueryRow( "SELECT restart_lsn FROM pg_replication_slots WHERE slot_name=$1;", @@ -19,10 +22,12 @@ func (r repositoryImpl) GetSlotLSN(slotName string) (string, error) { return restartLSNStr, err } -func (r repositoryImpl) IsAlive() bool { +// IsAlive check database connection problems. +func (r RepositoryImpl) IsAlive() bool { return r.conn.IsAlive() } -func (r repositoryImpl) Close() error { +// Close database connection. +func (r RepositoryImpl) Close() error { return r.conn.Close() } diff --git a/listener/wal_transaction.go b/listener/wal_transaction.go index bf1c6a14..31cdb0f3 100644 --- a/listener/wal_transaction.go +++ b/listener/wal_transaction.go @@ -11,15 +11,17 @@ import ( "github.com/sirupsen/logrus" ) +// ActionKind kind of action on WAL message. type ActionKind string -// kind of wall message. +// kind of WAL message. const ( ActionKindInsert ActionKind = "INSERT" ActionKindUpdate ActionKind = "UPDATE" ActionKindDelete ActionKind = "DELETE" ) +// WalTransaction transaction specified WAL message. type WalTransaction struct { LSN int64 BeginTime *time.Time @@ -28,6 +30,7 @@ type WalTransaction struct { Actions []ActionData } +// NewWalTransaction create and initialize new WAL transaction. func NewWalTransaction() *WalTransaction { return &WalTransaction{ RelationStore: make(map[int32]RelationData), @@ -38,12 +41,14 @@ func (k ActionKind) string() string { return string(k) } +// RelationData kind of WAL message data. type RelationData struct { Schema string Table string Columns []Column } +// ActionData kind of WAL message data. type ActionData struct { Schema string Table string @@ -51,6 +56,7 @@ type ActionData struct { Columns []Column } +// Column of the table with which changes occur. type Column struct { name string value interface{} @@ -58,6 +64,8 @@ type Column struct { isKey bool } +// AssertValue converts bytes to a specific type depending +// on the type of this data in the database table. func (c *Column) AssertValue(src []byte) { var val interface{} strSrc := string(src) @@ -78,12 +86,14 @@ func (c *Column) AssertValue(src []byte) { c.value = val } +// Clear transaction data. func (w *WalTransaction) Clear() { w.CommitTime = nil w.BeginTime = nil w.Actions = nil } +// CreateActionData create action from WAL message data. func (w WalTransaction) CreateActionData( relationID int32, rows []TupleData, @@ -113,7 +123,8 @@ func (w WalTransaction) CreateActionData( return a, nil } -// CreateEventsWithFilter filter wal message by table, action and create events for each value. +// CreateEventsWithFilter filter WAL message by table, +// action and create events for each value. func (w *WalTransaction) CreateEventsWithFilter( tableMap map[string][]string) []Event { var events []Event @@ -149,10 +160,10 @@ func (w *WalTransaction) CreateEventsWithFilter( return events } -// inArray checks whether the value is in an array +// inArray checks whether the value is in an array. func inArray(arr []string, value string) bool { for _, v := range arr { - if strings.ToLower(v) == strings.ToLower(value) { + if strings.EqualFold(v, value) { return true } } diff --git a/listener/wal_transaction_test.go b/listener/wal_transaction_test.go index abf99c6f..97255e4f 100644 --- a/listener/wal_transaction_test.go +++ b/listener/wal_transaction_test.go @@ -1,11 +1,12 @@ package listener import ( - "github.com/jackc/pgx/pgtype" - "github.com/magiconair/properties/assert" "reflect" "testing" "time" + + "github.com/jackc/pgx/pgtype" + "github.com/magiconair/properties/assert" ) func TestWalTransaction_CreateActionData(t *testing.T) {