diff --git a/libs/tendermint/blockchain/v0/reactor.go b/libs/tendermint/blockchain/v0/reactor.go index 67c9a2a4b6..0b6a67c4c9 100644 --- a/libs/tendermint/blockchain/v0/reactor.go +++ b/libs/tendermint/blockchain/v0/reactor.go @@ -244,13 +244,6 @@ func (bcR *BlockchainReactor) poolRoutine() { bcR.setIsSyncing(false) }() - bcR.pool.SetHeight(bcR.store.Height() + 1) - bcR.pool.Stop() - bcR.pool.Reset() - bcR.pool.Start() - - blocksSynced := uint64(0) - conR, ok := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) if ok { conState, err := conR.SwitchToFastSync() @@ -260,6 +253,13 @@ func (bcR *BlockchainReactor) poolRoutine() { } chainID := bcR.curState.ChainID + bcR.pool.SetHeight(bcR.curState.LastBlockHeight + 1) + bcR.pool.Stop() + bcR.pool.Reset() + bcR.pool.Start() + + blocksSynced := uint64(0) + lastHundred := time.Now() lastRate := 0.0 @@ -372,6 +372,11 @@ FOR_LOOP: bcR.curState, _, err = bcR.blockExec.ApplyBlock(bcR.curState, firstID, first) // rpc if err != nil { // TODO This is bad, are we zombie? + // The block can't be committed, do we need to delete it from store db? + _, errDel := bcR.store.DeleteBlocksFromTop(first.Height-1) + if errDel != nil { + bcR.Logger.Error("Failed to delete blocks from top", "height", first.Height-1, "err", errDel) + } panic(fmt.Sprintf("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } blocksSynced++ diff --git a/libs/tendermint/consensus/replay_test.go b/libs/tendermint/consensus/replay_test.go index f5fe463278..5691a355a8 100644 --- a/libs/tendermint/consensus/replay_test.go +++ b/libs/tendermint/consensus/replay_test.go @@ -1145,6 +1145,18 @@ func (bs *mockBlockStore) PruneBlocks(height int64) (uint64, error) { return pruned, nil } +// DeleteBlocksFromTop removes block down to (but not including) a height. It returns number of blocks deleted. +func (bs *mockBlockStore) DeleteBlocksFromTop(height int64) (uint64, error) { + deleted := uint64(0) + top := bs.Height() + for i := top; i > height; i-- { + bs.chain[i] = nil + bs.commits[i] = nil + deleted++ + } + return deleted, nil +} + //--------------------------------------- // Test handshake/init chain diff --git a/libs/tendermint/libs/autofile/group.go b/libs/tendermint/libs/autofile/group.go index d8c717399d..f02cb8a214 100644 --- a/libs/tendermint/libs/autofile/group.go +++ b/libs/tendermint/libs/autofile/group.go @@ -160,7 +160,7 @@ func (g *Group) Close() { g.FlushAndSync() g.mtx.Lock() - _ = g.Head.closeFile() + _ = g.Head.Close() g.mtx.Unlock() } diff --git a/libs/tendermint/rpc/core/blocks_test.go b/libs/tendermint/rpc/core/blocks_test.go index c2039827e3..afa3508c49 100644 --- a/libs/tendermint/rpc/core/blocks_test.go +++ b/libs/tendermint/rpc/core/blocks_test.go @@ -127,5 +127,6 @@ func (mockBlockStore) LoadBlockPart(height int64, index int) *types.Part { retur func (mockBlockStore) LoadBlockCommit(height int64) *types.Commit { return nil } func (mockBlockStore) LoadSeenCommit(height int64) *types.Commit { return nil } func (mockBlockStore) PruneBlocks(height int64) (uint64, error) { return 0, nil } +func (mockBlockStore) DeleteBlocksFromTop(height int64) (uint64, error) { return 0, nil } func (mockBlockStore) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) { } diff --git a/libs/tendermint/state/services.go b/libs/tendermint/state/services.go index 93c72c8d8b..fbd1629337 100644 --- a/libs/tendermint/state/services.go +++ b/libs/tendermint/state/services.go @@ -24,6 +24,7 @@ type BlockStore interface { SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit) PruneBlocks(height int64) (uint64, error) + DeleteBlocksFromTop(height int64) (uint64, error) LoadBlockByHash(hash []byte) *types.Block LoadBlockPart(height int64, index int) *types.Part diff --git a/libs/tendermint/store/store.go b/libs/tendermint/store/store.go index 56a70c648d..048d2703c9 100644 --- a/libs/tendermint/store/store.go +++ b/libs/tendermint/store/store.go @@ -195,44 +195,59 @@ func (bs *BlockStore) LoadSeenCommit(height int64) *types.Commit { // PruneBlocks removes block up to (but not including) a height. It returns number of blocks pruned. func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { + return bs.deleteBatch(height, false) +} + +// DeleteBlocksFromTop removes block down to (but not including) a height. It returns number of blocks deleted. +func (bs *BlockStore) DeleteBlocksFromTop(height int64) (uint64, error) { + return bs.deleteBatch(height, true) +} + +func (bs *BlockStore) deleteBatch(height int64, deleteFromTop bool) (uint64, error) { if height <= 0 { return 0, fmt.Errorf("height must be greater than 0") } + bs.mtx.RLock() - if height > bs.height { - bs.mtx.RUnlock() - return 0, fmt.Errorf("cannot prune beyond the latest height %v", bs.height) - } + top := bs.height base := bs.base bs.mtx.RUnlock() + if height > top { + return 0, fmt.Errorf("cannot delete beyond the latest height %v, delete from top %t", top, deleteFromTop) + } if height < base { - return 0, fmt.Errorf("cannot prune to height %v, it is lower than base height %v", - height, base) + return 0, fmt.Errorf("cannot delete to height %v, it is lower than base height %v, delete from top %t", + height, base, deleteFromTop) } - pruned := uint64(0) + deleted := uint64(0) batch := bs.db.NewBatch() defer batch.Close() - flush := func(batch db.Batch, base int64) error { + flush := func(batch db.Batch, height int64) error { // We can't trust batches to be atomic, so update base first to make sure noone // tries to access missing blocks. bs.mtx.Lock() - bs.base = base + if deleteFromTop { + bs.height = height + } else { + bs.base = height + } bs.mtx.Unlock() bs.saveState() err := batch.WriteSync() if err != nil { - return fmt.Errorf("failed to prune up to height %v: %w", base, err) + batch.Close() + return fmt.Errorf("failed to delete to height %v, delete from top %t: %w", height, deleteFromTop, err) } batch.Close() return nil } - for h := base; h < height; h++ { + deleteFn := func(h int64) error { meta := bs.LoadBlockMeta(h) if meta == nil { // assume already deleted - continue + return nil } batch.Delete(calcBlockMetaKey(h)) batch.Delete(calcBlockHashKey(meta.BlockID.Hash)) @@ -241,16 +256,32 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { for p := 0; p < meta.BlockID.PartsHeader.Total; p++ { batch.Delete(calcBlockPartKey(h, p)) } - pruned++ + deleted++ // flush every 1000 blocks to avoid batches becoming too large - if pruned%1000 == 0 && pruned > 0 { + if deleted%1000 == 0 && deleted > 0 { err := flush(batch, h) if err != nil { - return 0, err + return err } batch = bs.db.NewBatch() - defer batch.Close() + } + return nil + } + + if deleteFromTop { + for h := top; h > height; h-- { + err := deleteFn(h) + if err != nil { + return 0, err + } + } + } else { + for h := base; h < height; h++ { + err := deleteFn(h) + if err != nil { + return 0, err + } } } @@ -258,7 +289,7 @@ func (bs *BlockStore) PruneBlocks(height int64) (uint64, error) { if err != nil { return 0, err } - return pruned, nil + return deleted, nil } // SaveBlock persists the given block, blockParts, and seenCommit to the underlying db. diff --git a/libs/tendermint/store/store_test.go b/libs/tendermint/store/store_test.go index f1f1e9bf00..6c8209d3ad 100644 --- a/libs/tendermint/store/store_test.go +++ b/libs/tendermint/store/store_test.go @@ -472,6 +472,92 @@ func TestPruneBlocks(t *testing.T) { assert.Nil(t, bs.LoadBlock(1501)) } +func TestDeleteBlocksFromTop(t *testing.T) { + config := cfg.ResetTestRoot("blockchain_reactor_test") + defer os.RemoveAll(config.RootDir) + state, err := sm.LoadStateFromDBOrGenesisFile(dbm.NewMemDB(), config.GenesisFile()) + require.NoError(t, err) + db := dbm.NewMemDB() + bs := NewBlockStore(db) + assert.EqualValues(t, 0, bs.Base()) + assert.EqualValues(t, 0, bs.Height()) + assert.EqualValues(t, 0, bs.Size()) + + // deleting an empty store should error, even when deleting to 0 + _, err = bs.DeleteBlocksFromTop(1) + require.Error(t, err) + + _, err = bs.DeleteBlocksFromTop(0) + require.Error(t, err) + + // make more than 1000 blocks, to test batch deletions + for h := int64(1); h <= 1500; h++ { + block := makeBlock(h, state, new(types.Commit)) + partSet := block.MakePartSet(2) + seenCommit := makeTestCommit(h, tmtime.Now()) + bs.SaveBlock(block, partSet, seenCommit) + } + + assert.EqualValues(t, 1, bs.Base()) + assert.EqualValues(t, 1500, bs.Height()) + assert.EqualValues(t, 1500, bs.Size()) + + deletedBlock := bs.LoadBlock(1201) + + // Check that basic pruning works + deleted, err := bs.DeleteBlocksFromTop(1200) + require.NoError(t, err) + assert.EqualValues(t, 300, deleted) + assert.EqualValues(t, 1, bs.Base()) + assert.EqualValues(t, 1200, bs.Height()) + assert.EqualValues(t, 1200, bs.Size()) + assert.EqualValues(t, BlockStoreStateJSON{ + Base: 1, + Height: 1200, + }, LoadBlockStoreStateJSON(db)) + + require.NotNil(t, bs.LoadBlock(1200)) + require.Nil(t, bs.LoadBlock(1201)) + require.Nil(t, bs.LoadBlockByHash(deletedBlock.Hash())) + require.Nil(t, bs.LoadBlockCommit(1201)) + require.Nil(t, bs.LoadBlockMeta(1201)) + require.Nil(t, bs.LoadBlockPart(1201, 1)) + + for i := int64(1201); i <= 1500; i++ { + require.Nil(t, bs.LoadBlock(i)) + } + for i := int64(1); i <= 1200; i++ { + require.NotNil(t, bs.LoadBlock(i)) + } + + // Deleting up the current height should error + _, err = bs.DeleteBlocksFromTop(1201) + require.Error(t, err) + + // Deleting down to the current height should work + deleted, err = bs.DeleteBlocksFromTop(1200) + require.NoError(t, err) + assert.EqualValues(t, 0, deleted) + + // Deleting again should work + deleted, err = bs.DeleteBlocksFromTop(1100) + require.NoError(t, err) + assert.EqualValues(t, 100, deleted) + assert.EqualValues(t, 1100, bs.Height()) + + // Deleting beyond the current height should error + _, err = bs.DeleteBlocksFromTop(1101) + require.Error(t, err) + + // Deleting to the current base should work + deleted, err = bs.DeleteBlocksFromTop(1) + require.NoError(t, err) + assert.EqualValues(t, 1099, deleted) + assert.Nil(t, bs.LoadBlock(2)) + assert.NotNil(t, bs.LoadBlock(1)) + assert.Nil(t, bs.LoadBlock(2)) +} + func TestLoadBlockMeta(t *testing.T) { bs, db := freshBlockStore() height := int64(10)