Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
Add Sentry.
Fix logs.
Fix config.
Update Readme.
  • Loading branch information
ihippik committed Sep 10, 2022
1 parent 1e4004f commit a647180
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 95 deletions.
48 changes: 45 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ publishing events in a single transaction with a domain model change.
The service allows you to subscribe to changes in the PostgreSQL database using its logical decoding capability
and publish them to the NATS Streaming server.

### Logic of work
## Logic of work
To receive events about data changes in our PostgreSQL DB
  we use the standard logic decoding module (**pgoutput**) This module converts
changes read from the WAL into a logical replication protocol.
Expand Down Expand Up @@ -55,7 +55,16 @@ databases:
This filter means that we only process events occurring with the `users` table,
and in particular `insert` and `update` data.

### DB setting
### Topic mapping
By default, output NATS topic name consist of prefix, DB schema, and DB table name,
but if you want to send all update in one topic you should be configured the topic map:
```yaml
topicsMap:
main_users: "notifier"
main_customers: "notifier"
```
## DB setting
You must make the following settings in the db configuration (postgresql.conf)
* wal_level >= “logical”
* max_replication_slots >= 1
Expand All @@ -67,7 +76,40 @@ https://www.postgresql.org/docs/current/sql-createpublication.html
If you change the publication, do not forget to change the slot name or delete the current one.
### Docker
## Service configuration
```yaml
listener:
slotName: myslot_1
refreshConnection: 30s
heartbeatInterval: 10s
filter:
tables:
seasons:
- insert
- update
topicsMap:
schema_table_name: "notifier"
logger:
caller: false
level: info
format: json
database:
host: localhost
port: 5432
name: my_db
user: postgres
password: postgres
debug: false
nats:
address: localhost:4222
clusterID: test-cluster
clientID: wal-listener
topicPrefix: ""
monitoring:
sentryDSN: "dsn string"
```
## Docker
You can start the container from the project folder (configuration file is required)
Expand Down
51 changes: 46 additions & 5 deletions cmd/wal-listener/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"fmt"
"runtime/debug"

"github.com/evalphobia/logrus_sentry"
"github.com/jackc/pgx"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
Expand All @@ -18,6 +20,21 @@ const (
infoLoggerLevel = "info"
)

func getVersion() string {
var version = "unknown"

info, ok := debug.ReadBuildInfo()
if ok {
for _, item := range info.Settings {
if item.Key == "vcs.revision" {
version = item.Value[:4]
}
}
}

return version
}

// getConf load config from file.
func getConf(path string) (*config.Config, error) {
var cfg config.Config
Expand All @@ -36,11 +53,13 @@ func getConf(path string) (*config.Config, error) {
}

// initLogger init logrus preferences.
func initLogger(cfg config.LoggerCfg) {
logrus.SetReportCaller(cfg.Caller)
func initLogger(cfg config.LoggerCfg, version string) *logrus.Entry {
logger := logrus.New()

logger.SetReportCaller(cfg.Caller)

if !cfg.HumanReadable {
logrus.SetFormatter(&logrus.JSONFormatter{})
if cfg.Format == "json" {
logger.SetFormatter(&logrus.JSONFormatter{})
}

var level logrus.Level
Expand All @@ -58,7 +77,29 @@ func initLogger(cfg config.LoggerCfg) {
level = logrus.DebugLevel
}

logrus.SetLevel(level)
logger.SetLevel(level)

return logger.WithField("version", version)
}

func initSentry(dsn string, logger *logrus.Entry) {
if len(dsn) == 0 {
logger.Warnln("empty Sentry DSN")
return
}

hook, err := logrus_sentry.NewSentryHook(
dsn,
[]logrus.Level{
logrus.PanicLevel,
logrus.FatalLevel,
logrus.ErrorLevel,
},
)

if err == nil {
logger.Logger.AddHook(hook)
}
}

// initPgxConnections initialise db and replication connections.
Expand Down
10 changes: 6 additions & 4 deletions cmd/wal-listener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,15 @@ import (
"github.com/ihippik/wal-listener/listener"
)

// go build -ldflags "-X main.version=1.0.1" main.go
var version = "0.2.0"

func main() {
cli.VersionFlag = &cli.BoolFlag{
Name: "version",
Aliases: []string{"v"},
Usage: "print only the version",
}

version := getVersion()

app := &cli.App{
Name: "Wal-Listener",
Usage: "listen postgres events",
Expand All @@ -44,7 +43,9 @@ func main() {
return fmt.Errorf("validate config: %w", err)
}

initLogger(cfg.Logger)
logger := initLogger(cfg.Logger, version)

initSentry(cfg.Monitoring.SentryDSN, logger)

natsConn, err := stan.Connect(cfg.Nats.ClusterID, cfg.Nats.ClientID, stan.NatsURL(cfg.Nats.Address))
if err != nil {
Expand All @@ -58,6 +59,7 @@ func main() {

service := listener.NewWalListener(
cfg,
logger,
listener.NewRepository(conn),
rConn,
listener.NewNatsPublisher(natsConn),
Expand Down
25 changes: 16 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (

// Config for wal-listener/
type Config struct {
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger LoggerCfg
Listener ListenerCfg
Database DatabaseCfg
Nats NatsCfg
Logger LoggerCfg
Monitoring MonitoringCfg
}

// ListenerCfg path of the listener config.
Expand All @@ -20,21 +21,28 @@ type ListenerCfg struct {
AckTimeout time.Duration
RefreshConnection time.Duration `valid:"required"`
HeartbeatInterval time.Duration `valid:"required"`
Filter FilterStruct
TopicsMap map[string]string
}

// NatsCfg path of the NATS config.
type NatsCfg struct {
Address string `valid:"required"`
ClusterID string `valid:"required"`
ClientID string `valid:"required"`
TopicPrefix string `valid:"required"`
TopicPrefix string
}

// MonitoringCfg monitoring configuration.
type MonitoringCfg struct {
SentryDSN string
}

// LoggerCfg path of the logger config.
type LoggerCfg struct {
Caller bool
Level string
HumanReadable bool
Caller bool
Level string
Format string
}

// DatabaseCfg path of the PostgreSQL DB config.
Expand All @@ -44,7 +52,6 @@ type DatabaseCfg struct {
Name string `valid:"required"`
User string `valid:"required"`
Password string `valid:"required"`
Filter FilterStruct
}

// FilterStruct incoming WAL message filter.
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
bou.ke/monkey v1.0.2
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d
github.com/evalphobia/logrus_sentry v0.8.2
github.com/goccy/go-json v0.9.10
github.com/google/uuid v1.3.0
github.com/jackc/pgx v3.6.2+incompatible
Expand All @@ -17,10 +18,12 @@ require (
)

require (
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d // indirect
github.com/cockroachdb/apd v1.1.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/getsentry/raven-go v0.2.0 // indirect
github.com/gofrs/uuid v3.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d/go.mod h1:W
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d h1:S2NE3iHSwP0XV47EEXL8mWmRdEfGscSJ+7EgePNgt0s=
github.com/certifi/gocertifi v0.0.0-20210507211836-431795d63e8d/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
Expand All @@ -70,10 +72,14 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evalphobia/logrus_sentry v0.8.2 h1:dotxHq+YLZsT1Bb45bB5UQbfCh3gM/nFFetyN46VoDQ=
github.com/evalphobia/logrus_sentry v0.8.2/go.mod h1:pKcp+vriitUqu9KiWj/VRFbRfFNUwz95/UkgG8a6MNc=
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/getsentry/raven-go v0.2.0 h1:no+xWJRb5ZI7eE8TWgIq1jLulQiIoLG0IfYxv5JYMGs=
github.com/getsentry/raven-go v0.2.0/go.mod h1:KungGk8q33+aIAZUIVWZDr2OfAEBsO49PX4NzFV5kcQ=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down
Loading

0 comments on commit a647180

Please sign in to comment.