Skip to content

Commit

Permalink
spv: Add watchdog timer to cfilter fetching
Browse files Browse the repository at this point in the history
This commits adds a watchdog timer to the initial cfilter fetching,
triggered when a batch takes too long to be successfully fetched.

In some situations, specially close to the tip, the sidechain forest may
be populated with one (or more) headers for which no cfilter is
available in the network.  In this situation, the syncer could end up in
a state where it does not advance due to waiting for cfilters that will
never be sent by any peers.

To alleviate this issue, a watchdog timer is started for every batch of
cfilters. If that timer is triggered, then the entire sidechain forest
is pruned, ensuring the next batch of headers that will be fetched will
be using fresh block locators and that the best chain selected for
fetching cfilters will be for these fresh headers.

The timer interval is set for 2 minutes, which given the stall timer of
30 seconds per peer means at least 4 peers will be searched for the
cfilters, before giving up and resetting the forest.
  • Loading branch information
matheusd committed Nov 29, 2023
1 parent adba0d1 commit 66a3e69
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 3 deletions.
42 changes: 39 additions & 3 deletions spv/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"runtime"
"sync"
"time"

"decred.org/dcrwallet/v4/errors"
"decred.org/dcrwallet/v4/p2p"
Expand Down Expand Up @@ -104,6 +105,10 @@ func (s *Syncer) CFiltersV2(ctx context.Context, blockHashes []*chainhash.Hash)
}
}

// errCfilterWatchdogTriggered is an internal error generated when a batch
// of cfilters takes too long to be fetched.
var errCfilterWatchdogTriggered = errors.New("getCFilters watchdog triggered")

// cfiltersV2FromNodes fetches cfilters for all the specified nodes from a
// remote peer.
func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockNode) error {
Expand Down Expand Up @@ -132,14 +137,43 @@ func (s *Syncer) cfiltersV2FromNodes(ctx context.Context, nodes []*wallet.BlockN
}
lastHeight := nodes[len(nodes)-1].Header.Height

// Specially once we get close to the tip, we may have a header in the
// best sidechain that has been reorged out and thus no peer will have
// its corresponding CFilters. To recover from this case in a timely
// manner, we setup a special watchdog context that, if triggered, will
// make us clean up the sidechain forest, forcing a request of fresh
// headers from all remote peers.
//
// Peers have a 30s stall timeout protection, therefore a 2 minute
// watchdog interval means we'll try at least 4 different peers before
// resetting.
const watchdogTimeoutInterval = 2 * time.Minute
watchdogCtx, cancelWatchdog := context.WithTimeout(ctx, time.Minute)
defer cancelWatchdog()

nextTry:
for ctx.Err() == nil {
// Select a peer that should have these cfilters.
rp, err := s.waitForRemote(ctx, pickForGetCfilters(int32(lastHeight)), true)
rp, err := s.waitForRemote(watchdogCtx, pickForGetCfilters(int32(lastHeight)), true)
if watchdogCtx.Err() != nil && ctx.Err() == nil {
// Watchdog timer triggered. Reset sidechain forest.
lastNode := nodes[len(nodes)-1]
log.Warnf("Batch of CFilters ending on block %s at "+
"height %d not received within %s. Clearing "+
"sidechain forest to retry with different "+
"headers", lastNode.Hash, lastNode.Header.Height,
watchdogTimeoutInterval)
s.sidechainMu.Lock()
s.sidechains.PruneAll()
s.sidechainMu.Unlock()
return errCfilterWatchdogTriggered
}
if err != nil {
return err
}

startTime := time.Now()

// TODO: Fetch using getcfsv2 if peer supports batched cfilter fetching.

filters, err := rp.CFiltersV2(ctx, nodeHashes)
Expand Down Expand Up @@ -169,8 +203,10 @@ nextTry:
nodes[i].FilterV2 = filters[i].Filter
}
s.sidechainMu.Unlock()
log.Tracef("Fetched %d new cfilters(s) ending at height %d from %v",
len(nodes), nodes[len(nodes)-1].Header.Height, rp)
log.Tracef("Fetched %d new cfilters(s) ending at height %d "+
"from %v (request took %s)",
len(nodes), nodes[len(nodes)-1].Header.Height, rp,
time.Since(startTime).Truncate(time.Millisecond))
return nil
}

Expand Down
5 changes: 5 additions & 0 deletions wallet/sidechains.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ func (f *SidechainForest) AddBlockNode(n *BlockNode) bool {
return true
}

// PruneAll removes all sidechains.
func (f *SidechainForest) PruneAll() {
f.trees = nil
}

// Prune removes any sidechain trees which contain a root that is significantly
// behind the current main chain tip block.
func (f *SidechainForest) Prune(mainChainHeight int32, params *chaincfg.Params) {
Expand Down

0 comments on commit 66a3e69

Please sign in to comment.