diff --git a/Dockerfile-kimo b/Dockerfile-kimo index adda4e3..f7af808 100644 --- a/Dockerfile-kimo +++ b/Dockerfile-kimo @@ -1,18 +1,33 @@ FROM golang:1.23 -RUN apt-get update && apt-get -y install default-mysql-client +# Install system dependencies +RUN apt-get update && \ + apt-get -y install default-mysql-client && \ + rm -rf /var/lib/apt/lists/* ENV GOPATH=/go ENV PATH=$PATH:$GOPATH/bin -RUN mkdir /go/src/kimo -COPY go.mod go.sum /go/src/kimo/ -COPY ./server/static/ /go/src/kimo/server/static +WORKDIR /go/src/kimo -RUN cd /go/src/kimo && go install github.com/rakyll/statik -RUN cd /go/src/kimo && /go/bin/statik -src=./server/static -include='*.html' +# Copy dependency files first to leverage cache +COPY go.mod go.sum ./ -COPY . /go/src/kimo -RUN cd /go/src/kimo && go install +# Download dependencies separately +RUN go mod download -ADD config.toml /etc/kimo.toml +# Install statik before copying other files +RUN go install github.com/rakyll/statik + +# Copy only static files needed for statik +COPY ./server/static/ ./server/static/ +RUN /go/bin/statik -src=./server/static -include='*.html' + +# Copy remaining source code +COPY . . + +# Build the application +RUN go build -o /go/bin/kimo + +# Add config file +COPY config.yaml /etc/kimo.yaml diff --git a/Dockerfile-tcpproxy b/Dockerfile-tcpproxy deleted file mode 100644 index 3ad44d4..0000000 --- a/Dockerfile-tcpproxy +++ /dev/null @@ -1,16 +0,0 @@ -# Start from a Debian image with the latest version of Go installed -FROM kimo-agent - -ARG VERSION=1.2.12 - -RUN wget https://github.com/cenkalti/tcpproxy/releases/download/v${VERSION}/tcpproxy_${VERSION}_linux.tar.gz && \ - tar -zxvf tcpproxy_${VERSION}_linux.tar.gz && \ - rm -rf tcpproxy-${VERSION}_linux.tar.gz - -RUN mkdir /app -RUN cp tcpproxy /app/ - -ADD tcpproxy-entrypoint.sh /tmp/ -ENTRYPOINT ["bash", "/tmp/tcpproxy-entrypoint.sh"] - -EXPOSE 3307 \ No newline at end of file diff --git a/Makefile b/Makefile index 64b1299..8046353 100644 --- a/Makefile +++ b/Makefile @@ -7,15 +7,15 @@ build: go get github.com/rakyll/statik $(GOPATH)/bin/statik -src=./server/static -include='*.html' go install -up: +build-dependencies: docker-compose stop docker-compose rm -fsv docker-compose build mysql docker-compose build kimo - docker-compose build kimo-agent - docker-compose build kimo-server - docker-compose build tcpproxy - docker-compose up --scale kimo-agent=5 - +up: + docker-compose stop + docker-compose rm -fsv + docker-compose build kimo + docker-compose up --build kimo-server kimo-agent lint: golangci-lint run diff --git a/agent/agent.go b/agent/agent.go new file mode 100644 index 0000000..d86746f --- /dev/null +++ b/agent/agent.go @@ -0,0 +1,68 @@ +package agent + +import ( + "context" + "kimo/config" + "net/http" + "os" + "sync" + + "github.com/cenkalti/log" + gopsutilNet "github.com/shirou/gopsutil/v4/net" +) + +// Agent is type for handling agent operations +type Agent struct { + Config *config.AgentConfig + conns []gopsutilNet.ConnectionStat + Hostname string + mu sync.RWMutex // protects conns +} + +// NewAgent creates an returns a new Agent +func NewAgent(cfg *config.AgentConfig) *Agent { + d := new(Agent) + d.Config = cfg + d.Hostname = getHostname() + return d +} + +// SetConns sets connections with lock. +func (a *Agent) SetConns(conns []gopsutilNet.ConnectionStat) { + a.mu.Lock() + a.conns = conns + a.mu.Unlock() +} + +// GetConns gets connections with lock. +func (a *Agent) GetConns() []gopsutilNet.ConnectionStat { + a.mu.RLock() + defer a.mu.RUnlock() + return a.conns +} + +// getHostname returns hostname. +func getHostname() string { + hostname, err := os.Hostname() + if err != nil { + log.Errorf("Hostname could not found") + hostname = "UNKNOWN" + } + return hostname +} + +// Run starts the http server and begins listening for HTTP requests. +func (a *Agent) Run() error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go a.pollConns(ctx) + + http.HandleFunc("/proc", a.Process) + err := http.ListenAndServe(a.Config.ListenAddress, nil) + if err != nil { + log.Errorln(err.Error()) + return err + } + return nil +} diff --git a/agent/http.go b/agent/http.go index 3574a01..df6066a 100644 --- a/agent/http.go +++ b/agent/http.go @@ -2,42 +2,29 @@ package agent import ( "encoding/json" - "kimo/config" - "kimo/types" "net/http" - "os" "strconv" - "time" "github.com/cenkalti/log" gopsutilNet "github.com/shirou/gopsutil/v4/net" gopsutilProcess "github.com/shirou/gopsutil/v4/process" ) -// Agent is type for handling agent operations -type Agent struct { - Config *config.Agent - Conns []gopsutilNet.ConnectionStat - Hostname string +// Response contains basic process information for API responses. +type Response struct { + Status string `json:"status"` + Pid int32 `json:"pid"` + Name string `json:"name"` + CmdLine string `json:"cmdline"` } -// NewAgent is constuctor function for Agent type -func NewAgent(cfg *config.Config) *Agent { - d := new(Agent) - d.Config = &cfg.Agent - d.Hostname = getHostname() - return d -} - -func getHostname() string { - hostname, err := os.Hostname() - if err != nil { - log.Errorf("Hostname could not found") - hostname = "UNKNOWN" - } - return hostname +// NetworkProcess represents process with its network connection. +type NetworkProcess struct { + process *gopsutilProcess.Process + conn gopsutilNet.ConnectionStat } +// parsePortParam parses and returns port number from the request. func parsePortParam(w http.ResponseWriter, req *http.Request) (uint32, error) { portParam, ok := req.URL.Query()["port"] log.Debugf("Looking for process of port: %s\n", portParam) @@ -55,13 +42,9 @@ func parsePortParam(w http.ResponseWriter, req *http.Request) (uint32, error) { return uint32(p), nil } -type hostProc struct { - process *gopsutilProcess.Process - conn gopsutilNet.ConnectionStat -} - -func (a *Agent) findProc(port uint32) *hostProc { - for _, conn := range a.Conns { +// findProcess finds process from connections by given port. +func findProcess(port uint32, conns []gopsutilNet.ConnectionStat) *NetworkProcess { + for _, conn := range conns { if conn.Laddr.Port != port { continue } @@ -77,7 +60,7 @@ func (a *Agent) findProc(port uint32) *hostProc { return nil } - return &hostProc{ + return &NetworkProcess{ process: process, conn: conn, } @@ -85,85 +68,47 @@ func (a *Agent) findProc(port uint32) *hostProc { return nil } -func (a *Agent) createAgentProcess(proc *hostProc) *types.AgentProcess { - if proc == nil { +// createResponse creates Response from given NetworkProcess parameter. +func createResponse(np *NetworkProcess) *Response { + if np == nil { return nil } - name, err := proc.process.Name() + name, err := np.process.Name() if err != nil { name = "" } - cl, err := proc.process.CmdlineSlice() + cmdline, err := np.process.Cmdline() if err != nil { - log.Debugf("Cmdline could not found for %d\n", proc.process.Pid) + log.Debugf("Cmdline could not found for %d\n", np.process.Pid) } - return &types.AgentProcess{ - Laddr: types.IPPort{IP: proc.conn.Laddr.IP, Port: proc.conn.Laddr.Port}, - Status: proc.conn.Status, - Pid: proc.conn.Pid, - Name: name, - CmdLine: cl, - Hostname: a.Hostname, + return &Response{ + Status: np.conn.Status, + Pid: np.conn.Pid, + Name: name, + CmdLine: cmdline, } } // Process is handler for serving process info func (a *Agent) Process(w http.ResponseWriter, req *http.Request) { + w.Header().Set("X-Kimo-Hostname", a.Hostname) + // todo: cache result for a short period (10s? 30s?) port, err := parsePortParam(w, req) if err != nil { http.Error(w, "port param is required", http.StatusBadRequest) return } - p := a.findProc(port) - ap := a.createAgentProcess(p) - - w.Header().Set("Content-Type", "application/json") - if ap == nil { - json.NewEncoder(w).Encode(&types.AgentProcess{ - Hostname: a.Hostname, - }) + p := findProcess(port, a.GetConns()) + if p == nil { + http.Error(w, "Connection not found", http.StatusNotFound) return } - json.NewEncoder(w).Encode(&ap) - return -} -func (a *Agent) pollConns() { - // todo: run with context - log.Debugln("Polling...") - ticker := time.NewTicker(a.Config.PollDuration * time.Second) - - for { - a.getConns() // poll immediately at the initialization - select { - // todo: add return case - case <-ticker.C: - a.getConns() - } - } - -} -func (a *Agent) getConns() { - // This is an expensive operation. - // So, we need to call it infrequent to prevent high load on servers those run kimo agents. - conns, err := gopsutilNet.Connections("all") - if err != nil { - log.Errorln(err.Error()) - return - } - a.Conns = conns -} - -// Run is main function to run http server -func (a *Agent) Run() error { - go a.pollConns() - - http.HandleFunc("/proc", a.Process) - err := http.ListenAndServe(a.Config.ListenAddress, nil) + w.Header().Set("Content-Type", "application/json") + ap := createResponse(p) + err = json.NewEncoder(w).Encode(&ap) if err != nil { - log.Errorln(err.Error()) - return err + http.Error(w, "Can not encode agent process", http.StatusInternalServerError) } - return nil } diff --git a/agent/poll.go b/agent/poll.go new file mode 100644 index 0000000..20a9aad --- /dev/null +++ b/agent/poll.go @@ -0,0 +1,73 @@ +package agent + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/log" + gopsutilNet "github.com/shirou/gopsutil/v4/net" +) + +// pollConns periodically checks for connections. +func (a *Agent) pollConns(ctx context.Context) { + log.Infoln("Polling started...") + ticker := time.NewTicker(a.Config.PollInterval) + + // Initial poll + if err := a.doPoll(ctx); err != nil { + log.Errorf("Initial poll failed: %v", err) + } + + for { + select { + case <-ticker.C: + if err := a.doPoll(ctx); err != nil { + log.Errorf("Poll failed: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +// doPoll retrieves the current network connections and updates the Agent's connection state. +// It returns an error if fetching connections fails. +func (a *Agent) doPoll(ctx context.Context) error { + conns, err := getConns(ctx) + if err != nil { + return err + } + + a.SetConns(conns) + + log.Infof("Updated connections: %d", len(conns)) + return nil +} + +// getConns retrieves a list of TCP connections with a 5-second timeout. +// It runs the potentially expensive connection checking operation in a separate goroutine +// to ensure it doesn't block indefinitely. +func getConns(ctx context.Context) ([]gopsutilNet.ConnectionStat, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*5) + defer cancel() + + type result struct { + conns []gopsutilNet.ConnectionStat + err error + } + + resultChan := make(chan result, 1) + go func() { + // Expensive operation - should be called sparingly to avoid high server load + conns, err := gopsutilNet.ConnectionsWithContext(ctx, "tcp") + resultChan <- result{conns, err} + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("operation timed out: %w", ctx.Err()) + case r := <-resultChan: + return r.conns, r.err + } +} diff --git a/config.toml b/config.toml deleted file mode 100644 index a2ea802..0000000 --- a/config.toml +++ /dev/null @@ -1,16 +0,0 @@ -debug = true - -[agent] -listen_address = "0.0.0.0:3333" -poll_duration = 30 - -[server] -dsn = "kimo:123@(mysql:3306)/information_schema" -listen_address = "0.0.0.0:3322" -poll_duration = 10 -agent_port = 3333 -tcpproxy_mgmt_address = "tcpproxy:3307" -agent_connect_timeout = 1 -agent_read_timeout = 4 -tcpproxy_connect_timeout = 1 -tcpproxy_read_timeout = 2 diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..1fd4a09 --- /dev/null +++ b/config.yaml @@ -0,0 +1,20 @@ +debug: true + +agent: + listen_address: "0.0.0.0:3333" + poll_interval: "10s" + +server: + listen_address: "0.0.0.0:3322" + poll_interval: "12s" + mysql: + dsn: "kimo:123@(kimo-mysql:3306)/information_schema" + agent: + # kimo-agent listens this port. + port: 3333 + tcpproxy: + mgmt_address: "kimo-tcpproxy:3307" + metric: + # If one of these patterns match, whole cmdline will be exposed as it is, otherwise it will be truncated. + cmdline_patterns: + - "mysql*" diff --git a/config/config.go b/config/config.go index 3758020..cc17bc5 100644 --- a/config/config.go +++ b/config/config.go @@ -1,65 +1,90 @@ package config import ( + "fmt" + "os" "time" - "github.com/BurntSushi/toml" + "gopkg.in/yaml.v3" ) -// Config is used as config that contains both of agent and server configs +// Config represents the root configuration structure type Config struct { - Debug bool `toml:"debug"` - Agent Agent - Server Server + Debug bool `yaml:"debug"` + Agent AgentConfig `yaml:"agent"` + Server ServerConfig `yaml:"server"` } -// Server is used as server config -type Server struct { - DSN string `toml:"dsn"` - AgentPort uint32 `toml:"agent_port"` - PollDuration time.Duration `toml:"poll_duration"` - TCPProxyMgmtAddress string `toml:"tcpproxy_mgmt_address"` - ListenAddress string `toml:"listen_address"` - AgentConnectTimeout time.Duration `toml:"agent_connect_timeout"` - AgentReadTimeout time.Duration `toml:"agent_read_timeout"` - TCPProxyConnectTimeout time.Duration `toml:"tcpproxy_connect_timeout"` - TCPProxyReadTimeout time.Duration `toml:"tcpproxy_read_timeout"` +// AgentConfig represents the agent section configuration +type AgentConfig struct { + ListenAddress string `yaml:"listen_address"` + PollInterval time.Duration `yaml:"poll_interval"` } -// Agent is used as anget config on agent machines -type Agent struct { - ListenAddress string `toml:"listen_address"` - PollDuration time.Duration `toml:"poll_duration"` +// ServerConfig represents the server section configuration +type ServerConfig struct { + ListenAddress string `yaml:"listen_address"` + PollInterval time.Duration `yaml:"poll_interval"` + MySQL MySQLConfig `yaml:"mysql"` + Agent AgentInfo `yaml:"agent"` + TCPProxy TCPProxy `yaml:"tcpproxy"` + Metric Metric `yaml:"metric"` } -// NewConfig is constructor function for Config type +// MySQLConfig holds MySQL specific configuration +type MySQLConfig struct { + DSN string `yaml:"dsn"` +} + +// AgentInfo holds agent-related configuration within server section +type AgentInfo struct { + Port uint32 `yaml:"port"` +} + +// TCPProxy holds TCP proxy configuration +type TCPProxy struct { + MgmtAddress string `yaml:"mgmt_address"` +} + +// Metric holds metric-related configuration +type Metric struct { + CmdlinePatterns []string `yaml:"cmdline_patterns"` +} + +// NewConfig creates and returns a new Config. func NewConfig() *Config { c := new(Config) *c = defaultConfig return c } -// ReadFile parses a TOML file and returns new Config. -func (c *Config) ReadFile(name string) error { - _, err := toml.DecodeFile(name, c) - return err +// LoadFile parses a yaml config file and loads it into Config object +func (c *Config) LoadConfig(name string) error { + content, err := os.ReadFile(name) + if err != nil { + return err + } + err = yaml.Unmarshal(content, c) + if err != nil { + return fmt.Errorf("parsing YAML file %s: %w", name, err) + } + return nil } var defaultConfig = Config{ - Server: Server{ - DSN: "", - AgentPort: 3333, - PollDuration: 10, - TCPProxyMgmtAddress: "tcpproxy:3307", - ListenAddress: "0.0.0.0:3322", - AgentConnectTimeout: 2, - AgentReadTimeout: 3, - TCPProxyConnectTimeout: 1, - TCPProxyReadTimeout: 1, - }, - Agent: Agent{ + Debug: true, + Agent: AgentConfig{ ListenAddress: "0.0.0.0:3333", - PollDuration: 30, + PollInterval: 10 * time.Second, + }, + Server: ServerConfig{ + ListenAddress: "0.0.0.0:3322", + PollInterval: 12 * time.Second, + MySQL: MySQLConfig{ + DSN: "", + }, + Agent: AgentInfo{ + Port: 3333, + }, }, - Debug: true, } diff --git a/docker-compose.yml b/docker-compose.yml index 352dbc8..aeebde6 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,24 +2,26 @@ version: '3.9' services: mysql: image: kimo/mysql - container_name: mysql + container_name: kimo-mysql build: dockerfile: Dockerfile-mysql context: . ports: - "3306:3306" tcpproxy: - image: kimo/tcpproxy - container_name: tcpproxy - build: - dockerfile: Dockerfile-tcpproxy - context: . + image: cenkalti/tcpproxy + container_name: kimo-tcpproxy depends_on: - mysql ports: - "3307:3307" tty: true privileged: true + command: + - "-m" + - "0.0.0.0:3307" + - "0.0.0.0:3306" + - "mysql:3306" kimo: image: kimo container_name: kimo @@ -39,8 +41,9 @@ services: depends_on: - tcpproxy tty: true + scale: 3 kimo-server: - image: kimo-server + image: kimo container_name: kimo-server entrypoint: kimo --debug server ports: diff --git a/go.mod b/go.mod index 6f1af8f..6484b2f 100644 --- a/go.mod +++ b/go.mod @@ -3,25 +3,26 @@ module kimo go 1.23.1 require ( - github.com/BurntSushi/toml v1.4.0 github.com/cenkalti/log v1.0.0 github.com/go-sql-driver/mysql v1.8.1 github.com/prometheus/client_golang v1.20.5 github.com/rakyll/statik v0.1.7 - github.com/shirou/gopsutil/v4 v4.24.9 - github.com/urfave/cli v1.22.4 + github.com/shirou/gopsutil/v4 v4.24.10 + github.com/urfave/cli v1.22.16 + gopkg.in/yaml.v3 v3.0.1 ) require ( filippo.io/edwards25519 v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d // indirect - github.com/ebitengine/purego v0.8.0 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect + github.com/ebitengine/purego v0.8.1 // indirect github.com/go-ole/go-ole v1.2.6 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.0 // indirect github.com/klauspost/compress v1.17.9 // indirect + github.com/kr/text v0.2.0 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/mattn/go-isatty v0.0.12 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect @@ -29,11 +30,10 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.55.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - github.com/russross/blackfriday/v2 v2.0.1 // indirect - github.com/shurcooL/sanitized_anchor_name v1.0.0 // indirect + github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - golang.org/x/sys v0.25.0 // indirect + golang.org/x/sys v0.26.0 // indirect google.golang.org/protobuf v1.34.2 // indirect ) diff --git a/go.sum b/go.sum index 9f72d0f..262117b 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,5 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/BurntSushi/toml v1.4.0 h1:kuoIxZQy2WRRk1pttg9asf+WVv6tWQuBNVmK8+nqPr0= github.com/BurntSushi/toml v1.4.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -9,12 +7,14 @@ github.com/cenkalti/log v1.0.0 h1:0SITaDyovlmHFLaV+qenYmDxh8TNgxbJscMyn4W8XWk= github.com/cenkalti/log v1.0.0/go.mod h1:Kbz0XnbnTBtcJN8yeRuPW/9SNtP1tx5SwjU+357jKYM= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d h1:U+s90UTSYgptZMwQh2aRr3LuazLJIa+Pg3Kc1ylSYVY= -github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/ebitengine/purego v0.8.0 h1:JbqvnEzRvPpxhCJzJJ2y0RbiZ8nyjccVUrSM3q+GvvE= -github.com/ebitengine/purego v0.8.0/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= +github.com/ebitengine/purego v0.8.1 h1:sdRKd6plj7KYW33EH5As6YKfe8m9zbN9JMrOjNVF/BE= +github.com/ebitengine/purego v0.8.1/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= @@ -28,6 +28,10 @@ github.com/hashicorp/go-multierror v1.1.0 h1:B9UzwGQJehnUY1yNrnwREHc3fGbC2xefo8g github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA= github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= @@ -50,20 +54,27 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ= github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc= -github.com/russross/blackfriday/v2 v2.0.1 h1:lPqVAte+HuHNfhJ/0LC98ESWRz8afy9tM/0RK8m9o+Q= -github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= -github.com/shirou/gopsutil/v4 v4.24.9 h1:KIV+/HaHD5ka5f570RZq+2SaeFsb/pq+fp2DGNWYoOI= -github.com/shirou/gopsutil/v4 v4.24.9/go.mod h1:3fkaHNeYsUFCGZ8+9vZVWtbyM1k2eRnlL+bWO8Bxa/Q= -github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= -github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/shirou/gopsutil/v4 v4.24.10 h1:7VOzPtfw/5YDU+jLEoBwXwxJbQetULywoSV4RYY7HkM= +github.com/shirou/gopsutil/v4 v4.24.10/go.mod h1:s4D/wg+ag4rG0WO7AiTj2BeYCRhym0vM7DHbZRxnIT8= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= -github.com/urfave/cli v1.22.4 h1:u7tSpNPPswAFymm8IehJhy4uJMlUuU/GmqSkvJ1InXA= -github.com/urfave/cli v1.22.4/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/urfave/cli v1.22.16 h1:MH0k6uJxdwdeWQTwhSO42Pwr4YLrNLwBtg1MRgTqPdQ= +github.com/urfave/cli v1.22.16/go.mod h1:EeJR6BKodywf4zciqrdw6hpCPk68JO9z5LazXZMn5Po= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -71,12 +82,15 @@ golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kimo-agent-entrypoint.sh b/kimo-agent-entrypoint.sh index 2efa2fe..1d223f0 100644 --- a/kimo-agent-entrypoint.sh +++ b/kimo-agent-entrypoint.sh @@ -1,6 +1,26 @@ #!/bin/bash + +host="kimo-tcpproxy" +user="kimo" +password="123" + +for i in {30..0}; do + mysql -h"$host" -u"$user" -p"$password" -e "SELECT 1" > /dev/null + if [ $? -eq 0 ]; then + echo "MySQL connection successful." + break + fi + + echo 'MySQL init process in progress...' + sleep 1 +done +if [ "$i" = 0 ]; then + echo >&2 'MySQL init process failed.' + exit 1 +fi + echo "mysql sleep query..." -mysql -u kimo -p123 -h tcpproxy -e "SELECT SLEEP(100000)" & +mysql -u"$user" -p"$password" -h"$host" -e "SELECT SLEEP(100000)" & echo "running kimo agent..." kimo --debug agent diff --git a/main.go b/main.go index 7e768a0..bec48a2 100644 --- a/main.go +++ b/main.go @@ -20,30 +20,22 @@ func main() { app.Flags = []cli.Flag{ cli.StringFlag{ Name: "config, c", - Value: "/etc/kimo.toml", + Value: "/etc/kimo.yaml", Usage: "configuration file path", }, cli.BoolFlag{ Name: "debug, d", Usage: "enable debug log", }, - cli.BoolFlag{ - Name: "no-debug, D", - Usage: "disable debug log", - }, } app.Before = func(c *cli.Context) error { - err := cfg.ReadFile(c.GlobalString("config")) + err := cfg.LoadConfig(c.GlobalString("config")) if err != nil { log.Errorf("Cannot read config: %s\n", err) } if c.IsSet("debug") { cfg.Debug = true } - if c.IsSet("no-debug") { - cfg.Debug = false - } - if cfg.Debug { log.SetLevel(log.DEBUG) } else { @@ -57,7 +49,7 @@ func main() { Name: "agent", Usage: "run agent", Action: func(c *cli.Context) error { - a := agent.NewAgent(cfg) + a := agent.NewAgent(&cfg.Agent) err := a.Run() if err != nil { return err @@ -69,7 +61,7 @@ func main() { Name: "server", Usage: "run server", Action: func(c *cli.Context) error { - s := server.NewServer(cfg) + s := server.NewServer(&cfg.Server) s.Config = &cfg.Server err := s.Run() if err != nil { diff --git a/server/agent.go b/server/agent.go index d14bde2..c356999 100644 --- a/server/agent.go +++ b/server/agent.go @@ -3,55 +3,84 @@ package server import ( "context" "encoding/json" - "errors" "fmt" - "kimo/config" - "kimo/types" - "time" + "net/http" "github.com/cenkalti/log" ) -// Agent is agent client to fetch agent process from Kimo Agent -type Agent struct { - ConnectTimeout time.Duration - ReadTimeout time.Duration - Port uint32 +// AgentClient represents an agent client to fetch get process from a kimo-agent +type AgentClient struct { + Host string + Port uint32 } -// NewAgent is constructor function for creating Agent object -func NewAgent(cfg config.Server) *Agent { - a := new(Agent) - a.Port = cfg.AgentPort - a.ConnectTimeout = cfg.AgentConnectTimeout - a.ReadTimeout = cfg.AgentReadTimeout - return a +// AgentError represents an HTTP error that is retured from kimo-agent. +type AgentError struct { + Hostname string + Status string } -// Fetch is used to fetch agent process -func (a *Agent) Fetch(ctx context.Context, host string, port uint32) (*types.AgentProcess, error) { - // todo: use request with context - var httpClient = NewHTTPClient(a.ConnectTimeout*time.Second, a.ReadTimeout*time.Second) - url := fmt.Sprintf("http://%s:%d/proc?port=%d", host, a.Port, port) +// AgentResponse represents a success response from kimo-agent. +type AgentResponse struct { + Pid int + Name string + CmdLine string + Hostname string + ConnectionStatus string +} + +func (ae *AgentError) Error() string { + return fmt.Sprintf("Agent error. Host: %s - status: %s\n", ae.Hostname, ae.Status) +} + +// NewAgentClient creates and returns a new AgentClient. +func NewAgentClient(address IPPort) *AgentClient { + ac := new(AgentClient) + ac.Host = address.IP + ac.Port = address.Port + return ac +} + +// Get gets process info from kimo agent. +func (ac *AgentClient) Get(ctx context.Context, port uint32) (*AgentResponse, error) { + url := fmt.Sprintf("http://%s:%d/proc?port=%d", ac.Host, ac.Port, port) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, err + } + client := &http.Client{} log.Debugf("Requesting to %s\n", url) - response, err := httpClient.Get(url) + response, err := client.Do(req) if err != nil { return nil, err } + defer response.Body.Close() + hostname := response.Header.Get("X-Kimo-Hostname") if response.StatusCode != 200 { - log.Debugf("Error: %s -> %s\n", url, response.Status) - return nil, errors.New("status code is not 200") + return nil, &AgentError{Hostname: hostname, Status: response.Status} } - ap := types.AgentProcess{} - err = json.NewDecoder(response.Body).Decode(&ap) + type result struct { + Status string `json:"status"` + Pid int32 `json:"pid"` + Name string `json:"name"` + CmdLine string `json:"cmdline"` + } + r := result{} + err = json.NewDecoder(response.Body).Decode(&r) - // todo: consider NotFound if err != nil { log.Errorln(err.Error()) return nil, err } - return &ap, nil + return &AgentResponse{ + ConnectionStatus: r.Status, + Pid: int(r.Pid), + Name: r.Name, + CmdLine: r.CmdLine, + Hostname: hostname}, + nil } diff --git a/server/client.go b/server/client.go deleted file mode 100644 index 0e4b269..0000000 --- a/server/client.go +++ /dev/null @@ -1,192 +0,0 @@ -package server - -import ( - "context" - "kimo/config" - "kimo/types" - "net" - "strconv" - "sync" - - "github.com/cenkalti/log" -) - -// Client is used for creating process list -type Client struct { - Mysql *Mysql - TCPProxy *TCPProxy - Agent *Agent -} - -// KimoProcess is combined with processes from mysql to agent through tcpproxy -type KimoProcess struct { - AgentProcess *types.AgentProcess - MysqlProcess *MysqlProcess - TCPProxyRecord *TCPProxyRecord - Agent *Agent -} - -// SetAgentProcess is used to set agent process of a KimoProcess -func (kp *KimoProcess) SetAgentProcess(ctx context.Context, wg *sync.WaitGroup) { - // todo: get rid of this function. - defer wg.Done() - var host string - var port uint32 - - if kp.TCPProxyRecord != nil { - host = kp.TCPProxyRecord.ClientOut.IP - port = kp.TCPProxyRecord.ClientOut.Port - } else { - host = kp.MysqlProcess.Address.IP - port = kp.MysqlProcess.Address.Port - } - ap, err := kp.Agent.Fetch(ctx, host, port) - if err != nil { - log.Debugln(err.Error()) - kp.AgentProcess = &types.AgentProcess{ - Hostname: "ERROR", - } - } else { - kp.AgentProcess = ap - } -} - -// NewClient is constructor fuction for creating a Client object -func NewClient(cfg config.Server) *Client { - p := new(Client) - p.Mysql = NewMysql(cfg) - p.TCPProxy = NewTCPProxy(cfg) - p.Agent = NewAgent(cfg) - return p -} - -func findHostIP(host string) (string, error) { - ip := net.ParseIP(host) - if ip == nil { - ips, err := net.LookupIP(host) - if err != nil { - return "", err - } - return string(ips[0].String()), nil - } - return ip.String(), nil -} - -func findTCPProxyRecord(addr types.IPPort, proxyRecords []*TCPProxyRecord) *TCPProxyRecord { - ipAddr, err := findHostIP(addr.IP) - if err != nil { - log.Debugln(err.Error()) - return nil - } - - for _, pr := range proxyRecords { - if pr.ProxyOut.IP == ipAddr && pr.ProxyOut.Port == addr.Port { - return pr - } - } - return nil -} - -func (c *Client) initializeKimoProcesses(mps []*MysqlProcess, tps []*TCPProxyRecord) []*KimoProcess { - log.Infoln("Initializing Kimo processes...") - var kps []*KimoProcess - for _, mp := range mps { - tpr := findTCPProxyRecord(mp.Address, tps) - if tpr == nil { - continue - } - kps = append(kps, &KimoProcess{ - MysqlProcess: mp, - TCPProxyRecord: tpr, - Agent: c.Agent, - }) - } - log.Infof("%d processes are initialized \n", len(kps)) - return kps -} - -func (c *Client) createKimoProcesses(ctx context.Context) ([]*KimoProcess, error) { - var mps []*MysqlProcess - var tps []*TCPProxyRecord - - errC := make(chan error) - - mpsC := make(chan []*MysqlProcess) - tpsC := make(chan []*TCPProxyRecord) - - go c.Mysql.Fetch(ctx, mpsC, errC) - go c.TCPProxy.Fetch(ctx, tpsC, errC) - for { - if mps != nil && tps != nil { - kps := c.initializeKimoProcesses(mps, tps) - return kps, nil - - } - select { - case mpsResp := <-mpsC: - mps = mpsResp - case tpsResp := <-tpsC: - tps = tpsResp - case err := <-errC: - log.Errorf("Error occured: %s", err.Error()) - return nil, err - case <-ctx.Done(): - return nil, ctx.Err() - } - } - -} - -func (c *Client) setAgentProcesses(ctx context.Context, kps []*KimoProcess) { - log.Infof("Generating %d kimo processes...\n", len(kps)) - var wg sync.WaitGroup - for _, kp := range kps { - wg.Add(1) - go kp.SetAgentProcess(ctx, &wg) - } - wg.Wait() - log.Infoln("Generating process is done...") -} - -func (c *Client) createProcesses(kps []*KimoProcess) []Process { - ps := make([]Process, 0) - for _, kp := range kps { - ut, err := strconv.ParseUint(kp.MysqlProcess.Time, 10, 32) - if err != nil { - log.Errorf("time %s could not be converted to int", kp.MysqlProcess.Time) - } - ps = append(ps, Process{ - ID: kp.MysqlProcess.ID, - MysqlUser: kp.MysqlProcess.User, - DB: kp.MysqlProcess.DB.String, - Command: kp.MysqlProcess.Command, - Time: uint32(ut), - State: kp.MysqlProcess.State.String, - Info: kp.MysqlProcess.Info.String, - CmdLine: kp.AgentProcess.CmdLine, - Pid: kp.AgentProcess.Pid, - Host: kp.AgentProcess.Hostname, - }) - } - return ps -} - -// FetchAll is used to create processes from mysql to agents -func (c *Client) FetchAll(ctx context.Context) ([]Process, error) { - log.Debugf("Fetching...") - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - kps, err := c.createKimoProcesses(ctx) - if err != nil { - log.Error(err.Error()) - return nil, err - } - - c.setAgentProcesses(ctx, kps) - ps := c.createProcesses(kps) - - log.Debugf("%d processes are generated \n", len(ps)) - return ps, nil -} diff --git a/server/fetcher.go b/server/fetcher.go new file mode 100644 index 0000000..8853ff4 --- /dev/null +++ b/server/fetcher.go @@ -0,0 +1,252 @@ +package server + +import ( + "context" + "fmt" + "kimo/config" + "strings" + "sync" + "time" + + "github.com/cenkalti/log" +) + +// Fetcher fetches process info(s) from resources +type Fetcher struct { + MysqlClient *MysqlClient + TCPProxyClient *TCPProxyClient + + AgentPort uint32 +} + +// RawProcess combines resources information(mysql row, tcp proxy conn, agent process etc.) +type RawProcess struct { + MysqlRow *MysqlRow + TCPProxyConn *TCPProxyConn + AgentProcess *AgentProcess + + TCPProxyEnabled bool +} + +// AgentAddress returns agent address considering proxy usage. +func (rp *RawProcess) AgentAddress() IPPort { + if rp.TCPProxyConn != nil { + return IPPort{IP: rp.TCPProxyConn.ClientOut.IP, Port: rp.TCPProxyConn.ClientOut.Port} + } + return IPPort{IP: rp.MysqlRow.Address.IP, Port: rp.MysqlRow.Address.Port} +} + +// AgentProcess represents process info from a kimo-agent. +type AgentProcess struct { + Address IPPort + Response *AgentResponse + err error +} + +// Hostname is kimo agent's hostname. +func (ap *AgentProcess) Hostname() string { + if ap.Response != nil { + return ap.Response.Hostname + } + + if ap.err != nil { + if aErr, ok := ap.err.(*AgentError); ok { + return aErr.Hostname + } else { + return "ERROR" + } + } + + return "" +} + +// Pid returns process pid info from kimo agent +func (ap *AgentProcess) Pid() int { + if ap.Response != nil { + return ap.Response.Pid + } + return 0 +} + +// CmdLine returns process cmdline info from kimo agent +func (ap *AgentProcess) CmdLine() string { + if ap.Response != nil { + return ap.Response.CmdLine + } + return "" +} + +// ConnectionStatus returns process connections status info from kimo agent +func (ap *AgentProcess) ConnectionStatus() string { + if ap.Response != nil { + return strings.ToLower(ap.Response.ConnectionStatus) + } + return "" +} + +// Detail returns error detail of the process. +func (rp *RawProcess) Detail() string { + if rp.TCPProxyEnabled && rp.TCPProxyConn == nil { + return "No connection found on tcpproxy" + } + + if rp.AgentProcess != nil { + if rp.AgentProcess.err != nil { + return rp.AgentProcess.err.Error() + } + } + return "" +} + +// NewFetcher creates and returns a new Fetcher. +func NewFetcher(cfg config.ServerConfig) *Fetcher { + f := new(Fetcher) + f.MysqlClient = NewMysqlClient(cfg.MySQL) + if cfg.TCPProxy.MgmtAddress != "" { + f.TCPProxyClient = NewTCPProxyClient(cfg.TCPProxy) + } + f.AgentPort = cfg.Agent.Port + return f +} + +// getAgentProcess gets process info from a single kimo-agent. +func (f *Fetcher) getAgentProcess(ctx context.Context, wg *sync.WaitGroup, rp *RawProcess) { + defer wg.Done() + + ac := NewAgentClient(IPPort{IP: rp.AgentAddress().IP, Port: f.AgentPort}) + ar, err := ac.Get(ctx, rp.AgentAddress().Port) + rp.AgentProcess = &AgentProcess{Response: ar, err: err} +} + +// addProxyConns adds TCPProxy connection info if TCPProxy is enabled. +func addProxyConns(rps []*RawProcess, conns []*TCPProxyConn) { + log.Infoln("Adding tcpproxy conns...") + for _, rp := range rps { + conn := findTCPProxyConn(rp.AgentAddress(), conns) + if conn != nil { + rp.TCPProxyConn = conn + } + } +} + +// createRawProcesses creates raw process and inserts given param. +func createRawProcesses(rows []*MysqlRow) []*RawProcess { + log.Infoln("Combining mysql and tcpproxy results...") + var rps []*RawProcess + for _, row := range rows { + rp := &RawProcess{MysqlRow: row} + rps = append(rps, rp) + } + return rps +} + +// FetchAll fetches and creates processes from resources to agents +func (f *Fetcher) FetchAll(ctx context.Context) ([]*RawProcess, error) { + log.Infoln("Fetching resources...") + + log.Infoln("Fetching mysql rows...") + rows, err := f.fetchMysql(ctx) + if err != nil { + return nil, err + } + log.Infof("Got %d mysql rows \n", len(rows)) + + rps := createRawProcesses(rows) + + if f.TCPProxyClient != nil { + for _, rp := range rps { + rp.TCPProxyEnabled = true + } + + log.Infoln("Fetching tcpproxy conns...") + tps, err := f.fetchTcpProxy(ctx) + if err != nil { + return nil, err + } + log.Infof("Got %d tcpproxy conns \n", len(tps)) + addProxyConns(rps, tps) + } + + log.Infof("Fetching %d agents...\n", len(rps)) + f.fetchAgents(ctx, rps) + + log.Debugf("%d raw processes are generated \n", len(rps)) + return rps, nil +} + +// fetchMysql retrieves MySQL data with timeout. +// It performs the fetch operation in a separate goroutine to prevent blocking. +func (f *Fetcher) fetchMysql(ctx context.Context) ([]*MysqlRow, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() + + type result struct { + rows []*MysqlRow + err error + } + + resultChan := make(chan result, 1) + go func() { + rows, err := f.MysqlClient.Get(ctx) + resultChan <- result{rows, err} + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("fetch mysql operation timed out: %w", ctx.Err()) + case r := <-resultChan: + return r.rows, r.err + } +} + +// fetchTcpProxy retrieves TCP proxy connections with timeout. +// It performs the fetch operation in a separate goroutine to prevent blocking. +func (f *Fetcher) fetchTcpProxy(ctx context.Context) ([]*TCPProxyConn, error) { + ctx, cancel := context.WithTimeout(ctx, time.Second*3) + defer cancel() + + type result struct { + conns []*TCPProxyConn + err error + } + + resultChan := make(chan result, 1) + go func() { + conns, err := f.TCPProxyClient.Get(ctx) + resultChan <- result{conns, err} + }() + + select { + case <-ctx.Done(): + return nil, fmt.Errorf("fetch tcpproxy operation timed out: %w", ctx.Err()) + case r := <-resultChan: + return r.conns, r.err + } +} + +// fetchAgents concurrently retrieves process information from multiple agents with timeout. +func (f *Fetcher) fetchAgents(ctx context.Context, rps []*RawProcess) { + ctx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + done := make(chan struct{}, 1) + go func() { + // todo: limit concurrent goroutines. + var wg sync.WaitGroup + for _, rp := range rps { + wg.Add(1) + go f.getAgentProcess(ctx, &wg, rp) + } + wg.Wait() + done <- struct{}{} + }() + + select { + case <-ctx.Done(): + log.Errorf("fetch agents operation timed out: %s", ctx.Err()) + return + case <-done: + log.Infoln("All agents are visited.") + return + } +} diff --git a/server/http.go b/server/http.go index baa8c9e..dc9b431 100644 --- a/server/http.go +++ b/server/http.go @@ -3,9 +3,7 @@ package server import ( "encoding/json" "fmt" - "net" "net/http" - "time" "github.com/cenkalti/log" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -14,59 +12,29 @@ import ( _ "kimo/statik" // Auto-generated module by statik. ) -// Response is type for returning a response from kimo server +// Response contains basic process information for API responses. type Response struct { - Processes []Process `json:"processes"` -} - -// NewHTTPClient returns a http client with custom connect & read timeout -func NewHTTPClient(connectTimeout, readTimeout time.Duration) *http.Client { - return &http.Client{ - Transport: &http.Transport{ - Dial: timeoutDialer(connectTimeout, readTimeout), - }, - } -} - -func timeoutDialer(connectTimeout, readTimeout time.Duration) func(net, addr string) (c net.Conn, err error) { - return func(netw, addr string) (net.Conn, error) { - conn, err := net.DialTimeout(netw, addr, connectTimeout) - if err != nil { - return nil, err - } - conn.SetDeadline(time.Now().Add(readTimeout)) - return conn, nil - } + Processes []KimoProcess `json:"processes"` } // Procs is a handler for returning process list func (s *Server) Procs(w http.ResponseWriter, req *http.Request) { - forceParam := req.URL.Query().Get("force") - fetch := false - if forceParam == "true" || len(s.Processes) == 0 { - fetch = true - } - - if fetch { - s.FetchAll() - } - w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers") - - log.Infof("Returning response with %d kimo processes...\n", len(s.Processes)) w.Header().Set("Content-Type", "application/json") response := &Response{ - Processes: s.Processes, + Processes: s.GetProcesses(), + } + err := json.NewEncoder(w).Encode(response) + if err != nil { + http.Error(w, "Can not encode process", http.StatusInternalServerError) } - json.NewEncoder(w).Encode(response) } // Health is a dummy endpoint for load balancer health check func (s *Server) Health(w http.ResponseWriter, req *http.Request) { - // todo: real health check fmt.Fprintf(w, "OK") } @@ -82,26 +50,5 @@ func (s *Server) Static() http.Handler { // Metrics is used to expose metrics that is compatible with Prometheus exporter func (s *Server) Metrics() http.Handler { - // todo: separate prometheus and json metrics return promhttp.Handler() } - -// Run is used to run http handlers -func (s *Server) Run() error { - // todo: reconsider context usages - log.Infof("Running server on %s \n", s.Config.ListenAddress) - - go s.pollAgents() - go s.setMetrics() - - http.Handle("/", s.Static()) - http.Handle("/metrics", s.Metrics()) - http.HandleFunc("/procs", s.Procs) - http.HandleFunc("/health", s.Health) - err := http.ListenAndServe(s.Config.ListenAddress, nil) - if err != nil { - log.Errorln(err.Error()) - return err - } - return nil -} diff --git a/server/metric.go b/server/metric.go index 517e5cf..24ef5af 100644 --- a/server/metric.go +++ b/server/metric.go @@ -1,135 +1,90 @@ package server import ( + "fmt" + "regexp" "strings" - "time" "github.com/cenkalti/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) -// PrometheusMetric is the type that contains all metrics those will be exposed. +// PrometheusMetric represents the type that contains all metrics those will be exposed. type PrometheusMetric struct { - conns prometheus.Gauge - host *prometheus.GaugeVec - db *prometheus.GaugeVec - command *prometheus.GaugeVec - state *prometheus.GaugeVec - cmdline *prometheus.GaugeVec - Server *Server + conns prometheus.Gauge + conn *prometheus.GaugeVec + + commandLineRegexps []*regexp.Regexp } -// NewPrometheusMetric is the constructor function of PrometheusMetric -func NewPrometheusMetric(server *Server) *PrometheusMetric { +// NewPrometheusMetric creates and returns a new PrometheusMetric. +func NewPrometheusMetric(commandLinePatterns []string) *PrometheusMetric { return &PrometheusMetric{ - Server: server, + commandLineRegexps: convertPatternsToRegexps(commandLinePatterns), conns: promauto.NewGauge(prometheus.GaugeOpts{ - Name: "kimo_conns_total", + Name: "kimo_mysql_conns_total", Help: "Total number of db processes (conns)", }), - host: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "kimo_conns_host", - Help: "Conns per host", - }, - []string{ - "host", - }, - ), - db: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "kimo_conns_db", - Help: "Conns per db", + conn: promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "kimo_mysql_connection", + Help: "Kimo mysql connection.", }, []string{ "db", - }, - ), - command: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "kimo_conns_command", - Help: "Conns per command", - }, - []string{ + "host", "command", - }, - ), - state: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "kimo_conns_state", - Help: "Conns per state", - }, - []string{ "state", - }, - ), - cmdline: promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "kimo_conns_cmdline", - Help: "conns per cmdline", - }, - []string{ "cmdline", }, ), } } -// SetMetrics is used to set metrics periodically. -func (pm *PrometheusMetric) SetMetrics() { - // todo: configurable time - ticker := time.NewTicker(2 * time.Second) - - for { - select { - // todo: add return case - case <-ticker.C: - pm.Set() - } +// convertPatternsToRegexps converts given patterns into regexps. +func convertPatternsToRegexps(patterns []string) []*regexp.Regexp { + rps := make([]*regexp.Regexp, 0) + for _, pattern := range patterns { + rps = append(rps, regexp.MustCompile(pattern)) } + return rps + } // Set sets all metrics based on Processes -func (pm *PrometheusMetric) Set() { - ps := pm.Server.Processes - if len(ps) == 0 { - return - } - log.Debugf("Found '%d' processes. Setting metrics...\n", len(pm.Server.Processes)) - pm.conns.Set(float64(len(ps))) +func (pm *PrometheusMetric) Set(kps []KimoProcess) { + // clear previous run. + pm.conns.Set(0) + pm.conn.MetricVec.Reset() - // todo: too much duplication - var metricM = map[string]map[string]int{} - metricM["db"] = map[string]int{} - metricM["host"] = map[string]int{} - metricM["state"] = map[string]int{} - metricM["command"] = map[string]int{} - metricM["cmdline"] = map[string]int{} + log.Debugf("Found '%d' processes. Setting metrics...\n", len(kps)) - for _, p := range ps { - metricM["db"][p.DB]++ - metricM["host"][p.Host]++ - metricM["command"][p.Command]++ - metricM["state"][p.State]++ - metricM["cmdline"][strings.Join(p.CmdLine, " ")]++ - } - for k, v := range metricM { - switch k { - case "db": - setGaugeVec(k, v, pm.db) - case "host": - setGaugeVec(k, v, pm.host) - case "command": - setGaugeVec(k, v, pm.command) - case "state": - setGaugeVec(k, v, pm.state) - case "cmdline": - setGaugeVec(k, v, pm.cmdline) - } + pm.conns.Set(float64(len(kps))) + + for _, p := range kps { + pm.conn.With(prometheus.Labels{ + "db": p.DB, + "host": p.Host, + "command": p.Command, + "state": p.State, + "cmdline": pm.formatCommand(p.CmdLine), + }).Inc() } } -func setGaugeVec(name string, m map[string]int, gv *prometheus.GaugeVec) { - for i, j := range m { - if i == "" { - i = "UNKNOWN" +// formatCommand formats the command string based on configuration +func (pm *PrometheusMetric) formatCommand(cmdline string) string { + // Expose whole cmdline if pattern matches. + for _, cmdRegexp := range pm.commandLineRegexps { + result := cmdRegexp.FindString(cmdline) + if result != "" { + return cmdline } - gv.With(prometheus.Labels{name: i}).Set(float64(j)) } + // anonymize cmdline + parts := strings.Split(cmdline, " ") + if len(parts) >= 2 { + return fmt.Sprintf("%s %s ", parts[0], parts[1]) + } + return parts[0] } diff --git a/server/mysql.go b/server/mysql.go index 4c14971..fdf25f8 100644 --- a/server/mysql.go +++ b/server/mysql.go @@ -4,7 +4,6 @@ import ( "context" "database/sql" "kimo/config" - "kimo/types" "strconv" "strings" @@ -12,8 +11,8 @@ import ( _ "github.com/go-sql-driver/mysql" // imports mysql driver ) -// MysqlProcess is the process type in terms of MySQL context (a row from processlist table) -type MysqlProcess struct { +// MysqlRow represents a row from processlist table +type MysqlRow struct { ID int32 `json:"id"` User string `json:"user"` DB sql.NullString `json:"db"` @@ -21,50 +20,49 @@ type MysqlProcess struct { Time string `json:"time"` State sql.NullString `json:"state"` Info sql.NullString `json:"info"` - Address types.IPPort `json:"address"` + Address IPPort `json:"address"` } -// NewMysql is used to create a Mysql type. -func NewMysql(cfg config.Server) *Mysql { - m := new(Mysql) +// NewMysqlClient creates and returns a new *MysqlClient. +func NewMysqlClient(cfg config.MySQLConfig) *MysqlClient { + m := new(MysqlClient) m.DSN = cfg.DSN return m } -// Mysql is used to get processes from mysql. -type Mysql struct { +// MysqlClient represents a MySQL database client that manages connection details and stores query results. +type MysqlClient struct { DSN string - Processes []MysqlProcess + MysqlRows []MysqlRow } -// Fetch is used to fetch processlist table from information_schema. -func (m *Mysql) Fetch(ctx context.Context, procsC chan<- []*MysqlProcess, errC chan<- error) { - log.Infoln("Requesting to mysql...") - db, err := sql.Open("mysql", m.DSN) +// Get gets processlist table from information_schema. +func (mc *MysqlClient) Get(ctx context.Context) ([]*MysqlRow, error) { + db, err := sql.Open("mysql", mc.DSN) if err != nil { - errC <- err + return nil, err } defer db.Close() - results, err := db.Query("select * from PROCESSLIST") + results, err := db.QueryContext(ctx, "select * from PROCESSLIST") if err != nil { - errC <- err + return nil, err } if results == nil { - return + return nil, err } - mps := make([]*MysqlProcess, 0) + mps := make([]*MysqlRow, 0) for results.Next() { - var mp MysqlProcess + var mp MysqlRow var host string err = results.Scan(&mp.ID, &mp.User, &host, &mp.DB, &mp.Command, &mp.Time, &mp.State, &mp.Info) if err != nil { - errC <- err + return nil, err } s := strings.Split(host, ":") if len(s) < 2 { @@ -76,12 +74,11 @@ func (m *Mysql) Fetch(ctx context.Context, procsC chan<- []*MysqlProcess, errC c log.Errorf("error during string to int32: %s\n", err) continue } - mp.Address = types.IPPort{ + mp.Address = IPPort{ IP: s[0], Port: uint32(parsedPort), } mps = append(mps, &mp) } - log.Infof("Got %d mysql processes \n", len(mps)) - procsC <- mps + return mps, nil } diff --git a/server/poll.go b/server/poll.go new file mode 100644 index 0000000..9045600 --- /dev/null +++ b/server/poll.go @@ -0,0 +1,67 @@ +package server + +import ( + "context" + "fmt" + "time" + + "github.com/cenkalti/log" +) + +// pollAgents continuously polls for agent information at configured intervals. +// It performs an initial poll immediately, then polls based on PollInterval from config. +func (s *Server) pollAgents(ctx context.Context) { + log.Infoln("Polling started...") + ticker := time.NewTicker(s.Config.PollInterval) + + // Initial poll + if err := s.doPoll(ctx); err != nil { + log.Errorf("Initial poll failed: %v", err) + } + + for { + select { + case <-ticker.C: + if err := s.doPoll(ctx); err != nil { + log.Errorf("Poll failed: %v", err) + } + case <-ctx.Done(): + return + } + } +} + +// doPoll performs a single polling operation to fetch and update process information. +func (s *Server) doPoll(ctx context.Context) error { + type result struct { + rps []*RawProcess + err error + } + + resultChan := make(chan result) + + go func() { + rps, err := s.Fetcher.FetchAll(ctx) + select { + case resultChan <- result{rps, err}: + return + case <-ctx.Done(): + return + } + }() + + select { + case <-ctx.Done(): + return fmt.Errorf("operation timed out while fetching all: %w", ctx.Err()) + case r := <-resultChan: + if r.err != nil { + return r.err + } + kps := s.ConvertProcesses(r.rps) + s.SetProcesses(kps) + s.PrometheusMetric.Set(s.GetProcesses()) + log.Debugf("%d processes are set\n", len(s.GetProcesses())) + return nil + } + +} diff --git a/server/server.go b/server/server.go index 0710b41..62215d1 100644 --- a/server/server.go +++ b/server/server.go @@ -3,76 +3,114 @@ package server import ( "context" "kimo/config" - "time" + "net/http" + "strconv" + "sync" "github.com/cenkalti/log" ) -// Process is the final processes that is combined with AgentProcess + TCPProxyRecord + MysqlProcess -type Process struct { - ID int32 `json:"id"` - MysqlUser string `json:"mysql_user"` - DB string `json:"db"` - Command string `json:"command"` - Time uint32 `json:"time"` - State string `json:"state"` - Info string `json:"info"` - CmdLine []string `json:"cmdline"` - Pid int32 `json:"pid"` - Host string `json:"host"` +// KimoProcess is the final process that is combined with AgentProcess + TCPProxyConn + MysqlProcess +type KimoProcess struct { + ID int32 `json:"id"` + MysqlUser string `json:"mysql_user"` + DB string `json:"db"` + Command string `json:"command"` + Time uint32 `json:"time"` + State string `json:"state"` + Info string `json:"info"` + CmdLine string `json:"cmdline"` + ConnectionStatus string `json:"status"` + Pid int `json:"pid,omitempty"` + Host string `json:"host"` + Detail string `json:"detail"` } -// Server is a type for handling server side +// Server is a type for handling server side operations type Server struct { - Config *config.Server + Config *config.ServerConfig PrometheusMetric *PrometheusMetric - Processes []Process // todo: bad naming. - Client *Client + Fetcher *Fetcher + AgentPort uint32 + processes []KimoProcess + mu sync.RWMutex // proctects processes } -// NewServer is used to create a new Server type -func NewServer(cfg *config.Config) *Server { - log.Infoln("Creating a new server...") - s := new(Server) - s.Config = &cfg.Server - s.PrometheusMetric = NewPrometheusMetric(s) - s.Processes = make([]Process, 0) - s.Client = NewClient(*s.Config) - return s +// SetProcesses sets kimo processes with lock +func (s *Server) SetProcesses(kps []KimoProcess) { + s.mu.Lock() + s.processes = kps + s.mu.Unlock() } -// FetchAll fetches all processes through Client object -func (s *Server) FetchAll() { - // todo: call with lock - // todo: prevent race condition - // todo: if a fetch is in progress and a new fetch is triggered, cancel the existing one. - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() +// GetProcesses gets kimo processes with lock +func (s *Server) GetProcesses() []KimoProcess { + s.mu.RLock() + defer s.mu.RUnlock() + return s.processes +} - ps, err := s.Client.FetchAll(ctx) - if err != nil { - log.Error(err.Error()) - return +// ConvertProcesses convert raw processes to kimo processes +func (s *Server) ConvertProcesses(rps []*RawProcess) []KimoProcess { + kps := make([]KimoProcess, 0) + for _, rp := range rps { + ut, err := strconv.ParseUint(rp.MysqlRow.Time, 10, 32) + if err != nil { + log.Errorf("time %s could not be converted to int", rp.MysqlRow.Time) + } + var kp KimoProcess + + // set mysql properties + kp.ID = rp.MysqlRow.ID + kp.MysqlUser = rp.MysqlRow.User + kp.DB = rp.MysqlRow.DB.String + kp.Command = rp.MysqlRow.Command + kp.Time = uint32(ut) + kp.State = rp.MysqlRow.State.String + kp.Info = rp.MysqlRow.Info.String + + // set agent process properties + kp.CmdLine = rp.AgentProcess.CmdLine() + kp.ConnectionStatus = rp.AgentProcess.ConnectionStatus() + kp.Pid = rp.AgentProcess.Pid() + kp.Host = rp.AgentProcess.Hostname() + kp.Detail = rp.Detail() + + kps = append(kps, kp) } - s.Processes = ps - log.Debugf("%d processes are set\n", len(s.Processes)) + return kps } -func (s *Server) setMetrics() { - // todo: prevent race condition - s.PrometheusMetric.SetMetrics() +// NewServer creates an returns a new *Server +func NewServer(cfg *config.ServerConfig) *Server { + log.Infoln("Creating a new server...") + s := &Server{ + Config: cfg, + PrometheusMetric: NewPrometheusMetric(cfg.Metric.CmdlinePatterns), + processes: make([]KimoProcess, 0), + AgentPort: cfg.Agent.Port, + } + s.Fetcher = NewFetcher(*s.Config) + return s } -func (s *Server) pollAgents() { - ticker := time.NewTicker(s.Config.PollDuration * time.Second) +// Run starts the server and begins listening for HTTP requests. +func (s *Server) Run() error { + log.Infof("Running server on %s \n", s.Config.ListenAddress) - for { - s.FetchAll() // poll immediately at initialization - select { - // todo: add return case - case <-ticker.C: - s.FetchAll() - } - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go s.pollAgents(ctx) + http.Handle("/", s.Static()) + http.Handle("/metrics", s.Metrics()) + http.HandleFunc("/procs", s.Procs) + http.HandleFunc("/health", s.Health) + err := http.ListenAndServe(s.Config.ListenAddress, nil) + if err != nil { + log.Errorln(err.Error()) + return err + } + return nil } diff --git a/server/static/index.html b/server/static/index.html index 4f44de9..ea00889 100644 --- a/server/static/index.html +++ b/server/static/index.html @@ -3,8 +3,8 @@ KIMO??? - - + +