diff --git a/dcrwallet.go b/dcrwallet.go index a0a656cfe..afce6ad1a 100644 --- a/dcrwallet.go +++ b/dcrwallet.go @@ -523,21 +523,26 @@ func spvLoop(ctx context.Context, w *wallet.Wallet) { addr := &net.TCPAddr{IP: net.ParseIP("::1"), Port: 0} amgrDir := filepath.Join(cfg.AppDataDir.Value, w.ChainParams().Name) amgr := addrmgr.New(amgrDir, cfg.lookup) - lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr) - lp.SetDialFunc(cfg.dial) - lp.SetDisableRelayTx(cfg.SPVDisableRelayTx) - syncer := spv.NewSyncer(w, lp) - if len(cfg.SPVConnect) > 0 { - syncer.SetPersistentPeers(cfg.SPVConnect) - } - w.SetNetworkBackend(syncer) for { + lp := p2p.NewLocalPeer(w.ChainParams(), addr, amgr) + lp.SetDialFunc(cfg.dial) + lp.SetDisableRelayTx(cfg.SPVDisableRelayTx) + syncer := spv.NewSyncer(w, lp) + if len(cfg.SPVConnect) > 0 { + syncer.SetPersistentPeers(cfg.SPVConnect) + } + w.SetNetworkBackend(syncer) err := syncer.Run(ctx) - if done(ctx) { + if err == nil || done(ctx) { loggers.SyncLog.Infof("SPV synchronization stopped") return } loggers.SyncLog.Errorf("SPV synchronization stopped: %v", err) + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + } } } diff --git a/spv/sync.go b/spv/sync.go index 10bca0ba7..4456c6b7a 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -92,9 +92,10 @@ type Syncer struct { mempool sync.Map // k=chainhash.Hash v=*wire.MsgTx mempoolAdds chan *chainhash.Hash - done chan struct{} - err error - doneMu sync.Mutex + teardown func() + done chan struct{} + err error + doneMu sync.Mutex } // Notifications struct to contain all of the upcoming callbacks that will @@ -176,10 +177,12 @@ func (s *Syncer) synced() { // unsynced checks the atomic that controls wallet syncness and if previously // synced, updates to unsynced and notifies the callback, if set. func (s *Syncer) unsynced() { - if s.atomicWalletSynced.CompareAndSwap(1, 0) && - s.notifications != nil && - s.notifications.Synced != nil { - s.notifications.Synced(false) + if s.atomicWalletSynced.CompareAndSwap(1, 0) { + if s.notifications != nil && + s.notifications.Synced != nil { + s.notifications.Synced(false) + } + s.teardown() } } @@ -368,6 +371,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) { // Start background handlers to read received messages from remote peers g, gctx := errgroup.WithContext(context.Background()) + gctx, cancel := context.WithCancel(gctx) + s.teardown = func() { + err = errors.E(errors.NoPeers) + cancel() + } g.Go(func() error { return s.receiveGetData(gctx) }) g.Go(func() error { return s.receiveInv(gctx) }) g.Go(func() error { return s.receiveHeadersAnnouncements(gctx) }) @@ -450,7 +458,10 @@ func (s *Syncer) Run(ctx context.Context) (err error) { }) // Wait until cancellation or a handler errors. - return g.Wait() + if e := g.Wait(); err == nil { + err = e + } + return } // peerCandidate returns a peer address that we shall attempt to connect to.