Skip to content

Commit

Permalink
Static check
Browse files Browse the repository at this point in the history
Fixed comments linter.
  • Loading branch information
ihippik committed Mar 12, 2020
1 parent bab7d22 commit af9ed41
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 52 deletions.
1 change: 1 addition & 0 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"fmt"

"github.com/ihippik/wal-listener/listener"
"github.com/jackc/pgx"
"github.com/pkg/errors"
Expand Down
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,38 @@ import (
"github.com/asaskevich/govalidator"
)

// Config for wal-listener/
type Config struct {
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger LoggerCfg
}

// ListenerCfg path of the listener config.
type ListenerCfg struct {
SlotName string `valid:"required"`
AckTimeout time.Duration `valid:"required"`
RefreshConnection time.Duration `valid:"required"`
HeartbeatInterval time.Duration `valid:"required"`
}

// NatsCfg path of the NATS config.
type NatsCfg struct {
Address string `valid:"required"`
ClusterID string `valid:"required"`
ClientID string `valid:"required"`
TopicPrefix string `valid:"required"`
}

// LoggerCfg path of the logger config.
type LoggerCfg struct {
Caller bool
Level string
HumanReadable bool
}

// DatabaseCfg path of the PostgreSQL DB config.
type DatabaseCfg struct {
Host string `valid:"required"`
Port uint16 `valid:"required"`
Expand All @@ -41,10 +47,12 @@ type DatabaseCfg struct {
Filter FilterStruct
}

// FilterStruct incoming WAL message filter.
type FilterStruct struct {
Tables map[string][]string
}

// Validate config data.
func (c Config) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
Expand Down
21 changes: 14 additions & 7 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Listener struct {
errChannel chan error
}

// NewWalListener create and initialize new service instance.
func NewWalListener(
cfg *config.Config,
repo repository,
Expand All @@ -88,23 +89,27 @@ func (l *Listener) Process() error {
var serviceErr *serviceErr
logger := logrus.WithField("slot_name", l.slotName)
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
logrus.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage)

slotIsExists, err := l.slotIsExists()
if err != nil {
logger.WithError(err).Errorln("slotIsExists() error")
return err
}

if !slotIsExists {
consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgoutputPlugin)
if err != nil {
logger.WithError(err).Infoln("CreateReplicationSlotEx() error")
return err
}
l.LSN, err = pgx.ParseLSN(consistentPoint)
logger.Infoln("create new slot")
if err != nil {
logger.WithError(err).Errorln("slotIsExists() error")
return err
}
logger.Infoln("create new slot")
} else {
logger.Infoln("slot already exists, LSN updated")
}
Expand All @@ -113,11 +118,11 @@ func (l *Listener) Process() error {

signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
refresh := time.Tick(l.config.Listener.RefreshConnection)
refresh := time.NewTicker(l.config.Listener.RefreshConnection)
ProcessLoop:
for {
select {
case <-refresh:
case <-refresh.C:
if !l.replicator.IsAlive() {
logrus.Fatalln(errReplConnectionIsLost)
}
Expand All @@ -134,10 +139,9 @@ ProcessLoop:
}

case <-signalChan:
cancelFunc()
err := l.Stop()
if err != nil {
logrus.Errorln(err)
logrus.WithError(err).Errorln("l.Stop() error")
}
break ProcessLoop
}
Expand All @@ -150,6 +154,9 @@ func (l *Listener) slotIsExists() (bool, error) {
restartLSNStr, err := l.repository.GetSlotLSN(l.slotName)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
logrus.
WithField("slot", l.slotName).
Warningln("restart_lsn for slot not found")
return false, nil
}
return false, err
Expand Down Expand Up @@ -268,15 +275,15 @@ func (l *Listener) Stop() error {
return nil
}

// SendPeriodicHeartbeats send periodic keep alive hearbeats to the server.
// SendPeriodicHeartbeats send periodic keep alive heartbeats to the server.
func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) {
for {
select {
case <-ctx.Done():
logrus.WithField("func", "SendPeriodicHeartbeats").
Infoln("context was canceled, stop sending heartbeats")
return
case <-time.Tick(l.config.Listener.HeartbeatInterval):
case <-time.NewTicker(l.config.Listener.HeartbeatInterval).C:
{
err := l.SendStandbyStatus()
if err != nil {
Expand Down
86 changes: 67 additions & 19 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"bytes"
"context"
"errors"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"testing"
"time"

"github.com/google/uuid"

"bou.ke/monkey"
"github.com/jackc/pgx"
"github.com/stretchr/testify/assert"
Expand All @@ -17,7 +17,7 @@ import (
"github.com/ihippik/wal-listener/config"
)

var someErr = errors.New("some err")
var errSimple = errors.New("some err")

func TestListener_slotIsExists(t *testing.T) {
repo := new(repositoryMock)
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestListener_slotIsExists(t *testing.T) {
{
name: "repository error",
setup: func() {
setGetSlotLSN("myslot", "", someErr)
setGetSlotLSN("myslot", "", errSimple)
},
fields: fields{
slotName: "myslot",
Expand Down Expand Up @@ -194,7 +194,6 @@ func TestListener_Stop(t *testing.T) {
func TestListener_SendStandbyStatus(t *testing.T) {
repl := new(replicatorMock)
type fields struct {
status *pgx.StandbyStatus
restartLSN uint64
}

Expand Down Expand Up @@ -236,7 +235,7 @@ func TestListener_SendStandbyStatus(t *testing.T) {
wantErr: false,
},
{
name: "success",
name: "some err",
setup: func() {
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand All @@ -246,7 +245,7 @@ func TestListener_SendStandbyStatus(t *testing.T) {
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
someErr,
errSimple,
)
},
fields: fields{
Expand Down Expand Up @@ -331,7 +330,7 @@ func TestListener_AckWalMessage(t *testing.T) {
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
someErr,
errSimple,
)
},
fields: fields{
Expand Down Expand Up @@ -360,7 +359,7 @@ func TestListener_AckWalMessage(t *testing.T) {
}

func TestListener_Stream(t *testing.T) {
logrus.SetLevel(logrus.FatalLevel)
//logrus.SetLevel(logrus.FatalLevel)
repo := new(repositoryMock)
publ := new(publisherMock)
repl := new(replicatorMock)
Expand Down Expand Up @@ -431,6 +430,16 @@ func TestListener_Stream(t *testing.T) {
protoVersion,
"publication_names 'sport'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 0,
WalFlushPosition: 0,
WalApplyPosition: 0,
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
nil,
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 10,
Expand Down Expand Up @@ -488,7 +497,7 @@ func TestListener_Stream(t *testing.T) {
Listener: config.ListenerCfg{
SlotName: "myslot",
AckTimeout: 0,
HeartbeatInterval: 0,
HeartbeatInterval: 1,
},
Database: config.DatabaseCfg{
Filter: config.FilterStruct{
Expand All @@ -510,7 +519,7 @@ func TestListener_Stream(t *testing.T) {
name: "start replication err",
setup: func() {
setStartReplication(
someErr,
errSimple,
"myslot",
uint64(0),
int64(-1),
Expand All @@ -523,7 +532,7 @@ func TestListener_Stream(t *testing.T) {
Listener: config.ListenerCfg{
SlotName: "myslot",
AckTimeout: 0,
HeartbeatInterval: 0,
HeartbeatInterval: 1,
},
Database: config.DatabaseCfg{
Filter: config.FilterStruct{
Expand Down Expand Up @@ -552,6 +561,16 @@ func TestListener_Stream(t *testing.T) {
protoVersion,
"publication_names 'sport'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 0,
WalFlushPosition: 0,
WalApplyPosition: 0,
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
nil,
)
setWaitForReplicationMessage(
&pgx.ReplicationMessage{
WalMessage: &pgx.WalMessage{
Expand All @@ -566,15 +585,15 @@ func TestListener_Stream(t *testing.T) {
ReplyRequested: 1,
},
},
someErr,
errSimple,
)
},
fields: fields{
config: config.Config{
Listener: config.ListenerCfg{
SlotName: "myslot",
AckTimeout: 0,
HeartbeatInterval: 0,
HeartbeatInterval: 1,
},
Database: config.DatabaseCfg{
Filter: config.FilterStruct{
Expand Down Expand Up @@ -603,6 +622,16 @@ func TestListener_Stream(t *testing.T) {
protoVersion,
"publication_names 'sport'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 0,
WalFlushPosition: 0,
WalApplyPosition: 0,
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
nil,
)
setWaitForReplicationMessage(
&pgx.ReplicationMessage{
WalMessage: &pgx.WalMessage{
Expand All @@ -628,15 +657,15 @@ func TestListener_Stream(t *testing.T) {
RelationStore: make(map[int32]RelationData),
Actions: nil,
},
someErr,
errSimple,
)
},
fields: fields{
config: config.Config{
Listener: config.ListenerCfg{
SlotName: "myslot",
AckTimeout: 0,
HeartbeatInterval: 0,
HeartbeatInterval: 1,
},
Database: config.DatabaseCfg{
Filter: config.FilterStruct{
Expand Down Expand Up @@ -665,6 +694,26 @@ func TestListener_Stream(t *testing.T) {
protoVersion,
"publication_names 'sport'",
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 10,
WalFlushPosition: 10,
WalApplyPosition: 10,
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
nil,
)
setSendStandbyStatus(
&pgx.StandbyStatus{
WalWritePosition: 0,
WalFlushPosition: 0,
WalApplyPosition: 0,
ClientTime: 18445935546232551617,
ReplyRequested: 0,
},
nil,
)
setWaitForReplicationMessage(
&pgx.ReplicationMessage{
WalMessage: &pgx.WalMessage{
Expand Down Expand Up @@ -703,7 +752,7 @@ func TestListener_Stream(t *testing.T) {
Data: map[string]interface{}{"id": 1},
EventTime: wayback,
},
someErr,
errSimple,
)
setSendStandbyStatus(
&pgx.StandbyStatus{
Expand All @@ -721,7 +770,7 @@ func TestListener_Stream(t *testing.T) {
Listener: config.ListenerCfg{
SlotName: "myslot",
AckTimeout: 0,
HeartbeatInterval: 0,
HeartbeatInterval: 1,
},
Database: config.DatabaseCfg{
Filter: config.FilterStruct{
Expand Down Expand Up @@ -757,7 +806,6 @@ func TestListener_Stream(t *testing.T) {
go func() {
<-w.errChannel
cancel()

}()
w.Stream(ctx)
repl.AssertExpectations(t)
Expand Down
Loading

0 comments on commit af9ed41

Please sign in to comment.