Skip to content

Commit

Permalink
feat: implement inproc testing transport
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Stewart <[email protected]>
  • Loading branch information
paralin committed Aug 15, 2019
1 parent f52ccc3 commit b44f5fa
Show file tree
Hide file tree
Showing 8 changed files with 560 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/core_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/aperturerobotics/bifrost/link/hold-open"
nctr "github.com/aperturerobotics/bifrost/peer/controller"
"github.com/aperturerobotics/bifrost/pubsub/floodsub/controller"
iproctpt "github.com/aperturerobotics/bifrost/transport/inproc"
udptpt "github.com/aperturerobotics/bifrost/transport/udp"
wtpt "github.com/aperturerobotics/bifrost/transport/websocket"
"github.com/aperturerobotics/controllerbus/bus"
Expand Down Expand Up @@ -36,6 +37,7 @@ func NewCoreBus(
func AddFactories(b bus.Bus, sr *static.Resolver) {
sr.AddFactory(wtpt.NewFactory(b))
sr.AddFactory(udptpt.NewFactory(b))
sr.AddFactory(iproctpt.NewFactory(b))
sr.AddFactory(nctr.NewFactory())
sr.AddFactory(egc.NewFactory(b))
sr.AddFactory(bifrosteg.NewFactory(b))
Expand Down
46 changes: 46 additions & 0 deletions transport/inproc/addr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package inproc

import (
"net"
"strings"

"github.com/aperturerobotics/bifrost/peer"
"github.com/pkg/errors"
)

var scheme = "inproc://"

type addr struct {
peerID peer.ID
str string
}

// newAddr builds a new addr
func newAddr(peerID peer.ID) *addr {
return &addr{
peerID: peerID,
str: scheme + peerID.Pretty(),
}
}

func parseAddr(addr string) (net.Addr, error) {
if !strings.HasPrefix(addr, scheme) {
return nil, errors.Errorf("expected inproc prefix: %s", addr)
}
pid, err := peer.IDB58Decode(addr[len(scheme):])
if err != nil {
return nil, err
}
return newAddr(pid), nil
}

func (a *addr) Network() string {
return "inproc"
}

func (a *addr) String() string {
return a.str
}

// _ is a type assertion
var _ net.Addr = ((*addr)(nil))
115 changes: 115 additions & 0 deletions transport/inproc/inproc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package inproc

import (
"context"
"net"
"sync"
"time"

"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/transport"
"github.com/aperturerobotics/bifrost/transport/common/pconn"
"github.com/aperturerobotics/bifrost/util/scrc"
"github.com/blang/semver"
"github.com/libp2p/go-libp2p-crypto"
"github.com/sirupsen/logrus"
)

// TransportID is the transport identifier
const TransportID = "inproc"

// ControllerID is the controller identifier.
const ControllerID = "bifrost/inproc/1"

// Version is the version of the inproc implementation.
var Version = semver.MustParse("0.0.1")

// handshakeTimeout is the time after which a handshake expires
var handshakeTimeout = time.Second * 8

// Inproc implements a Inproc transport.
type Inproc struct {
// Transport is the packet transport
*pconn.Transport

// le is the logger
le *logrus.Entry
// packetConn is the packet conn
packetConn *packetConn

// mtx guards below
mtx sync.Mutex
// remotes are the currently known remotes
// map is from string (net.addr.String()) to *packetConn
remotes map[string]*packetConn
}

// NewInproc builds a new Inproc transport.
// Yields Links to other Inproc transports.
func NewInproc(
ctx context.Context,
le *logrus.Entry,
opts *Config,
pKey crypto.PrivKey,
c transport.TransportHandler,
) (transport.Transport, error) {
peerID, err := peer.IDFromPrivateKey(pKey)
if err != nil {
return nil, err
}

localAddr := newAddr(peerID)
uuid := scrc.Crc64([]byte("inproc/" + peerID.Pretty()))
ip := &Inproc{le: le, remotes: make(map[string]*packetConn)}
npc := newPacketConn(
ctx,
localAddr,
ip.writeToAddr,
)
ip.Transport = pconn.New(
le,
uuid,
npc,
pKey,
parseAddr,
c,
opts.GetPacketOpts(),
)
ip.packetConn = npc
return ip, nil
}

// ConnectToInproc connects the inproc to a remote inproc.
// Will overwrite any existing connection
func (t *Inproc) ConnectToInproc(ctx context.Context, other *Inproc) {
t.mtx.Lock()
oa := other.LocalAddr().String()
t.remotes[oa] = other.packetConn
t.mtx.Unlock()
}

// DisconnectInproc disconnects a previously connected inproc.
func (t *Inproc) DisconnectInproc(ctx context.Context, other *Inproc) {
t.mtx.Lock()
oa := other.LocalAddr().String()
delete(t.remotes, oa)
t.mtx.Unlock()
}

// writeToAddr routes outgoing packets.
func (t *Inproc) writeToAddr(ctx context.Context, p []byte, addr net.Addr) (int, error) {
oa := addr.String()
t.mtx.Lock()
out, outOk := t.remotes[oa]
t.mtx.Unlock()
if !outOk {
return 0, &net.AddrError{
Addr: oa,
Err: "remote transport not connected",
}
}
return out.HandlePacket(ctx, p, t.LocalAddr())
}

// _ is a type assertion.
var _ transport.Transport = ((*Inproc)(nil))
97 changes: 97 additions & 0 deletions transport/inproc/inproc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions transport/inproc/inproc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package inproc;

import "github.com/aperturerobotics/bifrost/transport/common/pconn/pconn.proto";

// Config is the configuration for the inproc testing transport.
message Config {
// NodePeerID constrains the node peer ID.
// If empty, attaches to whatever node is running.
string node_peer_id = 1;
// PacketOpts are options to set on the packet connection.
pconn.Opts packet_opts = 2;
}

32 changes: 32 additions & 0 deletions transport/inproc/inproc_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package inproc

import (
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/util/confparse"
"github.com/aperturerobotics/controllerbus/config"
"github.com/golang/protobuf/proto"
)

// ConfigID is the string used to identify this config object.
const ConfigID = ControllerID

// Validate validates the configuration.
// This is a cursory validation to see if the values "look correct."
func (c *Config) Validate() error { return nil }

// ParseNodePeerID parses the node peer ID if it is not empty.
func (c *Config) ParseNodePeerID() (peer.ID, error) {
return confparse.ParsePeerID(c.GetNodePeerId())
}

// GetConfigID returns the unique string for this configuration type.
// This string is stored with the encoded config.
// Example: bifrost/transport/xbee/1
func (c *Config) GetConfigID() string {
return ConfigID
}

// EqualsConfig checks if the other config is equal.
func (c *Config) EqualsConfig(other config.Config) bool {
return proto.Equal(c, other)
}
Loading

0 comments on commit b44f5fa

Please sign in to comment.