Skip to content

Commit

Permalink
p2p: fix using custom channels (tendermint#6339)
Browse files Browse the repository at this point in the history
  • Loading branch information
cmwaters authored Apr 13, 2021
1 parent bd968ab commit a9ac635
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG_PENDING.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ Friendly reminder, we have a [bug bounty program](https://hackerone.com/tendermi

### BUG FIXES

- [p2p/node] \#6339 Fix bug with using custom channels (@cmwaters)

19 changes: 17 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,21 @@ func CustomReactors(reactors map[string]p2p.Reactor) Option {
n.sw.RemoveReactor(name, existingReactor)
}
n.sw.AddReactor(name, reactor)
// register the new channels to the nodeInfo
// NOTE: This is a bit messy now with the type casting but is
// cleaned up in the following version when NodeInfo is changed from
// and interface to a concrete type
if ni, ok := n.nodeInfo.(p2p.DefaultNodeInfo); ok {
for _, chDesc := range reactor.GetChannels() {
if !ni.HasChannel(chDesc.ID) {
ni.Channels = append(ni.Channels, chDesc.ID)
n.transport.AddChannel(chDesc.ID)
}
}
n.nodeInfo = ni
} else {
n.Logger.Error("Node info is not of type DefaultNodeInfo. Custom reactor channels can not be added.")
}
}
}
}
Expand Down Expand Up @@ -1240,7 +1255,7 @@ func makeNodeInfo(
txIndexer txindex.TxIndexer,
genDoc *types.GenesisDoc,
state sm.State,
) (p2p.NodeInfo, error) {
) (p2p.DefaultNodeInfo, error) {
txIndexerStatus := "on"
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
Expand All @@ -1255,7 +1270,7 @@ func makeNodeInfo(
case "v2":
bcChannel = bcv2.BlockchainChannel
default:
return nil, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
return p2p.DefaultNodeInfo{}, fmt.Errorf("unknown fastsync version %s", config.FastSync.Version)
}

nodeInfo := p2p.DefaultNodeInfo{
Expand Down
13 changes: 13 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
tmrand "github.com/tendermint/tendermint/libs/rand"
mempl "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/p2p/conn"
p2pmock "github.com/tendermint/tendermint/p2p/mock"
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
Expand Down Expand Up @@ -379,6 +380,14 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {
defer os.RemoveAll(config.RootDir)

cr := p2pmock.NewReactor()
cr.Channels = []*conn.ChannelDescriptor{
{
ID: byte(0x31),
Priority: 5,
SendQueueCapacity: 100,
RecvMessageCapacity: 100,
},
}
customBlockchainReactor := p2pmock.NewReactor()

nodeKey, err := p2p.LoadOrGenNodeKey(config.NodeKeyFile())
Expand All @@ -405,6 +414,10 @@ func TestNodeNewNodeCustomReactors(t *testing.T) {

assert.True(t, customBlockchainReactor.IsRunning())
assert.Equal(t, customBlockchainReactor, n.Switch().Reactor("BLOCKCHAIN"))

channels := n.NodeInfo().(p2p.DefaultNodeInfo).Channels
assert.Contains(t, channels, mempl.MempoolChannel)
assert.Contains(t, channels, cr.Channels[0].ID)
}

func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
Expand Down
4 changes: 3 additions & 1 deletion p2p/mock/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

type Reactor struct {
p2p.BaseReactor

Channels []*conn.ChannelDescriptor
}

func NewReactor() *Reactor {
Expand All @@ -17,7 +19,7 @@ func NewReactor() *Reactor {
return r
}

func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return []*conn.ChannelDescriptor{} }
func (r *Reactor) GetChannels() []*conn.ChannelDescriptor { return r.Channels }
func (r *Reactor) AddPeer(peer p2p.Peer) {}
func (r *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {}
func (r *Reactor) Receive(chID byte, peer p2p.Peer, msgBytes []byte) {}
13 changes: 9 additions & 4 deletions p2p/node_info.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package p2p

import (
"bytes"
"errors"
"fmt"
"reflect"

"github.com/tendermint/tendermint/libs/bytes"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
tmstrings "github.com/tendermint/tendermint/libs/strings"
tmp2p "github.com/tendermint/tendermint/proto/tendermint/p2p"
"github.com/tendermint/tendermint/version"
Expand Down Expand Up @@ -85,9 +86,9 @@ type DefaultNodeInfo struct {

// Check compatibility.
// Channels are HexBytes so easier to read as JSON
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels bytes.HexBytes `json:"channels"` // channels this node knows about
Network string `json:"network"` // network/chain ID
Version string `json:"version"` // major.minor.revision
Channels tmbytes.HexBytes `json:"channels"` // channels this node knows about

// ASCIIText fields
Moniker string `json:"moniker"` // arbitrary moniker
Expand Down Expand Up @@ -222,6 +223,10 @@ func (info DefaultNodeInfo) NetAddress() (*NetAddress, error) {
return NewNetAddressString(idAddr)
}

func (info DefaultNodeInfo) HasChannel(chID byte) bool {
return bytes.Contains(info.Channels, []byte{chID})
}

func (info DefaultNodeInfo) ToProto() *tmp2p.DefaultNodeInfo {

dni := new(tmp2p.DefaultNodeInfo)
Expand Down
3 changes: 2 additions & 1 deletion p2p/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func TestNodeInfoCompatible(t *testing.T) {
assert.NoError(t, ni1.CompatibleWith(ni2))

// add another channel; still compatible
ni2.Channels = []byte{newTestChannel, testCh}
ni2.Channels = append(ni2.Channels, newTestChannel)
assert.True(t, ni2.HasChannel(newTestChannel))
assert.NoError(t, ni1.CompatibleWith(ni2))

// wrong NodeInfo type is not compatible
Expand Down
7 changes: 1 addition & 6 deletions p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,10 @@ func newPeer(
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
var channs = make([]byte, 0, len(chDescs))
for _, desc := range chDescs {
channs = append(channs, desc.ID)
}

p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: channs,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmap.NewCMap(),
metricsTicker: time.NewTicker(metricsTickerDuration),
metrics: NopMetrics(),
Expand Down
13 changes: 13 additions & 0 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,19 @@ func (mt *MultiplexTransport) Listen(addr NetAddress) error {
return nil
}

// AddChannel registers a channel to nodeInfo.
// NOTE: NodeInfo must be of type DefaultNodeInfo else channels won't be updated
// This is a bit messy at the moment but is cleaned up in the following version
// when NodeInfo changes from an interface to a concrete type
func (mt *MultiplexTransport) AddChannel(chID byte) {
if ni, ok := mt.nodeInfo.(DefaultNodeInfo); ok {
if !ni.HasChannel(chID) {
ni.Channels = append(ni.Channels, chID)
}
mt.nodeInfo = ni
}
}

func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()
Expand Down
15 changes: 15 additions & 0 deletions p2p/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,21 @@ func TestTransportHandshake(t *testing.T) {
}
}

func TestTransportAddChannel(t *testing.T) {
mt := newMultiplexTransport(
emptyNodeInfo(),
NodeKey{
PrivKey: ed25519.GenPrivKey(),
},
)
testChannel := byte(0x01)

mt.AddChannel(testChannel)
if !mt.nodeInfo.(DefaultNodeInfo).HasChannel(testChannel) {
t.Errorf("missing added channel %v. Got %v", testChannel, mt.nodeInfo.(DefaultNodeInfo).Channels)
}
}

// create listener
func testSetupMultiplexTransport(t *testing.T) *MultiplexTransport {
var (
Expand Down

0 comments on commit a9ac635

Please sign in to comment.