-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcommitmgr.go
206 lines (187 loc) · 5.68 KB
/
commitmgr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
package zkafka
import (
"context"
"sync"
"sync/atomic"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
)
// topicCommitMgr manages each subscribed topics commit manager.
type topicCommitMgr struct {
mtx *sync.RWMutex
topicToCommitMgr map[string]*commitMgr
}
func newTopicCommitMgr() *topicCommitMgr {
return &topicCommitMgr{
mtx: &sync.RWMutex{},
topicToCommitMgr: map[string]*commitMgr{}}
}
// get returns a topics commit manager in a thread safe way
func (t *topicCommitMgr) get(topicName string) *commitMgr {
t.mtx.RLock()
c, found := t.topicToCommitMgr[topicName]
t.mtx.RUnlock()
if !found {
t.mtx.Lock()
defer t.mtx.Unlock()
c, found = t.topicToCommitMgr[topicName]
if found {
return c
}
c = newCommitMgr()
t.topicToCommitMgr[topicName] = c
}
return c
}
// commitMgr manages inWork and completed commits. Its main responsibility is determining, through TryPop, the largest completed
// offset that can be safely committed (no outstanding inWork offsets with lower offset numbers). It manages these collections of offsets
// on a per-partition basis.
type commitMgr struct {
// mtx synchronizes access to per partition maps (partitionToMtx, partitionToInWork...)
mtx *sync.RWMutex
// partitionToMtx is a map of per partition managed mutexes. Used to synchronize mutations to offsetHeaps
partitionToMtx map[int32]*sync.Mutex
// partitionToInWork uses partition index as a key and has a heap as a value. It's responsible for allowing quick determination
// of the smallest inwork offset
partitionToInWork map[int32]*offsetHeap
// partitionToCompleted uses partition index as a key and has a heap as a value. It's responsible for allowing quick determination
// of the smallest completed offset
partitionToCompleted map[int32]*offsetHeap
// inWorkCount is a count of inflight work. Incremented when work is pushed to heap. Decremented when work is completed
inWorkCount int64
}
func newCommitMgr() *commitMgr {
return &commitMgr{
mtx: &sync.RWMutex{},
partitionToMtx: map[int32]*sync.Mutex{},
partitionToInWork: map[int32]*offsetHeap{},
partitionToCompleted: map[int32]*offsetHeap{},
}
}
// PushInWork pushes an offset to one of the managed inwork heaps.
func (c *commitMgr) PushInWork(tp kafka.TopicPartition) {
m := c.mutex(tp.Partition)
m.Lock()
heap := c.getInWorkHeap(tp.Partition)
heap.Push(tp)
atomic.AddInt64(&c.inWorkCount, 1)
m.Unlock()
}
// RemoveInWork removes an arbitrary partition from the heap (not necessarily the minimum).
// If the pop is successful (the partition is found) the in work count is decremented
func (c *commitMgr) RemoveInWork(tp kafka.TopicPartition) {
m := c.mutex(tp.Partition)
m.Lock()
heap := c.getInWorkHeap(tp.Partition)
if heap.SeekPop(tp) != nil {
atomic.AddInt64(&c.inWorkCount, -1)
}
m.Unlock()
}
// PushCompleted pushes an offset to one of the managed completed heaps
func (c *commitMgr) PushCompleted(tp kafka.TopicPartition) {
m := c.mutex(tp.Partition)
m.Lock()
heap := c.getCompletedHeap(tp.Partition)
heap.Push(tp)
atomic.AddInt64(&c.inWorkCount, -1)
m.Unlock()
}
func (c *commitMgr) InWorkCount() int64 {
return atomic.LoadInt64(&c.inWorkCount)
}
// TryPop returns the largest shared offset between inWork and completed. If
// none such offset exists nil is returned.
// TryPop is thread safe
func (c *commitMgr) TryPop(_ context.Context, partition int32) *kafka.TopicPartition {
m := c.mutex(partition)
m.Lock()
defer m.Unlock()
var commitOffset *kafka.TopicPartition
for {
inWorkPeek, err := c.peekInWork(partition)
if err != nil {
break
}
completedPeek, err := c.peekCompleted(partition)
if err != nil {
break
}
if completedPeek.Offset == inWorkPeek.Offset {
_ = c.popCompleted(partition)
_ = c.popInWork(partition)
commitOffset = &completedPeek
} else {
break
}
}
return commitOffset
}
// mutex returns a per partition mutex used for managing offset heaps at a per partition granularity
func (c *commitMgr) mutex(partition int32) *sync.Mutex {
c.mtx.RLock()
mtx, found := c.partitionToMtx[partition]
c.mtx.RUnlock()
if !found {
c.mtx.Lock()
defer c.mtx.Unlock()
mtx, found = c.partitionToMtx[partition]
if found {
return mtx
}
mtx = &sync.Mutex{}
c.partitionToMtx[partition] = mtx
}
return mtx
}
// getInWorkHeap returns the in work offsetHeap for a particular partition in a thread safe way
func (c *commitMgr) getInWorkHeap(partition int32) *offsetHeap {
c.mtx.RLock()
h, found := c.partitionToInWork[partition]
c.mtx.RUnlock()
if !found {
c.mtx.Lock()
defer c.mtx.Unlock()
h, found = c.partitionToInWork[partition]
if found {
return h
}
h = &offsetHeap{}
c.partitionToInWork[partition] = h
}
return h
}
// getCompletedHeap returns the completed offsetHeap for a particular partition in a thread safe way
func (c *commitMgr) getCompletedHeap(partition int32) *offsetHeap {
c.mtx.RLock()
h, found := c.partitionToCompleted[partition]
c.mtx.RUnlock()
if !found {
c.mtx.Lock()
defer c.mtx.Unlock()
h, found = c.partitionToCompleted[partition]
if found {
return h
}
h = &offsetHeap{}
c.partitionToCompleted[partition] = h
}
return h
}
func (c *commitMgr) popInWork(partition int32) kafka.TopicPartition {
heap := c.getInWorkHeap(partition)
out := heap.Pop()
return out
}
func (c *commitMgr) popCompleted(partition int32) kafka.TopicPartition {
heap := c.getCompletedHeap(partition)
out := heap.Pop()
return out
}
func (c *commitMgr) peekInWork(partition int32) (kafka.TopicPartition, error) {
heap := c.getInWorkHeap(partition)
return heap.Peek()
}
func (c *commitMgr) peekCompleted(partition int32) (kafka.TopicPartition, error) {
heap := c.getCompletedHeap(partition)
return heap.Peek()
}