Skip to content

Commit

Permalink
spv: Implement per-peer increasing backoffs
Browse files Browse the repository at this point in the history
Backoff for discovered peers begins at 5s and adds an exponentially-decaying
increase before maxing out at 90s.  Persistent peers use a smaller scaled
backoff curve starting at 1s and maxing out at 18s).

All backoffs include a small amount of random jitter to avoid reconnection
storms at syncer startup.

Backoffs are reset if network connection is established to the peer.
  • Loading branch information
jrick committed Jan 27, 2025
1 parent 8dd3140 commit 4803538
Showing 1 changed file with 75 additions and 9 deletions.
84 changes: 75 additions & 9 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/decred/dcrd/addrmgr/v2"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/crypto/rand"
"github.com/decred/dcrd/dcrec/secp256k1/v4/schnorr"
"github.com/decred/dcrd/gcs/v4/blockcf2"
"github.com/decred/dcrd/mixing"
Expand Down Expand Up @@ -92,6 +93,9 @@ type Syncer struct {
mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx
mempoolAdds chan *chainhash.Hash

backoffs map[string]time.Duration
backoffsMu sync.Mutex

teardown func()
done chan struct{}
err error
Expand Down Expand Up @@ -143,6 +147,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer {
lp: lp,
mempoolAdds: make(chan *chainhash.Hash),
initialSyncDone: make(chan struct{}),
backoffs: make(map[string]time.Duration),
}
}

Expand Down Expand Up @@ -509,7 +514,15 @@ var errBreaksMinVersionTarget = errors.New("peer uses too low version to satisif

// connectAndRunPeer connects to and runs the syncing process with the specified
// peer. It blocks until the peer disconnects and logs any errors.
func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent bool) {
func (s *Syncer) connectAndRunPeer(ctx context.Context, backoff time.Duration, raddr string, persistent bool) (connected bool) {
if backoff != 0 {
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}

// Attempt connection to peer.
rp, err := s.lp.ConnectOutbound(ctx, raddr, reqSvcs)
if err != nil {
Expand Down Expand Up @@ -542,6 +555,7 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent
s.remotesMu.Unlock()
log.Infof("New peer %v %v version=%d %v", raddr, rp.UA(), rp.Pver(), rp.Services())
s.peerConnected(n, raddr)
connected = true

// Alert disconnection once this peer is done.
defer func() {
Expand Down Expand Up @@ -574,6 +588,7 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent
} else {
log.Infof("Lost peer %v", raddr)
}
return
}

func (s *Syncer) breaksMinVersionTarget(rp *p2p.RemotePeer) bool {
Expand All @@ -596,16 +611,59 @@ func (s *Syncer) breaksMinVersionTarget(rp *p2p.RemotePeer) bool {
return true
}

// Returns the next backoff duration based on the previous value. Backoffs
// start at 5s and add an exponentially decaying increase, capped to 1m30s.
// It takes roughly 45m of repeated connection failures to reach the maximum
// backoff period.
func backoff(d time.Duration) time.Duration {
const maxBackoff = 90 * time.Second
d = 5*time.Second + d*95/100
if d > maxBackoff {
d = maxBackoff
}
return d
}

// backoffJitter returns a random +-50ms of jitter to be added to every backoff.
func backoffJitter() time.Duration {
return -50*time.Millisecond + rand.Duration(100*time.Millisecond)
}

func (s *Syncer) backoff(raddr string) time.Duration {
s.backoffsMu.Lock()
defer s.backoffsMu.Unlock()

return s.backoffs[raddr]
}

func (s *Syncer) setBackoff(raddr string, d time.Duration) {
s.backoffsMu.Lock()
defer s.backoffsMu.Unlock()

if d == 0 {
delete(s.backoffs, raddr)
return
}

s.backoffs[raddr] = d
}

func (s *Syncer) connectToPersistent(ctx context.Context, raddr string) error {
var d time.Duration
for {
s.connectAndRunPeer(ctx, raddr, true)

// Retry persistent peer after 5 seconds.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
// Persistent peers use a backoff duration scaled to 1/5th of
// discovered candidate peers (min 5s -> 1s, max 1m30s -> 18s).
jitter := backoffJitter()
connected := s.connectAndRunPeer(ctx, d/5+jitter, raddr, true)
if err := ctx.Err(); err != nil {
return err
}
if connected {
d = 0
} else {
d += jitter
}
d = backoff(d)
}
}

Expand Down Expand Up @@ -637,9 +695,17 @@ func (s *Syncer) connectToCandidates(ctx context.Context) error {
wg.Add(1)
go func() {
raddr := na.String()
s.connectAndRunPeer(ctx, raddr, false)
d := s.backoff(raddr) + backoffJitter()
connected := s.connectAndRunPeer(ctx, d, raddr, false)
wg.Done()
<-sem

if connected {
d = 0
} else {
d = backoff(d)
}
s.setBackoff(raddr, d)
}()
}
}
Expand Down

0 comments on commit 4803538

Please sign in to comment.