diff --git a/chain/sync.go b/chain/sync.go index 8b9af670e..00240a721 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -9,6 +9,7 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" + "fmt" "net" "runtime/trace" "sync" @@ -392,6 +393,60 @@ func (s *Syncer) getHeaders(ctx context.Context) error { return nil } +// getMissingHeaders fetches missing headers one by one from dcrd. This assumes +// that the initial header sync and rescan have been performed and that dcrd's +// data filter (for block rescans) has been loaded. +// +// This falls back to doing a full getHeaders process when a large number of +// missing headers is detected. +func (s *Syncer) getMissingHeaders(ctx context.Context) error { + locators, _, err := s.wallet.BlockLocators(ctx, nil) + if err != nil { + return err + } + headers, err := s.rpc.Headers(ctx, locators, &hashStop) + if err != nil { + return err + } + if len(headers) == 0 { + return nil + } + if len(headers) == wire.MaxBlockHeadersPerMsg { + // There are too many block headers to go through them one by + // one. Perform a full getHeaders sync. + log.Warnf("Too many (%d) missing headers. Performing full header sync.", + len(headers)) + return s.getHeaders(ctx) + } + + // Perform a rescan of this block on dcrd's side, to have the same data + // as if it had been received from a blockConnected notification. + // + // This must be done one by one because a new block might have + // transactions that change the set of active addresses for the wallet, + // causing yet another set of transactions to be found in a subsequent + // block. + rescanHashes := make([]chainhash.Hash, 1) + for _, header := range headers { + rescanHashes[0] = header.BlockHash() + var relevantTxs []*wire.MsgTx + err := s.rpc.Rescan(ctx, rescanHashes, func(block *chainhash.Hash, txs []*wire.MsgTx) error { + relevantTxs = append(relevantTxs, txs...) + return nil + }) + if err != nil { + return err + } + + err = s.handleBlockConnected(ctx, header, relevantTxs, true) + if err != nil { + return err + } + } + + return nil +} + func normalizeAddress(addr string, defaultPort string) (hostport string, err error) { host, port, origErr := net.SplitHostPort(addr) if origErr == nil { @@ -664,6 +719,10 @@ func (s *Syncer) blockConnected(ctx context.Context, params json.RawMessage) err return err } + return s.handleBlockConnected(ctx, header, relevant, false) +} + +func (s *Syncer) handleBlockConnected(ctx context.Context, header *wire.BlockHeader, relevantTxs []*wire.MsgTx, isProcessingMissingBlock bool) error { // Ensure the ancestor is known to be in the main or in a side chain. // If it is not, this means we missed some blocks and should perform a // new round of initial header sync. @@ -672,11 +731,15 @@ func (s *Syncer) blockConnected(ctx context.Context, params json.RawMessage) err prevInSideChain := s.sidechains.HasSideChainBlock(&header.PrevBlock) s.sidechainsMu.Unlock() if !(prevInMainChain || prevInSideChain) { + if isProcessingMissingBlock { + return fmt.Errorf("broken assumption: received missing block " + + " without its parent in mainchain or sidechain") + } log.Infof("Received header for block %s (height %d) when "+ "parent %s not in main or side chain. Re-requesting "+ "missing headers.", header.BlockHash(), header.Height, header.PrevBlock) - return s.getHeaders(ctx) + return s.getMissingHeaders(ctx) } blockHash := header.BlockHash() @@ -699,7 +762,7 @@ func (s *Syncer) blockConnected(ctx context.Context, params json.RawMessage) err blockNode.BadCheckpoint() } s.sidechains.AddBlockNode(blockNode) - s.relevantTxs[blockHash] = relevant + s.relevantTxs[blockHash] = relevantTxs bestChain, err := s.wallet.EvaluateBestChain(ctx, &s.sidechains) if err != nil {