Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run status servers #13

Merged
merged 8 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ gen:
protoc --proto_path=pb/ --proto_path=pbs/ --proto_path=pbc/ --proto_path=pbr/ --go_opt=module=github.com/connet-dev/connet --go_out=./ pb/*.proto pbs/*.proto pbc/*.proto pbr/*.proto

.PHONY: run-server run-client run-sws
run-server: all
run-server: build
connet server --config examples/minimal.toml

run-client: all
run-client: build
connet --config examples/minimal.toml

run-sws:
Expand Down
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ token-file = "path/to/relay/token" # a file that contains the token, one of toke

server-addr = "localhost:19190" # the control server address to connect to
server-cas = "path/to/cert.pem" # the control server certificate
direct-addr = ":19192" # at what address this client listens for direct connections

direct-addr = ":19192" # at what address this client listens for direct connections, defaults to :19192
status-addr = ":19182" # at what address this client listens for status connections, defaults to :19182. 'disable' will disable it

[client.destinations.serviceX]
addr = "localhost:3000" # where this destination connects to, required
Expand Down Expand Up @@ -165,6 +167,7 @@ tokens-file = "path/to/client/tokens" # a file that contains a list of client to
relay-addr = ":19191" # the address at which the relay will listen for connectsion, defaults to :19191
relay-hostname = "localhost" # the public hostname (e.g. domain, ip address) which will be advertised to clients, defaults to localhost

status-addr = ":19180" # at what address the server listens for status connections, defaults to :19180. 'disable' will disable it
store-dir = "path/to/server-store" # where does this server persist runtime information, defaults to a /tmp subdirectory

[server.ip-restriction] # defines restriction applicable for all client tokens, checked before verifying the token
Expand Down Expand Up @@ -194,6 +197,7 @@ relay-tokens = ["relay-token-1", "relay-token-n"] # set of recognized relay toke
relay-tokens-file = "path/to/relay/token" # a file that contains a list of relay tokens, one token per line
# one of relay-tokens or relay-tokens-file is necessary when connecting relays

status-addr = ":19180" # at what address the control server listens for status connections, defaults to :19180. 'disable' will disable it
store-dir = "path/to/control-store" # where does this control server persist runtime information, defaults to a /tmp subdirectory

[control.client-ip-restriction] # defines restriction applicable for all client tokens, checked before verifying the token
Expand Down Expand Up @@ -228,6 +232,7 @@ hostname = "localhost" # the public hostname (e.g. domain, ip address) which wil
control-addr = "localhost:19190" # the control server address to connect to, defaults to localhost:19191
control-cas = "path/to/ca/file.pem" # the public certificate root of the control server, no default, required when using self-signed certs

status-addr = ":19181" # at what address the relay server listens for status connections, defaults to :19181. 'disable' will disable it
store-dir = "path/to/relay-store" # where does this relay persist runtime information, defaults to a /tmp subdirectory
```

Expand Down
73 changes: 70 additions & 3 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/netip"
"os"
"strings"
"sync/atomic"
"time"

"github.com/connet-dev/connet/certc"
Expand All @@ -19,6 +20,7 @@ import (
"github.com/connet-dev/connet/netc"
"github.com/connet-dev/connet/pb"
"github.com/connet-dev/connet/pbs"
"github.com/connet-dev/connet/statusc"
"github.com/klev-dev/kleverr"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
Expand All @@ -30,6 +32,8 @@ type Client struct {
rootCert *certc.Cert
dsts map[model.Forward]*client.Destination
srcs map[model.Forward]*client.Source

connStatus atomic.Value
}

func NewClient(opts ...ClientOption) (*Client, error) {
Expand Down Expand Up @@ -64,11 +68,14 @@ func NewClient(opts ...ClientOption) (*Client, error) {
}
cfg.logger.Debug("generated root cert")

return &Client{
c := &Client{
clientConfig: *cfg,

rootCert: rootCert,
}, nil
}
c.connStatus.Store(statusc.NotConnected)

return c, nil
}

func (c *Client) Run(ctx context.Context) error {
Expand Down Expand Up @@ -120,6 +127,7 @@ func (c *Client) Run(ctx context.Context) error {
}

g.Go(func() error { return c.run(ctx, transport) })
g.Go(func() error { return c.runStatus(ctx) })

return g.Wait()
}
Expand Down Expand Up @@ -232,6 +240,9 @@ func (c *Client) reconnect(ctx context.Context, transport *quic.Transport, retok
func (c *Client) runConnection(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(0, "done")

c.connStatus.Store(statusc.Connected)
defer c.connStatus.Store(statusc.Disconnected)

g, ctx := errgroup.WithContext(ctx)

for _, dstServer := range c.dsts {
Expand All @@ -245,6 +256,48 @@ func (c *Client) runConnection(ctx context.Context, conn quic.Connection) error
return g.Wait()
}

func (c *Client) runStatus(ctx context.Context) error {
if c.statusAddr == nil {
return nil
}

c.logger.Debug("running status server", "addr", c.statusAddr)
return statusc.Run(ctx, c.statusAddr.String(), c.Status)
}

func (c *Client) Status(ctx context.Context) (ClientStatus, error) {
stat := c.connStatus.Load().(statusc.Status)
var err error

dsts := map[model.Forward]client.PeerStatus{}
for fwd, dst := range c.dsts {
dsts[fwd], err = dst.Status()
if err != nil {
return ClientStatus{}, err
}
}

srcs := map[model.Forward]client.PeerStatus{}
for fwd, src := range c.srcs {
srcs[fwd], err = src.Status()
if err != nil {
return ClientStatus{}, err
}
}

return ClientStatus{
Status: stat,
Destinations: dsts,
Sources: srcs,
}, nil
}

type ClientStatus struct {
Status statusc.Status `json:"status"`
Destinations map[model.Forward]client.PeerStatus `json:"destinations"`
Sources map[model.Forward]client.PeerStatus `json:"sources"`
}

type clientConfig struct {
token string

Expand All @@ -253,6 +306,7 @@ type clientConfig struct {
controlCAs *x509.CertPool

directAddr *net.UDPAddr
statusAddr *net.TCPAddr

destinations map[model.Forward]clientForwardConfig
sources map[model.Forward]clientForwardConfig
Expand Down Expand Up @@ -326,7 +380,7 @@ func ClientDirectAddress(address string) ClientOption {
return func(cfg *clientConfig) error {
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return err
return kleverr.Newf("direct address cannot be resolved: %w", err)
}

cfg.directAddr = addr
Expand All @@ -335,6 +389,19 @@ func ClientDirectAddress(address string) ClientOption {
}
}

func ClientStatusAddress(address string) ClientOption {
return func(cfg *clientConfig) error {
addr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return kleverr.Newf("status address cannot be resolved: %w", err)
}

cfg.statusAddr = addr

return nil
}
}

func ClientDestination(name, addr string, route model.RouteOption) ClientOption {
return func(cfg *clientConfig) error {
if cfg.destinations == nil {
Expand Down
4 changes: 4 additions & 0 deletions client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (d *Destination) Run(ctx context.Context) error {
return g.Wait()
}

func (d *Destination) Status() (PeerStatus, error) {
return d.peer.status()
}

func (d *Destination) runActive(ctx context.Context) error {
return d.peer.activeConnsListen(ctx, func(active map[peerConnKey]quic.Connection) error {
d.logger.Debug("active conns", "len", len(active))
Expand Down
4 changes: 4 additions & 0 deletions client/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (s *Source) Run(ctx context.Context) error {
return g.Wait()
}

func (s *Source) Status() (PeerStatus, error) {
return s.peer.status()
}

func (s *Source) runActive(ctx context.Context) error {
return s.peer.activeConnsListen(ctx, func(active map[peerConnKey]quic.Connection) error {
s.logger.Debug("active conns", "len", len(active))
Expand Down
38 changes: 38 additions & 0 deletions client/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package client

type PeerStatus struct {
Relays []string `json:"relays"`
Connections []PeerConnection `json:"connections"`
}

type PeerConnection struct {
ID string `json:"id"`
Style string `json:"style"`
Addr string `json:"addr"`
}

func (p *peer) status() (PeerStatus, error) {
stat := PeerStatus{}

relays, err := p.relayConns.Peek()
if err != nil {
return PeerStatus{}, err
}
for k := range relays {
stat.Relays = append(stat.Relays, k.String())
}

conns, err := p.peerConns.Peek()
if err != nil {
return PeerStatus{}, err
}
for key, conn := range conns {
stat.Connections = append(stat.Connections, PeerConnection{
ID: key.id,
Style: key.style.String(),
Addr: conn.RemoteAddr().String(),
})
}

return stat, nil
}
Loading
Loading