forked from moby/libnetwork
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhostdiscovery.go
121 lines (106 loc) · 2.84 KB
/
hostdiscovery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package hostdiscovery
import (
"net"
"sync"
"github.com/sirupsen/logrus"
mapset "github.com/deckarep/golang-set"
"github.com/docker/docker/pkg/discovery"
// Including KV
_ "github.com/docker/docker/pkg/discovery/kv"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/types"
)
type hostDiscovery struct {
watcher discovery.Watcher
nodes mapset.Set
stopChan chan struct{}
sync.Mutex
}
func init() {
consul.Register()
etcd.Register()
zookeeper.Register()
}
// NewHostDiscovery function creates a host discovery object
func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
}
func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
h.Lock()
d := h.watcher
h.Unlock()
if d == nil {
return types.BadRequestErrorf("invalid discovery watcher")
}
discoveryCh, errCh := d.Watch(h.stopChan)
go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
return nil
}
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
for {
select {
case entries := <-ch:
h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
case err := <-errCh:
if err != nil {
logrus.Errorf("discovery error: %v", err)
}
case <-h.stopChan:
return
}
}
}
func (h *hostDiscovery) StopDiscovery() error {
h.Lock()
stopChan := h.stopChan
h.watcher = nil
h.Unlock()
close(stopChan)
return nil
}
func (h *hostDiscovery) processCallback(entries discovery.Entries,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
updated := hosts(entries)
h.Lock()
existing := h.nodes
added, removed := diff(existing, updated)
h.nodes = updated
h.Unlock()
activeCallback()
if len(added) > 0 {
joinCallback(added)
}
if len(removed) > 0 {
leaveCallback(removed)
}
}
func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) {
addSlice := updated.Difference(existing).ToSlice()
removeSlice := existing.Difference(updated).ToSlice()
for _, ip := range addSlice {
added = append(added, net.ParseIP(ip.(string)))
}
for _, ip := range removeSlice {
removed = append(removed, net.ParseIP(ip.(string)))
}
return
}
func (h *hostDiscovery) Fetch() []net.IP {
h.Lock()
defer h.Unlock()
ips := []net.IP{}
for _, ipstr := range h.nodes.ToSlice() {
ips = append(ips, net.ParseIP(ipstr.(string)))
}
return ips
}
func hosts(entries discovery.Entries) mapset.Set {
hosts := mapset.NewSet()
for _, entry := range entries {
hosts.Add(entry.Host)
}
return hosts
}