-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmessage.go
69 lines (54 loc) · 1.31 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package postgres
import (
"sync/atomic"
"time"
"github.com/Bofry/trace"
"github.com/jackc/pglogrepl"
)
var (
_ trace.TracerTagMarshaler = new(Message)
)
type Message struct {
Slot string
Delegate MessageDelegate
consumedXLogPos pglogrepl.LSN
data *pglogrepl.XLogData
database string
systemID string
responded int32
}
func (m *Message) SystemID() string {
return m.systemID
}
func (m *Message) Database() string {
return m.database
}
func (m *Message) StartLSN() LSN {
return m.data.WALStart
}
func (m *Message) Timestamp() time.Time {
return m.data.ServerTime
}
func (m *Message) Body() []byte {
return m.data.WALData
}
func (m *Message) HasResponded() bool {
return atomic.LoadInt32(&m.responded) == 1
}
func (m *Message) Clone() *Message {
cloned := *m
return &cloned
}
// MarshalTracerTag implements trace.TracerTagMarshaler.
func (m *Message) MarshalTracerTag(builder *trace.TracerTagBuilder) error {
builder.String("system_id", m.SystemID())
builder.String("database", m.Database())
builder.String("lsn", m.StartLSN().String())
builder.String("timestamp", m.Timestamp().UTC().String())
builder.String("slot", m.Slot)
builder.String("body", string(m.Body()))
return nil
}
func (m *Message) canAck() bool {
return atomic.CompareAndSwapInt32(&m.responded, 0, 1)
}