Skip to content

Commit

Permalink
spv: Teardown syncer after all peers are lost
Browse files Browse the repository at this point in the history
Recreate a new syncer in the SPV reconnect loop in main.  This will allow
initial sync logic to be reperformed if peers can be connected later.

This also adds a forced 5s backoff before peers connection attempts are made
again, matching the backoff behavior for RPC syncing.
  • Loading branch information
jrick committed Jan 27, 2025
1 parent 8b18295 commit ef2b8b3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
23 changes: 14 additions & 9 deletions dcrwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,21 +523,26 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) {
addr := &net.TCPAddr{IP: net.ParseIP("::1"), Port: 0}
amgrDir := filepath.Join(cfg.AppDataDir.Value, w.ChainParams().Name)
amgr := addrmgr.New(amgrDir, cfg.lookup)
lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr)
lp.SetDialFunc(cfg.dial)
lp.SetDisableRelayTx(cfg.SPVDisableRelayTx)
syncer := spv.NewSyncer(w, lp)
if len(cfg.SPVConnect) > 0 {
syncer.SetPersistentPeers(cfg.SPVConnect)
}
w.SetNetworkBackend(syncer)
for {
lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr)
lp.SetDialFunc(cfg.dial)
lp.SetDisableRelayTx(cfg.SPVDisableRelayTx)
syncer := spv.NewSyncer(w, lp)
if len(cfg.SPVConnect) > 0 {
syncer.SetPersistentPeers(cfg.SPVConnect)
}
w.SetNetworkBackend(syncer)
err := syncer.Run(ctx)
if done(ctx) {
if err == nil || done(ctx) {
loggers.SyncLog.Infof("SPV synchronization stopped")
return
}
loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err)
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
}
}

Expand Down
27 changes: 19 additions & 8 deletions spv/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ type Syncer struct {
mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx
mempoolAdds chan *chainhash.Hash

done chan struct{}
err error
doneMu sync.Mutex
teardown func()
done chan struct{}
err error
doneMu sync.Mutex
}

// Notifications struct to contain all of the upcoming callbacks that will
Expand Down Expand Up @@ -176,10 +177,12 @@ func (s *Syncer) synced() {
// unsynced checks the atomic that controls wallet syncness and if previously
// synced, updates to unsynced and notifies the callback, if set.
func (s *Syncer) unsynced() {
if s.atomicWalletSynced.CompareAndSwap(1, 0) &&
s.notifications != nil &&
s.notifications.Synced != nil {
s.notifications.Synced(false)
if s.atomicWalletSynced.CompareAndSwap(1, 0) {
if s.notifications != nil &&
s.notifications.Synced != nil {
s.notifications.Synced(false)
}
s.teardown()
}
}

Expand Down Expand Up @@ -368,6 +371,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

// Start background handlers to read received messages from remote peers
g, gctx := errgroup.WithContext(context.Background())
gctx, cancel := context.WithCancel(gctx)
s.teardown = func() {
err = errors.E(errors.NoPeers)
cancel()
}
g.Go(func() error { return s.receiveGetData(gctx) })
g.Go(func() error { return s.receiveInv(gctx) })
g.Go(func() error { return s.receiveHeadersAnnouncements(gctx) })
Expand Down Expand Up @@ -450,7 +458,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
})

// Wait until cancellation or a handler errors.
return g.Wait()
if e := g.Wait(); err == nil {
err = e
}
return
}

// peerCandidate returns a peer address that we shall attempt to connect to.
Expand Down

0 comments on commit ef2b8b3

Please sign in to comment.