Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Remove easyjson.
  • Loading branch information
ihippik committed Aug 5, 2022
1 parent e776d7b commit 1e4004f
Show file tree
Hide file tree
Showing 15 changed files with 86 additions and 293 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
FROM alpine:3.10
FROM alpine:3.16

MAINTAINER Konstantin Makarov <[email protected]>
RUN adduser -D developer
WORKDIR /app
Expand Down
7 changes: 3 additions & 4 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"fmt"

"github.com/jackc/pgx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"

"github.com/ihippik/wal-listener/config"
"github.com/ihippik/wal-listener/listener"
)

// logger log levels.
Expand Down Expand Up @@ -40,6 +38,7 @@ func getConf(path string) (*config.Config, error) {
// initLogger init logrus preferences.
func initLogger(cfg config.LoggerCfg) {
logrus.SetReportCaller(cfg.Caller)

if !cfg.HumanReadable {
logrus.SetFormatter(&logrus.JSONFormatter{})
}
Expand Down Expand Up @@ -77,12 +76,12 @@ func initPgxConnections(cfg config.DatabaseCfg) (*pgx.Conn, *pgx.ReplicationConn

pgConn, err := pgx.Connect(pgxConf)
if err != nil {
return nil, nil, errors.Wrap(err, listener.ErrPostgresConnection)
return nil, nil, fmt.Errorf("db connection: %w", err)
}

rConnection, err := pgx.ReplicationConnect(pgxConf)
if err != nil {
return nil, nil, fmt.Errorf("%v: %w", listener.ErrReplicationConnection, err)
return nil, nil, fmt.Errorf("replication connect: %w", err)
}

return pgConn, rConnection, nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

// go build -ldflags "-X main.version=1.0.1" main.go
var version = "0.1.0"
var version = "0.2.0"

func main() {
cli.VersionFlag = &cli.BoolFlag{
Expand Down Expand Up @@ -46,7 +46,7 @@ func main() {

initLogger(cfg.Logger)

sc, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address))
natsConn, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address))
if err != nil {
return fmt.Errorf("nats connection: %w", err)
}
Expand All @@ -60,7 +60,7 @@ func main() {
cfg,
listener.NewRepository(conn),
rConn,
listener.NewNatsPublisher(sc),
listener.NewNatsPublisher(natsConn),
listener.NewBinaryParser(binary.BigEndian),
)

Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ go 1.19
require (
bou.ke/monkey v1.0.2
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/goccy/go-json v0.9.10
github.com/google/uuid v1.3.0
github.com/jackc/pgx v3.6.2+incompatible
github.com/magiconair/properties v1.8.6
github.com/mailru/easyjson v0.7.7
github.com/nats-io/stan.go v0.10.3
github.com/pkg/errors v0.9.1
github.com/sirupsen/logrus v1.9.0
github.com/spf13/viper v1.12.0
github.com/stretchr/testify v1.8.0
Expand All @@ -26,7 +25,6 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/lib/pq v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/nats-io/jwt v1.2.2 // indirect
Expand All @@ -38,6 +36,7 @@ require (
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 // indirect
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/goccy/go-json v0.9.10 h1:hCeNmprSNLB8B8vQKWl6DpuH0t60oEs+TAk9a7CScKc=
github.com/goccy/go-json v0.9.10/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
Expand Down Expand Up @@ -164,8 +166,6 @@ github.com/jackc/fake v0.0.0-20150926172116-812a484cc733 h1:vr3AYkKovP8uR8AvSGGU
github.com/jackc/fake v0.0.0-20150926172116-812a484cc733/go.mod h1:WrMFNQdiFJ80sQsxDoMokWK1W5TQtxBFNpzWTD84ibQ=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
Expand All @@ -181,8 +181,6 @@ github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamhfG/Qzo=
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.1.12 h1:jF+Du6AlPIjs2BiUiQlKOX0rt3SujHxPnksPKZbaA40=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down
10 changes: 0 additions & 10 deletions listener/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,6 @@ package listener

import "errors"

// Constants with error text message
const (
ErrPostgresConnection = "db connection error"
ErrReplicationConnection = "replication connection error"
ErrPublishEvent = "publish message error"
ErrUnmarshalMsg = "unmarshal wal message error"
ErrAckWalMessage = "acknowledge wal message error"
ErrSendStandbyStatus = "send standby status error"
)

// Variable with connection errors.
var (
errReplConnectionIsLost = errors.New("replication connection to postgres is lost")
Expand Down
88 changes: 33 additions & 55 deletions listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/jackc/pgx"
Expand All @@ -19,15 +18,7 @@ import (
const errorBufferSize = 100

// Logical decoding plugin.
const (
pgOutputPlugin = "pgoutput"
)

// Service info message.
const (
StartServiceMessage = "service was started"
StopServiceMessage = "service was stopped"
)
const pgOutputPlugin = "pgoutput"

type publisher interface {
Publish(string, Event) error
Expand Down Expand Up @@ -89,14 +80,14 @@ func NewWalListener(

// Process is main service entry point.
func (l *Listener) Process(ctx context.Context) error {
var serviceErr *serviceErr
var svcErr serviceErr

logger := logrus.WithField("slot_name", l.slotName)

ctx, cancelFunc := context.WithCancel(ctx)
defer cancelFunc()
ctx, stop := signal.NotifyContext(ctx, os.Interrupt)
defer stop()

logger.WithField("logger_level", l.config.Logger.Level).Infoln(StartServiceMessage)
logger.Infoln("service was started")

if err := l.repository.CreatePublication(publicationName); err != nil {
logger.WithError(err).Warnln("skip create publication")
Expand All @@ -119,41 +110,36 @@ func (l *Listener) Process(ctx context.Context) error {
}

l.setLSN(lsn)

logger.Infoln("create new slot")
} else {
logger.Infoln("slot already exists, LSN updated")
}

go l.Stream(ctx)

signalChan := make(chan os.Signal, 1)
refresh := time.NewTicker(l.config.Listener.RefreshConnection)

signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
defer refresh.Stop()

ProcessLoop:
for {
select {
case <-refresh.C:
if !l.replicator.IsAlive() {
logrus.Fatalln(errReplConnectionIsLost)
return fmt.Errorf("replicator: %w", errReplConnectionIsLost)
}

if !l.repository.IsAlive() {
logrus.Fatalln(errConnectionIsLost)
return fmt.Errorf("repository: %w", errConnectionIsLost)
}

case err := <-l.errChannel:
if errors.As(err, &serviceErr) {
cancelFunc()

logrus.Fatalln(err)
} else {
logrus.Errorln(err)
if errors.As(err, svcErr) {
return err
}

case <-signalChan:
logrus.WithError(err).Errorln("received error")
case <-ctx.Done():
logrus.Debugln("context was canceled")

if err := l.Stop(); err != nil {
logrus.WithError(err).Errorln("listener stop error")
}
Expand All @@ -169,18 +155,11 @@ ProcessLoop:
func (l *Listener) slotIsExists() (bool, error) {
restartLSNStr, err := l.repository.GetSlotLSN(l.slotName)
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
logrus.
WithField("slot", l.slotName).
Warningln("restart_lsn for slot not found")

return false, nil
}

return false, err
}

if len(restartLSNStr) == 0 {
logrus.WithField("slot_name", l.slotName).Warningln("restart LSN not found")
return false, nil
}

Expand All @@ -206,8 +185,13 @@ const (
// Stream receive event from PostgreSQL.
// Accept message, apply filter and publish it in NATS server.
func (l *Listener) Stream(ctx context.Context) {
err := l.replicator.StartReplication(l.slotName, l.readLSN(), -1, protoVersion, publicationNames(publicationName))
if err != nil {
if err := l.replicator.StartReplication(
l.slotName,
l.readLSN(),
-1,
protoVersion,
publicationNames(publicationName),
); err != nil {
l.errChannel <- newListenerError("StartReplication()", err)

return
Expand All @@ -217,7 +201,7 @@ func (l *Listener) Stream(ctx context.Context) {

tx := NewWalTransaction()
for {
if ctx.Err() != nil {
if err := ctx.Err(); err != nil {
l.errChannel <- newListenerError("read msg", err)
break
}
Expand All @@ -230,23 +214,22 @@ func (l *Listener) Stream(ctx context.Context) {

if msg != nil {
if msg.WalMessage != nil {
logrus.WithField("wal", msg.WalMessage.WalStart).
Debugln("receive wal message")
logrus.WithField("wal", msg.WalMessage.WalStart).Debugln("receive wal message")

if err := l.parser.ParseWalMessage(msg.WalMessage.WalData, tx); err != nil {
logrus.WithError(err).Errorln("msg parse failed")
l.errChannel <- fmt.Errorf("%v: %w", ErrUnmarshalMsg, err)
l.errChannel <- fmt.Errorf("unmarshal wal message: %w", err)

continue
}

if tx.CommitTime != nil {
natsEvents := tx.CreateEventsWithFilter(l.config.Database.Filter.Tables)
for _, event := range natsEvents {
subjectName := event.GetSubjectName(l.config.Nats.TopicPrefix)
if err = l.publisher.Publish(subjectName, event); err != nil {
l.errChannel <- fmt.Errorf("%v: %w", ErrPublishEvent, err)
subjectName := event.SubjectName(l.config.Nats.TopicPrefix)

if err = l.publisher.Publish(subjectName, event); err != nil {
l.errChannel <- fmt.Errorf("publish message: %w", err)
continue
}

Expand All @@ -255,36 +238,33 @@ func (l *Listener) Stream(ctx context.Context) {
WithField("action", event.Action).
WithField("lsn", l.readLSN()).
Infoln("event was send")

}

tx.Clear()
}

if msg.WalMessage.WalStart > l.readLSN() {
if err = l.AckWalMessage(msg.WalMessage.WalStart); err != nil {
l.errChannel <- fmt.Errorf("%v: %w", ErrAckWalMessage, err)

l.errChannel <- fmt.Errorf("acknowledge wal message: %w", err)
continue
}

logrus.WithField("lsn", l.readLSN()).Debugln("ack wal msg")

}
}

if msg.ServerHeartbeat != nil {
//FIXME panic if there have been no messages for a long time.
logrus.WithFields(logrus.Fields{
"server_wal_end": msg.ServerHeartbeat.ServerWalEnd,
"server_time": msg.ServerHeartbeat.ServerTime,
}).
Debugln("received server heartbeat")
}).Debugln("received server heartbeat")

if msg.ServerHeartbeat.ReplyRequested == 1 {
logrus.Debugln("status requested")

if err = l.SendStandbyStatus(); err != nil {
l.errChannel <- fmt.Errorf("%v: %w", ErrSendStandbyStatus, err)
l.errChannel <- fmt.Errorf("send standby status: %w", err)
}
}
}
Expand All @@ -294,7 +274,6 @@ func (l *Listener) Stream(ctx context.Context) {

// Stop is a finalizer function.
func (l *Listener) Stop() error {

if err := l.publisher.Close(); err != nil {
return fmt.Errorf("publisher close: %w", err)
}
Expand All @@ -307,7 +286,7 @@ func (l *Listener) Stop() error {
return fmt.Errorf("replicator close: %w", err)
}

logrus.Infoln(StopServiceMessage)
logrus.Infoln("service was stopped")

return nil
}
Expand All @@ -328,7 +307,6 @@ func (l *Listener) SendPeriodicHeartbeats(ctx context.Context) {
{
if err := l.SendStandbyStatus(); err != nil {
logrus.WithError(err).Errorln("failed to send status heartbeat")

continue
}

Expand Down
Loading

0 comments on commit 1e4004f

Please sign in to comment.