Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

network: handle empty wsPeer supplied to transaction handler #6195

Merged
merged 6 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agreement/fuzzer/networkFacade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ type facadePeer struct {
}

func (p *facadePeer) GetNetwork() network.GossipNode { return p.net }
func (p *facadePeer) RoutingAddr() []byte {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(p.id))
return buf
}

// MakeNetworkFacade creates a facade with a given nodeID.
func MakeNetworkFacade(fuzzer *Fuzzer, nodeID int) *NetworkFacade {
Expand Down
4 changes: 4 additions & 0 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ func (p *testUnicastPeer) GetAddress() string {

func (p *testUnicastPeer) GetNetwork() network.GossipNode { return p.gn }

func (p *testUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

func (p *testUnicastPeer) Request(ctx context.Context, tag protocol.Tag, topics network.Topics) (resp *network.Response, e error) {

responseChannel := make(chan *network.Response, 1)
Expand Down
6 changes: 3 additions & 3 deletions data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func (handler *TxHandler) incomingMsgDupCheck(data []byte) (*crypto.Digest, bool
// Returns:
// - the capacity guard returned by the elastic rate limiter
// - a boolean indicating if the sender is rate limited
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectablePeer) (*util.ErlCapacityGuard, bool) {
func (handler *TxHandler) incomingMsgErlCheck(sender network.DisconnectableAddressablePeer) (*util.ErlCapacityGuard, bool) {
var capguard *util.ErlCapacityGuard
var isCMEnabled bool
var err error
Expand Down Expand Up @@ -715,11 +715,11 @@ func (handler *TxHandler) incomingTxGroupCanonicalDedup(unverifiedTxGroup []tran
}

// incomingTxGroupAppRateLimit checks if the sender is rate limited by the per-application rate limiter.
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectablePeer) bool {
func (handler *TxHandler) incomingTxGroupAppRateLimit(unverifiedTxGroup []transactions.SignedTxn, sender network.DisconnectableAddressablePeer) bool {
// rate limit per application in a group. Limiting any app in a group drops the entire message.
if handler.appLimiter != nil {
congestedARL := len(handler.backlogQueue) > handler.appLimiterBacklogThreshold
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.(network.IPAddressable).RoutingAddr()) {
if congestedARL && handler.appLimiter.shouldDrop(unverifiedTxGroup, sender.RoutingAddr()) {
transactionMessagesAppLimiterDrop.Inc(nil)
return true
}
Expand Down
2 changes: 1 addition & 1 deletion network/connPerfMon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func makeMsgPool(N int, peers []Peer) (out []IncomingMessage) {

addMsg := func(msgCount int) {
for i := 0; i < msgCount; i++ {
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectablePeer)
msg.Sender = peers[(int(msgIndex)+i)%len(peers)].(DisconnectableAddressablePeer)
timer += int64(7 * time.Nanosecond)
msg.Received = timer
out = append(out, msg)
Expand Down
13 changes: 12 additions & 1 deletion network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type DisconnectablePeer interface {
GetNetwork() GossipNode
}

// DisconnectableAddressablePeer is a Peer with a long-living connection to a network that can be disconnected and has an IP address
type DisconnectableAddressablePeer interface {
DisconnectablePeer
IPAddressable
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// PeerOption allows users to specify a subset of peers to query
//
//msgp:ignore PeerOption
Expand Down Expand Up @@ -118,7 +129,7 @@ var outgoingMessagesBufferSize = int(

// IncomingMessage represents a message arriving from some peer in our p2p network
type IncomingMessage struct {
Sender DisconnectablePeer
Sender DisconnectableAddressablePeer
Tag Tag
Data []byte
Err error
Expand Down
32 changes: 24 additions & 8 deletions network/p2pNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,23 +943,39 @@
}
}

type gsPeer struct {
peerID peer.ID
net *P2PNetwork
routingAddr [8]byte
}

func (p *gsPeer) GetNetwork() GossipNode {
return p.net

Check warning on line 953 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L952-L953

Added lines #L952 - L953 were not covered by tests
}

func (p *gsPeer) RoutingAddr() []byte {
return p.routingAddr[:]

Check warning on line 957 in network/p2pNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/p2pNetwork.go#L956-L957

Added lines #L956 - L957 were not covered by tests
}

// txTopicValidator calls txHandler to validate and process incoming transactions.
func (n *P2PNetwork) txTopicValidator(ctx context.Context, peerID peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
var routingAddr [8]byte
n.wsPeersLock.Lock()
var wsp *wsPeer
var ok bool
if wsp, ok = n.wsPeers[peerID]; ok {
copy(routingAddr[:], wsp.RoutingAddr())
var sender DisconnectableAddressablePeer
if wsp, ok := n.wsPeers[peerID]; ok {
sender = wsp
} else {
// well, otherwise use last 8 bytes of peerID
// otherwise use last 8 bytes of peerID
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// handle the case where the peer is not in the wsPeers map yet
// this can happen when pubsub receives new peer notifications before the wsStreamHandler is called:
// create a fake peer that is good enough for tx handler to work with.
var routingAddr [8]byte
copy(routingAddr[:], peerID[len(peerID)-8:])
sender = &gsPeer{peerID: peerID, net: n, routingAddr: routingAddr}
}
n.wsPeersLock.Unlock()

inmsg := IncomingMessage{
// Sender: gossipSubPeer{peerID: msg.ReceivedFrom, net: n, routingAddr: routingAddr},
Sender: wsp,
Sender: sender,
Tag: protocol.TxnTag,
Data: msg.Data,
Net: n,
Expand Down
31 changes: 31 additions & 0 deletions network/p2pNetwork_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/algorand/go-algorand/test/partitiontest"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -1374,3 +1375,33 @@ func TestP2PEnableGossipService_BothDisable(t *testing.T) {
require.False(t, netA.hasPeers())
require.False(t, netB.hasPeers())
}

// TestP2PTxTopicValidator_NoWsPeer checks txTopicValidator does not call tx handler with empty Sender
func TestP2PTxTopicValidator_NoWsPeer(t *testing.T) {
partitiontest.PartitionTest(t)

log := logging.TestingLog(t)

// prepare configs
cfg := config.GetDefaultLocal()
cfg.DNSBootstrapID = "" // disable DNS lookups since the test uses phonebook addresses

net, err := NewP2PNetwork(log, cfg, "", nil, genesisID, config.Devtestnet, &nopeNodeInfo{}, nil)
require.NoError(t, err)

peerID := peer.ID("12345678") // must be 8+ in size
msg := pubsub.Message{Message: &pb.Message{}, ID: string(peerID)}
validateIncomingTxMessage := func(rawmsg IncomingMessage) OutgoingMessage {
require.NotEmpty(t, rawmsg.Sender)
require.Implements(t, (*DisconnectableAddressablePeer)(nil), rawmsg.Sender)
return OutgoingMessage{Action: Accept}
}
net.handler.RegisterValidatorHandlers([]TaggedMessageValidatorHandler{
{Tag: protocol.TxnTag, MessageHandler: ValidateHandleFunc(validateIncomingTxMessage)},
})

ctx := context.Background()
require.NotContains(t, net.wsPeers, peerID)
res := net.txTopicValidator(ctx, peerID, &msg)
require.Equal(t, pubsub.ValidationAccept, res)
}
6 changes: 3 additions & 3 deletions network/wsNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ const (
type broadcastRequest struct {
tags []Tag
data [][]byte
except *wsPeer
except Peer
done chan struct{}
enqueueTime time.Time
ctx context.Context
Expand Down Expand Up @@ -381,7 +381,7 @@ func (wn *msgBroadcaster) BroadcastArray(ctx context.Context, tags []protocol.Ta

request := broadcastRequest{tags: tags, data: data, enqueueTime: time.Now(), ctx: ctx}
if except != nil {
request.except = except.(*wsPeer)
request.except = except
}

broadcastQueue := wn.broadcastQueueBulk
Expand Down Expand Up @@ -1401,7 +1401,7 @@ func (wn *msgBroadcaster) innerBroadcast(request broadcastRequest, prio bool, pe
if wn.config.BroadcastConnectionsLimit >= 0 && sentMessageCount >= wn.config.BroadcastConnectionsLimit {
break
}
if peer == request.except {
if Peer(peer) == request.except {
cce marked this conversation as resolved.
Show resolved Hide resolved
continue
}
ok := peer.writeNonBlockMsgs(request.ctx, data, prio, digests, request.enqueueTime)
Expand Down
12 changes: 7 additions & 5 deletions network/wsPeer.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@

// closers is a slice of functions to run when the peer is closed
closers []func()
// closersMu synchronizes access to closers
closersMu deadlock.RWMutex
cce marked this conversation as resolved.
Show resolved Hide resolved

// peerType defines the peer's underlying connection type
// used for separate p2p vs ws metrics
Expand All @@ -295,11 +297,6 @@
GetHTTPClient() *http.Client
}

// IPAddressable is addressable with either IPv4 or IPv6 address
type IPAddressable interface {
RoutingAddr() []byte
}

// UnicastPeer is another possible interface for the opaque Peer.
// It is possible that we can only initiate a connection to a peer over websockets.
type UnicastPeer interface {
Expand Down Expand Up @@ -979,6 +976,8 @@
}

}
wp.closersMu.RLock()
defer wp.closersMu.RUnlock()
algorandskiy marked this conversation as resolved.
Show resolved Hide resolved
// now call all registered closers
for _, f := range wp.closers {
f()
Expand Down Expand Up @@ -1115,6 +1114,9 @@
}

func (wp *wsPeer) OnClose(f func()) {
wp.closersMu.Lock()
gmalouf marked this conversation as resolved.
Show resolved Hide resolved
defer wp.closersMu.Unlock()

Check warning on line 1118 in network/wsPeer.go

View check run for this annotation

Codecov / codecov/patch

network/wsPeer.go#L1117-L1118

Added lines #L1117 - L1118 were not covered by tests

if wp.closers == nil {
wp.closers = []func(){}
}
Expand Down
4 changes: 4 additions & 0 deletions rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ func (mup *mockUnicastPeer) GetNetwork() network.GossipNode {
panic("not implemented")
}

func (mup *mockUnicastPeer) RoutingAddr() []byte {
panic("not implemented")
}

// TestHandleCatchupReqNegative covers the error reporting in handleCatchupReq
func TestHandleCatchupReqNegative(t *testing.T) {
partitiontest.PartitionTest(t)
Expand Down
Loading