Skip to content

Commit

Permalink
do not rely on healtchecks
Browse files Browse the repository at this point in the history
  • Loading branch information
ingon committed Jan 14, 2025
1 parent d12c84a commit 6b98205
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 346 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -375,3 +375,4 @@ by adding account management and it is one of the easiest way to start.
- [ ] UDP support
- [ ] Use quic-go tracer, instead of ping (and duration estimation)
- [ ] Stateless reset key for the server
- [ ] Name access restrictions for clients
38 changes: 0 additions & 38 deletions client/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,6 @@ func (d *Destination) runDestinationErr(ctx context.Context, stream quic.Stream)
switch {
case req.Connect != nil:
return d.runConnect(ctx, stream)
case req.Heartbeat != nil:
return d.heartbeat(ctx, stream, req.Heartbeat)
default:
err := pb.NewError(pb.Error_RequestUnknown, "unknown request: %v", req)
if err := pb.Write(stream, &pbc.Response{Error: err}); err != nil {
Expand Down Expand Up @@ -187,42 +185,6 @@ func (d *Destination) runConnect(ctx context.Context, stream quic.Stream) error
return nil
}

func (d *Destination) heartbeat(ctx context.Context, stream quic.Stream, hbt *pbc.Heartbeat) error {
if err := pb.Write(stream, &pbc.Response{Heartbeat: hbt}); err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
<-ctx.Done()
stream.CancelRead(0)
return nil
})

g.Go(func() error {
for {
req, err := pbc.ReadRequest(stream)
if err != nil {
return err
}
if req.Heartbeat == nil {
respErr := pb.NewError(pb.Error_RequestUnknown, "unexpected request")
if err := pb.Write(stream, &pbc.Response{Error: respErr}); err != nil {
return kleverr.Ret(err)
}
return respErr
}

if err := pb.Write(stream, &pbc.Response{Heartbeat: req.Heartbeat}); err != nil {
return err
}
}
})

return g.Wait()
}

func (d *Destination) RunControl(ctx context.Context, conn quic.Connection) error {
return (&peerControl{
local: d.peer,
Expand Down
122 changes: 27 additions & 95 deletions client/peer_direct.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@ import (
"github.com/connet-dev/connet/netc"
"github.com/connet-dev/connet/notify"
"github.com/connet-dev/connet/pb"
"github.com/connet-dev/connet/pbc"
"github.com/connet-dev/connet/pbs"
"github.com/klev-dev/kleverr"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)

type directPeer struct {
Expand Down Expand Up @@ -149,7 +146,7 @@ func newDirectPeerIncoming(ctx context.Context, parent *directPeer, clientCert *
func (p *directPeerIncoming) run(ctx context.Context) {
boff := netc.MinBackoff
for {
conn, stream, err := p.connect(ctx)
conn, err := p.connect(ctx)
if err != nil {
p.parent.logger.Debug("could not connect incoming", "err", err)
switch {
Expand All @@ -171,7 +168,7 @@ func (p *directPeerIncoming) run(ctx context.Context) {
}
boff = netc.MinBackoff

if err := p.keepalive(ctx, conn, stream); err != nil {
if err := p.keepalive(ctx, conn); err != nil {
p.parent.logger.Debug("incoming keepalive failed", "err", err)
switch {
case errors.Is(err, context.Canceled):
Expand All @@ -183,70 +180,34 @@ func (p *directPeerIncoming) run(ctx context.Context) {
}
}

func (p *directPeerIncoming) connect(ctx context.Context) (quic.Connection, quic.Stream, error) {
func (p *directPeerIncoming) connect(ctx context.Context) (quic.Connection, error) {
ch, cancel := p.parent.local.direct.expect(p.parent.local.serverCert, p.clientCert)
select {
case <-ctx.Done():
cancel()
return nil, nil, ctx.Err()
return nil, ctx.Err()
case <-p.closer:
cancel()
return nil, nil, errClosed
return nil, errClosed
case conn := <-ch:
stream, err := conn.AcceptStream(ctx)
if err != nil {
return nil, nil, err
}
if err := p.heartbeat(stream); err != nil {
return nil, nil, err
}
return conn, stream, nil
return conn, nil
}
}

func (p *directPeerIncoming) keepalive(ctx context.Context, conn quic.Connection, stream quic.Stream) error {
func (p *directPeerIncoming) keepalive(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(quic.ApplicationErrorCode(pb.Error_DirectKeepaliveClosed), "keepalive closed")
defer stream.Close()

p.parent.local.addActiveConn(p.parent.remoteID, peerIncoming, "", conn)
defer p.parent.local.removeActiveConn(p.parent.remoteID, peerIncoming, "")

g, ctx := errgroup.WithContext(ctx)

g.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.closer:
return errClosed
}
})

g.Go(func() error {
for {
if err := p.heartbeat(stream); err != nil {
return err
}
}
})

return g.Wait()
}

func (p *directPeerIncoming) heartbeat(stream quic.Stream) error {
req, err := pbc.ReadRequest(stream)
switch {
case err != nil:
return err
case req.Heartbeat == nil:
respErr := pb.NewError(pb.Error_RequestUnknown, "unexpected request")
if err := pb.Write(stream, &pbc.Response{Error: respErr}); err != nil {
return kleverr.Ret(err)
}
return respErr
select {
case <-ctx.Done():
return ctx.Err()
case <-conn.Context().Done():
return context.Cause(conn.Context())
case <-p.closer:
return errClosed
}

return pb.Write(stream, &pbc.Response{Heartbeat: req.Heartbeat})
}

type directPeerOutgoing struct {
Expand All @@ -270,7 +231,7 @@ func newDirectPeerOutgoing(ctx context.Context, parent *directPeer, serverConfg
func (p *directPeerOutgoing) run(ctx context.Context) {
boff := netc.MinBackoff
for {
conn, stream, err := p.connect(ctx)
conn, err := p.connect(ctx)
if err != nil {
p.parent.logger.Debug("could not connect direct", "err", err)
if errors.Is(err, context.Canceled) {
Expand All @@ -289,7 +250,7 @@ func (p *directPeerOutgoing) run(ctx context.Context) {
}
boff = netc.MinBackoff

if err := p.keepalive(ctx, conn, stream); err != nil {
if err := p.keepalive(ctx, conn); err != nil {
p.parent.logger.Debug("disonnected peer", "err", err)
switch {
case errors.Is(err, context.Canceled):
Expand All @@ -301,7 +262,7 @@ func (p *directPeerOutgoing) run(ctx context.Context) {
}
}

func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, quic.Stream, error) {
func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, error) {
var errs []error
for paddr := range p.addrs {
addr := net.UDPAddrFromAddrPort(paddr)
Expand All @@ -320,53 +281,24 @@ func (p *directPeerOutgoing) connect(ctx context.Context) (quic.Connection, quic
continue
}

stream, err := conn.OpenStreamSync(ctx)
if err != nil {
errs = append(errs, err)
continue
}
if err := p.heartbeat(ctx, stream); err != nil {
errs = append(errs, err)
continue
}
return conn, stream, nil
return conn, nil
}
return nil, nil, errors.Join(errs...)
return nil, errors.Join(errs...)
}

func (p *directPeerOutgoing) keepalive(ctx context.Context, conn quic.Connection, stream quic.Stream) error {
func (p *directPeerOutgoing) keepalive(ctx context.Context, conn quic.Connection) error {
defer conn.CloseWithError(quic.ApplicationErrorCode(pb.Error_DirectKeepaliveClosed), "keepalive closed")
defer stream.Close()

p.parent.local.addActiveConn(p.parent.remoteID, peerOutgoing, "", conn)
defer p.parent.local.removeActiveConn(p.parent.remoteID, peerOutgoing, "")

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-p.closer:
return errClosed
case <-time.After(10 * time.Second):
}
if err := p.heartbeat(ctx, stream); err != nil {
return err
}
}
}

func (p *directPeerOutgoing) heartbeat(_ context.Context, stream quic.Stream) error {
// TODO setDeadline as additional assurance we are not blocked
req := &pbc.Heartbeat{Time: timestamppb.Now()}
if err := pb.Write(stream, &pbc.Request{Heartbeat: req}); err != nil {
return err
}
if resp, err := pbc.ReadResponse(stream); err != nil {
return err
} else {
dur := time.Since(resp.Heartbeat.Time.AsTime())
p.parent.logger.Debug("direct heartbeat", "dur", dur)
return nil
select {
case <-ctx.Done():
return ctx.Err()
case <-conn.Context().Done():
return context.Cause(conn.Context())
case <-p.closer:
return errClosed
}
}

Expand Down
38 changes: 2 additions & 36 deletions client/peer_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ import (

"github.com/connet-dev/connet/model"
"github.com/connet-dev/connet/netc"
"github.com/connet-dev/connet/pb"
"github.com/connet-dev/connet/pbc"
"github.com/quic-go/quic-go"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/timestamppb"
)

type relayPeer struct {
Expand Down Expand Up @@ -99,42 +96,11 @@ func (r *relayPeer) connect(ctx context.Context) (quic.Connection, error) {
}

func (r *relayPeer) keepalive(ctx context.Context, conn quic.Connection) error {
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
return err
}
if err := r.heartbeat(ctx, stream); err != nil {
return err
}

r.local.addRelayConn(r.serverHostport, conn)
defer r.local.removeRelayConn(r.serverHostport)

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(10 * time.Second):
}
if err := r.heartbeat(ctx, stream); err != nil {
return err
}
}
}

func (r *relayPeer) heartbeat(_ context.Context, stream quic.Stream) error {
// TODO setDeadline as additional assurance we are not blocked
req := &pbc.Heartbeat{Time: timestamppb.Now()}
if err := pb.Write(stream, &pbc.Request{Heartbeat: req}); err != nil {
return err
}
if resp, err := pbc.ReadResponse(stream); err != nil {
return err
} else {
dur := time.Since(resp.Heartbeat.Time.AsTime())
r.logger.Debug("relay heartbeat", "dur", dur)
return nil
}
<-conn.Context().Done()
return context.Cause(conn.Context())
}

func (r *relayPeer) stop() {
Expand Down
Loading

0 comments on commit 6b98205

Please sign in to comment.