-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_confluent.go
119 lines (107 loc) · 2.94 KB
/
client_confluent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package main
import (
"time"
"github.com/apex/log"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
// ==== confluentinc/confluent-kafka-go ====
func consumeConfluentKafkaGo() {
logger := log.WithFields(log.Fields{
"client": "confluent-kafka-go",
"mode": "consumer",
})
group, _ := newUUID()
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"group.id": group,
"session.timeout.ms": int(SessionTimeout / time.Millisecond),
"auto.commit.interval.ms": int(CommitInterval / time.Millisecond),
"auto.offset.reset": "earliest",
"fetch.wait.max.ms": int(MaxWait / time.Millisecond),
"fetch.min.bytes": MinBytes,
"fetch.max.bytes": MaxBytes,
})
if err != nil {
logger.WithError(err).Fatal("new consumer")
}
// NOTE: for fast quit after benchmark
//defer func() {
// if err := consumer.Close(); err != nil {
// logger.WithError(err).Error("close consumer")
// }
//}()
if err := consumer.SubscribeTopics([]string{topic}, nil); err != nil {
logger.WithError(err).Fatal("subscribe topics")
}
var start = time.Now()
var msgCount = 0
for msgCount < numMessages {
_, err := consumer.ReadMessage(-1)
if err != nil {
if e, ok := err.(kafka.Error); ok {
logger.WithError(err).WithField("code", e.Code()).Error("consume loop")
if e.Code() == kafka.ErrAllBrokersDown {
break
}
}
continue
}
msgCount++
}
elapsed := time.Since(start)
logger.Infof("msg/s: %.2f", float64(msgCount)/elapsed.Seconds())
}
func produceConfluentKafkaGo() {
logger := log.WithFields(log.Fields{
"client": "confluent-kafka-go",
"mode": "producer",
})
producer, err := kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": brokers,
"linger.ms": int(BatchTimeout / time.Millisecond),
"max.in.flight.requests.per.connection": BatchSize,
"batch.size": BatchBytes,
"acks": "0",
})
if err != nil {
logger.WithError(err).Fatal("new producer")
}
defer producer.Close()
done := make(chan bool)
var msgCount int
go func() {
for e := range producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if err := ev.TopicPartition.Error; err != nil {
logger.WithError(err).Error("delivery")
}
default:
logger.Debugf("ignore: %v", e)
continue
}
msgCount++
if msgCount >= numMessages {
done <- true
break
}
}
}()
var start = time.Now()
for j := 0; j < numMessages; j++ {
msg := &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Value: value,
}
if err := producer.Produce(msg, nil); err != nil {
logger.WithError(err).Error("produce")
}
}
<-done
producer.Flush(int(15 * time.Second / time.Millisecond))
elapsed := time.Since(start)
logger.Infof("msg/s: %.2f", float64(msgCount)/elapsed.Seconds())
}