Skip to content

Commit

Permalink
Merge PR:Store state height dismatch (#1460)
Browse files Browse the repository at this point in the history
* use state height to start fast sync instead of store height

* close AutoFile when Group is closed

* delete block from store when commit failed during fast-sync

* call DeleteBlocksFromTop in fast-sync

* don't close batch when write sync failed

* fix bug in test cases

* print error info when delete blocks in store db failed

* close batch when write sync failed

Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
MinZhu09 and xiangjianmeng authored Jan 19, 2022
1 parent 7c866d4 commit a2f8671
Show file tree
Hide file tree
Showing 7 changed files with 161 additions and 25 deletions.
19 changes: 12 additions & 7 deletions libs/tendermint/blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,13 +244,6 @@ func (bcR *BlockchainReactor) poolRoutine() {
bcR.setIsSyncing(false)
}()

bcR.pool.SetHeight(bcR.store.Height() + 1)
bcR.pool.Stop()
bcR.pool.Reset()
bcR.pool.Start()

blocksSynced := uint64(0)

conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
if ok {
conState, err := conR.SwitchToFastSync()
Expand All @@ -260,6 +253,13 @@ func (bcR *BlockchainReactor) poolRoutine() {
}
chainID := bcR.curState.ChainID

bcR.pool.SetHeight(bcR.curState.LastBlockHeight + 1)
bcR.pool.Stop()
bcR.pool.Reset()
bcR.pool.Start()

blocksSynced := uint64(0)

lastHundred := time.Now()
lastRate := 0.0

Expand Down Expand Up @@ -372,6 +372,11 @@ FOR_LOOP:
bcR.curState, _, err = bcR.blockExec.ApplyBlock(bcR.curState, firstID, first) // rpc
if err != nil {
// TODO This is bad, are we zombie?
// The block can't be committed, do we need to delete it from store db?
_, errDel := bcR.store.DeleteBlocksFromTop(first.Height-1)
if errDel != nil {
bcR.Logger.Error("Failed to delete blocks from top", "height", first.Height-1, "err", errDel)
}
panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
}
blocksSynced++
Expand Down
12 changes: 12 additions & 0 deletions libs/tendermint/consensus/replay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,18 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) {
return pruned, nil
}

// DeleteBlocksFromTop removes block down to (but not including) a height. It returns number of blocks deleted.
func (bs *mockBlockStore) DeleteBlocksFromTop(height int64) (uint64, error) {
deleted := uint64(0)
top := bs.Height()
for i := top; i > height; i-- {
bs.chain[i] = nil
bs.commits[i] = nil
deleted++
}
return deleted, nil
}

//---------------------------------------
// Test handshake/init chain

Expand Down
2 changes: 1 addition & 1 deletion libs/tendermint/libs/autofile/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (g *Group) Close() {
g.FlushAndSync()

g.mtx.Lock()
_ = g.Head.closeFile()
_ = g.Head.Close()
g.mtx.Unlock()
}

Expand Down
1 change: 1 addition & 0 deletions libs/tendermint/rpc/core/blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,5 +127,6 @@ func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { retur
func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil }
func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil }
func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil }
func (mockBlockStore) DeleteBlocksFromTop(height int64) (uint64, error) { return 0, nil }
func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) {
}
1 change: 1 addition & 0 deletions libs/tendermint/state/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type BlockStore interface {
SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)

PruneBlocks(height int64) (uint64, error)
DeleteBlocksFromTop(height int64) (uint64, error)

LoadBlockByHash(hash []byte) *types.Block
LoadBlockPart(height int64, index int) *types.Part
Expand Down
65 changes: 48 additions & 17 deletions libs/tendermint/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,44 +195,59 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit {

// PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned.
func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) {
return bs.deleteBatch(height, false)
}

// DeleteBlocksFromTop removes block down to (but not including) a height. It returns number of blocks deleted.
func (bs *BlockStore) DeleteBlocksFromTop(height int64) (uint64, error) {
return bs.deleteBatch(height, true)
}

func (bs *BlockStore) deleteBatch(height int64, deleteFromTop bool) (uint64, error) {
if height <= 0 {
return 0, fmt.Errorf("height must be greater than 0")
}

bs.mtx.RLock()
if height > bs.height {
bs.mtx.RUnlock()
return 0, fmt.Errorf("cannot prune beyond the latest height %v", bs.height)
}
top := bs.height
base := bs.base
bs.mtx.RUnlock()
if height > top {
return 0, fmt.Errorf("cannot delete beyond the latest height %v, delete from top %t", top, deleteFromTop)
}
if height < base {
return 0, fmt.Errorf("cannot prune to height %v, it is lower than base height %v",
height, base)
return 0, fmt.Errorf("cannot delete to height %v, it is lower than base height %v, delete from top %t",
height, base, deleteFromTop)
}

pruned := uint64(0)
deleted := uint64(0)
batch := bs.db.NewBatch()
defer batch.Close()
flush := func(batch db.Batch, base int64) error {
flush := func(batch db.Batch, height int64) error {
// We can't trust batches to be atomic, so update base first to make sure noone
// tries to access missing blocks.
bs.mtx.Lock()
bs.base = base
if deleteFromTop {
bs.height = height
} else {
bs.base = height
}
bs.mtx.Unlock()
bs.saveState()

err := batch.WriteSync()
if err != nil {
return fmt.Errorf("failed to prune up to height %v: %w", base, err)
batch.Close()
return fmt.Errorf("failed to delete to height %v, delete from top %t: %w", height, deleteFromTop, err)
}
batch.Close()
return nil
}

for h := base; h < height; h++ {
deleteFn := func(h int64) error {
meta := bs.LoadBlockMeta(h)
if meta == nil { // assume already deleted
continue
return nil
}
batch.Delete(calcBlockMetaKey(h))
batch.Delete(calcBlockHashKey(meta.BlockID.Hash))
Expand All @@ -241,24 +256,40 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) {
for p := 0; p < meta.BlockID.PartsHeader.Total; p++ {
batch.Delete(calcBlockPartKey(h, p))
}
pruned++
deleted++

// flush every 1000 blocks to avoid batches becoming too large
if pruned%1000 == 0 && pruned > 0 {
if deleted%1000 == 0 && deleted > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
return err
}
batch = bs.db.NewBatch()
defer batch.Close()
}
return nil
}

if deleteFromTop {
for h := top; h > height; h-- {
err := deleteFn(h)
if err != nil {
return 0, err
}
}
} else {
for h := base; h < height; h++ {
err := deleteFn(h)
if err != nil {
return 0, err
}
}
}

err := flush(batch, height)
if err != nil {
return 0, err
}
return pruned, nil
return deleted, nil
}

// SaveBlock persists the given block, blockParts, and seenCommit to the underlying db.
Expand Down
86 changes: 86 additions & 0 deletions libs/tendermint/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,92 @@ func TestPruneBlocks(t *testing.T) {
assert.Nil(t, bs.LoadBlock(1501))
}

func TestDeleteBlocksFromTop(t *testing.T) {
config := cfg.ResetTestRoot("blockchain_reactor_test")
defer os.RemoveAll(config.RootDir)
state, err := sm.LoadStateFromDBOrGenesisFile(dbm.NewMemDB(), config.GenesisFile())
require.NoError(t, err)
db := dbm.NewMemDB()
bs := NewBlockStore(db)
assert.EqualValues(t, 0, bs.Base())
assert.EqualValues(t, 0, bs.Height())
assert.EqualValues(t, 0, bs.Size())

// deleting an empty store should error, even when deleting to 0
_, err = bs.DeleteBlocksFromTop(1)
require.Error(t, err)

_, err = bs.DeleteBlocksFromTop(0)
require.Error(t, err)

// make more than 1000 blocks, to test batch deletions
for h := int64(1); h <= 1500; h++ {
block := makeBlock(h, state, new(types.Commit))
partSet := block.MakePartSet(2)
seenCommit := makeTestCommit(h, tmtime.Now())
bs.SaveBlock(block, partSet, seenCommit)
}

assert.EqualValues(t, 1, bs.Base())
assert.EqualValues(t, 1500, bs.Height())
assert.EqualValues(t, 1500, bs.Size())

deletedBlock := bs.LoadBlock(1201)

// Check that basic pruning works
deleted, err := bs.DeleteBlocksFromTop(1200)
require.NoError(t, err)
assert.EqualValues(t, 300, deleted)
assert.EqualValues(t, 1, bs.Base())
assert.EqualValues(t, 1200, bs.Height())
assert.EqualValues(t, 1200, bs.Size())
assert.EqualValues(t, BlockStoreStateJSON{
Base: 1,
Height: 1200,
}, LoadBlockStoreStateJSON(db))

require.NotNil(t, bs.LoadBlock(1200))
require.Nil(t, bs.LoadBlock(1201))
require.Nil(t, bs.LoadBlockByHash(deletedBlock.Hash()))
require.Nil(t, bs.LoadBlockCommit(1201))
require.Nil(t, bs.LoadBlockMeta(1201))
require.Nil(t, bs.LoadBlockPart(1201, 1))

for i := int64(1201); i <= 1500; i++ {
require.Nil(t, bs.LoadBlock(i))
}
for i := int64(1); i <= 1200; i++ {
require.NotNil(t, bs.LoadBlock(i))
}

// Deleting up the current height should error
_, err = bs.DeleteBlocksFromTop(1201)
require.Error(t, err)

// Deleting down to the current height should work
deleted, err = bs.DeleteBlocksFromTop(1200)
require.NoError(t, err)
assert.EqualValues(t, 0, deleted)

// Deleting again should work
deleted, err = bs.DeleteBlocksFromTop(1100)
require.NoError(t, err)
assert.EqualValues(t, 100, deleted)
assert.EqualValues(t, 1100, bs.Height())

// Deleting beyond the current height should error
_, err = bs.DeleteBlocksFromTop(1101)
require.Error(t, err)

// Deleting to the current base should work
deleted, err = bs.DeleteBlocksFromTop(1)
require.NoError(t, err)
assert.EqualValues(t, 1099, deleted)
assert.Nil(t, bs.LoadBlock(2))
assert.NotNil(t, bs.LoadBlock(1))
assert.Nil(t, bs.LoadBlock(2))
}

func TestLoadBlockMeta(t *testing.T) {
bs, db := freshBlockStore()
height := int64(10)
Expand Down

0 comments on commit a2f8671

Please sign in to comment.