Skip to content

Commit

Permalink
Merge PR: refactor orphan map (#1969)
Browse files Browse the repository at this point in the history
* refactor orphan map (#1960)

* refactor orphan map

* func (oi *OrphanInfo) enqueueResult(res int64)

* rename

* rename

* rm 	oi.orphanItemCacheQueue

* rm 	oi.orphanItemCacheQueue

* type NodeCache struct

* simplify names

* simplify names
zhongqiuwood authored May 4, 2022
1 parent 5076a1f commit c1d1ef0
Showing 10 changed files with 303 additions and 169 deletions.
5 changes: 3 additions & 2 deletions dev/start.sh
Original file line number Diff line number Diff line change
@@ -23,7 +23,8 @@ killbyname() {
run() {
LOG_LEVEL=main:debug,iavl:info,*:error,state:info,provider:info

exchaind start --pruning=nothing --rpc.unsafe \
# exchaind start --pruning=nothing --rpc.unsafe \
exchaind start --rpc.unsafe \
--local-rpc-port 26657 \
--log_level $LOG_LEVEL \
--log_file json \
@@ -33,7 +34,7 @@ run() {
--iavl-enable-async-commit \
--enable-gid \
--append-pid=true \
--iavl-commit-interval-height 10 \
--iavl-commit-interval-height 5 \
--iavl-output-modules evm=0,acc=0 \
--trace --home $HOME_SERVER --chain-id $CHAINID \
--elapsed Round=1,CommitRound=1,Produce=1 \
5 changes: 2 additions & 3 deletions libs/iavl/mutable_tree.go
Original file line number Diff line number Diff line change
@@ -121,9 +121,8 @@ func (tree *MutableTree) IsEmpty() bool {

// VersionExists returns whether or not a version exists.
func (tree *MutableTree) VersionExists(version int64) bool {
tree.ndb.mtx.Lock()
defer tree.ndb.mtx.Unlock()
if tree.ndb.heightOrphansMap[version] != nil {
_, ok := tree.ndb.findRootHash(version)
if ok {
return true
}
return tree.versions.Get(version)
56 changes: 28 additions & 28 deletions libs/iavl/mutable_tree_oec.go
Original file line number Diff line number Diff line change
@@ -41,9 +41,12 @@ type commitEvent struct {
iavlHeight int
}


func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte, int64, error) {
moduleName := tree.GetModuleName()
oldRoot, saved := tree.hasSaved(version)

tree.ndb.sanityCheckHandleOrphansResult(version)

oldRoot, saved := tree.ndb.findRootHash(version)
if saved {
return nil, version, fmt.Errorf("existing version: %d, root: %X", version, oldRoot)
}
@@ -65,34 +68,34 @@ func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte
}
}

tree.ndb.SaveOrphansAsync(version, tree.orphans)

shouldPersist := (version-tree.lastPersistHeight >= CommitIntervalHeight) ||
(treeMap.totalPreCommitCacheSize >= MinCommitItemCount)
shouldPersist := (version-tree.lastPersistHeight >= CommitIntervalHeight) || (treeMap.totalPreCommitCacheSize >= MinCommitItemCount)

newOrphans := tree.orphans
if shouldPersist {
batch := tree.NewBatch()
if err := tree.persist(batch, version); err != nil {
return nil, 0, err
}
tree.ndb.saveNewOrphans(version, newOrphans, true)
tree.persist(version)
newOrphans = nil
}

return tree.setNewWorkingTree(version, newOrphans, shouldPersist)
}

func (tree *MutableTree) setNewWorkingTree(version int64, newOrphans []*Node, persisted bool) ([]byte, int64, error) {
// set new working tree
tree.ImmutableTree = tree.ImmutableTree.clone()
tree.lastSaved = tree.ImmutableTree.clone()
tree.orphans = make([]*Node, 0, len(tree.orphans))
for k := range tree.savedNodes {
delete(tree.savedNodes, k)
}

rootHash := tree.lastSaved.Hash()
tree.setHeightOrphansItem(version, rootHash)

tree.ndb.enqueueOrphanTask(version, rootHash, newOrphans)
tree.version = version
if shouldPersist {
if persisted {
tree.versions.Set(version, true)
}
treeMap.updateMutableTreeMap(moduleName)
treeMap.updateMutableTreeMap(tree.GetModuleName())

tree.removedVersions.Range(func(k, v interface{}) bool {
tree.log(IavlDebug, "remove version from tree version map", "Height", k.(int64))
@@ -109,31 +112,34 @@ func (tree *MutableTree) removeVersion(version int64) {
tree.versions.Delete(version)
}

func (tree *MutableTree) persist(batch dbm.Batch, version int64) error {
func (tree *MutableTree) persist(version int64) {
var err error
batch := tree.NewBatch()
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0}
var tpp map[string]*Node = nil
if EnablePruningHistoryState {
tree.ndb.saveCommitOrphans(batch, version, tree.commitOrphans)
}
if tree.root == nil {
// There can still be orphans, for example if the root is the node being removed.
if err := tree.ndb.SaveEmptyRoot(batch, version); err != nil {
return err
}
err = tree.ndb.SaveEmptyRoot(batch, version)
} else {
if err := tree.ndb.SaveRoot(batch, tree.root, version); err != nil {
return err
}
err = tree.ndb.SaveRoot(batch, tree.root, version)
tpp = tree.ndb.asyncPersistTppStart(version)
}

if err != nil {
// never going to happen in case of AC enabled
panic(err)
}

for k := range tree.commitOrphans {
delete(tree.commitOrphans, k)
}
versions := tree.deepCopyVersions()
tree.commitCh <- commitEvent{version, versions, batch,
tpp, nil, int(tree.Height())}
tree.lastPersistHeight = version
return nil
}

func (tree *MutableTree) commitSchedule() {
@@ -217,9 +223,6 @@ func (tree *MutableTree) log(level int, msg string, kvs ...interface{}) {
iavlLog(tree.GetModuleName(), level, msg, kvs...)
}

func (tree *MutableTree) setHeightOrphansItem(version int64, rootHash []byte) {
tree.ndb.setHeightOrphansItem(version, rootHash)
}

func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool) {
queue := tree.committedHeightQueue
@@ -291,9 +294,6 @@ func (tree *MutableTree) addOrphansOptimized(orphans []*Node) {
}
}

func (tree *MutableTree) hasSaved(version int64) ([]byte, bool) {
return tree.ndb.inVersionCacheMap(version)
}

func (tree *MutableTree) deepCopyVersions() map[int64]bool {
if !EnablePruningHistoryState {
11 changes: 7 additions & 4 deletions libs/iavl/mutable_tree_oec_test.go
Original file line number Diff line number Diff line change
@@ -110,8 +110,11 @@ func TestSaveVersionCommitIntervalHeight(t *testing.T) {
tree.Set([]byte(k2), []byte("k22"))
_, _, _, err = tree.SaveVersion(false)

require.Equal(t, 5, len(tree.ndb.prePersistNodeCache)+len(tree.ndb.nodeCache))
require.Equal(t, 3, len(tree.ndb.orphanNodeCache))
tree.ndb.sanityCheckHandleOrphansResult(tree.version+1)
tree.ndb.oi.enqueueResult(tree.version)

require.Equal(t, 5, len(tree.ndb.prePersistNodeCache)+tree.ndb.nc.nodeCacheLen())
require.Equal(t, 3, tree.ndb.oi.orphanNodeCacheLen())

_, _, _, err = tree.SaveVersion(false)
require.NoError(t, err)
@@ -124,7 +127,7 @@ func TestSaveVersionCommitIntervalHeight(t *testing.T) {
_, _, _, err = tree.SaveVersion(false)
require.NoError(t, err)
require.Equal(t, 0, len(tree.ndb.prePersistNodeCache))
require.Equal(t, 0, len(tree.ndb.orphanNodeCache))
require.Equal(t, 0, tree.ndb.oi.orphanNodeCacheLen())

//require.Equal(t, 5, len(tree.ndb.nodeCache)+len(tree.ndb.tempPrePersistNodeCache))
tree.Set([]byte("k5"), []byte("5555555555"))
@@ -467,7 +470,7 @@ func TestStopTree(t *testing.T) {
_, _, _, err := tree.SaveVersion(false)
require.NoError(t, err)
tree.StopTree()
require.Equal(t, 5, len(tree.ndb.nodeCache))
require.Equal(t, 5, tree.ndb.nc.nodeCacheLen())
}

func TestLog(t *testing.T) {
30 changes: 6 additions & 24 deletions libs/iavl/nodedb.go
Original file line number Diff line number Diff line change
@@ -47,18 +47,6 @@ type nodeDB struct {

latestVersion int64

//lruNodeCache *lru.Cache

nodeCache map[string]*list.Element // Node cache.
nodeCacheSize int // Node cache size limit in elements.
nodeCacheQueue *syncList // LRU queue of cache elements. Used for deletion.
nodeCacheMutex sync.RWMutex // Mutex for node cache.

orphanNodeCache map[string]*Node
heightOrphansCacheQueue *list.List
heightOrphansCacheSize int
heightOrphansMap map[int64]*heightOrphansItem

prePersistNodeCache map[string]*Node
tppMap map[int64]*tppItem
tppVersionList *list.List
@@ -76,6 +64,9 @@ type nodeDB struct {
name string

preWriteNodeCache cmap.ConcurrentMap

oi *OrphanInfo
nc *NodeCache
}

func makeNodeCacheMap(cacheSize int, initRatio float64) map[string]*list.Element {
@@ -97,25 +88,16 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options) *nodeDB {
ndb := &nodeDB{
db: db,
opts: *opts,
latestVersion: 0, // initially invalid
nodeCache: makeNodeCacheMap(cacheSize, IavlCacheInitRatio),
nodeCacheSize: cacheSize,
nodeCacheQueue: newSyncList(),
versionReaders: make(map[int64]uint32, 8),
orphanNodeCache: make(map[string]*Node),
heightOrphansCacheQueue: list.New(),
heightOrphansCacheSize: HeightOrphansCacheSize,
heightOrphansMap: make(map[int64]*heightOrphansItem),
prePersistNodeCache: make(map[string]*Node),
tppMap: make(map[int64]*tppItem),
tppVersionList: list.New(),
dbReadCount: 0,
dbReadTime: 0,
dbWriteCount: 0,
name: ParseDBName(db),
preWriteNodeCache: cmap.New(),
}

ndb.oi = newOrphanInfo(ndb)
ndb.nc = newNodeCache(cacheSize)
return ndb
}

@@ -138,7 +120,7 @@ func (ndb *nodeDB) getNodeFromMemory(hash []byte, promoteRecentNode bool) *Node
return elem
}

if elem, ok := ndb.orphanNodeCache[string(hash)]; ok {
if elem := ndb.oi.getNodeFromOrphanCache(hash); elem != nil {
return elem
}

48 changes: 19 additions & 29 deletions libs/iavl/nodedb_cache.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,32 @@
package iavl

import (
cmap "github.com/orcaman/concurrent-map"
"github.com/tendermint/go-amino"
"container/list"
"github.com/okex/exchain/libs/iavl/config"

"github.com/tendermint/go-amino"
"sync"
)

func (ndb *nodeDB) uncacheNodeRontine(n []*Node) {
for _, node := range n {
ndb.uncacheNode(node.hash)
}
type NodeCache struct {
nodeCache map[string]*list.Element // Node cache.
nodeCacheSize int // Node cache size limit in elements.
nodeCacheQueue *syncList // LRU queue of cache elements. Used for deletion.
nodeCacheMutex sync.RWMutex // Mutex for node cache.
}

func (ndb *nodeDB) initPreWriteCache() {
if ndb.preWriteNodeCache == nil {
ndb.preWriteNodeCache = cmap.New()
func newNodeCache(cacheSize int) *NodeCache {
return &NodeCache{
nodeCache: makeNodeCacheMap(cacheSize, IavlCacheInitRatio),
nodeCacheSize: cacheSize,
nodeCacheQueue: newSyncList(),
}
}

func (ndb *nodeDB) cacheNodeToPreWriteCache(n *Node) {
ndb.preWriteNodeCache.Set(string(n.hash), n)
}

func (ndb *nodeDB) finishPreWriteCache() {
ndb.preWriteNodeCache.IterCb(func(key string, v interface{}) {
ndb.cacheNode(v.(*Node))
})
ndb.preWriteNodeCache = nil
}


// ===================================================
// ======= map[string]*list.Element implementation
// ===================================================

func (ndb *nodeDB) uncacheNode(hash []byte) {
func (ndb *NodeCache) uncache(hash []byte) {
ndb.nodeCacheMutex.Lock()
if elem, ok := ndb.nodeCache[string(hash)]; ok {
ndb.nodeCacheQueue.Remove(elem)
@@ -46,7 +37,7 @@ func (ndb *nodeDB) uncacheNode(hash []byte) {

// Add a node to the cache and pop the least recently used node if we've
// reached the cache size limit.
func (ndb *nodeDB) cacheNode(node *Node) {
func (ndb *NodeCache) cache(node *Node) {
ndb.nodeCacheMutex.Lock()
elem := ndb.nodeCacheQueue.PushBack(node)
ndb.nodeCache[string(node.hash)] = elem
@@ -59,17 +50,16 @@ func (ndb *nodeDB) cacheNode(node *Node) {
ndb.nodeCacheMutex.Unlock()
}

func (ndb *nodeDB) cacheNodeByCheck(node *Node) {
func (ndb *NodeCache) cacheByCheck(node *Node) {
ndb.nodeCacheMutex.RLock()
_, ok := ndb.nodeCache[string(node.hash)]
ndb.nodeCacheMutex.RUnlock()
if !ok {
ndb.cacheNode(node)
ndb.cache(node)
}
}


func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Node) {
func (ndb *NodeCache) get(hash []byte, promoteRecentNode bool) (n *Node) {
// Check the cache.
ndb.nodeCacheMutex.RLock()
elem, ok := ndb.nodeCache[string(hash)]
@@ -84,7 +74,7 @@ func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Nod
return
}

func (ndb *nodeDB) nodeCacheLen() int {
func (ndb *NodeCache) nodeCacheLen() int {
return len(ndb.nodeCache)
}

117 changes: 44 additions & 73 deletions libs/iavl/nodedb_oec.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"container/list"
"encoding/binary"
"fmt"
cmap "github.com/orcaman/concurrent-map"

"github.com/tendermint/go-amino"

@@ -28,12 +29,6 @@ var (
IavlCacheInitRatio float64 = 0
)

type heightOrphansItem struct {
version int64
rootHash []byte
orphans []*Node
}

type tppItem struct {
nodeMap map[string]*Node
listItem *list.Element
@@ -50,50 +45,6 @@ func (ndb *nodeDB) SaveOrphans(batch dbm.Batch, version int64, orphans []*Node)
}
}

func (ndb *nodeDB) SaveOrphansAsync(version int64, orphans []*Node) {
ndb.log(IavlDebug, "saving orphan node to OrphanCache", "size", len(orphans))
version--
atomic.AddInt64(&ndb.totalOrphanCount, int64(len(orphans)))

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

orphansObj := ndb.heightOrphansMap[version]
if orphansObj != nil {
orphansObj.orphans = orphans
}
for _, node := range orphans {
ndb.orphanNodeCache[string(node.hash)] = node
delete(ndb.prePersistNodeCache, amino.BytesToStr(node.hash))
node.leftNode = nil
node.rightNode = nil
}
go ndb.uncacheNodeRontine(orphans)
}

func (ndb *nodeDB) setHeightOrphansItem(version int64, rootHash []byte) {
if rootHash == nil {
rootHash = []byte{}
}
orphanObj := &heightOrphansItem{
version: version,
rootHash: rootHash,
}
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.heightOrphansCacheQueue.PushBack(orphanObj)
ndb.heightOrphansMap[version] = orphanObj

for ndb.heightOrphansCacheQueue.Len() > ndb.heightOrphansCacheSize {
orphans := ndb.heightOrphansCacheQueue.Front()
oldHeightOrphanItem := ndb.heightOrphansCacheQueue.Remove(orphans).(*heightOrphansItem)
for _, node := range oldHeightOrphanItem.orphans {
delete(ndb.orphanNodeCache, amino.BytesToStr(node.hash))
}
delete(ndb.heightOrphansMap, oldHeightOrphanItem.version)
}
}

func (ndb *nodeDB) dbGet(k []byte) ([]byte, error) {
ts := time.Now()
defer func() {
@@ -307,8 +258,8 @@ func (ndb *nodeDB) sprintCacheLog(version int64) string {
printLog := fmt.Sprintf("Save Version<%d>: Tree<%s>", version, ndb.name)

printLog += fmt.Sprintf(", TotalPreCommitCacheSize:%d", treeMap.totalPreCommitCacheSize)
printLog += fmt.Sprintf(", nodeCCnt:%d", ndb.nodeCacheLen())
printLog += fmt.Sprintf(", orphanCCnt:%d", len(ndb.orphanNodeCache))
printLog += fmt.Sprintf(", nodeCCnt:%d", ndb.nc.nodeCacheLen())
printLog += fmt.Sprintf(", orphanCCnt:%d", ndb.oi.orphanNodeCacheLen())
printLog += fmt.Sprintf(", prePerCCnt:%d", len(ndb.prePersistNodeCache))
printLog += fmt.Sprintf(", dbRCnt:%d", ndb.getDBReadCount())
printLog += fmt.Sprintf(", dbWCnt:%d", ndb.getDBWriteCount())
@@ -474,15 +425,6 @@ func updateBranchAndSaveNodeToChan(node *Node, saveNodesCh chan<- *Node) []byte
return node.hash
}

func (ndb *nodeDB) getRootWithCache(version int64) ([]byte, error) {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
orphansObj := ndb.heightOrphansMap[version]
if orphansObj != nil {
return orphansObj.rootHash, nil
}
return nil, fmt.Errorf("version %d is not in heightOrphansMap", version)
}

// Saves orphaned nodes to disk under a special prefix.
// version: the new version being saved.
@@ -510,23 +452,14 @@ func (ndb *nodeDB) getNodeInTpp(hash []byte) (*Node, bool) {

func (ndb *nodeDB) getRootWithCacheAndDB(version int64) ([]byte, error) {
if EnableAsyncCommit {
root, err := ndb.getRootWithCache(version)
if err == nil {
return root, err
root, ok := ndb.findRootHash(version)
if ok {
return root, nil
}
}
return ndb.getRoot(version)
}

func (ndb *nodeDB) inVersionCacheMap(version int64) ([]byte, bool) {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
item := ndb.heightOrphansMap[version]
if item != nil {
return item.rootHash, true
}
return nil, false
}

// DeleteVersion deletes a tree version from disk.
func (ndb *nodeDB) DeleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool) error {
@@ -578,3 +511,41 @@ func orphanKeyFast(fromVersion, toVersion int64, hash []byte) []byte {
copy(key[n+hashLen-len(hash):n+hashLen], hash)
return key
}

func (ndb *nodeDB) cacheNode(node *Node) {
ndb.nc.cache(node)
}
func (ndb *nodeDB) uncacheNode(hash []byte) {
ndb.nc.uncache(hash)
}

func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Node) {
return ndb.nc.get(hash, promoteRecentNode)
}

func (ndb *nodeDB) cacheNodeByCheck(node *Node) {
ndb.nc.cacheByCheck(node)
}

func (ndb *nodeDB) uncacheNodeRontine(n []*Node) {
for _, node := range n {
ndb.uncacheNode(node.hash)
}
}

func (ndb *nodeDB) initPreWriteCache() {
if ndb.preWriteNodeCache == nil {
ndb.preWriteNodeCache = cmap.New()
}
}

func (ndb *nodeDB) cacheNodeToPreWriteCache(n *Node) {
ndb.preWriteNodeCache.Set(string(n.hash), n)
}

func (ndb *nodeDB) finishPreWriteCache() {
ndb.preWriteNodeCache.IterCb(func(key string, v interface{}) {
ndb.cacheNode(v.(*Node))
})
ndb.preWriteNodeCache = nil
}
13 changes: 7 additions & 6 deletions libs/iavl/nodedb_oec_test.go
Original file line number Diff line number Diff line change
@@ -340,16 +340,17 @@ func Test_getRootWithCache(t *testing.T) {
ndb := mockNodeDB()
for _, c := range cases {
rootHash := randBytes(32)
ndb.heightOrphansMap[c.version] = &heightOrphansItem{c.version, rootHash, nil}
ndb.oi.orphanItemMap[c.version] = &orphanItem{rootHash, nil}

actualHash, err := ndb.getRootWithCache(c.version)
actualHash, ok := ndb.findRootHash(c.version)
if c.exist {
require.Equal(t, actualHash, rootHash)
} else {
require.Nil(t, actualHash)
}
require.NoError(t, err)
require.Equal(t, ok, true)

var err error
actualHash, err = ndb.getRootWithCacheAndDB(c.version)
if c.exist {
require.Equal(t, actualHash, rootHash)
@@ -374,9 +375,9 @@ func Test_inVersionCacheMap(t *testing.T) {
ndb := mockNodeDB()
for _, c := range cases {
rootHash := randBytes(32)
orphanObj := &heightOrphansItem{version: c.version, rootHash: rootHash}
ndb.heightOrphansMap[c.version] = orphanObj
actualHash, existed := ndb.inVersionCacheMap(c.version)
orphanObj := &orphanItem{rootHash: rootHash}
ndb.oi.orphanItemMap[c.version] = orphanObj
actualHash, existed := ndb.findRootHash(c.version)
require.Equal(t, actualHash, rootHash)
require.Equal(t, existed, c.expected)
}
63 changes: 63 additions & 0 deletions libs/iavl/nodedb_orphan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package iavl

import (
"github.com/tendermint/go-amino"
"sync/atomic"
)

func (ndb *nodeDB) enqueueOrphanTask(version int64, rootHash []byte, newOrphans []*Node) {

ndb.addOrphanItem(version, rootHash)

task := func() {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.saveNewOrphans(version, newOrphans, false)
ndb.oi.removeOldOrphans(version)
ndb.oi.enqueueResult(version)
}

ndb.oi.enqueueTask(task)
}

func (ndb *nodeDB) addOrphanItem(version int64, rootHash []byte) {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.oi.addOrphanItem(version, rootHash)
}

func (ndb *nodeDB) saveNewOrphans(version int64, orphans []*Node, lock bool) {

if orphans == nil {
return
}

version--
ndb.log(IavlDebug, "saving orphan node to OrphanCache", "size", len(orphans))
atomic.AddInt64(&ndb.totalOrphanCount, int64(len(orphans)))

if lock {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
}

ndb.oi.feedOrphansMap(version, orphans)
for _, node := range orphans {
ndb.oi.feedOrphanNodeCache(node)
delete(ndb.prePersistNodeCache, amino.BytesToStr(node.hash))
node.leftNode = nil
node.rightNode = nil
}
ndb.uncacheNodeRontine(orphans)
}

func (ndb *nodeDB) sanityCheckHandleOrphansResult(version int64) {
ndb.oi.wait4Result(version)
}

func (ndb *nodeDB) findRootHash(version int64) (res []byte, found bool) {
ndb.mtx.RLock()
defer ndb.mtx.RUnlock()
return ndb.oi.findRootHash(version)
}

124 changes: 124 additions & 0 deletions libs/iavl/nodedb_orphan_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package iavl

import (
"fmt"
"github.com/tendermint/go-amino"
)

type OrphanInfo struct {
ndb *nodeDB
orphanNodeCache map[string]*Node
orphanItemMap map[int64]*orphanItem
itemSize int

orphanTaskChan chan func()
resultChan chan int64
}

type orphanItem struct {
rootHash []byte
orphans []*Node
}

func newOrphanInfo(ndb *nodeDB) *OrphanInfo {

oi := &OrphanInfo{
ndb: ndb,
orphanNodeCache: make(map[string]*Node),
orphanItemMap: make(map[int64]*orphanItem),
itemSize: HeightOrphansCacheSize,
orphanTaskChan: make(chan func(), 1),
resultChan: make(chan int64, 1),
}

oi.enqueueResult(0)
go oi.handleOrphansRoutine()
return oi
}

func (oi *OrphanInfo) enqueueResult(res int64) {
oi.resultChan <- res
}

func (oi *OrphanInfo) enqueueTask(t func()) {
oi.orphanTaskChan <- t
}

func (oi *OrphanInfo) handleOrphansRoutine() {
for task := range oi.orphanTaskChan {
task()
}
}

func (oi *OrphanInfo) wait4Result(version int64) {

version--
for versionCompleted := range oi.resultChan {
if versionCompleted == version {
break
} else if versionCompleted == 0 {
break
}
}
}

func (oi *OrphanInfo) addOrphanItem(version int64, rootHash []byte) {
if rootHash == nil {
rootHash = []byte{}
}
orphanObj := &orphanItem{
rootHash: rootHash,
}
_, ok := oi.orphanItemMap[version]
if ok {
panic(fmt.Sprintf("unexpected orphanItemMap, version: %d", version))
}
oi.orphanItemMap[version] = orphanObj
}


func (oi *OrphanInfo) removeOldOrphans(version int64) {
expiredVersion := version-int64(oi.itemSize)
expiredItem, ok := oi.orphanItemMap[expiredVersion]
if !ok {
return
}
for _, node := range expiredItem.orphans {
delete(oi.orphanNodeCache, amino.BytesToStr(node.hash))
}
delete(oi.orphanItemMap, expiredVersion)
}

func (oi *OrphanInfo) feedOrphansMap(version int64, orphans []*Node) {
v, ok := oi.orphanItemMap[version]
if !ok {
return
}
v.orphans = orphans
}

func (oi *OrphanInfo) feedOrphanNodeCache(node *Node) {
oi.orphanNodeCache[string(node.hash)] = node
}


func (oi *OrphanInfo) getNodeFromOrphanCache(hash []byte) *Node {
elem, ok := oi.orphanNodeCache[string(hash)]
if ok {
return elem
}
return nil
}

func (oi *OrphanInfo) orphanNodeCacheLen() int {
return len(oi.orphanNodeCache)
}

func (oi *OrphanInfo) findRootHash(version int64) (res []byte, found bool) {
v, ok := oi.orphanItemMap[version]
if ok {
res = v.rootHash
found = true
}
return
}

0 comments on commit c1d1ef0

Please sign in to comment.