Skip to content

Commit

Permalink
reward: Prealod StakingInfo during state regen
Browse files Browse the repository at this point in the history
  • Loading branch information
blukat29 committed Jul 8, 2024
1 parent 4f15b2c commit 293b303
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 10 deletions.
6 changes: 1 addition & 5 deletions blockchain/system/multicall.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,7 @@ func (caller *ContractCallerForMultiCall) CallContract(ctx context.Context, call
}

// NewMultiCallContractCaller creates a new instance of ContractCaller for MultiCall contract.
func NewMultiCallContractCaller(chain backends.BlockChainForCaller, header *types.Header) (*multicall.MultiCallContractCaller, error) {
state, err := chain.StateAt(header.Root)
if err != nil {
return nil, err
}
func NewMultiCallContractCaller(state *state.StateDB, chain backends.BlockChainForCaller, header *types.Header) (*multicall.MultiCallContractCaller, error) {
c := &ContractCallerForMultiCall{state, chain, header}
return multicall.NewMultiCallContractCaller(MultiCallAddr, c)
}
3 changes: 2 additions & 1 deletion blockchain/system/multicall_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestContractCallerForMultiCall(t *testing.T) {
header := backend.BlockChain().CurrentHeader()
chain := backend.BlockChain()

caller, _ := NewMultiCallContractCaller(chain, header)
tempState, _ := backend.BlockChain().StateAt(header.Root)
caller, _ := NewMultiCallContractCaller(tempState, chain, header)
ret, err := caller.MultiCallStakingInfo(&bind.CallOpts{BlockNumber: header.Number})
assert.Nil(t, err)

Expand Down
59 changes: 59 additions & 0 deletions common/refcntmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package common

import (
"sync"
)

// RefCntMap is a map with reference counting.
type RefCountingMap struct {
m sync.Map
}

type refCountedElem struct {
value interface{}
count int
}

func NewRefCountingMap() *RefCountingMap {
return &RefCountingMap{}
}

// Inc adds an element and increments its reference count.
// If the key already exists, it only increments the reference count.
func (r *RefCountingMap) Inc(key interface{}, value interface{}) {
// Could be loaded or stored. Increment the count in either case.
mVal, _ := r.m.LoadOrStore(key, &refCountedElem{value: value, count: 0})
elem := mVal.(*refCountedElem)
elem.count++
}

// Dec decrements the reference count of the element with the given key.
// If the reference count reaches 0, the element is removed from the map.
func (r *RefCountingMap) Dec(key interface{}) {
if mVal, ok := r.m.Load(key); ok {
elem := mVal.(*refCountedElem)
elem.count--
if elem.count == 0 {
r.m.Delete(key)
}
}
}

// Get returns the value of the element with the given key. It does not incre
func (r *RefCountingMap) Get(key interface{}) (interface{}, bool) {
if mVal, ok := r.m.Load(key); ok {
return mVal.(*refCountedElem).value, true
}
return nil, false
}

// sync.Map does not have a method to get the length of the map.
// This function is a slow way to get the length; so use it only for testing.
func (r *RefCountingMap) Len() int {
size := 0
r.m.Range(func(key, value interface{}) bool {
size++
return true
})
return size
}
13 changes: 13 additions & 0 deletions node/cn/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/kaiachain/kaia/blockchain/types"
"github.com/kaiachain/kaia/blockchain/vm"
"github.com/kaiachain/kaia/common"
"github.com/kaiachain/kaia/reward"
statedb2 "github.com/kaiachain/kaia/storage/statedb"
)

Expand Down Expand Up @@ -112,7 +113,14 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD
start = time.Now()
logged time.Time
parent common.Hash

preloadedStakingBlockNums = make([]uint64, 0, origin-current.NumberU64())
)
defer func() {
for _, num := range preloadedStakingBlockNums {
reward.UnloadStakingInfo(num)
}
}()
for current.NumberU64() < origin {
// Print progress logs if long enough time elapsed
if report && time.Since(logged) > 8*time.Second {
Expand All @@ -123,6 +131,11 @@ func (cn *CN) stateAtBlock(block *types.Block, reexec uint64, base *state.StateD
if cn.config.DisableUnsafeDebug && time.Since(start) > cn.config.StateRegenerationTimeLimit {
return nil, fmt.Errorf("this request has queried old states too long since it exceeds the state regeneration time limit(%s)", cn.config.StateRegenerationTimeLimit.String())
}
// Preload StakingInfo for the current block based on the previous state. Needed for post-Kaia engine.Finalize().
preloadedStakingBlockNums = append(preloadedStakingBlockNums, current.NumberU64())
if err := reward.PreloadStakingInfoWithState(current.Header(), statedb); err != nil {
return nil, fmt.Errorf("processing block %d failed: %v", current.NumberU64()+1, err) // this is part of processing next block.
}
// Retrieve the next block to regenerate and process it
next := current.NumberU64() + 1
if current = cn.blockchain.GetBlockByNumber(next); current == nil {
Expand Down
74 changes: 73 additions & 1 deletion reward/staking_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ type StakingManager struct {
blockchain blockChain
chainHeadChan chan blockchain.ChainHeadEvent
chainHeadSub event.Subscription

// Preloaded stakingInfos are fetched before the GetStakingInfo request.
// This is used when the state is available when preloaded, but not available
// when GetStakingInfo is called. e.g. reexec loop in stateAtBlock.
// Therefore preloaded staking infos must not be evicted,
// and it should be only used temporarily, hence a separate mapping.
preloadedInfo *common.RefCountingMap
}

var (
Expand Down Expand Up @@ -104,6 +111,7 @@ func NewStakingManager(bc blockChain, gh governanceHelper, db stakingInfoDB) *St
governanceHelper: gh,
blockchain: bc,
chainHeadChan: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
preloadedInfo: common.NewRefCountingMap(),
}

// Before migration, staking information of current and before should be stored in DB.
Expand Down Expand Up @@ -234,6 +242,45 @@ func GetStakingInfoOnStakingBlock(stakingBlockNumber uint64) *StakingInfo {
return calcStakingInfo
}

// PreloadStakingInfoWithState fetches the stakingInfo based on the given state trie
// and then stores it in the preloaded map. Because preloaded map does not have eviction policy,
// it should be removed manually after use. Note that preloaded info is for the next block of the given header.
func PreloadStakingInfoWithState(header *types.Header, statedb *state.StateDB) error {
if stakingManager == nil {
return ErrStakingManagerNotSet
}

if !isKaiaForkEnabled(header.Number.Uint64() + 1) {
return nil // no need to preload staking info before kaia fork because we have it in the database.
}

if header.Root != statedb.IntermediateRoot(false) { // Sanity check
return fmt.Errorf("must supply the statedb for the given header") // this should not happen
}

num := header.Number.Uint64()
info, err := getStakingInfoFromMultiCallAtState(num, statedb.Copy(), header)
if err != nil {
return fmt.Errorf("staking info preload failed. root err: %v", err)
}
if info != nil {
stakingManager.preloadedInfo.Inc(num, info)
}
logger.Trace("preloaded staking info", "staking block number", num)
return nil
}

// UnloadStakingInfo removes a stakingInfo from the preloaded map.
// Must be executed after PreloadStakingInfoWithState(Header{num}).
func UnloadStakingInfo(num uint64) {
if stakingManager == nil {
logger.Error("unable to GetStakingInfo", "err", ErrStakingManagerNotSet)
return
}

stakingManager.preloadedInfo.Dec(num)
}

// updateKaiaStakingInfo updates kaia staking info in cache created from given block number.
// From Kaia fork, not only the staking block number but also the calculation of staking amounts is changed,
// so we need separate update function for kaia staking info.
Expand Down Expand Up @@ -287,8 +334,17 @@ func getStakingInfoFromMultiCall(blockNum uint64) (*StakingInfo, error) {
return nil, fmt.Errorf("failed to get header by number %d", blockNum)
}

tempState, err := stakingManager.blockchain.StateAt(header.Root)
if err != nil {
return nil, fmt.Errorf("failed to get state at number %d. root err: %s", blockNum, err)
}

return getStakingInfoFromMultiCallAtState(blockNum, tempState, header)
}

func getStakingInfoFromMultiCallAtState(blockNum uint64, tempState *state.StateDB, header *types.Header) (*StakingInfo, error) {
// Get staking info from multicall contract
caller, err := system.NewMultiCallContractCaller(stakingManager.blockchain, header)
caller, err := system.NewMultiCallContractCaller(tempState, stakingManager.blockchain, header)
if err != nil {
return nil, fmt.Errorf("failed to create multicall contract caller. root err: %s", err)
}
Expand Down Expand Up @@ -374,6 +430,15 @@ func getStakingInfoFromCache(blockNum uint64) *StakingInfo {
return cachedStakingInfo.(*StakingInfo)
}

if info, ok := stakingManager.preloadedInfo.Get(blockNum); ok {
info := info.(*StakingInfo)
// Fill in Gini coeff if not set. Modifies the cached object.
if err := fillMissingGiniCoefficient(info, blockNum); err != nil {
logger.Warn("Cannot fill in gini coefficient", "staking block number", blockNum, "err", err)
}
return info
}

return nil
}

Expand Down Expand Up @@ -517,6 +582,7 @@ func SetTestStakingManagerWithChain(bc blockChain, gh governanceHelper, db staki
governanceHelper: gh,
blockchain: bc,
chainHeadChan: make(chan blockchain.ChainHeadEvent, chainHeadChanSize),
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -526,6 +592,7 @@ func SetTestStakingManagerWithDB(testDB stakingInfoDB) {
SetTestStakingManager(&StakingManager{
blockchain: &blockchain.BlockChain{},
stakingInfoDB: testDB,
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -537,6 +604,7 @@ func SetTestStakingManagerWithStakingInfoCache(testInfo *StakingInfo) {
SetTestStakingManager(&StakingManager{
blockchain: &blockchain.BlockChain{},
stakingInfoCache: cache,
preloadedInfo: common.NewRefCountingMap(),
})
}

Expand All @@ -563,3 +631,7 @@ func SetTestAddressBookAddress(addr common.Address) {
func TestGetStakingCacheSize() int {
return GetStakingManager().stakingInfoCache.Len()
}

func TestGetStakingPreloadSize() int {
return GetStakingManager().preloadedInfo.Len()
}
7 changes: 4 additions & 3 deletions tests/state_reexec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func TestStateReexec(t *testing.T) {

testStateReexec_prune(t, ctx.nodes[0], []uint64{2, 3, 4, 5})
testStateReexec_run(t, ctx.nodes[0], 5) // post-kaia

assert.Equal(t, 0, reward.TestGetStakingPreloadSize()) // Ensure cleanup of preloaded staking info
}

func testStateReexec_config(forkNum *big.Int) *params.ChainConfig {
Expand Down Expand Up @@ -153,11 +155,10 @@ func testStateReexec_prune(t *testing.T, node *blockchainTestNode, nums []uint64
func testStateReexec_run(t *testing.T, node *blockchainTestNode, num uint64) {
block := node.cn.BlockChain().GetBlockByNumber(num)

t.Logf("Regenerating state at block %d", num)
state, err := node.cn.APIBackend.StateAtBlock(context.Background(), block, 10, nil, false, false)
require.Nil(t, err)
root, err := state.Commit(false)
require.Nil(t, err)

// Regenerated state must match the stored block's stateRoot
assert.Equal(t, block.Header().Root, root)
assert.Equal(t, block.Header().Root, state.IntermediateRoot(false))
}

0 comments on commit 293b303

Please sign in to comment.