diff --git a/p2p/peering.go b/p2p/peering.go index 7ff622e98..20a7cc63a 100644 --- a/p2p/peering.go +++ b/p2p/peering.go @@ -708,6 +708,11 @@ func (rp *RemotePeer) Err() error { return rp.err } +// Done returns a channel that is closed once the peer disconnects. +func (rp *RemotePeer) Done() <-chan struct{} { + return rp.errc +} + // RemoteAddr returns the remote address of the peer's TCP connection. func (rp *RemotePeer) RemoteAddr() net.Addr { return rp.c.RemoteAddr() diff --git a/spv/backend.go b/spv/backend.go index fce221d1b..d7a05afc1 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -29,6 +29,17 @@ var _ wallet.NetworkBackend = (*Syncer)(nil) func pickAny(*p2p.RemotePeer) bool { return true } +// pickForGetHeaders returns a function to use in waitForRemotes which selects +// peers that may have headers that are more recent than the passed tipHeight. +func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool { + return func(rp *p2p.RemotePeer) bool { + // We are interested in this peer's headers if they announced a + // height greater than the current tip height and if we haven't + // yet fetched all the headers that it announced. + return rp.InitialHeight() > tipHeight && rp.LastHeight() < rp.InitialHeight() + } +} + // Blocks implements the Blocks method of the wallet.Peer interface. func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { for { diff --git a/spv/sync.go b/spv/sync.go index 6329f6930..781ea071c 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -53,9 +53,9 @@ type Syncer struct { remoteAvailable chan struct{} remotesMu sync.Mutex - // missingCfiltersFetched is closed when the wallet's missing cfilters - // have been fetched. - missingCfiltersFetched chan struct{} + // headersFetched is closed when the initial getheaders loop has + // finished fetching headers. + headersFetched chan struct{} // Data filters // @@ -75,10 +75,6 @@ type Syncer struct { sidechains wallet.SidechainForest sidechainMu sync.Mutex - currentLocators []*chainhash.Hash - locatorGeneration uint - locatorMu sync.Mutex - // Holds all potential callbacks used to notify clients notifications *Notifications @@ -131,7 +127,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer { lp: lp, mempoolAdds: make(chan *chainhash.Hash), - missingCfiltersFetched: make(chan struct{}), + headersFetched: make(chan struct{}), } } @@ -333,12 +329,6 @@ func (s *Syncer) Run(ctx context.Context) error { log.Infof("Transactions synced through block %v height %d", &tipHash, tipHeight) } - locators, err := s.wallet.BlockLocators(ctx, nil) - if err != nil { - return err - } - s.currentLocators = locators - s.lp.AddrManager().Start() defer func() { err := s.lp.AddrManager().Stop() @@ -389,7 +379,19 @@ func (s *Syncer) Run(ctx context.Context) error { } s.fetchMissingCfiltersFinished() log.Debugf("Fetched all missing cfilters") - close(s.missingCfiltersFetched) + + // Next: fetch headers and cfilters up to mainchain tip. + s.fetchHeadersStart() + log.Debugf("Fetching headers and CFilters...") + err = s.getHeaders(ctx) + if err != nil { + return err + } + s.fetchHeadersFinished() + + // Signal that startup fetching of headers has completed. + close(s.headersFetched) + return nil }) @@ -1287,13 +1289,6 @@ func (s *Syncer) handleBlockAnnouncements(ctx context.Context, rp *p2p.RemotePee return err } - if len(bestChain) != 0 { - s.locatorMu.Lock() - s.currentLocators = nil - s.locatorGeneration++ - s.locatorMu.Unlock() - } - // Log connected blocks. for _, n := range bestChain { log.Infof("Connected block %v, height %d, %d wallet transaction(s)", @@ -1343,55 +1338,63 @@ func (s *Syncer) disconnectStragglers(height int32) { // locators. var hashStop chainhash.Hash -// getHeaders iteratively fetches headers from rp using the latest locators. -// Returns when no more headers are available. A sendheaders message is pushed -// to the peer when there are no more headers to fetch. -func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { - var locators []*chainhash.Hash - var generation uint - var err error - s.locatorMu.Lock() - locators = s.currentLocators - generation = s.locatorGeneration - if len(locators) == 0 { - locators, err = s.wallet.BlockLocators(ctx, nil) +// getHeaders fetches headers from peers until the wallet is up to date with +// all connected peers. This is part of the startup sync process. +func (s *Syncer) getHeaders(ctx context.Context) error { + + cnet := s.wallet.ChainParams().Net + + startTime := time.Now() + +nextbatch: + for ctx.Err() == nil { + tipHash, tipHeight := s.wallet.MainChainTip(ctx) + + // Determine if there are any peers from which to request newer + // headers. + rp, err := s.waitForRemote(ctx, pickForGetHeaders(tipHeight), false) if err != nil { - s.locatorMu.Unlock() return err } - s.currentLocators = locators - s.locatorGeneration++ - } - s.locatorMu.Unlock() - - var lastHeight int32 - cnet := s.wallet.ChainParams().Net + if rp == nil { + // All done. + log.Infof("Initial sync to block %s at height %d completed in %s", + tipHash, tipHeight, time.Since(startTime).Round(time.Second)) + return nil + } - for { - headers, err := rp.Headers(ctx, locators, &hashStop) + // Request headers from the selected peer. + locators, err := s.wallet.BlockLocators(ctx, nil) if err != nil { return err } + headers, err := rp.Headers(ctx, locators, &hashStop) + if err != nil { + continue nextbatch + } if len(headers) == 0 { // Ensure that the peer provided headers through the height // advertised during handshake. - if lastHeight < rp.InitialHeight() { + if rp.LastHeight() < rp.InitialHeight() { // Peer may not have provided any headers if our own locators // were up to date. Compare the best locator hash with the // advertised height. h, err := s.wallet.BlockHeader(ctx, locators[0]) if err == nil && int32(h.Height) < rp.InitialHeight() { - return errors.E(errors.Protocol, "peer did not provide "+ + err := errors.E(errors.Protocol, "peer did not provide "+ "headers through advertised height") + rp.Disconnect(err) + continue nextbatch } } - return rp.SendHeaders(ctx) + // Try to pick a different peer with a higher advertised + // height or check there are no such peers (thus we're + // done with fetching headers for initial sync). + continue nextbatch } - lastHeight = int32(headers[len(headers)-1].Height) - nodes := make([]*wallet.BlockNode, len(headers)) for i := range headers { // Determine the hash of the header. It is safe to use @@ -1423,7 +1426,11 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { if err != nil { s.sidechainMu.Unlock() rp.Disconnect(err) - return err + if !errors.Is(err, context.Canceled) { + log.Warnf("Disconnecting from %v due to header "+ + "validation error: %v", rp, err) + } + continue nextbatch } s.sidechainMu.Unlock() @@ -1449,7 +1456,8 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { } err = g.Wait() if err != nil { - return err + rp.Disconnect(err) + continue nextbatch } var added int @@ -1466,22 +1474,7 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { if added == 0 { s.sidechainMu.Unlock() - s.locatorMu.Lock() - if s.locatorGeneration > generation { - locators = s.currentLocators - } - if len(locators) == 0 { - locators, err = s.wallet.BlockLocators(ctx, nil) - if err != nil { - s.locatorMu.Unlock() - return err - } - s.currentLocators = locators - s.locatorGeneration++ - generation = s.locatorGeneration - } - s.locatorMu.Unlock() - continue + continue nextbatch } s.fetchHeadersProgress(headers[len(headers)-1]) log.Debugf("Fetched %d new header(s) ending at height %d from %v", @@ -1490,17 +1483,19 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { bestChain, err := s.wallet.EvaluateBestChain(ctx, &s.sidechains) if err != nil { s.sidechainMu.Unlock() - return err + rp.Disconnect(err) + continue nextbatch } if len(bestChain) == 0 { s.sidechainMu.Unlock() - continue + continue nextbatch } prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil) if err != nil { s.sidechainMu.Unlock() - return err + rp.Disconnect(err) + continue nextbatch } if len(prevChain) != 0 { @@ -1523,38 +1518,22 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { // Peers should not be significantly behind the new tip. s.setRequiredHeight(int32(tip.Header.Height)) s.disconnectStragglers(int32(tip.Header.Height)) - - // Generate new locators - s.locatorMu.Lock() - locators, err = s.wallet.BlockLocators(ctx, nil) - if err != nil { - s.locatorMu.Unlock() - return err - } - s.currentLocators = locators - s.locatorGeneration++ - s.locatorMu.Unlock() } + + return ctx.Err() } func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error { - // Continue with fetching headers only after missing cfilters have - // been fetched. + select { case <-ctx.Done(): return ctx.Err() - case <-s.missingCfiltersFetched: - } - - // Fetch any unseen headers from the peer. - s.fetchHeadersStart() - log.Debugf("Fetching headers from %v", rp.RemoteAddr()) - err := s.getHeaders(ctx, rp) - if err != nil { - return err + case <-s.headersFetched: + case <-rp.Done(): + return rp.Err() } - s.fetchHeadersFinished() + var err error if atomic.CompareAndSwapUint32(&s.atomicCatchUpTryLock, 0, 1) { err = func() error { rescanPoint, err := s.wallet.RescanPoint(ctx) @@ -1631,15 +1610,16 @@ func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error { log.Errorf("Cannot load unmined transactions for resending: %v", err) return nil } - if len(unminedTxs) == 0 { - return nil - } - err = rp.PublishTransactions(ctx, unminedTxs...) - if err != nil { - // TODO: Transactions should be removed if this is a double spend. - log.Errorf("Failed to resent one or more unmined transactions: %v", err) + if len(unminedTxs) > 0 { + err = rp.PublishTransactions(ctx, unminedTxs...) + if err != nil { + // TODO: Transactions should be removed if this is a double spend. + log.Errorf("Failed to resent one or more unmined transactions: %v", err) + } } - return nil + + // Ask peer to send any new headers. + return rp.SendHeaders(ctx) } // handleMempool handles eviction from the local mempool of non-wallet-backed