Skip to content

Commit

Permalink
Reorg ropsten solution (#2419)
Browse files Browse the repository at this point in the history
* Reorg ropsten solution

* Add option

* Print TD recalc

* Correct fix for eip1559

* Try to fix the unwind

* Print header progress and hash

* Not insert descendants of bad headers

* Print some more

* Print less

* Better way of marking bad headers

* Disable inSync

* Penalise peers who give incorrect chain pieces

* better fix for initial cycle

* Clean up

Co-authored-by: Alexey Sharp <[email protected]>
  • Loading branch information
AlexeyAkhunov and Alexey Sharp authored Jul 22, 2021
1 parent b957ae9 commit 026c4e9
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 22 deletions.
10 changes: 8 additions & 2 deletions cmd/sentry/download/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,11 @@ func (cs *ControlServerImpl) blockHeaders66(ctx context.Context, in *proto_sentr
if penalty == headerdownload.NoPenalty {
var canRequestMore bool
for _, segment := range segments {
requestMore := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId)))
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId)))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}

if canRequestMore {
Expand Down Expand Up @@ -491,8 +494,11 @@ func (cs *ControlServerImpl) blockHeaders65(ctx context.Context, in *proto_sentr
if penalty == headerdownload.NoPenalty {
var canRequestMore bool
for _, segment := range segments {
requestMore := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId)))
requestMore, penalties := cs.Hd.ProcessSegment(segment, false /* newBlock */, string(gointerfaces.ConvertH512ToBytes(in.PeerId)))
canRequestMore = canRequestMore || requestMore
if len(penalties) > 0 {
cs.Penalize(ctx, penalties)
}
}

if canRequestMore {
Expand Down
2 changes: 1 addition & 1 deletion core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ func (st *StateTransition) buyGas(gasBailout bool) error {
if st.gasFeeCap != nil {
balanceCheck = st.sharedBuyGasBalance.SetUint64(st.msg.Gas())
balanceCheck = balanceCheck.Mul(balanceCheck, st.gasFeeCap)
balanceCheck.Add(balanceCheck, st.value)
}
balanceCheck.Add(balanceCheck, st.value)
if have, want := st.state.GetBalance(st.msg.From()), balanceCheck; have.Cmp(want) < 0 {
if !gasBailout {
return fmt.Errorf("%w: address %v have %v want %v", ErrInsufficientFunds, st.msg.From().Hex(), have, want)
Expand Down
11 changes: 11 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
if err != nil {
return nil, err
}
if config.BadBlock != 0 {
var badHash common.Hash
if err = chainKv.View(context.Background(), func(tx ethdb.Tx) error {
var hErr error
badHash, hErr = rawdb.ReadCanonicalHash(tx, config.BadBlock)
return hErr
}); err != nil {
return nil, err
}
backend.stagedSync.UnwindTo(config.BadBlock-1, badHash)
}

go txpropagate.BroadcastPendingTxsToNetwork(backend.downloadCtx, backend.txPool, backend.txPoolP2PServer.RecentPeers, backend.downloadServer)

Expand Down
1 change: 1 addition & 0 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type Config struct {

Prune prune.Mode
BatchSize datasize.ByteSize // Batch size for execution stage
BadBlock uint64 // Block marked as bad (for forced reorg)

Snapshot Snapshot

Expand Down
35 changes: 26 additions & 9 deletions eth/stagedsync/stage_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,34 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg)
return err
}
badBlock := u.BadBlock != (common.Hash{})
for blockHeight := headerProgress; blockHeight > u.UnwindPoint; blockHeight-- {
if badBlock {
var hash common.Hash
if hash, err = rawdb.ReadCanonicalHash(tx, blockHeight); err != nil {
if badBlock {
cfg.hd.ReportBadHeader(u.BadBlock)
// Mark all descendants of bad block as bad too
headerCursor, cErr := tx.Cursor(dbutils.HeadersBucket)
if cErr != nil {
return cErr
}
defer headerCursor.Close()
var k, v []byte
for k, v, err = headerCursor.Seek(dbutils.EncodeBlockNumber(u.UnwindPoint + 1)); err == nil && k != nil; k, v, err = headerCursor.Next() {
var h types.Header
if err = rlp.DecodeBytes(v, &h); err != nil {
return err
}
cfg.hd.ReportBadHeader(hash)
if cfg.hd.IsBadHeader(h.ParentHash) {
cfg.hd.ReportBadHeader(h.Hash())
}
}
if err != nil {
return fmt.Errorf("iterate over headers to mark bad headers: %w", err)
}
}
for blockHeight := headerProgress; blockHeight > u.UnwindPoint; blockHeight-- {
if err = rawdb.DeleteCanonicalHash(tx, blockHeight); err != nil {
return err
}
}
if u.BadBlock != (common.Hash{}) {
cfg.hd.ReportBadHeader(u.BadBlock)
if badBlock {
// Find header with biggest TD
tdCursor, cErr := tx.Cursor(dbutils.HeaderTDBucket)
if cErr != nil {
Expand Down Expand Up @@ -303,14 +317,17 @@ func HeadersUnwind(u *UnwindState, s *StageState, tx ethdb.RwTx, cfg HeadersCfg)
return err
}
if maxNum == 0 {
// Read genesis hash
if maxHash, err = rawdb.ReadCanonicalHash(tx, 0); err != nil {
maxNum = u.UnwindPoint
if maxHash, err = rawdb.ReadCanonicalHash(tx, maxNum); err != nil {
return err
}
}
if err = rawdb.WriteHeadHeaderHash(tx, maxHash); err != nil {
return err
}
if err = u.Done(tx); err != nil {
return err
}
if err = s.Update(tx, maxNum); err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions eth/stagedsync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (s *Sync) Run(db ethdb.RwKV, tx ethdb.RwTx, firstCycle bool) error {
if err := s.SetCurrentStage(s.stages[0].ID); err != nil {
return err
}
// If there were unwinds at the start, a heavier but invalid chain may be present, so
// we relax the rules for Stage1
firstCycle = false
}

stage := s.stages[s.currentStage]
Expand Down
1 change: 1 addition & 0 deletions turbo/cli/default_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var DefaultFlags = []cli.Flag{
TLSKeyFlag,
TLSCACertFlag,
SyncLoopThrottleFlag,
BadBlockFlag,
utils.ListenPortFlag,
utils.ListenPort65Flag,
utils.NATFlag,
Expand Down
7 changes: 7 additions & 0 deletions turbo/cli/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ var (
Usage: "Sets the minimum time between sync loop starts (e.g. 1h30m, default is none)",
Value: "",
}

BadBlockFlag = cli.IntFlag{
Name: "bad.block",
Usage: "Marks block with given number bad and forces initial reorg before normal staged sync",
Value: 0,
}
)

func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config) {
Expand Down Expand Up @@ -201,6 +207,7 @@ func ApplyFlagsForEthConfig(ctx *cli.Context, cfg *ethconfig.Config) {
}
cfg.SyncLoopThrottle = syncLoopThrottle
}
cfg.BadBlock = uint64(ctx.GlobalInt(BadBlockFlag.Name))
}

func ApplyFlagsForEthConfigCobra(f *pflag.FlagSet, cfg *ethconfig.Config) {
Expand Down
25 changes: 15 additions & 10 deletions turbo/stages/headerdownload/header_algos.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (hd *HeaderDownload) extendUp(segment *ChainSegment, start, end int) error
}
}

if attachmentLink.persisted {
if _, bad := hd.badHeaders[attachmentLink.hash]; !bad && attachmentLink.persisted {
link := hd.links[linkHeader.Hash()]
hd.insertList = append(hd.insertList, link)
}
Expand Down Expand Up @@ -255,21 +255,21 @@ func (hd *HeaderDownload) extendDown(segment *ChainSegment, start, end int) (boo
}

// Connect connects some working trees using anchors of some, and a link of another
func (hd *HeaderDownload) connect(segment *ChainSegment, start, end int) error {
func (hd *HeaderDownload) connect(segment *ChainSegment, start, end int) ([]PenaltyItem, error) {
// Find attachment link again
linkHeader := segment.Headers[end-1]
// Find attachement anchors again
anchorHeader := segment.Headers[start]
attachmentLink, ok1 := hd.getLink(linkHeader.ParentHash)
if !ok1 {
return fmt.Errorf("connect attachment link not found for %x", linkHeader.ParentHash)
return nil, fmt.Errorf("connect attachment link not found for %x", linkHeader.ParentHash)
}
if attachmentLink.preverified && len(attachmentLink.next) > 0 {
return fmt.Errorf("cannot connect to preverified link %d with children", attachmentLink.blockHeight)
return nil, fmt.Errorf("cannot connect to preverified link %d with children", attachmentLink.blockHeight)
}
anchor, ok2 := hd.anchors[anchorHeader.Hash()]
if !ok2 {
return fmt.Errorf("connect attachment anchors not found for %x", anchorHeader.Hash())
return nil, fmt.Errorf("connect attachment anchors not found for %x", anchorHeader.Hash())
}
anchorPreverified := false
for _, link := range anchor.links {
Expand Down Expand Up @@ -300,11 +300,15 @@ func (hd *HeaderDownload) connect(segment *ChainSegment, start, end int) error {
// Mark the entire segment as preverified
hd.markPreverified(prevLink)
}
if attachmentLink.persisted {
var penalties []PenaltyItem
if _, bad := hd.badHeaders[attachmentLink.hash]; bad {
hd.invalidateAnchor(anchor)
penalties = append(penalties, PenaltyItem{Penalty: AbandonedAnchorPenalty, PeerID: anchor.peerID})
} else if attachmentLink.persisted {
link := hd.links[linkHeader.Hash()]
hd.insertList = append(hd.insertList, link)
}
return nil
return penalties, nil
}

// if anchor will be abandoned - given peerID will get Penalty
Expand Down Expand Up @@ -826,7 +830,7 @@ func (hi *HeaderInserter) BestHeaderChanged() bool {
// it allows higher-level algo immediately request more headers without waiting all stages precessing,
// speeds up visibility of new blocks
// It remember peerID - then later - if anchors created from segments will abandoned - this peerID gonna get Penalty
func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, peerID string) (requestMore bool) {
func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, peerID string) (requestMore bool, penalties []PenaltyItem) {
log.Debug("processSegment", "from", segment.Headers[0].Number.Uint64(), "to", segment.Headers[len(segment.Headers)-1].Number.Uint64())
hd.lock.Lock()
defer hd.lock.Unlock()
Expand All @@ -849,7 +853,8 @@ func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, p
if foundAnchor {
if foundTip {
// Connect
if err := hd.connect(segment, start, end); err != nil {
var err error
if penalties, err = hd.connect(segment, start, end); err != nil {
log.Debug("Connect failed", "error", err)
return
}
Expand Down Expand Up @@ -919,7 +924,7 @@ func (hd *HeaderDownload) ProcessSegment(segment *ChainSegment, newBlock bool, p
default:
}

return hd.requestChaining && requestMore
return hd.requestChaining && requestMore, penalties
}

func (hd *HeaderDownload) TopSeenHeight() uint64 {
Expand Down

0 comments on commit 026c4e9

Please sign in to comment.