Skip to content

Commit

Permalink
move handlers under http file
Browse files Browse the repository at this point in the history
  • Loading branch information
muraty committed Oct 12, 2020
1 parent 99f9143 commit a5bfe8c
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 122 deletions.
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (c *Config) ReadFile(name string) error {

var defaultConfig = Config{
Server: Server{
DSN: "kimo:123@(mysql:3306)/information_schema",
DSN: "",
AgentPort: 3333,
PollDuration: 30,
TCPProxyMgmtAddress: "tcpproxy:3307",
Expand Down
10 changes: 5 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func main() {
Name: "agent",
Usage: "run agent",
Action: func(c *cli.Context) error {
kimoAgent := agent.NewAgent(cfg)
err := kimoAgent.Run()
a := agent.NewAgent(cfg)
err := a.Run()
if err != nil {
return err
}
Expand All @@ -69,9 +69,9 @@ func main() {
Name: "server",
Usage: "run server",
Action: func(c *cli.Context) error {
kimoServer := server.NewServer(cfg)
kimoServer.Config = &cfg.Server
err := kimoServer.Run()
s := server.NewServer(cfg)
s.Config = &cfg.Server
err := s.Run()
if err != nil {
return err
}
Expand Down
85 changes: 82 additions & 3 deletions server/http.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,34 @@
package server

import (
"encoding/json"
"fmt"
"net"
"net/http"
"time"

"github.com/cenkalti/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rakyll/statik/fs"

_ "kimo/statik" // Auto-generated module by statik.
)

// Response is type for returning a response from kimo server
type Response struct {
Processes []Process `json:"processes"`
}

// NewHTTPClient returns a http client with custom connect & read timeout
func NewHTTPClient(connectTimeout, readTimeout time.Duration) *http.Client {
return &http.Client{
Transport: &http.Transport{
Dial: TimeoutDialer(connectTimeout, readTimeout),
Dial: timeoutDialer(connectTimeout, readTimeout),
},
}
}

// TimeoutDialer is used to set connect & read timeouts for the client
func TimeoutDialer(connectTimeout, readTimeout time.Duration) func(net, addr string) (c net.Conn, err error) {
func timeoutDialer(connectTimeout, readTimeout time.Duration) func(net, addr string) (c net.Conn, err error) {
return func(netw, addr string) (net.Conn, error) {
conn, err := net.DialTimeout(netw, addr, connectTimeout)
if err != nil {
Expand All @@ -26,3 +38,70 @@ func TimeoutDialer(connectTimeout, readTimeout time.Duration) func(net, addr str
return conn, nil
}
}

// Procs is a handler for returning process list
func (s *Server) Procs(w http.ResponseWriter, req *http.Request) {
forceParam := req.URL.Query().Get("force")
fetch := false
if forceParam == "true" || len(s.Processes) == 0 {
fetch = true
}

if fetch {
s.FetchAll()
}

w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")

log.Infof("Returning response with %d kimo processes...\n", len(s.Processes))
w.Header().Set("Content-Type", "application/json")

response := &Response{
Processes: s.Processes,
}
json.NewEncoder(w).Encode(response)

}

// Health is a dummy endpoint for load balancer health check
func (s *Server) Health(w http.ResponseWriter, req *http.Request) {
// todo: real health check
fmt.Fprintf(w, "OK")
}

// Static serves static files (web components).
func (s *Server) Static() http.Handler {
statikFS, err := fs.New()
if err != nil {
log.Errorln(err)
}
return http.FileServer(statikFS)

}

// Metrics is used to expose metrics that is compatible with Prometheus exporter
func (s *Server) Metrics() http.Handler {
// todo: separate prometheus and json metrics
return promhttp.Handler()
}

// Run is used to run http handlers
func (s *Server) Run() error {
// todo: reconsider context usages
log.Infof("Running server on %s \n", s.Config.ListenAddress)

go s.pollAgents()
go s.setMetrics()

http.Handle("/", s.Static())
http.Handle("/metrics", s.Metrics())
http.HandleFunc("/procs", s.Procs)
http.HandleFunc("/health", s.Health)
err := http.ListenAndServe(s.Config.ListenAddress, nil)
if err != nil {
log.Errorln(err.Error())
return err
}
return nil
}
25 changes: 9 additions & 16 deletions server/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,36 @@ func NewPrometheusMetric(server *Server) *PrometheusMetric {
}
}

// PollMetrics is used to set metrics periodically.
func (pm *PrometheusMetric) PollMetrics() {
// todo: separate prometheus and json metrics
// SetMetrics is used to set metrics periodically.
func (pm *PrometheusMetric) SetMetrics() {
// todo: configurable time
ticker := time.NewTicker(10 * time.Second)
ticker := time.NewTicker(2 * time.Second)

for {
select {
// todo: add return case
case <-ticker.C:
pm.SetMetrics()
pm.Set()
}
}
}

// SetMetrics sets metrics based on Processes
func (pm *PrometheusMetric) SetMetrics() {
// Set sets all metrics based on Processes
func (pm *PrometheusMetric) Set() {
if len(pm.Server.Processes) == 0 {
pm.Server.FetchAll()
log.Debugln("0 kimo processes. Continue...")
return
}
pm.Set(pm.Server.Processes)
}

// Set sets all types (gauge, counter etc.) of metrics based on process list.
func (pm *PrometheusMetric) Set(ps []Process) {
log.Debugf("Found '%d' processes. Setting metrics...\n", len(ps))
log.Debugf("Found '%d' processes. Setting metrics...\n", len(pm.Server.Processes))

pm.connCount.Set(float64(len(ps)))
pm.connCount.Set(float64(len(pm.Server.Processes)))

var metricM = map[string]map[string]int{}
// todo: keys should be constant at somewhere else and we should iterate through them
metricM["db"] = map[string]int{}
metricM["host"] = map[string]int{}

for _, p := range ps {
for _, p := range pm.Server.Processes {
// todo: keys should be constant at somewhere else and we should iterate through them
metricM["db"][p.DB]++
metricM["host"][p.Host]++
Expand Down
100 changes: 3 additions & 97 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,12 @@ package server

import (
"context"
"encoding/json"
"fmt"
"kimo/config"
"net/http"
"time"

"github.com/cenkalti/log"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rakyll/statik/fs"

_ "kimo/statik" // Auto-generated module by statik.
)

// Response is type for returning a response from kimo server
type Response struct {
Processes []Process `json:"processes"`
}

// Process is the final processes that is combined with AgentProcess + TCPProxyRecord + MysqlProcess
type Process struct {
ID int32 `json:"id"`
Expand Down Expand Up @@ -53,18 +41,6 @@ func NewServer(cfg *config.Config) *Server {
return s
}

// ReturnResponse is used to return a response from server
func (s *Server) ReturnResponse(ctx context.Context, w http.ResponseWriter) {
// todo: bad naming.
log.Infof("Returning response with %d kimo processes...\n", len(s.Processes))
w.Header().Set("Content-Type", "application/json")

response := &Response{
Processes: s.Processes,
}
json.NewEncoder(w).Encode(response)
}

// FetchAll fetches all processes through Client object
func (s *Server) FetchAll() {
// todo: call with lock
Expand All @@ -82,38 +58,9 @@ func (s *Server) FetchAll() {
log.Debugf("%d processes are set\n", len(s.Processes))
}

// Procs is a handler for returning process list
func (s *Server) Procs(w http.ResponseWriter, req *http.Request) {
forceParam := req.URL.Query().Get("force")
fetch := false
if forceParam == "true" || len(s.Processes) == 0 {
fetch = true
}

if fetch {
s.FetchAll()
}

w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.ReturnResponse(ctx, w)

}

// Health is a dummy endpoint for load balancer health check
func (s *Server) Health(w http.ResponseWriter, req *http.Request) {
// todo: real health check
fmt.Fprintf(w, "OK")
}

// todo: bad naming.
func (s *Server) pollMetrics() {
// todo: bad naming.
s.PrometheusMetric.PollMetrics()
func (s *Server) setMetrics() {
// todo: prevent race condition
s.PrometheusMetric.SetMetrics()
}

func (s *Server) pollAgents() {
Expand All @@ -129,44 +76,3 @@ func (s *Server) pollAgents() {
}

}

// Static serves static files (web components).
func (s *Server) Static() http.Handler {
statikFS, err := fs.New()
if err != nil {
log.Errorln(err)
}
return http.FileServer(statikFS)

}

// Metrics is used to expose metrics that is compatible with Prometheus exporter
func (s *Server) Metrics() http.Handler {
if len(s.Processes) == 0 {
log.Debugln("Processes are not initialized. Polling...")
s.PrometheusMetric.SetMetrics()
}

return promhttp.Handler()
}

// Run is used to run http handlers
func (s *Server) Run() error {
// todo: move background jobs to another file. Keep only http related ones, here.
// todo: reconsider context usages
log.Infof("Running server on %s \n", s.Config.ListenAddress)

go s.pollAgents()
go s.pollMetrics()

http.Handle("/", s.Static())
http.Handle("/metrics", s.Metrics())
http.HandleFunc("/procs", s.Procs)
http.HandleFunc("/health", s.Health)
err := http.ListenAndServe(s.Config.ListenAddress, nil)
if err != nil {
log.Errorln(err.Error())
return err
}
return nil
}

0 comments on commit a5bfe8c

Please sign in to comment.