From eaaca07e9a109ba372d9180a61795703b1c35e91 Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Mon, 23 Oct 2023 12:50:15 -0300 Subject: [PATCH 1/2] spv: Fetch headers in syncer instead of peer startup This moves the fetch headers and cfilters from the peer startup to the syncer startup. This avoids wasting duplicate effort for every peer. In the future, this will allow using multiple peers to fetch headers and cfilters from, as well as allowing asynchronous fetching. --- p2p/peering.go | 5 +++ spv/backend.go | 11 +++++ spv/sync.go | 118 +++++++++++++++++++++++++++++++------------------ 3 files changed, 92 insertions(+), 42 deletions(-) diff --git a/p2p/peering.go b/p2p/peering.go index 7ff622e98..20a7cc63a 100644 --- a/p2p/peering.go +++ b/p2p/peering.go @@ -708,6 +708,11 @@ func (rp *RemotePeer) Err() error { return rp.err } +// Done returns a channel that is closed once the peer disconnects. +func (rp *RemotePeer) Done() <-chan struct{} { + return rp.errc +} + // RemoteAddr returns the remote address of the peer's TCP connection. func (rp *RemotePeer) RemoteAddr() net.Addr { return rp.c.RemoteAddr() diff --git a/spv/backend.go b/spv/backend.go index fce221d1b..d7a05afc1 100644 --- a/spv/backend.go +++ b/spv/backend.go @@ -29,6 +29,17 @@ var _ wallet.NetworkBackend = (*Syncer)(nil) func pickAny(*p2p.RemotePeer) bool { return true } +// pickForGetHeaders returns a function to use in waitForRemotes which selects +// peers that may have headers that are more recent than the passed tipHeight. +func pickForGetHeaders(tipHeight int32) func(rp *p2p.RemotePeer) bool { + return func(rp *p2p.RemotePeer) bool { + // We are interested in this peer's headers if they announced a + // height greater than the current tip height and if we haven't + // yet fetched all the headers that it announced. + return rp.InitialHeight() > tipHeight && rp.LastHeight() < rp.InitialHeight() + } +} + // Blocks implements the Blocks method of the wallet.Peer interface. func (s *Syncer) Blocks(ctx context.Context, blockHashes []*chainhash.Hash) ([]*wire.MsgBlock, error) { for { diff --git a/spv/sync.go b/spv/sync.go index 6329f6930..0a166f64f 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -53,9 +53,9 @@ type Syncer struct { remoteAvailable chan struct{} remotesMu sync.Mutex - // missingCfiltersFetched is closed when the wallet's missing cfilters - // have been fetched. - missingCfiltersFetched chan struct{} + // headersFetched is closed when the initial getheaders loop has + // finished fetching headers. + headersFetched chan struct{} // Data filters // @@ -131,7 +131,7 @@ func NewSyncer(w *wallet.Wallet, lp *p2p.LocalPeer) *Syncer { lp: lp, mempoolAdds: make(chan *chainhash.Hash), - missingCfiltersFetched: make(chan struct{}), + headersFetched: make(chan struct{}), } } @@ -389,7 +389,19 @@ func (s *Syncer) Run(ctx context.Context) error { } s.fetchMissingCfiltersFinished() log.Debugf("Fetched all missing cfilters") - close(s.missingCfiltersFetched) + + // Next: fetch headers and cfilters up to mainchain tip. + s.fetchHeadersStart() + log.Debugf("Fetching headers and CFilters...") + err = s.getHeaders(ctx) + if err != nil { + return err + } + s.fetchHeadersFinished() + + // Signal that startup fetching of headers has completed. + close(s.headersFetched) + return nil }) @@ -1343,10 +1355,9 @@ func (s *Syncer) disconnectStragglers(height int32) { // locators. var hashStop chainhash.Hash -// getHeaders iteratively fetches headers from rp using the latest locators. -// Returns when no more headers are available. A sendheaders message is pushed -// to the peer when there are no more headers to fetch. -func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { +// getHeaders fetches headers from peers until the wallet is up to date with +// all connected peers. This is part of the startup sync process. +func (s *Syncer) getHeaders(ctx context.Context) error { var locators []*chainhash.Hash var generation uint var err error @@ -1364,34 +1375,54 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { } s.locatorMu.Unlock() - var lastHeight int32 cnet := s.wallet.ChainParams().Net - for { - headers, err := rp.Headers(ctx, locators, &hashStop) + startTime := time.Now() + +nextbatch: + for ctx.Err() == nil { + tipHash, tipHeight := s.wallet.MainChainTip(ctx) + + // Determine if there are any peers from which to request newer + // headers. + rp, err := s.waitForRemote(ctx, pickForGetHeaders(tipHeight), false) if err != nil { return err } + if rp == nil { + // All done. + log.Infof("Initial sync to block %s at height %d completed in %s", + tipHash, tipHeight, time.Since(startTime).Round(time.Second)) + return nil + } + + headers, err := rp.Headers(ctx, locators, &hashStop) + if err != nil { + continue nextbatch + } if len(headers) == 0 { // Ensure that the peer provided headers through the height // advertised during handshake. - if lastHeight < rp.InitialHeight() { + if rp.LastHeight() < rp.InitialHeight() { // Peer may not have provided any headers if our own locators // were up to date. Compare the best locator hash with the // advertised height. h, err := s.wallet.BlockHeader(ctx, locators[0]) if err == nil && int32(h.Height) < rp.InitialHeight() { - return errors.E(errors.Protocol, "peer did not provide "+ + err := errors.E(errors.Protocol, "peer did not provide "+ "headers through advertised height") + rp.Disconnect(err) + continue nextbatch } } - return rp.SendHeaders(ctx) + // Try to pick a different peer with a higher advertised + // height or check there are no such peers (thus we're + // done with fetching headers for initial sync). + continue nextbatch } - lastHeight = int32(headers[len(headers)-1].Height) - nodes := make([]*wallet.BlockNode, len(headers)) for i := range headers { // Determine the hash of the header. It is safe to use @@ -1423,7 +1454,11 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { if err != nil { s.sidechainMu.Unlock() rp.Disconnect(err) - return err + if !errors.Is(err, context.Canceled) { + log.Warnf("Disconnecting from %v due to header "+ + "validation error: %v", rp, err) + } + continue nextbatch } s.sidechainMu.Unlock() @@ -1449,7 +1484,8 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { } err = g.Wait() if err != nil { - return err + rp.Disconnect(err) + continue nextbatch } var added int @@ -1481,7 +1517,7 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { generation = s.locatorGeneration } s.locatorMu.Unlock() - continue + continue nextbatch } s.fetchHeadersProgress(headers[len(headers)-1]) log.Debugf("Fetched %d new header(s) ending at height %d from %v", @@ -1490,7 +1526,8 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { bestChain, err := s.wallet.EvaluateBestChain(ctx, &s.sidechains) if err != nil { s.sidechainMu.Unlock() - return err + rp.Disconnect(err) + continue nextbatch } if len(bestChain) == 0 { s.sidechainMu.Unlock() @@ -1500,7 +1537,8 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil) if err != nil { s.sidechainMu.Unlock() - return err + rp.Disconnect(err) + continue nextbatch } if len(prevChain) != 0 { @@ -1535,26 +1573,21 @@ func (s *Syncer) getHeaders(ctx context.Context, rp *p2p.RemotePeer) error { s.locatorGeneration++ s.locatorMu.Unlock() } + + return ctx.Err() } func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error { - // Continue with fetching headers only after missing cfilters have - // been fetched. + select { case <-ctx.Done(): return ctx.Err() - case <-s.missingCfiltersFetched: - } - - // Fetch any unseen headers from the peer. - s.fetchHeadersStart() - log.Debugf("Fetching headers from %v", rp.RemoteAddr()) - err := s.getHeaders(ctx, rp) - if err != nil { - return err + case <-s.headersFetched: + case <-rp.Done(): + return rp.Err() } - s.fetchHeadersFinished() + var err error if atomic.CompareAndSwapUint32(&s.atomicCatchUpTryLock, 0, 1) { err = func() error { rescanPoint, err := s.wallet.RescanPoint(ctx) @@ -1631,15 +1664,16 @@ func (s *Syncer) startupSync(ctx context.Context, rp *p2p.RemotePeer) error { log.Errorf("Cannot load unmined transactions for resending: %v", err) return nil } - if len(unminedTxs) == 0 { - return nil - } - err = rp.PublishTransactions(ctx, unminedTxs...) - if err != nil { - // TODO: Transactions should be removed if this is a double spend. - log.Errorf("Failed to resent one or more unmined transactions: %v", err) + if len(unminedTxs) > 0 { + err = rp.PublishTransactions(ctx, unminedTxs...) + if err != nil { + // TODO: Transactions should be removed if this is a double spend. + log.Errorf("Failed to resent one or more unmined transactions: %v", err) + } } - return nil + + // Ask peer to send any new headers. + return rp.SendHeaders(ctx) } // handleMempool handles eviction from the local mempool of non-wallet-backed From 0203f769658c25b332f8c32724b54c971645fb8c Mon Sep 17 00:00:00 2001 From: Matheus Degiovani Date: Tue, 31 Oct 2023 10:02:15 -0300 Subject: [PATCH 2/2] spv: Drop syncer-global block locators The locators are now only needed during the initial getHeaders sync stage, therefore they are not needed in the global syncer struct. Since they are accessed only from a single goroutine now, the mutex is also unneeded. --- spv/sync.go | 66 +++++------------------------------------------------ 1 file changed, 6 insertions(+), 60 deletions(-) diff --git a/spv/sync.go b/spv/sync.go index 0a166f64f..781ea071c 100644 --- a/spv/sync.go +++ b/spv/sync.go @@ -75,10 +75,6 @@ type Syncer struct { sidechains wallet.SidechainForest sidechainMu sync.Mutex - currentLocators []*chainhash.Hash - locatorGeneration uint - locatorMu sync.Mutex - // Holds all potential callbacks used to notify clients notifications *Notifications @@ -333,12 +329,6 @@ func (s *Syncer) Run(ctx context.Context) error { log.Infof("Transactions synced through block %v height %d", &tipHash, tipHeight) } - locators, err := s.wallet.BlockLocators(ctx, nil) - if err != nil { - return err - } - s.currentLocators = locators - s.lp.AddrManager().Start() defer func() { err := s.lp.AddrManager().Stop() @@ -1299,13 +1289,6 @@ func (s *Syncer) handleBlockAnnouncements(ctx context.Context, rp *p2p.RemotePee return err } - if len(bestChain) != 0 { - s.locatorMu.Lock() - s.currentLocators = nil - s.locatorGeneration++ - s.locatorMu.Unlock() - } - // Log connected blocks. for _, n := range bestChain { log.Infof("Connected block %v, height %d, %d wallet transaction(s)", @@ -1358,22 +1341,6 @@ var hashStop chainhash.Hash // getHeaders fetches headers from peers until the wallet is up to date with // all connected peers. This is part of the startup sync process. func (s *Syncer) getHeaders(ctx context.Context) error { - var locators []*chainhash.Hash - var generation uint - var err error - s.locatorMu.Lock() - locators = s.currentLocators - generation = s.locatorGeneration - if len(locators) == 0 { - locators, err = s.wallet.BlockLocators(ctx, nil) - if err != nil { - s.locatorMu.Unlock() - return err - } - s.currentLocators = locators - s.locatorGeneration++ - } - s.locatorMu.Unlock() cnet := s.wallet.ChainParams().Net @@ -1396,6 +1363,11 @@ nextbatch: return nil } + // Request headers from the selected peer. + locators, err := s.wallet.BlockLocators(ctx, nil) + if err != nil { + return err + } headers, err := rp.Headers(ctx, locators, &hashStop) if err != nil { continue nextbatch @@ -1502,21 +1474,6 @@ nextbatch: if added == 0 { s.sidechainMu.Unlock() - s.locatorMu.Lock() - if s.locatorGeneration > generation { - locators = s.currentLocators - } - if len(locators) == 0 { - locators, err = s.wallet.BlockLocators(ctx, nil) - if err != nil { - s.locatorMu.Unlock() - return err - } - s.currentLocators = locators - s.locatorGeneration++ - generation = s.locatorGeneration - } - s.locatorMu.Unlock() continue nextbatch } s.fetchHeadersProgress(headers[len(headers)-1]) @@ -1531,7 +1488,7 @@ nextbatch: } if len(bestChain) == 0 { s.sidechainMu.Unlock() - continue + continue nextbatch } prevChain, err := s.wallet.ChainSwitch(ctx, &s.sidechains, bestChain, nil) @@ -1561,17 +1518,6 @@ nextbatch: // Peers should not be significantly behind the new tip. s.setRequiredHeight(int32(tip.Header.Height)) s.disconnectStragglers(int32(tip.Header.Height)) - - // Generate new locators - s.locatorMu.Lock() - locators, err = s.wallet.BlockLocators(ctx, nil) - if err != nil { - s.locatorMu.Unlock() - return err - } - s.currentLocators = locators - s.locatorGeneration++ - s.locatorMu.Unlock() } return ctx.Err()