From 23f26f21794109fd657a9cb0cdbffe4f8623fe1c Mon Sep 17 00:00:00 2001 From: dustinxie Date: Wed, 8 Jan 2025 22:35:22 -0800 Subject: [PATCH] [factory] add parent to workingset --- state/factory/factory.go | 53 ++++++++++++++++++++++++++----------- state/factory/statedb.go | 45 ++++++++++++++++++++++--------- state/factory/util.go | 23 ++++++++++++++++ state/factory/workingset.go | 45 ++++++++++++++++++++++++++++++- 4 files changed, 137 insertions(+), 29 deletions(-) diff --git a/state/factory/factory.go b/state/factory/factory.go index ddfb2e20df..e830eb48bb 100644 --- a/state/factory/factory.go +++ b/state/factory/factory.go @@ -269,7 +269,11 @@ func (sf *factory) newWorkingSet(ctx context.Context, height uint64) (*workingSe if err != nil { return nil, err } - return sf.createSfWorkingSet(ctx, height, store) + var parent *workingSet + if height > 0 { + parent = getWorkingSetByHeight(sf.workingsets, height-1) + } + return sf.createSfWorkingSet(ctx, height, store, parent) } func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*workingSet, error) { @@ -290,10 +294,10 @@ func (sf *factory) newWorkingSetAtHeight(ctx context.Context, height uint64) (*w if err != nil { return nil, err } - return sf.createSfWorkingSet(ctx, height, store) + return sf.createSfWorkingSet(ctx, height, store, nil) } -func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore) (*workingSet, error) { +func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store workingSetStore, parent *workingSet) (*workingSet, error) { if err := store.Start(ctx); err != nil { return nil, err } @@ -308,7 +312,7 @@ func (sf *factory) createSfWorkingSet(ctx context.Context, height uint64, store } } } - return newWorkingSet(height, store), nil + return newWorkingSet(height, store, parent), nil } func (sf *factory) flusherOptions(preEaster bool) []db.KVStoreFlusherOption { @@ -357,6 +361,7 @@ func (sf *factory) Validate(ctx context.Context, blk *block.Block) error { return errors.Wrap(err, "failed to validate block with workingset in factory") } sf.workingsets.Add(key, ws) + sf.workingsets.Add(ws.height, ws) } receipts, err := ws.Receipts() if err != nil { @@ -399,6 +404,7 @@ func (sf *factory) NewBlockBuilder( blkCtx := protocol.MustGetBlockCtx(ctx) key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String()) sf.workingsets.Add(key, ws) + sf.workingsets.Add(ws.height, ws) return blkBuilder, nil } @@ -434,19 +440,35 @@ func (sf *factory) WorkingSetAtHeight(ctx context.Context, height uint64, preact } // PutBlock persists all changes in RunActions() into the DB -func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { +func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) (err error) { timer := sf.timerFactory.NewTimer("Commit") - defer timer.End() + var ( + ws *workingSet + isExist bool + ) + defer func() { + timer.End() + if err != nil { + // abandon current workingset, and all pending workingsets beyond current height + ws.abandon() + h, _ := ws.Height() + abandonWorkingSets(sf.workingsets, h) + } + }() producer := blk.PublicKey().Address() if producer == nil { return errors.New("failed to get address") } ctx = protocol.WithRegistry(ctx, sf.registry) key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress()) - ws, isExist, err := sf.getFromWorkingSets(ctx, key) + ws, isExist, err = sf.getFromWorkingSets(ctx, key) if err != nil { - return err + return } + if err = ws.verifyParent(); err != nil { + return + } + ws.detachParent() if !isExist { // regenerate workingset if !sf.skipBlockValidationOnPut { @@ -456,14 +478,14 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { } if err != nil { log.L().Error("Failed to update state.", zap.Error(err)) - return err + return } } sf.mutex.Lock() defer sf.mutex.Unlock() receipts, err := ws.Receipts() if err != nil { - return err + return } blk.Receipts = receipts h, _ := ws.Height() @@ -475,18 +497,17 @@ func (sf *factory) PutBlock(ctx context.Context, blk *block.Block) error { ) } - if err := ws.Commit(ctx); err != nil { - return err + if err = ws.Commit(ctx); err != nil { + return } rh, err := sf.dao.Get(ArchiveTrieNamespace, []byte(ArchiveTrieRootKey)) if err != nil { - return err + return } - if err := sf.twoLayerTrie.SetRootHash(rh); err != nil { - return err + if err = sf.twoLayerTrie.SetRootHash(rh); err != nil { + return } sf.currentChainHeight = h - return nil } diff --git a/state/factory/statedb.go b/state/factory/statedb.go index 03634eb060..b58d655036 100644 --- a/state/factory/statedb.go +++ b/state/factory/statedb.go @@ -197,8 +197,11 @@ func (sdb *stateDB) newWorkingSet(ctx context.Context, height uint64) (*workingS if err := store.Start(ctx); err != nil { return nil, err } - - return newWorkingSet(height, store), nil + var parent *workingSet + if height > 0 { + parent = getWorkingSetByHeight(sdb.workingsets, height-1) + } + return newWorkingSet(height, store, parent), nil } func (sdb *stateDB) Register(p protocol.Protocol) error { @@ -217,6 +220,7 @@ func (sdb *stateDB) Validate(ctx context.Context, blk *block.Block) error { return errors.Wrap(err, "failed to validate block with workingset in statedb") } sdb.workingsets.Add(key, ws) + sdb.workingsets.Add(ws.height, ws) } receipts, err := ws.Receipts() if err != nil { @@ -260,6 +264,7 @@ func (sdb *stateDB) NewBlockBuilder( blkCtx := protocol.MustGetBlockCtx(ctx) key := generateWorkingSetCacheKey(blkBuilder.GetCurrentBlockHeader(), blkCtx.Producer.String()) sdb.workingsets.Add(key, ws) + sdb.workingsets.Add(ws.height, ws) return blkBuilder, nil } @@ -287,21 +292,37 @@ func (sdb *stateDB) WorkingSetAtHeight(ctx context.Context, height uint64, preac } // PutBlock persists all changes in RunActions() into the DB -func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { +func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) (err error) { sdb.mutex.Lock() timer := sdb.timerFactory.NewTimer("Commit") sdb.mutex.Unlock() - defer timer.End() + var ( + ws *workingSet + isExist bool + ) + defer func() { + timer.End() + if err != nil { + // abandon current workingset, and all pending workingsets beyond current height + ws.abandon() + h, _ := ws.Height() + abandonWorkingSets(sdb.workingsets, h) + } + }() producer := blk.PublicKey().Address() if producer == nil { return errors.New("failed to get address") } ctx = protocol.WithRegistry(ctx, sdb.registry) key := generateWorkingSetCacheKey(blk.Header, blk.Header.ProducerAddress()) - ws, isExist, err := sdb.getFromWorkingSets(ctx, key) + ws, isExist, err = sdb.getFromWorkingSets(ctx, key) if err != nil { - return err + return } + if err = ws.verifyParent(); err != nil { + return + } + ws.detachParent() if !isExist { if !sdb.skipBlockValidationOnPut { err = ws.ValidateBlock(ctx, blk) @@ -310,14 +331,14 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { } if err != nil { log.L().Error("Failed to update state.", zap.Error(err)) - return err + return } } sdb.mutex.Lock() defer sdb.mutex.Unlock() receipts, err := ws.Receipts() if err != nil { - return err + return } blk.Receipts = receipts h, _ := ws.Height() @@ -329,8 +350,8 @@ func (sdb *stateDB) PutBlock(ctx context.Context, blk *block.Block) error { ) } - if err := ws.Commit(ctx); err != nil { - return err + if err = ws.Commit(ctx); err != nil { + return } sdb.currentChainHeight = h return nil @@ -440,6 +461,6 @@ func (sdb *stateDB) getFromWorkingSets(ctx context.Context, key hash.Hash256) (* sdb.mutex.RLock() currHeight := sdb.currentChainHeight sdb.mutex.RUnlock() - tx, err := sdb.newWorkingSet(ctx, currHeight+1) - return tx, false, err + ws, err := sdb.newWorkingSet(ctx, currHeight+1) + return ws, false, err } diff --git a/state/factory/util.go b/state/factory/util.go index 9e25bff81c..82e29dd16a 100644 --- a/state/factory/util.go +++ b/state/factory/util.go @@ -9,6 +9,7 @@ import ( "context" "github.com/iotexproject/go-pkgs/bloom" + "github.com/iotexproject/go-pkgs/cache" "github.com/iotexproject/go-pkgs/crypto" "github.com/iotexproject/go-pkgs/hash" "github.com/pkg/errors" @@ -96,6 +97,28 @@ func generateWorkingSetCacheKey(blkHeader block.Header, producerAddr string) has return hash.Hash256b(sum) } +func getWorkingSetByHeight(c cache.LRUCache, h uint64) *workingSet { + if h == 0 { + return nil + } + if data, ok := c.Get(h); ok { + if ws, ok := data.(*workingSet); ok { + return ws + } + } + return nil +} + +func abandonWorkingSets(c cache.LRUCache, h uint64) { + for ; ; h++ { + if ws := getWorkingSetByHeight(c, h); ws != nil { + c.Remove(h) + } else { + break + } + } +} + func protocolPreCommit(ctx context.Context, sr protocol.StateManager) error { if reg, ok := protocol.GetRegistry(ctx); ok { for _, p := range reg.All() { diff --git a/state/factory/workingset.go b/state/factory/workingset.go index 0b1b05dd30..182f8d132d 100644 --- a/state/factory/workingset.go +++ b/state/factory/workingset.go @@ -9,6 +9,7 @@ import ( "context" "math/big" "sort" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -64,17 +65,20 @@ type ( height uint64 store workingSetStore finalized bool + abandoned atomic.Bool dock protocol.Dock txValidator *protocol.GenericValidator receipts []*action.Receipt + parent *workingSet } ) -func newWorkingSet(height uint64, store workingSetStore) *workingSet { +func newWorkingSet(height uint64, store workingSetStore, parent *workingSet) *workingSet { ws := &workingSet{ height: height, store: store, dock: protocol.NewDock(), + parent: parent, } ws.txValidator = protocol.NewGenericValidator(ws, accountutil.AccountState) return ws @@ -114,6 +118,26 @@ func (ws *workingSet) validate(ctx context.Context) error { return nil } +func (ws *workingSet) isAbandoned() bool { + return ws.abandoned.Load() +} + +func (ws *workingSet) abandon() { + ws.abandoned.Store(true) +} + +func (ws *workingSet) verifyParent() error { + if ws.parent != nil && ws.parent.isAbandoned() { + ws.abandon() + return errors.New("workingset abandoned") + } + return nil +} + +func (ws *workingSet) detachParent() { + ws.parent = nil +} + func withActionCtx(ctx context.Context, selp *action.SealedEnvelope) (context.Context, error) { var actionCtx protocol.ActionCtx var err error @@ -292,8 +316,15 @@ func (ws *workingSet) freshAccountConversion(ctx context.Context, actCtx *protoc return nil } +func (ws *workingSet) getDirty(ns string, key []byte) ([]byte, bool) { + return ws.store.GetDirty(ns, key) +} + // Commit persists all changes in RunActions() into the DB func (ws *workingSet) Commit(ctx context.Context) error { + if err := ws.verifyParent(); err != nil { + return err + } if err := protocolPreCommit(ctx, ws); err != nil { return err } @@ -318,6 +349,14 @@ func (ws *workingSet) State(s interface{}, opts ...protocol.StateOption) (uint64 if cfg.Keys != nil { return 0, errors.Wrap(ErrNotSupported, "Read state with keys option has not been implemented yet") } + if ws.parent != nil { + if value, dirty := ws.getDirty(cfg.Namespace, cfg.Key); dirty { + return ws.height, state.Deserialize(s, value) + } + if value, dirty := ws.parent.getDirty(cfg.Namespace, cfg.Key); dirty { + return ws.height, state.Deserialize(s, value) + } + } value, err := ws.store.Get(cfg.Namespace, cfg.Key) if err != nil { return ws.height, err @@ -333,6 +372,7 @@ func (ws *workingSet) States(opts ...protocol.StateOption) (uint64, state.Iterat if cfg.Key != nil { return 0, nil, errors.Wrap(ErrNotSupported, "Read states with key option has not been implemented yet") } + // TODO: check parent keys, values, err := ws.store.States(cfg.Namespace, cfg.Keys) if err != nil { return 0, nil, err @@ -480,6 +520,9 @@ func (ws *workingSet) Process(ctx context.Context, actions []*action.SealedEnvel } func (ws *workingSet) processWithCorrectOrder(ctx context.Context, actions []*action.SealedEnvelope) error { + if err := ws.verifyParent(); err != nil { + return err + } if err := ws.validate(ctx); err != nil { return err }