diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 1c5f3d9cf2f..9d71da216bd 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -25,4 +25,4 @@ - [config] \#9483 Calling `tendermint init` would incorrectly leave out the new `[storage]` section delimiter in the generated configuration file - this has now been fixed - +- [p2p] \#9500 prevent peers who have errored being added to the peer_set (@jmalicevic) diff --git a/blockchain/v2/reactor_test.go b/blockchain/v2/reactor_test.go index a135d68fd86..57a4a00b41d 100644 --- a/blockchain/v2/reactor_test.go +++ b/blockchain/v2/reactor_test.go @@ -59,6 +59,9 @@ func (mp mockPeer) TrySend(byte, []byte) bool { return true } func (mp mockPeer) Set(string, interface{}) {} func (mp mockPeer) Get(string) interface{} { return struct{}{} } +func (mp mockPeer) SetRemovalFailed() {} +func (mp mockPeer) GetRemovalFailed() bool { return false } + type mockBlockStore struct { blocks map[int64]*types.Block } diff --git a/p2p/errors.go b/p2p/errors.go index 3650a7a0a8e..4fc915292fb 100644 --- a/p2p/errors.go +++ b/p2p/errors.go @@ -145,6 +145,13 @@ func (e ErrTransportClosed) Error() string { return "transport has been closed" } +// ErrPeerRemoval is raised when attempting to remove a peer results in an error. +type ErrPeerRemoval struct{} + +func (e ErrPeerRemoval) Error() string { + return "peer removal failed" +} + //------------------------------------------------------------------- type ErrNetAddressNoID struct { diff --git a/p2p/mock/peer.go b/p2p/mock/peer.go index 59f6e0f4aa4..10254c3437a 100644 --- a/p2p/mock/peer.go +++ b/p2p/mock/peer.go @@ -68,3 +68,5 @@ func (mp *Peer) RemoteIP() net.IP { return mp.ip } func (mp *Peer) SocketAddr() *p2p.NetAddress { return mp.addr } func (mp *Peer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } func (mp *Peer) CloseConn() error { return nil } +func (mp *Peer) SetRemovalFailed() {} +func (mp *Peer) GetRemovalFailed() bool { return false } diff --git a/p2p/mocks/peer.go b/p2p/mocks/peer.go index f739a0b2126..c4d89375cb5 100644 --- a/p2p/mocks/peer.go +++ b/p2p/mocks/peer.go @@ -53,6 +53,20 @@ func (_m *Peer) Get(_a0 string) interface{} { return r0 } +// GetRemovalFailed provides a mock function with given fields: +func (_m *Peer) GetRemovalFailed() bool { + ret := _m.Called() + + var r0 bool + if rf, ok := ret.Get(0).(func() bool); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + // ID provides a mock function with given fields: func (_m *Peer) ID() p2p.ID { ret := _m.Called() @@ -244,6 +258,11 @@ func (_m *Peer) SetLogger(_a0 log.Logger) { _m.Called(_a0) } +// SetRemovalFailed provides a mock function with given fields: +func (_m *Peer) SetRemovalFailed() { + _m.Called() +} + // SocketAddr provides a mock function with given fields: func (_m *Peer) SocketAddr() *p2p.NetAddress { ret := _m.Called() diff --git a/p2p/peer.go b/p2p/peer.go index 751ca3cf286..d8d61a7a00b 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -39,6 +39,9 @@ type Peer interface { Set(string, interface{}) Get(string) interface{} + + SetRemovalFailed() + GetRemovalFailed() bool } //---------------------------------------------------------- @@ -117,6 +120,9 @@ type peer struct { metrics *Metrics metricsTicker *time.Ticker + + // When removal of a peer fails, we set this flag + removalAttemptFailed bool } type PeerOption func(*peer) @@ -316,6 +322,14 @@ func (p *peer) CloseConn() error { return p.peerConn.conn.Close() } +func (p *peer) SetRemovalFailed() { + p.removalAttemptFailed = true +} + +func (p *peer) GetRemovalFailed() bool { + return p.removalAttemptFailed +} + //--------------------------------------------------- // methods only used for testing // TODO: can we remove these? diff --git a/p2p/peer_set.go b/p2p/peer_set.go index 38dff7a9fb5..30bcc4d32f5 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -47,6 +47,9 @@ func (ps *PeerSet) Add(peer Peer) error { if ps.lookup[peer.ID()] != nil { return ErrSwitchDuplicatePeerID{peer.ID()} } + if peer.GetRemovalFailed() { + return ErrPeerRemoval{} + } index := len(ps.list) // Appending is safe even with other goroutines @@ -107,6 +110,12 @@ func (ps *PeerSet) Remove(peer Peer) bool { item := ps.lookup[peer.ID()] if item == nil { + // Removing the peer has failed so we set a flag to mark that a removal was attempted. + // This can happen when the peer add routine from the switch is running in + // parallel to the receive routine of MConn. + // There is an error within MConn but the switch has not actually added the peer to the peer set yet. + // Setting this flag will prevent a peer from being added to a node's peer set afterwards. + peer.SetRemovalFailed() return false } diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index b61b43f10af..db3d9261e27 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -32,6 +32,8 @@ func (mp *mockPeer) RemoteIP() net.IP { return mp.ip } func (mp *mockPeer) SocketAddr() *NetAddress { return nil } func (mp *mockPeer) RemoteAddr() net.Addr { return &net.TCPAddr{IP: mp.ip, Port: 8800} } func (mp *mockPeer) CloseConn() error { return nil } +func (mp *mockPeer) SetRemovalFailed() {} +func (mp *mockPeer) GetRemovalFailed() bool { return false } // Returns a mock peer func newMockPeer(ip net.IP) *mockPeer { diff --git a/p2p/switch.go b/p2p/switch.go index 3214de22305..884fd883e6f 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -370,6 +370,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { // https://github.com/tendermint/tendermint/issues/3338 if sw.peers.Remove(peer) { sw.metrics.Peers.Add(float64(-1)) + } else { + // Removal of the peer has failed. The function above sets a flag within the peer to mark this. + // We keep this message here as information to the developer. + sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID()) } } @@ -824,6 +828,12 @@ func (sw *Switch) addPeer(p Peer) error { // so that if Receive errors, we will find the peer and remove it. // Add should not err since we already checked peers.Has(). if err := sw.peers.Add(p); err != nil { + switch err.(type) { + case ErrPeerRemoval: + sw.Logger.Error("Error starting peer ", + " err ", "Peer has already errored and removal was attempted.", + "peer", p.ID()) + } return err } sw.metrics.Peers.Add(float64(1)) diff --git a/p2p/switch_test.go b/p2p/switch_test.go index 36420d333c3..11de0a4426f 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -836,3 +836,16 @@ func BenchmarkSwitchBroadcast(b *testing.B) { b.Logf("success: %v, failure: %v", numSuccess, numFailure) } + +func TestSwitchRemovalErr(t *testing.T) { + + sw1, sw2 := MakeSwitchPair(t, func(i int, sw *Switch) *Switch { + return initSwitchFunc(i, sw) + }) + assert.Equal(t, len(sw1.Peers().List()), 1) + p := sw1.Peers().List()[0] + + sw2.StopPeerForError(p, fmt.Errorf("peer should error")) + + assert.Equal(t, sw2.peers.Add(p).Error(), ErrPeerRemoval{}.Error()) +} diff --git a/test/fuzz/p2p/pex/reactor_receive.go b/test/fuzz/p2p/pex/reactor_receive.go index 4ac06c89273..4d0ef65257c 100644 --- a/test/fuzz/p2p/pex/reactor_receive.go +++ b/test/fuzz/p2p/pex/reactor_receive.go @@ -84,3 +84,5 @@ func (fp *fuzzPeer) Send(byte, []byte) bool { return true } func (fp *fuzzPeer) TrySend(byte, []byte) bool { return true } func (fp *fuzzPeer) Set(key string, value interface{}) { fp.m[key] = value } func (fp *fuzzPeer) Get(key string) interface{} { return fp.m[key] } +func (fp *fuzzPeer) SetRemovalFailed() {} +func (fp *fuzzPeer) GetRemovalFailed() bool { return false }