-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathurl_group.go
201 lines (188 loc) · 5.69 KB
/
url_group.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
package qstash
import (
"encoding/json"
"fmt"
"net/http"
)
// UrlGroups in QStash are namespaces where you can publish messages that are then sent to multiple endpoints.
// After creating an url group, you can define multiple endpoints, each represented by a publicly available URL.
// When a message is published to an url group, it is distributed to all subscribed endpoints.
type UrlGroups struct {
client *Client
}
type Endpoint struct {
// Url is the target address of the endpoint.
Url string `json:"url"`
// Name is the optional name of the endpoint.
Name string `json:"name,omitempty"`
}
type UrlGroup struct {
// Name is the name of the url group.
Name string `json:"name"`
// CreatedAt is the creation time of the url group, in unix milliseconds.
CreatedAt int64 `json:"createdAt"`
// UpdatedAt is the last update time of the url group, in unix milliseconds.
UpdatedAt int64 `json:"updatedAt"`
// Endpoints is the list of endpoints belong to url group.
Endpoints []Endpoint `json:"endpoints"`
}
// Publish publishes a message to QStash.
func (u *UrlGroups) Publish(po PublishUrlGroupOptions) (result []PublishOrEnqueueResponse, err error) {
opts := requestOptions{
method: http.MethodPost,
path: fmt.Sprintf("/v2/publish/%s", po.UrlGroup),
header: po.headers(),
body: po.Body,
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[[]PublishOrEnqueueResponse](response)
return
}
// PublishJSON publishes a message to QStash, automatically serializing the body as JSON string,
// and setting content type to `application/json`.
func (u *UrlGroups) PublishJSON(message PublishUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error) {
payload, err := json.Marshal(message.Body)
if err != nil {
return
}
opts := requestOptions{
method: http.MethodPost,
path: fmt.Sprintf("/v2/publish/%s", message.UrlGroup),
header: message.headers(),
body: string(payload),
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[[]PublishOrEnqueueResponse](response)
return
}
// Enqueue enqueues a message, after creating the queue if it does not exist.
func (u *UrlGroups) Enqueue(options EnqueueUrlGroupOptions) (result []PublishOrEnqueueResponse, err error) {
opts := requestOptions{
method: http.MethodPost,
header: options.headers(),
path: fmt.Sprintf("/v2/enqueue/%s/%s", options.Queue, options.UrlGroup),
body: options.Body,
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[[]PublishOrEnqueueResponse](response)
return
}
// EnqueueJSON enqueues a message, after creating the queue if it does not exist.
// It automatically serializes the body as JSON string, and setting content type to `application/json`.
func (u *UrlGroups) EnqueueJSON(message EnqueueUrlGroupJSONOptions) (result []PublishOrEnqueueResponse, err error) {
payload, err := json.Marshal(message.Body)
if err != nil {
return
}
opts := requestOptions{
method: http.MethodPost,
path: fmt.Sprintf("/v2/enqueue/%s/%s", message.Queue, message.UrlGroup),
body: string(payload),
header: message.headers(),
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[[]PublishOrEnqueueResponse](response)
return
}
// UpsertEndpoints adds or updates one or more endpoints to an url group.
// If the url group or the endpoint does not exist, it will be created.
// If the endpoint exists, it will be updated.
func (u *UrlGroups) UpsertEndpoints(urlGroup string, endpoints []Endpoint) (err error) {
for _, endpoint := range endpoints {
if endpoint.Url == "" {
err = fmt.Errorf("`url` of the endpoint must be provided")
return
}
}
payload, err := json.Marshal(map[string][]Endpoint{
"endpoints": endpoints,
})
if err != nil {
return
}
opts := requestOptions{
method: http.MethodPost,
path: fmt.Sprintf("/v2/topics/%s/endpoints", urlGroup),
body: string(payload),
header: contentTypeJson,
}
_, _, err = u.client.fetchWith(opts)
return
}
// RemoveEndpoints removes one or more endpoints from an url group.
// If all endpoints have been removed, the url group will be deleted.
func (u *UrlGroups) RemoveEndpoints(urlGroup string, endpoints []Endpoint) (err error) {
for _, endpoint := range endpoints {
if endpoint.Url == "" && endpoint.Name == "" {
err = fmt.Errorf("one of `url` or `name` of the endpoint must be provided")
return
}
}
payload, err := json.Marshal(map[string][]Endpoint{
"endpoints": endpoints,
})
if err != nil {
return
}
opts := requestOptions{
method: http.MethodDelete,
path: fmt.Sprintf("/v2/topics/%s/endpoints", urlGroup),
body: string(payload),
header: contentTypeJson,
}
_, _, err = u.client.fetchWith(opts)
return
}
// Get retrieves the url group by its name.
func (u *UrlGroups) Get(urlGroup string) (result UrlGroup, err error) {
opts := requestOptions{
method: http.MethodGet,
path: fmt.Sprintf("/v2/topics/%s", urlGroup),
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[UrlGroup](response)
if err != nil {
return
}
return
}
// List retrieves all the url groups.
func (u *UrlGroups) List() (result []UrlGroup, err error) {
opts := requestOptions{
method: http.MethodGet,
path: "/v2/topics",
}
response, _, err := u.client.fetchWith(opts)
if err != nil {
return
}
result, err = parse[[]UrlGroup](response)
if err != nil {
return
}
return
}
// Delete deletes the url group and all its endpoints.
func (u *UrlGroups) Delete(urlGroup string) (err error) {
opts := requestOptions{
method: http.MethodDelete,
path: fmt.Sprintf("/v2/topics/%s", urlGroup),
}
_, _, err = u.client.fetchWith(opts)
return
}