-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.go
121 lines (109 loc) · 2.68 KB
/
util.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package postgres
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgx/v5/pgconn"
)
func ParseReplicationMode(s string) (ReplicationMode, error) {
switch strings.ToUpper(s) {
case LogicalReplication.String():
return LogicalReplication, nil
case PhysicalReplication.String():
return PhysicalReplication, nil
}
return 0, fmt.Errorf("unsupported slot type '%s'", s)
}
func SelectReplicationSlot(ctx context.Context, conn *pgconn.PgConn, slots []string) (records []ReplicationSlotSource, err error) {
if len(slots) == 0 {
return
}
var slotParam []string = make([]string, len(slots))
for i, slot := range slots {
param, err := conn.EscapeString(slot)
if err != nil {
return nil, err
}
slotParam[i] = "'" + param + "'"
}
sql := fmt.Sprintf(__SQL_SELECT_REPLICATION_SLOT, strings.Join(slotParam, ","))
reader := conn.Exec(ctx, sql)
result, err := reader.ReadAll()
if err != nil {
return
}
for _, r := range result {
if len(r.Rows) > 0 {
records = make([]ReplicationSlotSource, len(r.Rows))
for i, v := range r.Rows {
r := ReplicationSlotSource{
SlotName: string(v[0]),
Plugin: string(v[1]),
Database: string(v[3]),
}
{
t, err := ParseReplicationMode(string(v[2]))
if err != nil {
return nil, err
}
r.SlotType = t
}
{
b, err := strconv.ParseBool(string(v[4]))
if err != nil {
return nil, err
}
r.Temporary = b
}
{
b, err := strconv.ParseBool(string(v[5]))
if err != nil {
return nil, err
}
r.Active = b
}
r.RestartLSN.Scan(string(v[6]))
r.ConfirmedFlushLSN.Scan(string(v[7]))
records[i] = r
}
return
}
}
return
}
func IsDuplicateObjectError(err error) bool {
if verr, ok := err.(*pgconn.PgError); ok {
return verr.Code == __PG_ERRCODE_DUPLICATE_OBJECT
}
return false
}
func CreateReplicationSlot(ctx context.Context, conn *pgconn.PgConn, provider CreateReplicationSlotSourceProvider) error {
for _, source := range provider.sources {
_, err := pglogrepl.CreateReplicationSlot(ctx, conn,
source.SlotName,
source.Plugin,
pglogrepl.CreateReplicationSlotOptions{
Temporary: source.Temporary,
Mode: source.SlotType,
})
if err != nil {
return err
}
}
return nil
}
func NewConn(config *Config) (*pgconn.PgConn, error) {
config.init()
c, err := pgconn.ParseConfig(fmt.Sprintf("postgres://%s?replication=database", config.Host))
if err != nil {
return nil, err
}
c.Port = config.Port
c.User = config.User
c.Password = config.Password
c.Database = config.Database
c.ConnectTimeout = config.ConnectTimeout
return pgconn.ConnectConfig(context.Background(), c)
}