diff --git a/listener/listener.go b/listener/listener.go index b626d35b..c68fb5f2 100644 --- a/listener/listener.go +++ b/listener/listener.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/signal" + "sync" "syscall" "time" @@ -19,7 +20,7 @@ const errorBufferSize = 100 // Logical decoding plugin. const ( - pgoutputPlugin = "pgoutput" + pgOutputPlugin = "pgoutput" ) // Service info message. @@ -55,13 +56,14 @@ type repository interface { // Listener main service struct. type Listener struct { + mu sync.RWMutex config config.Config slotName string publisher publisher replicator replication repository repository parser parser - LSN uint64 + lsn uint64 errChannel chan error } @@ -84,6 +86,18 @@ func NewWalListener( } } +func (l *Listener) readLSN() uint64 { + l.mu.RLock() + defer l.mu.RUnlock() + return l.lsn +} + +func (l *Listener) setLSN(lsn uint64) { + l.mu.Lock() + l.lsn = lsn + defer l.mu.Unlock() +} + // Process is main service entry point. func (l *Listener) Process() error { var serviceErr *serviceErr @@ -99,16 +113,17 @@ func (l *Listener) Process() error { } if !slotIsExists { - consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgoutputPlugin) + consistentPoint, _, err := l.replicator.CreateReplicationSlotEx(l.slotName, pgOutputPlugin) if err != nil { logger.WithError(err).Infoln("CreateReplicationSlotEx() error") return err } - l.LSN, err = pgx.ParseLSN(consistentPoint) + lsn, err := pgx.ParseLSN(consistentPoint) if err != nil { logger.WithError(err).Errorln("slotIsExists() error") return err } + l.setLSN(lsn) logger.Infoln("create new slot") } else { logger.Infoln("slot already exists, LSN updated") @@ -164,10 +179,11 @@ func (l *Listener) slotIsExists() (bool, error) { if len(restartLSNStr) == 0 { return false, nil } - l.LSN, err = pgx.ParseLSN(restartLSNStr) + lsn, err := pgx.ParseLSN(restartLSNStr) if err != nil { return false, err } + l.setLSN(lsn) return true, nil } @@ -180,7 +196,7 @@ const protoVersion = "proto_version '1'" // Stream receive event from PostgreSQL. // Accept message, apply filter and publish it in NATS server. func (l *Listener) Stream(ctx context.Context) { - err := l.replicator.StartReplication(l.slotName, l.LSN, -1, protoVersion, publicationNames("sport")) + err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames("sport")) if err != nil { l.errChannel <- newListenerError("StartReplication()", err) return @@ -220,20 +236,20 @@ func (l *Listener) Stream(ctx context.Context) { logrus. WithField("subject", subjectName). WithField("action", event.Action). - WithField("lsn", l.LSN). + WithField("lsn", l.readLSN()). Infoln("event was send") } } tx.Clear() } - if msg.WalMessage.WalStart > l.LSN { + if msg.WalMessage.WalStart > l.readLSN() { err = l.AckWalMessage(msg.WalMessage.WalStart) if err != nil { l.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err) continue } else { - logrus.WithField("lsn", l.LSN).Debugln("ack wal msg") + logrus.WithField("lsn", l.readLSN()).Debugln("ack wal msg") } } } @@ -298,7 +314,7 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) { // SendStandbyStatus sends a `StandbyStatus` object with the current RestartLSN value to the server. func (l *Listener) SendStandbyStatus() error { - standbyStatus, err := pgx.NewStandbyStatus(l.LSN) + standbyStatus, err := pgx.NewStandbyStatus(l.readLSN()) if err != nil { return fmt.Errorf("unable to create StandbyStatus object: %w", err) } @@ -312,7 +328,7 @@ func (l *Listener) SendStandbyStatus() error { // AckWalMessage acknowledge received wal message. func (l *Listener) AckWalMessage(lsn uint64) error { - l.LSN = lsn + l.setLSN(lsn) err := l.SendStandbyStatus() if err != nil { return err diff --git a/listener/listener_test.go b/listener/listener_test.go index 6a9d4ca1..c19d1adc 100644 --- a/listener/listener_test.go +++ b/listener/listener_test.go @@ -259,7 +259,7 @@ func TestListener_SendStandbyStatus(t *testing.T) { tt.setup() w := &Listener{ replicator: repl, - LSN: tt.fields.restartLSN, + lsn: tt.fields.restartLSN, } if err := w.SendStandbyStatus(); (err != nil) != tt.wantErr { t.Errorf("SendStandbyStatus() error = %v, wantErr %v", err, tt.wantErr) @@ -347,7 +347,7 @@ func TestListener_AckWalMessage(t *testing.T) { tt.setup() w := &Listener{ replicator: repl, - LSN: tt.fields.restartLSN, + lsn: tt.fields.restartLSN, } if err := w.AckWalMessage(tt.args.LSN); (err != nil) != tt.wantErr { t.Errorf("AckWalMessage() error = %v, wantErr %v", err, tt.wantErr) @@ -800,7 +800,7 @@ func TestListener_Stream(t *testing.T) { replicator: repl, repository: repo, parser: prs, - LSN: tt.fields.restartLSN, + lsn: tt.fields.restartLSN, errChannel: make(chan error, errorBufferSize), } go func() {