From 622c7a78b5ed34eb5767debbad275faaba1ccc7d Mon Sep 17 00:00:00 2001 From: Mateusz Charytoniuk Date: Mon, 13 May 2024 21:12:30 +0200 Subject: [PATCH] feat(agent): register target --- agent/AgentConfiguration.go | 11 +++ agent/LlamaCppObserver.go | 42 +++++++++++ cmd/Agent.go | 37 +++++++++- cmd/Balancer.go | 7 +- llamacpp/LlamaCppClient.go | 2 - llamacpp/LlamaCppClient_test.go | 2 + loadbalancer/LlamaCppTargetConfiguration.go | 2 +- loadbalancer/LoadBalancerStatus.go | 2 +- loadbalancer/ReverseProxyServer.go | 56 +++++++++++++++ main.go | 62 ++++++++++++---- management/Client.go | 79 +++++++++++++++++++++ management/RegisterTargetRequest.go | 11 +++ management/RespondToRegisterTarget.go | 41 +++++++---- netcfg/HttpAddressConfiguration.go | 6 +- reverseproxy/ReverseProxyController.go | 18 ----- reverseproxy/Server.go | 56 --------------- 16 files changed, 319 insertions(+), 115 deletions(-) create mode 100644 agent/AgentConfiguration.go create mode 100644 loadbalancer/ReverseProxyServer.go create mode 100644 management/Client.go create mode 100644 management/RegisterTargetRequest.go delete mode 100644 reverseproxy/ReverseProxyController.go delete mode 100644 reverseproxy/Server.go diff --git a/agent/AgentConfiguration.go b/agent/AgentConfiguration.go new file mode 100644 index 0000000..59e81de --- /dev/null +++ b/agent/AgentConfiguration.go @@ -0,0 +1,11 @@ +package agent + +import "time" + +type AgentConfiguration struct { + ReportingIntervalMiliseconds uint +} + +func (self *AgentConfiguration) GetReportingIntervalDuration() time.Duration { + return time.Duration(self.ReportingIntervalMiliseconds) * time.Millisecond +} diff --git a/agent/LlamaCppObserver.go b/agent/LlamaCppObserver.go index 3b04b15..84635d6 100644 --- a/agent/LlamaCppObserver.go +++ b/agent/LlamaCppObserver.go @@ -1,4 +1,46 @@ package agent +import ( + "time" + + "github.com/distantmagic/paddler/goroutine" + "github.com/distantmagic/paddler/llamacpp" + "github.com/distantmagic/paddler/management" + "github.com/hashicorp/go-hclog" +) + type LlamaCppObserver struct { + AgentConfiguration *AgentConfiguration + LlamaCppClient *llamacpp.LlamaCppClient + Logger hclog.Logger + ManagementClient *management.Client +} + +func (self *LlamaCppObserver) ObserveAndReport( + serverEventsChannel chan goroutine.ResultMessage, +) { + llamaCppHealthStatusChannel := make(chan llamacpp.LlamaCppHealthStatus) + + defer close(llamaCppHealthStatusChannel) + + ticker := time.NewTicker(self.AgentConfiguration.GetReportingIntervalDuration()) + + go self.RunTickerInterval(llamaCppHealthStatusChannel, ticker) + + for llamaCppHealthStatus := range llamaCppHealthStatusChannel { + go self.ManagementClient.ReportLlamaCppHealthStatus( + serverEventsChannel, + self.LlamaCppClient.LlamaCppConfiguration, + &llamaCppHealthStatus, + ) + } +} + +func (self *LlamaCppObserver) RunTickerInterval( + llamaCppHealthStatusChannel chan llamacpp.LlamaCppHealthStatus, + ticker *time.Ticker, +) { + for range ticker.C { + go self.LlamaCppClient.GetHealth(llamaCppHealthStatusChannel) + } } diff --git a/cmd/Agent.go b/cmd/Agent.go index aa3fcd8..e824db9 100644 --- a/cmd/Agent.go +++ b/cmd/Agent.go @@ -1,18 +1,49 @@ package cmd import ( + "net/http" + + "github.com/distantmagic/paddler/agent" + "github.com/distantmagic/paddler/goroutine" "github.com/distantmagic/paddler/llamacpp" + "github.com/distantmagic/paddler/management" "github.com/distantmagic/paddler/reverseproxy" "github.com/hashicorp/go-hclog" "github.com/urfave/cli/v2" ) type Agent struct { - Logger hclog.Logger - LlamaCppConfiguration *llamacpp.LlamaCppConfiguration - ReverseProxyConfiguration *reverseproxy.ReverseProxyConfiguration + AgentConfiguration *agent.AgentConfiguration + Logger hclog.Logger + LlamaCppConfiguration *llamacpp.LlamaCppConfiguration + ManagementServerConfiguration *management.ManagementServerConfiguration + ReverseProxyConfiguration *reverseproxy.ReverseProxyConfiguration } func (self *Agent) Action(cliContext *cli.Context) error { + serverEventsChannel := make(chan goroutine.ResultMessage) + + llamaCppObserver := &agent.LlamaCppObserver{ + AgentConfiguration: self.AgentConfiguration, + LlamaCppClient: &llamacpp.LlamaCppClient{ + HttpClient: http.DefaultClient, + LlamaCppConfiguration: self.LlamaCppConfiguration, + }, + Logger: self.Logger.Named("LlamaCppObserver"), + ManagementClient: &management.Client{ + HttpClient: http.DefaultClient, + ManagementServerConfiguration: self.ManagementServerConfiguration, + }, + } + + go llamaCppObserver.ObserveAndReport(serverEventsChannel) + + for serverEvent := range serverEventsChannel { + self.Logger.Info( + "server event", + "event", serverEvent, + ) + } + return nil } diff --git a/cmd/Balancer.go b/cmd/Balancer.go index b8507d0..3689d5a 100644 --- a/cmd/Balancer.go +++ b/cmd/Balancer.go @@ -37,7 +37,7 @@ func (self *Balancer) Action(cliContext *cli.Context) error { }, } - reverseProxyServer := &reverseproxy.Server{ + reverseProxyServer := &loadbalancer.ReverseProxyServer{ LoadBalancer: loadBalancer, Logger: self.Logger.Named("reverseproxy"), ReverseProxyConfiguration: self.ReverseProxyConfiguration, @@ -47,8 +47,9 @@ func (self *Balancer) Action(cliContext *cli.Context) error { go reverseProxyServer.Serve(serverEventsChannel) for serverEvent := range serverEventsChannel { - self.Logger.Info( - "server", + self.Logger.Log( + hclog.Info, + "server event", "event", serverEvent, ) } diff --git a/llamacpp/LlamaCppClient.go b/llamacpp/LlamaCppClient.go index 3374cf4..9f224c7 100644 --- a/llamacpp/LlamaCppClient.go +++ b/llamacpp/LlamaCppClient.go @@ -25,8 +25,6 @@ type LlamaCppClient struct { func (self *LlamaCppClient) GetHealth( responseChannel chan LlamaCppHealthStatus, ) { - defer close(responseChannel) - request, err := http.NewRequest( "GET", self.LlamaCppConfiguration.HttpAddress.BuildUrlWithPath("health").String(), diff --git a/llamacpp/LlamaCppClient_test.go b/llamacpp/LlamaCppClient_test.go index d1064dc..c77278c 100644 --- a/llamacpp/LlamaCppClient_test.go +++ b/llamacpp/LlamaCppClient_test.go @@ -22,6 +22,8 @@ var llamaCppClient *LlamaCppClient = &LlamaCppClient{ func TestHealthIsObtained(t *testing.T) { responseChannel := make(chan LlamaCppHealthStatus) + defer close(responseChannel) + go llamaCppClient.GetHealth(responseChannel) healthStatus := <-responseChannel diff --git a/loadbalancer/LlamaCppTargetConfiguration.go b/loadbalancer/LlamaCppTargetConfiguration.go index 32a49e3..9a66ce5 100644 --- a/loadbalancer/LlamaCppTargetConfiguration.go +++ b/loadbalancer/LlamaCppTargetConfiguration.go @@ -3,5 +3,5 @@ package loadbalancer import "github.com/distantmagic/paddler/llamacpp" type LlamaCppTargetConfiguration struct { - LlamaCppConfiguration *llamacpp.LlamaCppConfiguration + LlamaCppConfiguration *llamacpp.LlamaCppConfiguration `json:"llama_cpp_configuration"` } diff --git a/loadbalancer/LoadBalancerStatus.go b/loadbalancer/LoadBalancerStatus.go index b6de5bf..5913cae 100644 --- a/loadbalancer/LoadBalancerStatus.go +++ b/loadbalancer/LoadBalancerStatus.go @@ -1,5 +1,5 @@ package loadbalancer type LoadBalancerStatus struct { - RegisteredTargets int + RegisteredTargets int `json:"registred_targets"` } diff --git a/loadbalancer/ReverseProxyServer.go b/loadbalancer/ReverseProxyServer.go new file mode 100644 index 0000000..e504c90 --- /dev/null +++ b/loadbalancer/ReverseProxyServer.go @@ -0,0 +1,56 @@ +package loadbalancer + +import ( + "net/http" + "net/http/httputil" + + "github.com/distantmagic/paddler/goroutine" + "github.com/distantmagic/paddler/reverseproxy" + "github.com/hashicorp/go-hclog" +) + +type ReverseProxyServer struct { + LoadBalancer *LoadBalancer + Logger hclog.Logger + ReverseProxyConfiguration *reverseproxy.ReverseProxyConfiguration +} + +func (self *ReverseProxyServer) Serve(serverEventsChannel chan goroutine.ResultMessage) { + self.Logger.Debug( + "listen", + "host", self.ReverseProxyConfiguration.HttpAddress.GetHostWithPort(), + ) + + reverseProxy := &httputil.ReverseProxy{ + ErrorLog: self.Logger.Named("ReverseProxy").StandardLogger(&hclog.StandardLoggerOptions{ + InferLevels: true, + }), + Rewrite: func(request *httputil.ProxyRequest) { + targetUrl, err := self.LoadBalancer.Balance(request.In) + + if err != nil { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to balance request", + Error: err, + } + + return + } + + request.SetURL(targetUrl) + request.SetXForwarded() + }, + } + + err := http.ListenAndServe( + self.ReverseProxyConfiguration.HttpAddress.GetHostWithPort(), + reverseProxy, + ) + + if err != nil { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to listen", + Error: err, + } + } +} diff --git a/main.go b/main.go index b4ec1da..79bce14 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "os" + "github.com/distantmagic/paddler/agent" "github.com/distantmagic/paddler/cmd" "github.com/distantmagic/paddler/llamacpp" "github.com/distantmagic/paddler/management" @@ -12,17 +13,28 @@ import ( "github.com/urfave/cli/v2" ) +const ( + DefaultManagementHost = "127.0.0.1" + DefaultManagementPort = 8085 + DefaultManagementScheme = "http" +) + func main() { logger := hclog.New(&hclog.LoggerOptions{ - Name: "paddler", + Name: "paddler", + // JSONFormat: true, Level: hclog.Debug, }) agent := &cmd.Agent{ - Logger: logger.Named("Agent"), + AgentConfiguration: &agent.AgentConfiguration{}, LlamaCppConfiguration: &llamacpp.LlamaCppConfiguration{ HttpAddress: &netcfg.HttpAddressConfiguration{}, }, + Logger: logger.Named("Agent"), + ManagementServerConfiguration: &management.ManagementServerConfiguration{ + HttpAddress: &netcfg.HttpAddressConfiguration{}, + }, ReverseProxyConfiguration: &reverseproxy.ReverseProxyConfiguration{ HttpAddress: &netcfg.HttpAddressConfiguration{}, }, @@ -47,20 +59,25 @@ func main() { Usage: "start llama.cpp observer agent", Action: agent.Action, Flags: []cli.Flag{ + &cli.UintFlag{ + Name: "agent-reporting-interval-miliseconds", + Value: 1000, + Destination: &agent.AgentConfiguration.ReportingIntervalMiliseconds, + }, &cli.StringFlag{ - Name: "balancer-host", - Value: "127.0.0.1", - Destination: &agent.ReverseProxyConfiguration.HttpAddress.Host, + Name: "management-host", + Value: DefaultManagementHost, + Destination: &agent.ManagementServerConfiguration.HttpAddress.Host, }, &cli.UintFlag{ - Name: "balancer-port", - Value: 8085, - Destination: &agent.ReverseProxyConfiguration.HttpAddress.Port, + Name: "management-port", + Value: DefaultManagementPort, + Destination: &agent.ManagementServerConfiguration.HttpAddress.Port, }, &cli.StringFlag{ - Name: "balancer-scheme", - Value: "http", - Destination: &agent.ReverseProxyConfiguration.HttpAddress.Scheme, + Name: "management-scheme", + Value: DefaultManagementScheme, + Destination: &agent.ManagementServerConfiguration.HttpAddress.Scheme, }, &cli.StringFlag{ Name: "llamacpp-host", @@ -69,7 +86,7 @@ func main() { }, &cli.UintFlag{ Name: "llamacpp-port", - Value: 8080, + Value: 8088, Destination: &agent.LlamaCppConfiguration.HttpAddress.Port, }, &cli.StringFlag{ @@ -77,6 +94,21 @@ func main() { Value: "http", Destination: &agent.LlamaCppConfiguration.HttpAddress.Scheme, }, + &cli.StringFlag{ + Name: "reverseproxy-host", + Value: "127.0.0.1", + Destination: &balancer.ReverseProxyConfiguration.HttpAddress.Host, + }, + &cli.UintFlag{ + Name: "reverseproxy-port", + Value: 8086, + Destination: &balancer.ReverseProxyConfiguration.HttpAddress.Port, + }, + &cli.StringFlag{ + Name: "reverseproxy-scheme", + Value: "http", + Destination: &balancer.ReverseProxyConfiguration.HttpAddress.Scheme, + }, }, }, { @@ -86,17 +118,17 @@ func main() { Flags: []cli.Flag{ &cli.StringFlag{ Name: "management-host", - Value: "127.0.0.1", + Value: DefaultManagementHost, Destination: &balancer.ManagementServerConfiguration.HttpAddress.Host, }, &cli.UintFlag{ Name: "management-port", - Value: 8085, + Value: DefaultManagementPort, Destination: &balancer.ManagementServerConfiguration.HttpAddress.Port, }, &cli.StringFlag{ Name: "management-scheme", - Value: "http", + Value: DefaultManagementScheme, Destination: &balancer.ManagementServerConfiguration.HttpAddress.Scheme, }, &cli.StringFlag{ diff --git a/management/Client.go b/management/Client.go new file mode 100644 index 0000000..a8370fc --- /dev/null +++ b/management/Client.go @@ -0,0 +1,79 @@ +package management + +import ( + "bytes" + "encoding/json" + "net/http" + + "github.com/distantmagic/paddler/goroutine" + "github.com/distantmagic/paddler/llamacpp" + "github.com/distantmagic/paddler/loadbalancer" +) + +type Client struct { + HttpClient *http.Client + ManagementServerConfiguration *ManagementServerConfiguration +} + +func (self *Client) ReportLlamaCppHealthStatus( + serverEventsChannel chan goroutine.ResultMessage, + llamaCppConfiguration *llamacpp.LlamaCppConfiguration, + llamaCppHealthStatus *llamacpp.LlamaCppHealthStatus, +) { + jsonData, err := json.Marshal(&RegisterTargetRequest{ + LlamaCppHealthStatus: llamaCppHealthStatus, + LlamaCppTargetConfiguration: &loadbalancer.LlamaCppTargetConfiguration{ + LlamaCppConfiguration: llamaCppConfiguration, + }, + }) + + if err != nil { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to marshal JSON data", + Error: err, + } + + return + } + + request, err := http.NewRequest( + "POST", + self. + ManagementServerConfiguration. + HttpAddress. + BuildUrlWithPath("/register/target"). + String(), + bytes.NewBuffer(jsonData), + ) + + if err != nil { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to create HTTP request", + Error: err, + } + + return + } + + request.Header.Set("Content-Type", "application/json") + + response, err := self.HttpClient.Do(request) + + if err != nil { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to send HTTP request", + Error: err, + } + + return + } + + if response.StatusCode != http.StatusOK { + serverEventsChannel <- goroutine.ResultMessage{ + Comment: "unexpected HTTP status code", + Error: err, + } + + return + } +} diff --git a/management/RegisterTargetRequest.go b/management/RegisterTargetRequest.go new file mode 100644 index 0000000..ecc361c --- /dev/null +++ b/management/RegisterTargetRequest.go @@ -0,0 +1,11 @@ +package management + +import ( + "github.com/distantmagic/paddler/llamacpp" + "github.com/distantmagic/paddler/loadbalancer" +) + +type RegisterTargetRequest struct { + LlamaCppHealthStatus *llamacpp.LlamaCppHealthStatus `json:"llama_cpp_health_status"` + LlamaCppTargetConfiguration *loadbalancer.LlamaCppTargetConfiguration `json:"llama_cpp_target_configuration"` +} diff --git a/management/RespondToRegisterTarget.go b/management/RespondToRegisterTarget.go index 45f608d..9ddc10d 100644 --- a/management/RespondToRegisterTarget.go +++ b/management/RespondToRegisterTarget.go @@ -1,14 +1,12 @@ package management import ( + "encoding/json" "fmt" - "html" "net/http" "github.com/distantmagic/paddler/goroutine" - "github.com/distantmagic/paddler/llamacpp" "github.com/distantmagic/paddler/loadbalancer" - "github.com/distantmagic/paddler/netcfg" ) type RespondToRegisterTarget struct { @@ -17,18 +15,35 @@ type RespondToRegisterTarget struct { } func (self *RespondToRegisterTarget) ServeHTTP(response http.ResponseWriter, request *http.Request) { + if request.Method != http.MethodPost { + http.Error(response, "Only POST method is allowed", http.StatusMethodNotAllowed) + + return + } + + var registerTargetRequest RegisterTargetRequest + + decoder := json.NewDecoder(request.Body) + + err := decoder.Decode(®isterTargetRequest) + + if err != nil { + http.Error(response, err.Error(), http.StatusBadRequest) + + self.ServerEventsChannel <- goroutine.ResultMessage{ + Comment: "failed to decode request body", + Error: err, + } + + return + } + go self.LoadBalancer.RegisterTarget( self.ServerEventsChannel, - &loadbalancer.LlamaCppTargetConfiguration{ - LlamaCppConfiguration: &llamacpp.LlamaCppConfiguration{ - HttpAddress: &netcfg.HttpAddressConfiguration{ - Host: "127.0.0.1", - Port: 8088, - Scheme: "http", - }, - }, - }, + registerTargetRequest.LlamaCppTargetConfiguration, ) - fmt.Fprintf(response, "Hello, %q", html.EscapeString(request.URL.Path)) + response.Header().Set("Content-Type", "application/json") + response.WriteHeader(http.StatusOK) + fmt.Fprintf(response, `{"status":"ok"}`) } diff --git a/netcfg/HttpAddressConfiguration.go b/netcfg/HttpAddressConfiguration.go index bd53f1b..4f65f50 100644 --- a/netcfg/HttpAddressConfiguration.go +++ b/netcfg/HttpAddressConfiguration.go @@ -6,9 +6,9 @@ import ( ) type HttpAddressConfiguration struct { - Host string - Port uint - Scheme string + Host string `json:"host"` + Port uint `json:"port"` + Scheme string `json:"scheme"` } func (self *HttpAddressConfiguration) BuildUrlWithPath(path string) *url.URL { diff --git a/reverseproxy/ReverseProxyController.go b/reverseproxy/ReverseProxyController.go deleted file mode 100644 index 428fede..0000000 --- a/reverseproxy/ReverseProxyController.go +++ /dev/null @@ -1,18 +0,0 @@ -package reverseproxy - -import ( - "net/http" - "net/http/httputil" - - "github.com/hashicorp/go-hclog" -) - -type ReverseProxyController struct { - Logger hclog.Logger - ReverseProxy *httputil.ReverseProxy -} - -func (self *ReverseProxyController) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - self.Logger.Debug("forwarding", "path", request.URL.Path) - self.ReverseProxy.ServeHTTP(writer, request) -} diff --git a/reverseproxy/Server.go b/reverseproxy/Server.go deleted file mode 100644 index 0f9d07b..0000000 --- a/reverseproxy/Server.go +++ /dev/null @@ -1,56 +0,0 @@ -package reverseproxy - -import ( - "net/http" - "net/http/httputil" - - "github.com/distantmagic/paddler/goroutine" - "github.com/distantmagic/paddler/loadbalancer" - "github.com/hashicorp/go-hclog" -) - -type Server struct { - LoadBalancer *loadbalancer.LoadBalancer - Logger hclog.Logger - ReverseProxyConfiguration *ReverseProxyConfiguration -} - -func (self *Server) Serve(serverEventsChannel chan goroutine.ResultMessage) { - self.Logger.Debug( - "listen", - "host", self.ReverseProxyConfiguration.HttpAddress.GetHostWithPort(), - ) - - reverseProxyController := &ReverseProxyController{ - Logger: self.Logger.Named("ReverseProxyController"), - ReverseProxy: &httputil.ReverseProxy{ - Rewrite: func(request *httputil.ProxyRequest) { - targetUrl, err := self.LoadBalancer.Balance(request.In) - - if err != nil { - serverEventsChannel <- goroutine.ResultMessage{ - Comment: "failed to balance request", - Error: err, - } - - return - } - - request.SetURL(targetUrl) - request.SetXForwarded() - }, - }, - } - - err := http.ListenAndServe( - self.ReverseProxyConfiguration.HttpAddress.GetHostWithPort(), - reverseProxyController, - ) - - if err != nil { - serverEventsChannel <- goroutine.ResultMessage{ - Comment: "failed to listen", - Error: err, - } - } -}