-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnector.go
121 lines (98 loc) · 2.94 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package grocery
import (
"context"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
const (
// Redis channel to send a test message on during initialization.
firstMessageChannel = "grocery_hello_world"
)
var (
// The underlying Redis client powering grocery. Use this field to run your
// own Redis commands.
C *redis.Client
// Context for all Redis queries, currently unused.
ctx = context.Background()
// Callback functions that listen for events published to Redis.
handlers = make(map[string][]func(string, []byte))
// Handler synchronization.
handlersMux sync.RWMutex
// Persistent pubsub connection that waits for published events.
psc *redis.PubSub
)
// Init initializes the Redis client and additionally starts a pub/sub client.
func Init(config *redis.Options) error {
C = redis.NewClient(config)
if _, err := C.Ping(ctx).Result(); err != nil {
return err
}
// Wait until PSubscribe receives its first message to return
firstMessageSignal := make(chan bool)
go listenForUpdates(firstMessageSignal)
ticker := time.NewTicker(time.Millisecond * 10)
defer ticker.Stop()
for {
select {
case <-firstMessageSignal:
// Wait until we can confirm listenForUpdates is working before returning
close(firstMessageSignal)
return nil
case <-ticker.C:
// Repeatedly send messages while we wait for listenForUpdates to
// start listening
C.Publish(ctx, firstMessageChannel, "")
}
}
}
func listenForUpdates(firstMessageSignal chan bool) {
receivedFirstMessage := false
psc = C.PSubscribe(ctx, "*")
ch := psc.Channel()
for msg := range ch {
if !receivedFirstMessage && msg.Channel == firstMessageChannel {
// Send signal on first message to confirm PSubscribe is ready
firstMessageSignal <- true
receivedFirstMessage = true
continue
}
handlersMux.RLock()
handlers, ok := handlers[msg.Channel]
handlersMux.RUnlock()
if !ok {
// Received message for a channel that nobody is subscribed to
continue
}
for _, handler := range handlers {
handler(msg.Channel, []byte(msg.Payload))
}
}
}
// Subscribe adds a new listener function to a channel in our pub/sub
// connection. For example, if you want to listen to events on the 'reset'
// channel, and then publish a test event, you might do the following:
//
// db.Subscribe([]string{"reset"}, func(channel string, payload []byte) {
// fmt.Println("receiving data from " + channel)
// })
//
// db.C.Publish("reset", "payload")
func Subscribe(channels []string, handler func(string, []byte)) {
handlersMux.Lock()
defer handlersMux.Unlock()
for _, channel := range channels {
if _, ok := handlers[channel]; !ok {
handlers[channel] = []func(string, []byte){}
}
handlers[channel] = append(handlers[channel], handler)
}
}
// Unsubscribe removes all listeners waiting on any channel in channels.
func Unsubscribe(channels []string) {
handlersMux.Lock()
defer handlersMux.Unlock()
for _, channel := range channels {
delete(handlers, channel)
}
}