forked from flynn/flynn
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdiscoverd_cache.go
113 lines (100 loc) · 2.19 KB
/
discoverd_cache.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package main
import (
"sync"
"github.com/flynn/flynn/discoverd/client"
"github.com/flynn/flynn/pkg/stream"
)
var testMode = false
type DiscoverdServiceCache interface {
Addrs() []string
Close() error
}
func NewDiscoverdServiceCache(s discoverd.Service) (DiscoverdServiceCache, error) {
d := &discoverdServiceCache{addrs: make(map[string]struct{})}
return d, d.start(s)
}
type discoverdServiceCache struct {
stream stream.Stream
sync.RWMutex
addrs map[string]struct{}
// used by the test suite
watchCh chan *discoverd.Event
}
func (d *discoverdServiceCache) start(s discoverd.Service) (err error) {
events := make(chan *discoverd.Event)
d.stream, err = s.Watch(events)
if err != nil {
return err
}
current := make(chan error)
go func() {
for event := range events {
switch event.Kind {
case discoverd.EventKindUp, discoverd.EventKindUpdate:
d.Lock()
d.addrs[event.Instance.Addr] = struct{}{}
d.Unlock()
case discoverd.EventKindDown:
d.Lock()
delete(d.addrs, event.Instance.Addr)
d.Unlock()
case discoverd.EventKindCurrent:
if current != nil {
current <- nil
current = nil
}
}
if testMode {
d.Lock()
if d.watchCh != nil {
d.watchCh <- event
}
d.Unlock()
}
}
if current != nil {
current <- d.stream.Err()
}
// TODO: handle discoverd disconnection
}()
return <-current
}
func (d *discoverdServiceCache) Close() error {
return d.stream.Close()
}
func (d *discoverdServiceCache) Addrs() []string {
d.RLock()
defer d.RUnlock()
res := make([]string, 0, len(d.addrs))
for addr := range d.addrs {
res = append(res, addr)
}
return res
}
// This method is only used by the test suite
func (d *discoverdServiceCache) watch(current bool) chan *discoverd.Event {
d.Lock()
d.watchCh = make(chan *discoverd.Event)
go func() {
if current {
for addr := range d.addrs {
d.watchCh <- &discoverd.Event{
Kind: discoverd.EventKindUp,
Instance: &discoverd.Instance{Addr: addr},
}
}
}
d.Unlock()
}()
return d.watchCh
}
func (d *discoverdServiceCache) unwatch(ch chan *discoverd.Event) {
go func() {
for range ch {
}
}()
d.Lock()
close(d.watchCh)
d.watchCh = nil
d.Unlock()
}