-
Notifications
You must be signed in to change notification settings - Fork 0
/
ahrw.go
211 lines (183 loc) · 5.48 KB
/
ahrw.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// Package implements aggregated rendezvous hashing with O(1)
// amortized time complexity.
package ahrw
import (
"bytes"
"encoding/binary"
"errors"
"slices"
"sync/atomic"
"github.com/zeebo/xxh3"
)
// Server is an implementation of [Node] interface provided for convenience.
type Server struct {
id []byte
handle any
}
// NewServer creates Server identified by name and holding some handle.
// Handle is useful to reference actual server object. This way Server
// wraps any type and provides NodeID inferred from unique name.
func NewServer(name string, handle any) *Server {
return &Server{
id: []byte(name),
handle: handle,
}
}
// NodeID implements [Node] interface and returns provided unique name as
// a slice of bytes.
func (s *Server) NodeID() []byte {
return s.id
}
// Handle returns handle passed to constructor. Useful to hold a reference
// to actual server object.
func (s *Server) Handle() any {
return s.handle
}
// Node is the interface for load balancing targets (servers, shards etc) of
// rendezvous hashing algorithm.
type Node interface {
// NodeID returns slice of bytes unique within set of all targets
// provided to AHRW instance.
NodeID() []byte
}
var _ Node = &Server{}
type noCopy struct{}
func (*noCopy) Lock() {}
func (*noCopy) Unlock() {}
// AHRW stands for Aggregated Highest Random Weight.
//
// It implements Rendezvous Hashing, mapping objects to respective nodes
// with minimal remapping if set of nodes changes.
//
// AHRW pre-aggregates input objects into specified number of slots and
// only then distributes these slots across provided nodes. It allows
// memoization of calculations made for each slot and makes rendezvous
// hashing practical for large numbers of nodes (or shares of capacity
// for weighted case) and/or high request rates.
//
// AHRW is safe for concurrent use by multiple goroutines and for
// efficiency should only be created once and re-used. On the other hand
// AHRW instance should be recreated to change set of active nodes.
type AHRW struct {
_ noCopy
nodes []Node
m []atomic.Pointer[Node]
}
func uniqNodes(nodes []Node) []Node {
sortedNodes := make([]Node, len(nodes))
copy(sortedNodes, nodes)
slices.SortStableFunc(sortedNodes, func(a, b Node) int {
return bytes.Compare(a.NodeID(), b.NodeID())
})
return slices.CompactFunc(sortedNodes, func(a, b Node) bool {
return bytes.Equal(a.NodeID(), b.NodeID())
})
}
var (
// ErrZeroSlots indicates incorrect invocation of New with zero slots.
ErrZeroSlots = errors.New("number of slots can't be zero")
// ErrZeroNodes indicates incorrect invocation of New with empty slice
// of nodes.
ErrZeroNodes = errors.New("number of nodes can't be zero")
// ErrSlotOutOfRange is returned when requested slot is
// beyond index range of created AHRW instance.
ErrSlotOutOfRange = errors.New("slot out of range")
)
// New returns instance of AHRW with nslots slots distributed to nodes.
//
// Reasonable choice of nslots is two orders of magnitude higher than
// maximal potential number of nodes. It works a bit faster if nslots is
// power of two. E.g. for expected maximum of 100 nodes
// reasonable choice would be 16384.
//
// nslots should be the same across instances for different sets of nodes,
// otherwise AHRW will not maintain minimal difference of distribution. It's
// fine to have some hardcoded value covering needs for near future.
func New(nslots uint64, nodes []Node) (*AHRW, error) {
if nslots == 0 {
return nil, ErrZeroSlots
}
if len(nodes) == 0 {
return nil, ErrZeroNodes
}
return &AHRW{
nodes: uniqNodes(nodes),
m: make([]atomic.Pointer[Node], nslots),
}, nil
}
func (h *AHRW) calculateNode(slot uint64) *Node {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, slot)
hash := xxh3.New()
hrw := uint64(0)
hrwidx := 0
for i := 0; i < len(h.nodes); i++ {
hash.Reset()
hash.Write(buf)
hash.Write(h.nodes[i].NodeID())
weight := hash.Sum64()
if weight > hrw {
hrw = weight
hrwidx = i
}
}
return &h.nodes[hrwidx]
}
func (h *AHRW) lookupSlot(slot uint64) Node {
node := h.m[slot].Load()
if node == nil {
node = h.calculateNode(slot)
h.m[slot].Store(node)
}
return *node
}
// NSlots returns number of slots in this AHRW instance.
func (h *AHRW) NSlots() uint64 {
return uint64(len(h.m))
}
// NodeForSlot returns mapped node for specified slot.
// Useful if you'd like to implement hashing of your objects
// on your own.
func (h *AHRW) NodeForSlot(slot uint64) (Node, error) {
if slot >= uint64(len(h.m)) {
return nil, ErrSlotOutOfRange
}
return h.lookupSlot(slot), nil
}
// NodeForString maps string identifying some object to one
// of nodes provided to this AHRW instance.
func (h *AHRW) NodeForString(s string) Node {
return h.NodeForBytes([]byte(s))
}
// NodeForBytes maps slice of bytes identifying some object to one
// of nodes provided to this AHRW instance.
func (h *AHRW) NodeForBytes(s []byte) Node {
return h.lookupSlot(SlotForBytes(uint64(len(h.m)), s))
}
// SlotForBytes uniformly maps byte slice identifying some object
// to some slot.
func SlotForBytes(nslots uint64, s []byte) uint64 {
if nslots == 0 {
panic("number of slots can't be zero")
}
hash := xxh3.New()
iv := []byte{0}
if nslots&(nslots-1) == 0 {
hash.Write(iv)
hash.Write(s)
return hash.Sum64() & (nslots - 1)
}
minBiased := -((-nslots) % nslots) // == 2**64 - (2**64%nslots)
var hv uint64
for {
hash.Write(iv)
hash.Write([]byte(s))
hv = hash.Sum64()
if hv < minBiased {
break
}
iv[0]++
hash.Reset()
}
return hv % nslots
}