Skip to content

Commit

Permalink
RSDK-6917: Have ClientConn objects expose access to underlying PeerCo…
Browse files Browse the repository at this point in the history
…nnection, if applicable. (#261)
  • Loading branch information
dgottlieb authored Mar 13, 2024
1 parent 82bcef8 commit 0e62b10
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 3 deletions.
23 changes: 21 additions & 2 deletions rpc/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/edaniels/golog"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
"github.com/pion/webrtc/v3"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -59,6 +60,10 @@ type Dialer interface {
// there is a way to close the connection.
type ClientConn interface {
grpc.ClientConnInterface

// PeerConn returns the backing PeerConnection object, or nil if the underlying transport is not
// a PeerConnection.
PeerConn() *webrtc.PeerConnection
Close() error
}

Expand All @@ -68,6 +73,16 @@ type ClientConnAuthenticator interface {
Authenticate(ctx context.Context) (string, error)
}

// A GrpcOverHTTPClientConn is grpc connection that is not backed by a `webrtc.PeerConnection`.
type GrpcOverHTTPClientConn struct {
*grpc.ClientConn
}

// PeerConn returns nil as this is a native gRPC connection.
func (cc GrpcOverHTTPClientConn) PeerConn() *webrtc.PeerConnection {
return nil
}

type cachedDialer struct {
mu sync.Mutex // Note(erd): not suitable for highly concurrent usage
conns map[string]*refCountedConnWrapper
Expand All @@ -92,7 +107,7 @@ func (cd *cachedDialer) DialDirect(
if err != nil {
return nil, nil, err
}
return conn, onClose, nil
return GrpcOverHTTPClientConn{conn}, onClose, nil
})
}

Expand Down Expand Up @@ -328,7 +343,11 @@ func dialDirectGRPC(ctx context.Context, address string, dOpts dialOptions, logg
if ctxDialer := contextDialer(ctx); ctxDialer != nil {
conn, cached, err = ctxDialer.DialDirect(ctx, address, dOpts.cacheKey(), closeCredsFunc, dialOpts...)
} else {
conn, err = grpc.DialContext(ctx, address, dialOpts...)
var grpcConn *grpc.ClientConn
grpcConn, err = grpc.DialContext(ctx, address, dialOpts...)
if err == nil {
conn = GrpcOverHTTPClientConn{grpcConn}
}
if err == nil && closeCredsFunc != nil {
conn = wrapClientConnWithCloseFunc(conn, closeCredsFunc)
}
Expand Down
3 changes: 2 additions & 1 deletion rpc/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,9 @@ func TestServerWithExternalListenerAddress(t *testing.T) {
WithInsecure(),
WithDialDebug(),
)

test.That(t, err, test.ShouldBeNil)
test.That(t, conn.(*grpc.ClientConn).Target(), test.ShouldEqual, listener.Addr().String())
test.That(t, conn.(GrpcOverHTTPClientConn).Target(), test.ShouldEqual, listener.Addr().String())
test.That(t, conn.Close(), test.ShouldBeNil)

test.That(t, rpcServer.Stop(), test.ShouldBeNil)
Expand Down
4 changes: 4 additions & 0 deletions rpc/wrtc_client_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func newWebRTCClientChannel(
return ch
}

func (ch *webrtcClientChannel) PeerConn() *webrtc.PeerConnection {
return ch.webrtcBaseChannel.peerConn
}

// Close closes all streams and the underlying channel.
func (ch *webrtcClientChannel) Close() error {
ch.mu.Lock()
Expand Down

0 comments on commit 0e62b10

Please sign in to comment.