-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
173 lines (153 loc) · 5.39 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package main
import (
"context"
"fmt"
discovery "github.com/gkarthiks/k8s-discovery"
log "github.com/sirupsen/logrus"
"net/http"
"net/http/httputil"
"net/url"
"os"
"strconv"
"strings"
"time"
"vault-balancer/globals"
"vault-balancer/helper"
"vault-balancer/types"
)
func init() {
log.SetFormatter(&log.JSONFormatter{})
log.Infof("Vault Balancer running version: `%v`", BuildVersion)
globals.K8s, _ = discovery.NewK8s()
globals.Namespace, _ = globals.K8s.GetNamespace()
version, _ := globals.K8s.GetVersion()
log.Infof("Running in %v version of Kubernetes cluster in %s namespace", version, globals.Namespace)
labelSelector, avail = os.LookupEnv("VAULT_LABEL_SELECTOR")
if !avail {
log.Fatalf("No label selector has been provided. Please provide the label selector in `VAULT_LABEL_SELECTOR` key.")
}
balancerPortStr, avail := os.LookupEnv("BALANCER_PORT")
if !avail {
log.Warnf("Balancer port is not specified. Please provide the balancer port in `BALANCER_PORT` key. Now the default will be used. BALANCER_PORT: %v", globals.DefaultBalancerPort)
balancerPort = globals.DefaultBalancerPort
} else {
balancerPort, _ = strconv.Atoi(balancerPortStr)
}
globals.HttpTimeout, avail = os.LookupEnv("HTTP_TIMEOUT")
if !avail {
log.Warnf("No http timeout duration is specified. Please provide in `HTTP_TIMEOUT` key. Now the default time out will be used. HTTP_TIMEOUT: %v Minutes", globals.DefaultTimeOut)
globals.HttpTimeout = globals.DefaultTimeOut
}
}
var (
versionLogger = log.WithFields(log.Fields{"vlb_version": BuildVersion})
BuildVersion = "dev"
balancerPort int
vaultPool types.VaultPool
labelSelector string
avail bool
)
const (
HealthCheckPath = ":8200/v1/sys/seal-status"
ProxyPath = ":8200"
)
func main() {
go startRoutine(context.Background())
// start the balancer http service
server := http.Server{
Addr: fmt.Sprintf(":%d", balancerPort),
Handler: http.HandlerFunc(loadBalance),
}
//
versionLogger.Infof("Vault Balancer started and running at :%d", balancerPort)
if err := server.ListenAndServe(); err != nil {
versionLogger.Fatalf("error while starting the load balance, %v", err)
}
}
// startRoutine starts the routine work of collecting IPs, setting up reverse
// proxies and doing health check.
func startRoutine(context context.Context) {
versionLogger.Info("Starting the routines for discovery, proxy setup and health check")
t := time.NewTicker(time.Second * 10)
for {
select {
case <-t.C:
ipAddressMap := helper.GetVaultIPsFromLabelSelectors(labelSelector, versionLogger)
setUpProxies(ipAddressMap)
healthCheck(vaultPool)
}
}
}
// loadBalance load balances the incoming request
func loadBalance(w http.ResponseWriter, r *http.Request) {
attempts := helper.GetAttemptsFromContext(r)
if attempts > 3 {
versionLogger.Infof("%s(%s) Max attempts reached, terminating\n", r.RemoteAddr, r.URL.Path)
http.Error(w, "Service not available", http.StatusServiceUnavailable)
return
}
peer := vaultPool.GetNextPod()
if peer != nil {
peer.ReverseProxy.ServeHTTP(w, r)
return
}
http.Error(w, "Service not available", http.StatusServiceUnavailable)
}
// setUpProxies will create the reverse proxies for the identified IPs
func setUpProxies(serviceNameAndIP map[string]struct{}) {
for podIP := range serviceNameAndIP {
if !vaultPool.IsInThePool(podIP) {
sanitizedIP := strings.TrimSpace(podIP)
vaultUrl, err := url.Parse("http://" + sanitizedIP + ProxyPath)
if err != nil {
versionLogger.Errorf("error occurred while converting string to URL for proxy path. error: %v", err)
}
healthUrl, _ := url.Parse("http://" + sanitizedIP + HealthCheckPath)
proxy := httputil.NewSingleHostReverseProxy(vaultUrl)
proxy.ErrorHandler = func(writer http.ResponseWriter, request *http.Request, e error) {
versionLogger.Infof("[%s] %s\n", vaultUrl.Host, e.Error())
retries := helper.GetRetryFromContext(request)
if retries < 3 {
select {
case <-time.After(5 * time.Millisecond):
ctx := context.WithValue(request.Context(), helper.Retry, retries+1)
proxy.ServeHTTP(writer, request.WithContext(ctx))
}
return
}
// mark the ip address as not alice after 3 attempts
vaultPool.MarkVaultPodStatus(vaultUrl, false)
attempts := helper.GetAttemptsFromContext(request)
versionLogger.Infof("Retry attempt for the %s(%s): %d\n", request.RemoteAddr, request.URL.Path, attempts)
ctx := context.WithValue(request.Context(), helper.Attempts, attempts+1)
loadBalance(writer, request.WithContext(ctx))
}
vaultPool.AddBackend(&types.VaultBackend{
IP: sanitizedIP,
ProxyURL: vaultUrl,
HealthURL: healthUrl,
Alive: true,
ReverseProxy: proxy,
})
versionLogger.Infof("The service IP %s has been configured", vaultUrl)
} else {
versionLogger.Infof("Pod IP %v is already configured.", podIP)
}
}
var toBeRemoved []*types.VaultBackend
for _, b := range vaultPool.VaultBackends {
if _, ok := serviceNameAndIP[b.IP]; !ok {
toBeRemoved = append(toBeRemoved, b)
}
}
for _, b := range toBeRemoved {
versionLogger.Infof("Retiring the backed with IP %v from load balancing", b.IP)
vaultPool.RetireBackend(b)
}
}
// healthCheck runs a routine for check status of the pods every 2 mins
func healthCheck(vaultPool types.VaultPool) {
versionLogger.Info("Starting health check...")
vaultPool.HealthCheck()
versionLogger.Info("Health check completed")
}