Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network: Class-based Peer Selector #5937

Merged
merged 15 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions catchup/catchpointService.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package catchup

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand Down Expand Up @@ -69,7 +70,7 @@ type CatchpointCatchupStats struct {
type CatchpointCatchupService struct {
// stats is the statistics object, updated async while downloading the ledger
stats CatchpointCatchupStats
// statsMu synchronizes access to stats, as we could attempt to update it while querying for it's current state
// statsMu synchronizes access to stats, as we could attempt to update it while querying for its current state
statsMu deadlock.Mutex
node CatchpointCatchupNodeServices
// ctx is the node cancellation context, used when the node is being stopped.
Expand Down Expand Up @@ -98,7 +99,7 @@ type CatchpointCatchupService struct {
abortCtx context.Context
abortCtxFunc context.CancelFunc
// blocksDownloadPeerSelector is the peer selector used for downloading blocks.
blocksDownloadPeerSelector *peerSelector
blocksDownloadPeerSelector peerSelector
}

// MakeResumedCatchpointCatchupService creates a catchpoint catchup service for a node that is already in catchpoint catchup mode
Expand Down Expand Up @@ -280,51 +281,50 @@ func (cs *CatchpointCatchupService) processStageInactive() (err error) {
}

// processStageLedgerDownload is the second catchpoint catchup stage. It downloads the ledger.
func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
func (cs *CatchpointCatchupService) processStageLedgerDownload() error {
cs.statsMu.Lock()
label := cs.stats.CatchpointLabel
cs.statsMu.Unlock()
round, _, err0 := ledgercore.ParseCatchpointLabel(label)
round, _, err := ledgercore.ParseCatchpointLabel(label)

if err0 != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err0))
if err != nil {
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to parse label : %v", err))
}

// download balances file.
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
lf := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
attemptsCount := 0

for {
attemptsCount++

err = cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err != nil {
err0 := cs.ledgerAccessor.ResetStagingBalances(cs.ctx, true)
if err0 != nil {
if cs.ctx.Err() != nil {
return cs.stopOrAbort()
}
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err))
return cs.abort(fmt.Errorf("processStageLedgerDownload failed to reset staging balances : %v", err0))
}
psp, err := peerSelector.getNextPeer()
if err != nil {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err)
psp, err0 := cs.blocksDownloadPeerSelector.getNextPeer()
if err0 != nil {
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup was unable to obtain a list of peers to retrieve the catchpoint file from")
return cs.abort(err0)
}
peer := psp.Peer
start := time.Now()
err = ledgerFetcher.downloadLedger(cs.ctx, peer, round)
if err == nil {
err0 = lf.downloadLedger(cs.ctx, peer, round)
if err0 == nil {
cs.log.Infof("ledger downloaded in %d seconds", time.Since(start)/time.Second)
start = time.Now()
err = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err == nil {
err0 = cs.ledgerAccessor.BuildMerkleTrie(cs.ctx, cs.updateVerifiedCounts)
if err0 == nil {
cs.log.Infof("built merkle trie in %d seconds", time.Since(start)/time.Second)
break
}
// failed to build the merkle trie for the above catchpoint file.
peerSelector.rankPeer(psp, peerRankInvalidDownload)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankInvalidDownload)
} else {
peerSelector.rankPeer(psp, peerRankDownloadFailed)
cs.blocksDownloadPeerSelector.rankPeer(psp, peerRankDownloadFailed)
}

// instead of testing for err == cs.ctx.Err() , we'll check on the context itself.
Expand All @@ -335,10 +335,10 @@ func (cs *CatchpointCatchupService) processStageLedgerDownload() (err error) {
}

if attemptsCount >= cs.config.CatchupLedgerDownloadRetryAttempts {
err = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err)
err0 = fmt.Errorf("processStageLedgerDownload: catchpoint catchup exceeded number of attempts to retrieve ledger")
return cs.abort(err0)
}
cs.log.Warnf("unable to download ledger : %v", err)
cs.log.Warnf("unable to download ledger : %v", err0)
}

err = cs.updateStage(ledger.CatchpointCatchupStateLatestBlockDownload)
Expand Down Expand Up @@ -506,14 +506,14 @@ func lookbackForStateproofsSupport(topBlock *bookkeeping.Block) uint64 {
return uint64(topBlock.Round().SubSaturate(lowestStateProofRound))
}

// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against it's predecessor.
// processStageBlocksDownload is the fourth catchpoint catchup stage. It downloads all the reminder of the blocks, verifying each one of them against its predecessor.
func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
topBlock, err := cs.ledgerAccessor.EnsureFirstBlock(cs.ctx)
if err != nil {
return cs.abort(fmt.Errorf("processStageBlocksDownload failed, unable to ensure first block : %v", err))
}

// pick the lookback with the greater of
// pick the lookback with the greatest of
// either (MaxTxnLife+DeeperBlockHeaderHistory+CatchpointLookback) or MaxBalLookback
// Explanation:
// 1. catchpoint snapshots accounts at round X-CatchpointLookback
Expand All @@ -531,13 +531,13 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
}

// in case the effective lookback is going before our rounds count, trim it there.
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback..MaxTxnLife)
// ( a catchpoint is generated starting round MaxBalLookback, and this is a possible in any round in the range of MaxBalLookback...MaxTxnLife)
if lookback >= uint64(topBlock.Round()) {
lookback = uint64(topBlock.Round() - 1)
}

cs.statsMu.Lock()
cs.stats.TotalBlocks = uint64(lookback)
cs.stats.TotalBlocks = lookback
cs.stats.AcquiredBlocks = 0
cs.stats.VerifiedBlocks = 0
cs.statsMu.Unlock()
Expand All @@ -558,8 +558,9 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
blk = &ledgerBlock
cert = &ledgerCert
} else {
switch err0.(type) {
case ledgercore.ErrNoEntry:
var errNoEntry ledgercore.ErrNoEntry
switch {
case errors.As(err0, &errNoEntry):
// this is expected, ignore this one.
default:
cs.log.Warnf("processStageBlocksDownload encountered the following error when attempting to retrieve the block for round %d : %v", topBlock.Round()-basics.Round(blocksFetched), err0)
Expand Down Expand Up @@ -658,7 +659,7 @@ func (cs *CatchpointCatchupService) processStageBlocksDownload() (err error) {
func (cs *CatchpointCatchupService) fetchBlock(round basics.Round, retryCount uint64) (blk *bookkeeping.Block, cert *agreement.Certificate, downloadDuration time.Duration, psp *peerSelectorPeer, stop bool, err error) {
psp, err = cs.blocksDownloadPeerSelector.getNextPeer()
if err != nil {
if err == errPeerSelectorNoPeerPoolsAvailable {
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
cs.log.Infof("fetchBlock: unable to obtain a list of peers to retrieve the latest block from; will retry shortly.")
// this is a possible on startup, since the network package might have yet to retrieve the list of peers.
time.Sleep(noPeersAvailableSleepInterval)
Expand Down Expand Up @@ -718,7 +719,7 @@ func (cs *CatchpointCatchupService) processStageSwitch() (err error) {
// stopOrAbort is called when any of the stage processing function sees that cs.ctx has been canceled. It can be
// due to the end user attempting to abort the current catchpoint catchup operation or due to a node shutdown.
func (cs *CatchpointCatchupService) stopOrAbort() error {
if cs.abortCtx.Err() == context.Canceled {
if errors.Is(cs.abortCtx.Err(), context.Canceled) {
return cs.abort(context.Canceled)
}
return nil
Expand Down Expand Up @@ -749,7 +750,7 @@ func (cs *CatchpointCatchupService) updateStage(newStage ledger.CatchpointCatchu
return nil
}

// updateNodeCatchupMode requests the node to change it's operational mode from
// updateNodeCatchupMode requests the node to change its operational mode from
// catchup mode to normal mode and vice versa.
func (cs *CatchpointCatchupService) updateNodeCatchupMode(catchupModeEnabled bool) {
newCtxCh := cs.node.SetCatchpointCatchupMode(catchupModeEnabled)
Expand Down Expand Up @@ -802,15 +803,7 @@ func (cs *CatchpointCatchupService) updateBlockRetrievalStatistics(acquiredBlock
}

func (cs *CatchpointCatchupService) initDownloadPeerSelector() {
cs.blocksDownloadPeerSelector = cs.makeCatchpointPeerSelector()
}

func (cs *CatchpointCatchupService) makeCatchpointPeerSelector() *peerSelector {
return makePeerSelector(
cs.net,
[]peerClass{
{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays},
})
cs.blocksDownloadPeerSelector = makeCatchpointPeerSelector(cs.net)
}

// checkLedgerDownload sends a HEAD request to the ledger endpoint of peers to validate the catchpoint's availability
Expand All @@ -821,10 +814,9 @@ func (cs *CatchpointCatchupService) checkLedgerDownload() error {
if err != nil {
return fmt.Errorf("failed to parse catchpoint label : %v", err)
}
peerSelector := cs.makeCatchpointPeerSelector()
ledgerFetcher := makeLedgerFetcher(cs.net, cs.ledgerAccessor, cs.log, cs, cs.config)
for i := 0; i < cs.config.CatchupLedgerDownloadRetryAttempts; i++ {
psp, peerError := peerSelector.getNextPeer()
psp, peerError := cs.blocksDownloadPeerSelector.getNextPeer()
if peerError != nil {
return err
}
Expand Down
153 changes: 153 additions & 0 deletions catchup/classBasedPeerSelector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright (C) 2019-2024 Algorand, Inc.
// This file is part of go-algorand
//
// go-algorand is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// go-algorand is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with go-algorand. If not, see <https://www.gnu.org/licenses/>.

package catchup

import (
"errors"
"github.com/algorand/go-algorand/network"
"github.com/algorand/go-deadlock"
"sort"
"time"
)

// The duration after which we reset the downloadFailures for a rankPooledPeerSelector
const lastCheckedDuration = 10 * time.Minute

// classBasedPeerSelector is a rankPooledPeerSelector that tracks and ranks classes of peers based on their response behavior.
// It is used to select the most appropriate peers to download blocks from - this is most useful when catching up
// and needing to figure out whether the blocks can be retrieved from relay nodes or require archive nodes.
type classBasedPeerSelector struct {
mu deadlock.Mutex
peerSelectors []*wrappedPeerSelector
}

func makeClassBasedPeerSelector(peerSelectors []*wrappedPeerSelector) *classBasedPeerSelector {
// Sort the peerSelectors by priority
sort.SliceStable(peerSelectors, func(i, j int) bool {
return peerSelectors[i].priority < peerSelectors[j].priority
})
return &classBasedPeerSelector{
peerSelectors: peerSelectors,
}
}

func (c *classBasedPeerSelector) rankPeer(psp *peerSelectorPeer, rank int) (int, int) {
c.mu.Lock()
defer c.mu.Unlock()

oldRank, newRank := -1, -1
for _, wp := range c.peerSelectors {
// See if the peer is in the class, ranking it appropriately if so
if psp.peerClass != wp.peerClass {
continue
}

oldRank, newRank = wp.peerSelector.rankPeer(psp, rank)
if oldRank < 0 || newRank < 0 {
// Peer not found in this selector
continue
}

// Peer was in this class, if there was any kind of download issue, we increment the failure count
if rank >= peerRankNoBlockForRound {
wp.downloadFailures++
}

break
}

return oldRank, newRank
}

func (c *classBasedPeerSelector) peerDownloadDurationToRank(psp *peerSelectorPeer, blockDownloadDuration time.Duration) (rank int) {
c.mu.Lock()
defer c.mu.Unlock()

for _, wp := range c.peerSelectors {
rank = wp.peerSelector.peerDownloadDurationToRank(psp, blockDownloadDuration)
// If rank is peerRankInvalidDownload, we check the next class's rankPooledPeerSelector
if rank >= peerRankInvalidDownload {
continue
}
// Should be a legit ranking, we return it
return rank
}
// If we reached here, we have exhausted all classes without finding the peer
return peerRankInvalidDownload
}

func (c *classBasedPeerSelector) getNextPeer() (psp *peerSelectorPeer, err error) {
c.mu.Lock()
defer c.mu.Unlock()
for _, wp := range c.peerSelectors {
if time.Since(wp.lastCheckedTime) > lastCheckedDuration {
wp.downloadFailures = 0
}

if wp.downloadFailures > wp.toleranceFactor {
// peerSelector is disabled for now, we move to the next one
continue
}
psp, err = wp.peerSelector.getNextPeer()
wp.lastCheckedTime = time.Now()
if err != nil {
// This is mostly just future-proofing, as we don't expect any other errors from getNextPeer
if errors.Is(err, errPeerSelectorNoPeerPoolsAvailable) {
// We penalize this class the equivalent of one download failure (in case this is transient)
wp.downloadFailures++
}
continue
}
return psp, nil
}
// If we reached here, we have exhausted all classes and still have no peers
return nil, errPeerSelectorNoPeerPoolsAvailable
}

type wrappedPeerSelector struct {
peerSelector peerSelector // The underlying peerSelector for this class
peerClass network.PeerOption // The class of peers the peerSelector is responsible for
toleranceFactor int // The number of times we can net fail for any reason before we move to the next class's rankPooledPeerSelector
priority int // The original priority of the peerSelector, used for sorting
downloadFailures int // The number of times we have failed to download a block from this class's rankPooledPeerSelector since it was last reset
lastCheckedTime time.Time // The last time we tried to use the peerSelector
}

// makeCatchpointPeerSelector returns a classBasedPeerSelector that selects peers based on their class and response behavior.
// These are the preferred configurations for the catchpoint service.
func makeCatchpointPeerSelector(net peersRetriever) peerSelector {
wrappedPeerSelectors := []*wrappedPeerSelector{
{
peerClass: network.PeersPhonebookRelays,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookRelays}}),
priority: peerRankInitialFirstPriority,
toleranceFactor: 3,
lastCheckedTime: time.Now(),
},
{
peerClass: network.PeersPhonebookArchivalNodes,
peerSelector: makeRankPooledPeerSelector(net,
[]peerClass{{initialRank: peerRankInitialFirstPriority, peerClass: network.PeersPhonebookArchivalNodes}}),
priority: peerRankInitialSecondPriority,
toleranceFactor: 10,
lastCheckedTime: time.Now(),
},
}

return makeClassBasedPeerSelector(wrappedPeerSelectors)
}
Loading
Loading