forked from ryandotsmith/l2met
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
95 lines (86 loc) · 2.55 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"fmt"
"io/ioutil"
"l2met/outlet"
"l2met/receiver"
"l2met/store"
"l2met/utils"
"log"
"net/http"
"os"
"runtime"
"time"
)
func init() {
runtime.GOMAXPROCS(runtime.NumCPU())
}
func main() {
// The number of partitions that our backends support.
numPartitions := utils.EnvUint64("NUM_OUTLET_PARTITIONS", 1)
// The bucket.Store struct will initialize a redis pool for us.
maxRedisConn := utils.EnvInt("OUTLET_C", 2) + 100
// We use the store to Put buckets into redis.
server, pass, err := utils.ParseRedisUrl()
if err != nil {
log.Fatal(err)
}
rs := store.NewRedisStore(server, pass, numPartitions, maxRedisConn)
reqBuf := utils.EnvInt("REQUEST_BUFFER", 1000)
recv := receiver.NewReceiver(reqBuf, reqBuf)
recv.FlushInterval = time.Millisecond * 200
recv.NumOutlets = utils.EnvInt("OUTLET_C", 100)
recv.NumAcceptors = utils.EnvInt("ACCEPT_C", 100)
recv.Store = rs
recv.Start()
go recv.Report()
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
healthCheck(w, r, rs)
})
http.HandleFunc("/logs", func(w http.ResponseWriter, r *http.Request) {
recvLogs(w, r, recv)
})
httpOutlet := new(outlet.HttpOutlet)
httpOutlet.Store = rs
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
httpOutlet.ServeReadBucket(w, r)
})
port := utils.EnvString("PORT", "8000")
err = http.ListenAndServe(":"+port, nil)
if err != nil {
fmt.Printf("error=%s msg=%q\n", err, "Unable to start http server.")
os.Exit(1)
}
fmt.Printf("at=l2met-initialized port=%s\n", port)
}
func healthCheck(w http.ResponseWriter, r *http.Request, s store.Store) {
ok := s.Health()
if !ok {
msg := "Redis is unavailable."
fmt.Printf("error=%q\n", msg)
http.Error(w, msg, 500)
}
}
// Pull data from the http request, stick it in a channel and close the request.
// We don't do any validation on the data. Always respond with 200.
func recvLogs(w http.ResponseWriter, r *http.Request, recv *receiver.Receiver) {
defer utils.MeasureT("http-receiver", time.Now())
if r.Method != "POST" {
http.Error(w, "Invalid Request", 400)
return
}
user, pass, err := utils.ParseAuth(r)
if err != nil {
fmt.Printf("measure.failed-auth erro=%s user=%s pass=%s user-agent=%s token=%s client=%s\n",
err, user, pass, r.Header.Get("User-Agent"), r.Header.Get("Logplex-Drain-Token"), r.Header.Get("X-Forwarded-For"))
http.Error(w, "Invalid Request", 400)
return
}
b, err := ioutil.ReadAll(r.Body)
r.Body.Close()
if err != nil {
http.Error(w, "Invalid Request", 400)
return
}
recv.Receive(user, pass, b, r.URL.Query())
}