Skip to content

Commit

Permalink
run status servers
Browse files Browse the repository at this point in the history
  • Loading branch information
ingon committed Jan 10, 2025
1 parent a5db1e8 commit 964e714
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 3 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

default: all

all: build test lint
all: build test

build:
go install -v github.com/connet-dev/connet/cmd/...
Expand Down
41 changes: 41 additions & 0 deletions client/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package client

import (
"context"
"encoding/json"
"fmt"
"net/http"
)

type statusServer struct {
}

func (s *statusServer) run(ctx context.Context) error {
srv := &http.Server{
Addr: ":19182",
Handler: http.HandlerFunc(s.serve),
}

go func() {
<-ctx.Done()
srv.Close()
}()

return srv.ListenAndServe()
}

func (s *statusServer) serve(w http.ResponseWriter, r *http.Request) {
if err := s.serveErr(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "server error: %v", err.Error())
}
}

func (s *statusServer) serveErr(w http.ResponseWriter, _ *http.Request) error {
w.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(w)
return enc.Encode(status{})
}

type status struct {
}
12 changes: 10 additions & 2 deletions control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,17 @@ func NewServer(cfg Config) (*Server, error) {
}
s.relays = relays

clSrv, err := newClientServer(cfg.ClientAuth, cfg.ClientRestr, s.relays, config, cfg.Stores, cfg.Logger)
clients, err := newClientServer(cfg.ClientAuth, cfg.ClientRestr, s.relays, config, cfg.Stores, cfg.Logger)
if err != nil {
return nil, err
}
s.clients = clSrv
s.clients = clients

s.status = &statusServer{
clients: clients,
relays: relays,
logger: cfg.Logger.With("control", "status"),
}

return s, nil
}
Expand All @@ -61,13 +67,15 @@ type Server struct {

clients *clientServer
relays *relayServer
status *statusServer
}

func (s *Server) Run(ctx context.Context) error {
g, ctx := errgroup.WithContext(ctx)

g.Go(func() error { return s.relays.run(ctx) })
g.Go(func() error { return s.clients.run(ctx) })
g.Go(func() error { return s.status.run(ctx) })
g.Go(func() error { return s.runListener(ctx) })

return g.Wait()
Expand Down
141 changes: 141 additions & 0 deletions control/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package control

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"

"github.com/connet-dev/connet/model"
"github.com/segmentio/ksuid"
)

type statusServer struct {
clients *clientServer
relays *relayServer
logger *slog.Logger
}

func (s *statusServer) run(ctx context.Context) error {
srv := &http.Server{
Addr: ":19180",
Handler: http.HandlerFunc(s.serve),
}

go func() {
<-ctx.Done()
srv.Close()
}()

s.logger.Debug("start http listener", "addr", srv.Addr)
return srv.ListenAndServe()
}

func (s *statusServer) serve(w http.ResponseWriter, r *http.Request) {
if err := s.serveErr(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "server error: %v", err.Error())
}
}

func (s *statusServer) serveErr(w http.ResponseWriter, _ *http.Request) error {
clients, err := s.getClients()
if err != nil {
return err
}

peers, err := s.getPeers()
if err != nil {
return err
}

relays, err := s.getRelays()
if err != nil {
return err
}

w.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(w)
return enc.Encode(status{
ServerID: s.relays.id,
Clients: clients,
Peers: peers,
Relays: relays,
})
}

func (s *statusServer) getClients() ([]statusClient, error) {
clientMsgs, _, err := s.clients.conns.Snapshot()
if err != nil {
return nil, err
}

var clients []statusClient
for _, msg := range clientMsgs {
clients = append(clients, statusClient{
ID: msg.Key.ID,
Addr: msg.Value.Addr,
})
}

return clients, nil
}

func (s *statusServer) getPeers() ([]statusPeer, error) {
peerMsgs, _, err := s.clients.peers.Snapshot()
if err != nil {
return nil, err
}

var peers []statusPeer
for _, msg := range peerMsgs {
peers = append(peers, statusPeer{
ID: msg.Key.ID,
Role: msg.Key.Role,
Forward: msg.Key.Forward,
})
}

return peers, nil
}

func (s *statusServer) getRelays() ([]statusRelay, error) {
msgs, _, err := s.relays.conns.Snapshot()
if err != nil {
return nil, err
}

var relays []statusRelay
for _, msg := range msgs {
relays = append(relays, statusRelay{
ID: msg.Key.ID,
Hostport: msg.Value.Hostport.String(),
})
}

return relays, nil
}

type status struct {
ServerID string `json:"server_id"`
Clients []statusClient `json:"clients"`
Peers []statusPeer `json:"peers"`
Relays []statusRelay `json:"relays"`
}

type statusClient struct {
ID ksuid.KSUID `json:"id"`
Addr string `json:"addr"`
}

type statusPeer struct {
ID ksuid.KSUID `json:"id"`
Role model.Role `json:"role"`
Forward model.Forward `json:"forward"`
}

type statusRelay struct {
ID ksuid.KSUID `json:"id"`
Hostport string `json:"hostport"`
}
5 changes: 5 additions & 0 deletions relay/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type controlClient struct {
clientsStreamOffset int64
clientsLogOffset int64

connStatus atomic.Bool

logger *slog.Logger
}

Expand Down Expand Up @@ -269,6 +271,9 @@ func (s *controlClient) reconnect(ctx context.Context, transport *quic.Transport
func (s *controlClient) runConnection(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(0, "done")

s.connStatus.Store(true)
defer s.connStatus.Store(false)

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error { return s.runClientsStream(ctx, conn) })
Expand Down
7 changes: 7 additions & 0 deletions relay/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func NewServer(cfg Config) (*Server, error) {

control: control,
clients: clients,
status: &statusServer{
control: control,
clients: clients,
logger: cfg.Logger.With("relay", "status"),
},

logger: cfg.Logger.With("relay", cfg.Hostport),
}
Expand All @@ -55,6 +60,7 @@ type Server struct {

control *controlClient
clients *clientsServer
status *statusServer

logger *slog.Logger
}
Expand All @@ -78,6 +84,7 @@ func (s *Server) Run(ctx context.Context) error {

g.Go(func() error { return s.control.run(ctx, transport) })
g.Go(func() error { return s.clients.run(ctx, transport) })
g.Go(func() error { return s.status.run(ctx) })

return g.Wait()
}
87 changes: 87 additions & 0 deletions relay/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package relay

import (
"context"
"encoding/json"
"fmt"
"log/slog"
"maps"
"net/http"
"slices"

"github.com/connet-dev/connet/model"
)

type statusServer struct {
control *controlClient
clients *clientsServer
logger *slog.Logger
}

func (s *statusServer) run(ctx context.Context) error {
srv := &http.Server{
Addr: ":19181",
Handler: http.HandlerFunc(s.serve),
}

go func() {
<-ctx.Done()
srv.Close()
}()

s.logger.Debug("start http listener", "addr", srv.Addr)
return srv.ListenAndServe()
}

func (s *statusServer) serve(w http.ResponseWriter, r *http.Request) {
if err := s.serveErr(w, r); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(w, "server error: %v", err.Error())
}
}

func (s *statusServer) serveErr(w http.ResponseWriter, _ *http.Request) error {
stat := "disconnected"
if s.control.connStatus.Load() {
stat = "online"
}
controlID, err := s.getControlID()
if err != nil {
return err
}

fwds := s.getForwards()

w.Header().Add("Content-Type", "application/json")
enc := json.NewEncoder(w)
return enc.Encode(status{
Status: stat,
Hostport: s.control.hostport.String(),
ControlServerAddr: s.control.controlAddr.String(),
ControlServerID: controlID,
Forwards: fwds,
})
}

func (s *statusServer) getControlID() (string, error) {
controlIDConfig, err := s.control.config.GetOrDefault(configControlID, ConfigValue{})
if err != nil {
return "", err
}
return controlIDConfig.String, nil
}

func (s *statusServer) getForwards() []model.Forward {
s.clients.forwardMu.RLock()
defer s.clients.forwardMu.RUnlock()

return slices.Collect(maps.Keys(s.clients.forwards))
}

type status struct {
Status string `json:"status"`
Hostport string `json:"hostport"`
ControlServerAddr string `json:"control_server_addr"`
ControlServerID string `json:"control_server_id"`
Forwards []model.Forward `json:"forwards"`
}

0 comments on commit 964e714

Please sign in to comment.