Skip to content

Commit

Permalink
Remove race condition
Browse files Browse the repository at this point in the history
Add rwMutex.
  • Loading branch information
ihippik committed Apr 29, 2020
1 parent af9ed41 commit d7a8276
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 14 deletions.
38 changes: 27 additions & 11 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -19,7 +20,7 @@ const errorBufferSize = 100

// Logical decoding plugin.
const (
pgoutputPlugin = "pgoutput"
pgOutputPlugin = "pgoutput"
)

// Service info message.
Expand Down Expand Up @@ -55,13 +56,14 @@ type repository interface {

// Listener main service struct.
type Listener struct {
mu sync.RWMutex
config config.Config
slotName string
publisher publisher
replicator replication
repository repository
parser parser
LSN uint64
lsn uint64
errChannel chan error
}

Expand All @@ -84,6 +86,18 @@ func NewWalListener(
}
}

func (l *Listener) readLSN() uint64 {
l.mu.RLock()
defer l.mu.RUnlock()
return l.lsn
}

func (l *Listener) setLSN(lsn uint64) {
l.mu.Lock()
l.lsn = lsn
defer l.mu.Unlock()
}

// Process is main service entry point.
func (l *Listener) Process() error {
var serviceErr *serviceErr
Expand All @@ -99,16 +113,17 @@ func (l *Listener) Process() error {
}

if !slotIsExists {
consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgoutputPlugin)
consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgOutputPlugin)
if err != nil {
logger.WithError(err).Infoln("CreateReplicationSlotEx() error")
return err
}
l.LSN, err = pgx.ParseLSN(consistentPoint)
lsn, err := pgx.ParseLSN(consistentPoint)
if err != nil {
logger.WithError(err).Errorln("slotIsExists() error")
return err
}
l.setLSN(lsn)
logger.Infoln("create new slot")
} else {
logger.Infoln("slot already exists, LSN updated")
Expand Down Expand Up @@ -164,10 +179,11 @@ func (l *Listener) slotIsExists() (bool, error) {
if len(restartLSNStr) == 0 {
return false, nil
}
l.LSN, err = pgx.ParseLSN(restartLSNStr)
lsn, err := pgx.ParseLSN(restartLSNStr)
if err != nil {
return false, err
}
l.setLSN(lsn)
return true, nil
}

Expand All @@ -180,7 +196,7 @@ const protoVersion = "proto_version '1'"
// Stream receive event from PostgreSQL.
// Accept message, apply filter and publish it in NATS server.
func (l *Listener) Stream(ctx context.Context) {
err := l.replicator.StartReplication(l.slotName, l.LSN, -1, protoVersion, publicationNames("sport"))
err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames("sport"))
if err != nil {
l.errChannel <- newListenerError("StartReplication()", err)
return
Expand Down Expand Up @@ -220,20 +236,20 @@ func (l *Listener) Stream(ctx context.Context) {
logrus.
WithField("subject", subjectName).
WithField("action", event.Action).
WithField("lsn", l.LSN).
WithField("lsn", l.readLSN()).
Infoln("event was send")
}
}
tx.Clear()
}

if msg.WalMessage.WalStart > l.LSN {
if msg.WalMessage.WalStart > l.readLSN() {
err = l.AckWalMessage(msg.WalMessage.WalStart)
if err != nil {
l.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err)
continue
} else {
logrus.WithField("lsn", l.LSN).Debugln("ack wal msg")
logrus.WithField("lsn", l.readLSN()).Debugln("ack wal msg")
}
}
}
Expand Down Expand Up @@ -298,7 +314,7 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) {

// SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server.
func (l *Listener) SendStandbyStatus() error {
standbyStatus, err := pgx.NewStandbyStatus(l.LSN)
standbyStatus, err := pgx.NewStandbyStatus(l.readLSN())
if err != nil {
return fmt.Errorf("unable to create StandbyStatus object: %w", err)
}
Expand All @@ -312,7 +328,7 @@ func (l *Listener) SendStandbyStatus() error {

// AckWalMessage acknowledge received wal message.
func (l *Listener) AckWalMessage(lsn uint64) error {
l.LSN = lsn
l.setLSN(lsn)
err := l.SendStandbyStatus()
if err != nil {
return err
Expand Down
6 changes: 3 additions & 3 deletions listener/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func TestListener_SendStandbyStatus(t *testing.T) {
tt.setup()
w := &Listener{
replicator: repl,
LSN: tt.fields.restartLSN,
lsn: tt.fields.restartLSN,
}
if err := w.SendStandbyStatus(); (err != nil) != tt.wantErr {
t.Errorf("SendStandbyStatus() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -347,7 +347,7 @@ func TestListener_AckWalMessage(t *testing.T) {
tt.setup()
w := &Listener{
replicator: repl,
LSN: tt.fields.restartLSN,
lsn: tt.fields.restartLSN,
}
if err := w.AckWalMessage(tt.args.LSN); (err != nil) != tt.wantErr {
t.Errorf("AckWalMessage() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -800,7 +800,7 @@ func TestListener_Stream(t *testing.T) {
replicator: repl,
repository: repo,
parser: prs,
LSN: tt.fields.restartLSN,
lsn: tt.fields.restartLSN,
errChannel: make(chan error, errorBufferSize),
}
go func() {
Expand Down

0 comments on commit d7a8276

Please sign in to comment.