-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
182 lines (154 loc) · 5.86 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
// main.go
package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"fortio.org/log"
"fortio.org/scli"
"github.com/prometheus/client_golang/prometheus"
)
type Metrics struct {
RequestsReceivedTotal *prometheus.CounterVec
RequestsFailedTotal *prometheus.CounterVec
RequestsRetriedTotal *prometheus.CounterVec
RequestsSucceededTotal *prometheus.CounterVec
RequestsNotProcessed *prometheus.CounterVec
QueueSize *prometheus.GaugeVec
}
type SlackResponse struct {
Ok bool `json:"ok"`
Error string `json:"error,omitempty"`
}
type SlackPostMessageRequest struct {
Token string `json:"token"`
Channel string `json:"channel"`
Text string `json:"text"`
AsUser bool `json:"as_user,omitempty"`
Username string `json:"username,omitempty"`
IconURL string `json:"icon_url,omitempty"`
IconEmoji string `json:"icon_emoji,omitempty"`
ThreadTS string `json:"thread_ts,omitempty"`
Parse string `json:"parse,omitempty"`
LinkNames bool `json:"link_names,omitempty"`
Blocks json.RawMessage `json:"blocks,omitempty"` // JSON serialized array of blocks
Attachments json.RawMessage `json:"attachments,omitempty"` // JSON serialized array of attachments
}
type App struct {
slackQueue chan SlackPostMessageRequest
wg sync.WaitGroup
messenger SlackMessenger
SlackPostMessageURL string
SlackToken string
metrics *Metrics
channelOverride string
}
// podIndex retrieves the index of the current pod based on the HOSTNAME environment variable.
// The function expects the HOSTNAME to be in the format <name>-<index>.
// It returns the index as an integer and an error if any occurred during the process.
// If the HOSTNAME environment variable is not set or if the format is invalid, it returns an error.
func podIndex(podName string) (int, error) {
lastDash := strings.LastIndex(podName, "-")
if lastDash == -1 || lastDash == len(podName)-1 {
return 0, fmt.Errorf("invalid pod name %s. Expected <name>-<index>", podName)
}
indexStr := podName[lastDash+1:]
index, err := strconv.Atoi(indexStr)
if err != nil {
return 0, fmt.Errorf("invalid pod name format. Expected <name>-<index>, got %s", podName)
}
return index, nil
}
func getSlackTokens() []string {
tokensEnv := os.Getenv("SLACK_TOKENS")
if tokensEnv == "" {
return []string{}
}
tokens := strings.Split(tokensEnv, ",")
for i, token := range tokens {
tokens[i] = strings.TrimSpace(token)
}
return tokens
}
func main() {
var (
maxRetries = 2
slackPostMessageURL = "https://slack.com/api/chat.postMessage"
maxQueueSize = 100
burst = 3
metricsPort = ":9090"
applicationPort = ":8080"
channelOverride string
)
initialBackoff := flag.Duration("initialBackoff", 1000*time.Millisecond, "Initial backoff in milliseconds for retries")
slackRequestRate := flag.Duration("slackRequestRate", 1000*time.Millisecond, "Rate limit for slack requests in milliseconds")
// Define the flags with the default values // TODO: move the ones that can change to dflag
flag.IntVar(&maxRetries, "maxRetries", maxRetries, "Maximum number of retries for posting a message")
flag.StringVar(&slackPostMessageURL, "slackURL", slackPostMessageURL, "Slack Post Message API URL")
flag.IntVar(&maxQueueSize, "queueSize", maxQueueSize, "Maximum number of messages in the queue")
flag.IntVar(&burst, "burst", burst, "Maximum number of burst to allow")
flag.StringVar(&metricsPort, "metricsPort", metricsPort, "Port for the metrics server")
flag.StringVar(&applicationPort, "applicationPort", applicationPort, "Port for the application server")
flag.StringVar(&channelOverride, "channelOverride", "", "Override the channel for all messages - Be careful with this one!")
scli.ServerMain()
// Get list of comma separated tokens from environment variable SLACK_TOKENS
tokens := getSlackTokens()
// Hack to get the pod index
// Todo: Remove this by using the label pod-index:
// https://github.com/kubernetes/kubernetes/pull/119232
podName := os.Getenv("HOSTNAME")
if podName == "" {
log.Fatalf("HOSTNAME environment variable not set")
}
index, err := podIndex(podName)
if err != nil {
log.Fatalf("Failed to get pod index: %v", err)
}
// Get the token for the current pod
// If the index is out of range, we fail
log.S(log.Info, "Pod", log.Any("index", index), log.Any("num-tokens", len(tokens)))
if index >= len(tokens) {
log.Fatalf("Pod index %d is out of range for the list of %d tokens", index, len(tokens))
}
token := tokens[index]
// Initialize metrics
r := prometheus.NewRegistry()
metrics := NewMetrics(r)
// Initialize the app, metrics are passed along so they are accessible
app := NewApp(maxQueueSize, &http.Client{
Timeout: 10 * time.Second,
}, metrics, channelOverride, slackPostMessageURL, token)
log.Infof("Starting metrics server.")
StartMetricServer(r, metricsPort)
// Main ctx
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Server ctx, needed to cancel the server and not every other context
serverCtx, serverCancel := context.WithCancel(context.Background())
defer serverCancel()
log.Infof("Starting main app logic")
go app.processQueue(ctx, maxRetries, *initialBackoff, burst, *slackRequestRate)
log.Infof("Starting receiver server")
// Check error return of app.StartServer in go routine anon function:
go func() {
err := app.StartServer(serverCtx, applicationPort)
if err != nil {
log.Fatalf("Error starting server: %v", err)
}
}()
log.Infof("Up and running!")
// Shutdown is handled by scli
scli.UntilInterrupted()
log.Infof("Shutting down server...")
serverCancel()
log.Infof("Shutting down queue...")
app.Shutdown()
log.Infof("Shutdown complete.")
}