Skip to content

Commit

Permalink
Merge pull request #22 from Propfend/main
Browse files Browse the repository at this point in the history
adding registered agents displaying api endpoint
  • Loading branch information
mcharytoniuk authored Sep 23, 2024
2 parents 4aaeaa5 + 4e6cc08 commit 43636b4
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 13 deletions.
4 changes: 4 additions & 0 deletions cmd/Balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (self *Balancer) Action(cliContext *cli.Context) error {
ServerEventsChannel: serverEventsChannel,
},
RespondToStatic: management.NewRespondToStatic(),
RespondToRegisteredAgents: &management.RespondToRegisteredAgents{
LoadBalancerTargetCollection: loadBalancerTargetCollection,
ServerEventsChannel: serverEventsChannel,
},
}

respondToCompletion := &loadbalancer.RespondToCompletion{
Expand Down
2 changes: 1 addition & 1 deletion llamacpp/LlamaCppConfiguration.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type LlamaCppConfiguration struct {
HttpAddress *netcfg.HttpAddressConfiguration
HttpAddress *netcfg.HttpAddressConfiguration `json:"http_address"`
}

func (self *LlamaCppConfiguration) String() string {
Expand Down
16 changes: 8 additions & 8 deletions loadbalancer/LlamaCppTarget.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
)

type LlamaCppTarget struct {
LastUpdate time.Time
LlamaCppClient *llamacpp.LlamaCppClient
LlamaCppSlotsAggregatedStatus *llamacpp.LlamaCppSlotsAggregatedStatus
LlamaCppTargetConfiguration *LlamaCppTargetConfiguration
RBMutex xsync.RBMutex
RemainingTicksUntilRemoved int
ReverseProxy *httputil.ReverseProxy
TotalUpdates int
LastUpdate time.Time `json:"last_update"`
LlamaCppClient *llamacpp.LlamaCppClient `json:"-"`
LlamaCppSlotsAggregatedStatus *llamacpp.LlamaCppSlotsAggregatedStatus `json:"llamacpp_slots_aggregated_status"`
LlamaCppTargetConfiguration *LlamaCppTargetConfiguration `json:"llamacpp_target_configuration"`
RBMutex xsync.RBMutex `json:"-"`
RemainingTicksUntilRemoved int `json:"remaining_ticks_until_removed"`
ReverseProxy *httputil.ReverseProxy `json:"-"`
TotalUpdates int `json:"total_updates"`
}

func (self *LlamaCppTarget) DecrementIdleSlots() {
Expand Down
8 changes: 4 additions & 4 deletions loadbalancer/LoadBalancerTargetCollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
)

type LoadBalancerTargetCollection struct {
LlamaCppHealthStatusAggregate *LlamaCppHealthStatusAggregate
Targets []*LlamaCppTarget
LlamaCppHealthStatusAggregate *LlamaCppHealthStatusAggregate `json:"-"`
Targets []*LlamaCppTarget `json:"targets"`

targetById *xsync.MapOf[string, *LlamaCppTarget]
RBMutex xsync.RBMutex
targetById *xsync.MapOf[string, *LlamaCppTarget] `json:"-"`
RBMutex xsync.RBMutex `json:"-"`
}

func (self *LoadBalancerTargetCollection) GetTargetById(targetId string) *LlamaCppTarget {
Expand Down
37 changes: 37 additions & 0 deletions management/RespondToRegisteredAgents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package management

import (
"encoding/json"
"net/http"

"github.com/distantmagic/paddler/goroutine"
"github.com/distantmagic/paddler/loadbalancer"
)

type RespondToRegisteredAgents struct {
LoadBalancerTargetCollection *loadbalancer.LoadBalancerTargetCollection
ServerEventsChannel chan<- goroutine.ResultMessage
}

func (self *RespondToRegisteredAgents) ServeHTTP(response http.ResponseWriter, request *http.Request) {
registeredAgentsStatusJson, err := json.Marshal(self.LoadBalancerTargetCollection)

if err != nil {
http.Error(response, err.Error(), http.StatusInternalServerError)

return
}

response.Header().Set("Content-Type", "application/json")
response.WriteHeader(http.StatusOK)

_, err = response.Write(registeredAgentsStatusJson)

if err != nil {
self.ServerEventsChannel <- goroutine.ResultMessage{
Error: err,
}

return
}
}
53 changes: 53 additions & 0 deletions management/RespondToRegisteredAgents_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package management

import (
"encoding/json"
"testing"
"time"

"github.com/distantmagic/paddler/llamacpp"
"github.com/distantmagic/paddler/loadbalancer"
"github.com/distantmagic/paddler/netcfg"
"github.com/stretchr/testify/assert"
)

func TestRespondToRegisteredAgents(t *testing.T) {
var targets []*loadbalancer.LlamaCppTarget

timeLocation, _ := time.LoadLocation("UTC")

httpAddress := &netcfg.HttpAddressConfiguration{
Host: "127.0.0.1",
Port: 8088,
Scheme: "http",
}

targets = append(targets, &loadbalancer.LlamaCppTarget{
LlamaCppSlotsAggregatedStatus: &llamacpp.LlamaCppSlotsAggregatedStatus{
SlotsIdle: 4,
SlotsProcessing: 0,
},
LlamaCppTargetConfiguration: &loadbalancer.LlamaCppTargetConfiguration{
Id: "01921f1d-e817-72c9-bb92-f6363899042e",
Name: "Agent-1",
LlamaCppConfiguration: &llamacpp.LlamaCppConfiguration{
HttpAddress: httpAddress,
},
},
LastUpdate: time.Date(2007, 9, 24, 4, 20, 20, 2000000000, timeLocation),
TotalUpdates: 3000,
RemainingTicksUntilRemoved: 2,
})

loadBalancerTargetCollection := &loadbalancer.LoadBalancerTargetCollection{
Targets: targets,
}

llamaCppLastRegisteredAgentJson, _ := json.Marshal(loadBalancerTargetCollection)

assert.Equal(
t,
`{"targets":[{"last_update":"2007-09-24T04:20:22Z","llamacpp_slots_aggregated_status":{"status":"","slots_idle":4,"slots_processing":0},"llamacpp_target_configuration":{"Id":"01921f1d-e817-72c9-bb92-f6363899042e","Name":"Agent-1","llama_cpp_configuration":{"http_address":{"host":"127.0.0.1","port":8088,"scheme":"http"}}},"remaining_ticks_until_removed":2,"total_updates":3000}]}`,
string(llamaCppLastRegisteredAgentJson),
)
}
2 changes: 2 additions & 0 deletions management/Server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Server struct {
RespondToHealth http.Handler
RespondToRegisterTarget http.Handler
RespondToStatic http.Handler
RespondToRegisteredAgents http.Handler
}

func (self *Server) Serve(serverEventsChannel chan<- goroutine.ResultMessage) {
Expand All @@ -33,6 +34,7 @@ func (self *Server) Serve(serverEventsChannel chan<- goroutine.ResultMessage) {

mux.Handle("/health", self.RespondToHealth)
mux.Handle("/register/target", self.RespondToRegisterTarget)
mux.Handle("/api/v1/agents", self.RespondToRegisteredAgents)

err := http.ListenAndServe(
self.ManagementServerConfiguration.HttpAddress.GetHostWithPort(),
Expand Down

0 comments on commit 43636b4

Please sign in to comment.