-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconsumer.go
125 lines (107 loc) · 2.41 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package consumer
import (
"fmt"
"time"
)
type Consumer interface {
SetMaxWait(i int) error
SetChannelCap(i int) error
GetChannel() <-chan Record
}
type Record struct {
ArrivalTimestamp time.Time
Data []byte
}
type kinesisConsumer struct {
app, stream, table string
shards []string
maxWait int
recordCh chan Record
ddbsvc *ddbClient
kinesissvc *kinesisClient
}
func New(appName, streamName, tableName string) (Consumer, error) {
kc := newKinesisClient(streamName)
ss, err := kc.getShards()
if err != nil {
return nil, err
}
dc := newDdbClient(appName, streamName, tableName)
go dc.runSave()
return &kinesisConsumer{
app: appName,
stream: streamName,
table: tableName,
shards: ss,
maxWait: 20,
recordCh: make(chan Record),
ddbsvc: dc,
kinesissvc: kc,
}, nil
}
func (c *kinesisConsumer) SetMaxWait(i int) error {
if i < 0 {
return ErrInvalidMaxWait
}
c.maxWait = i
return nil
}
func (c *kinesisConsumer) SetChannelCap(i int) error {
if i < 0 {
return ErrInvalidChannelCap
}
c.recordCh = make(chan Record, i)
return nil
}
func (c *kinesisConsumer) GetChannel() <-chan Record {
for _, v := range c.shards {
go c.readLoop(v)
}
return c.recordCh
}
func (c *kinesisConsumer) readLoop(shardID string) {
seqNum, err := c.ddbsvc.getSequenceNumber(shardID)
if err != nil {
// TODO:
fmt.Println(err)
}
iterator, err := c.kinesissvc.getShardIterator(shardID, seqNum)
if err != nil {
// TODO:
fmt.Println(err)
}
sleepTime := 0
for {
resp, err := c.kinesissvc.getRecords(iterator)
if err != nil {
// TODO:
fmt.Println(err)
seqNum, _ = c.ddbsvc.getSequenceNumber(shardID)
iterator, _ = c.kinesissvc.getShardIterator(shardID, seqNum)
continue
}
for _, v := range resp.Records {
rec := Record{
Data: v.Data,
}
if v.ApproximateArrivalTimestamp != nil {
rec.ArrivalTimestamp = *v.ApproximateArrivalTimestamp
}
c.recordCh <- rec
c.ddbsvc.setSequenceNumber(shardID, *v.SequenceNumber)
}
if resp.NextShardIterator != nil {
iterator = *resp.NextShardIterator
}
if len(resp.Records) > 0 || (resp.MillisBehindLatest != nil && *resp.MillisBehindLatest > 10000) {
sleepTime = 0
} else {
if sleepTime == 0 {
sleepTime = 1
} else if sleepTime < c.maxWait {
sleepTime = sleepTime * 2
}
}
time.Sleep(time.Duration(sleepTime) * time.Second)
}
}