Skip to content

Commit

Permalink
spv: Implement per-peer increasing backoffs
Browse files Browse the repository at this point in the history
Backoff for discovered peers begins at 5s and adds an exponentially-decaying
increase before maxing out at 90s.  Persistent peers use a smaller scaled
backoff curve starting at 1s and maxing out at 18s).

All backoffs include a small amount of random jitter to avoid reconnection
storms at syncer startup.

Backoffs are reset if network connection is established to the peer.
  • Loading branch information
jrick committed Jan 28, 2025
1 parent 4cb8d60 commit 5b9bceb
Showing 1 changed file with 90 additions and 9 deletions.
99 changes: 90 additions & 9 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/decred/dcrd/addrmgr/v2"
"github.com/decred/dcrd/blockchain/stake/v5"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/crypto/rand"
"github.com/decred/dcrd/dcrec/secp256k1/v4/schnorr"
"github.com/decred/dcrd/gcs/v4/blockcf2"
"github.com/decred/dcrd/mixing"
Expand All @@ -43,6 +44,11 @@ const (
minVersionTarget = 3
)

type backoff struct {
duration time.Duration
time time.Time
}

// Syncer implements wallet synchronization services by over the Decred wire
// protocol using Simplified Payment Verification (SPV) with compact filters.
type Syncer struct {
Expand Down Expand Up @@ -92,6 +98,9 @@ type Syncer struct {
mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx
mempoolAdds chan *chainhash.Hash

backoffs map[string]backoff
backoffsMu sync.Mutex

teardown func()
done chan struct{}
err error
Expand Down Expand Up @@ -143,6 +152,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer {
lp: lp,
mempoolAdds: make(chan *chainhash.Hash),
initialSyncDone: make(chan struct{}),
backoffs: make(map[string]backoff),
}
}

Expand Down Expand Up @@ -510,7 +520,16 @@ var errBreaksMinVersionTarget = errors.New("peer uses too low version to satisif

// connectAndRunPeer connects to and runs the syncing process with the specified
// peer. It blocks until the peer disconnects and logs any errors.
func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent bool) {
func (s *Syncer) connectAndRunPeer(ctx context.Context, backoff time.Duration, raddr string, persistent bool) (connected bool) {
if backoff > 0 {
log.Tracef("connectAndRunPeer(%v): sleeping backoff %v", raddr, backoff)
select {
case <-time.After(backoff):
case <-ctx.Done():
return
}
}

// Attempt connection to peer.
rp, err := s.lp.ConnectOutbound(ctx, raddr, reqSvcs)
if err != nil {
Expand Down Expand Up @@ -543,6 +562,7 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent
s.remotesMu.Unlock()
log.Infof("New peer %v %v version=%d %v", raddr, rp.UA(), rp.Pver(), rp.Services())
s.peerConnected(n, raddr)
connected = true

// Alert disconnection once this peer is done.
defer func() {
Expand Down Expand Up @@ -575,6 +595,7 @@ func (s *Syncer) connectAndRunPeer(ctx context.Context, raddr string, persistent
} else {
log.Infof("Lost peer %v", raddr)
}
return
}

func (s *Syncer) breaksMinVersionTarget(rp *p2p.RemotePeer) bool {
Expand All @@ -597,16 +618,59 @@ func (s *Syncer) breaksMinVersionTarget(rp *p2p.RemotePeer) bool {
return true
}

// Returns the next backoff duration based on the previous value. Backoffs
// start at 5s and add an exponentially decaying increase, capped to 1m30s.
// It takes roughly 45m of repeated connection failures to reach the maximum
// backoff period.
func nextBackoff(d time.Duration) time.Duration {
const maxBackoff = 90 * time.Second
d = 5*time.Second + d*95/100
if d > maxBackoff {
d = maxBackoff
}
return d
}

// backoffJitter returns a random +-200ms of jitter to be added to every backoff.
func backoffJitter() time.Duration {
return -200*time.Millisecond + rand.Duration(400*time.Millisecond)
}

func (s *Syncer) backoff(raddr string) backoff {
s.backoffsMu.Lock()
defer s.backoffsMu.Unlock()

return s.backoffs[raddr]
}

func (s *Syncer) setBackoff(raddr string, b backoff) {
s.backoffsMu.Lock()
defer s.backoffsMu.Unlock()

if b.duration == 0 {
delete(s.backoffs, raddr)
return
}

s.backoffs[raddr] = b
}

func (s *Syncer) connectToPersistent(ctx context.Context, raddr string) error {
var d time.Duration
for {
s.connectAndRunPeer(ctx, raddr, true)

// Retry persistent peer after 5 seconds.
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(5 * time.Second):
// Persistent peers use a backoff duration scaled to 1/5th of
// discovered candidate peers (min 5s -> 1s, max 1m30s -> 18s).
jitter := backoffJitter()
connected := s.connectAndRunPeer(ctx, d/5+jitter, raddr, true)
if err := ctx.Err(); err != nil {
return err
}
if connected {
d = 0
} else {
d += jitter
}
d = nextBackoff(d)
}
}

Expand Down Expand Up @@ -638,7 +702,24 @@ func (s *Syncer) connectToCandidates(ctx context.Context) error {
wg.Add(1)
go func() {
raddr := na.String()
s.connectAndRunPeer(ctx, raddr, false)
b := s.backoff(raddr)
var jitter, delay time.Duration
if b.time.IsZero() {
jitter = rand.Duration(200 * time.Millisecond)
delay = jitter
} else {
jitter = backoffJitter()
delay = time.Until(b.time) + jitter
}
connected := s.connectAndRunPeer(ctx, delay, raddr, false)
if connected {
s.setBackoff(raddr, backoff{})
} else {
b.duration = nextBackoff(b.duration + jitter)
b.time = time.Now().Add(b.duration)
s.setBackoff(raddr, b)
}

wg.Done()
<-sem
}()
Expand Down

0 comments on commit 5b9bceb

Please sign in to comment.