This repository has been archived by the owner on Aug 26, 2024. It is now read-only.
forked from scylladb/gocql
-
Notifications
You must be signed in to change notification settings - Fork 4
/
connpicker.go
129 lines (106 loc) · 2.47 KB
/
connpicker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package gocql
import (
"fmt"
"sync"
"sync/atomic"
)
type ConnPicker interface {
Pick(token) *Conn
Put(*Conn)
Remove(conn *Conn)
Size() (int, int)
Close()
// NextShard returns the shardID to connect to.
// nrShard specifies how many shards the host has.
// If nrShards is zero, the caller shouldn't use shard-aware port.
NextShard() (shardID, nrShards int)
}
type defaultConnPicker struct {
conns []*Conn
pos uint32
size int
mu sync.RWMutex
}
func newDefaultConnPicker(size int) *defaultConnPicker {
if size <= 0 {
panic(fmt.Sprintf("invalid pool size %d", size))
}
return &defaultConnPicker{
size: size,
}
}
func (p *defaultConnPicker) Remove(conn *Conn) {
p.mu.Lock()
defer p.mu.Unlock()
for i, candidate := range p.conns {
if candidate == conn {
p.conns[i] = nil
return
}
}
}
func (p *defaultConnPicker) Close() {
p.mu.Lock()
defer p.mu.Unlock()
conns := p.conns
p.conns = nil
for _, conn := range conns {
if conn != nil {
conn.Close()
}
}
}
func (p *defaultConnPicker) Size() (int, int) {
size := len(p.conns)
return size, p.size - size
}
func (p *defaultConnPicker) Pick(token) *Conn {
pos := int(atomic.AddUint32(&p.pos, 1) - 1)
size := len(p.conns)
var (
leastBusyConn *Conn
streamsAvailable int
)
// find the conn which has the most available streams, this is racy
for i := 0; i < size; i++ {
conn := p.conns[(pos+i)%size]
if conn == nil {
continue
}
if streams := conn.AvailableStreams(); streams > streamsAvailable {
leastBusyConn = conn
streamsAvailable = streams
}
}
return leastBusyConn
}
func (p *defaultConnPicker) Put(conn *Conn) {
p.mu.Lock()
p.conns = append(p.conns, conn)
p.mu.Unlock()
}
func (*defaultConnPicker) NextShard() (shardID, nrShards int) {
return 0, 0
}
// nopConnPicker is a no-operation implementation of ConnPicker, it's used when
// hostConnPool is created to allow deferring creation of the actual ConnPicker
// to the point where we have first connection.
type nopConnPicker struct{}
func (nopConnPicker) Pick(token) *Conn {
return nil
}
func (nopConnPicker) Put(*Conn) {
}
func (nopConnPicker) Remove(conn *Conn) {
}
func (nopConnPicker) Size() (int, int) {
// Return 1 to make hostConnPool to try to establish a connection.
// When first connection is established hostConnPool replaces nopConnPicker
// with a different ConnPicker implementation.
return 0, 1
}
func (nopConnPicker) Close() {
}
func (nopConnPicker) NextShard() (shardID, nrShards int) {
return 0, 0
}