From a5c5c91daa7f8cd67b386e29f12e4ddcd6675fc6 Mon Sep 17 00:00:00 2001 From: ZhengQi Date: Sun, 14 Nov 2021 22:37:09 +0800 Subject: [PATCH 1/3] fix consensus version bug (#285) --- kernel/consensus/pluggable_consensus.go | 39 +++++-------------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/kernel/consensus/pluggable_consensus.go b/kernel/consensus/pluggable_consensus.go index 37c0118a5..bafd64ec0 100644 --- a/kernel/consensus/pluggable_consensus.go +++ b/kernel/consensus/pluggable_consensus.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "strconv" - "strings" "sync" "github.com/xuperchain/xupercore/kernel/common/xcontext" @@ -43,7 +42,7 @@ var ( ContractMngErr = errors.New("Contract manager is empty.") ErrInvalidConfig = errors.New("config should be an empty JSON when rolling back an old one, or try an upper version") - ErrInvalidVersion = errors.New("version should be an upper one when upgrading a new one, or try an empty config JSON when you need rollback") + ErrInvalidVersion = errors.New("version should be an upper one when upgrading a new one") ) // PluggableConsensus 实现了consensus_interface接口 @@ -195,7 +194,7 @@ func (pc *PluggableConsensus) proposalArgsUnmarshal(ctxArgs map[string][]byte) ( // updateConsensus 共识升级,更新原有共识列表,向PluggableConsensus共识列表插入新共识,并暂停原共识实例 // 该方法注册到kernel的延时调用合约中,在trigger高度时被调用,此时直接按照共识cfg新建新的共识实例 -// 若同名共识且version相同,则使用历史的配置,否则version需要递增序列 +// 共识version需要递增序列 func (pc *PluggableConsensus) updateConsensus(contractCtx contract.KContext) (*contract.Response, error) { // 解析用户合约信息,包括待升级名称name、trigger高度height和待升级配置config cfg, err := pc.proposalArgsUnmarshal(contractCtx.Args()) @@ -410,13 +409,11 @@ func NewPluginConsensus(cCtx cctx.ConsensusCtx, cCfg def.ConsensusConfig) (base. type configFilter struct { Version string `json:"version,omitempty"` - index int `json:"-"` } // CheckConsensusConfig 同名配置文件检查: -// 1. 若历史相同version,则直接返回历史cfg -// 2. 若不存在,需要比历史最大值大 -// 3. 将合法的配置写到map中 +// 1. 同一个链的共识版本只能增加,不能升级到旧版本 +// 2. 将合法的配置写到map中 func CheckConsensusVersion(hisMap map[int]def.ConsensusConfig, cfg *def.ConsensusConfig) error { var err error var newConf configFilter @@ -431,41 +428,21 @@ func CheckConsensusVersion(hisMap map[int]def.ConsensusConfig, cfg *def.Consensu return errors.New("wrong parameter version, version should an integer in string") } // 获取历史最近共识实例,初始状态下历史共识没有version字段的,需手动添加 - var configSlice []configFilter var maxVersion int64 for i := len(hisMap) - 1; i >= 0; i-- { configItem := hisMap[i] - if configItem.ConsensusName != cfg.ConsensusName { - continue - } var tmpItem configFilter - json.Unmarshal([]byte(configItem.Config), &tmpItem) - tmpItem.index = i + err := json.Unmarshal([]byte(configItem.Config), &tmpItem) + if err != nil { + return errors.New("unmarshal config error") + } if tmpItem.Version == "" { tmpItem.Version = "0" } - if tmpItem.Version == newConf.Version { - dec := json.NewDecoder(strings.NewReader(cfg.Config)) - dec.DisallowUnknownFields() - var checkCfg configFilter - err := dec.Decode(&checkCfg) - if err != nil { - return ErrInvalidConfig - } - hisConfig := hisMap[i] - hisMap[len(hisMap)] = hisConfig - return nil - } - configSlice = append(configSlice, tmpItem) v, _ := strconv.ParseInt(tmpItem.Version, 10, 64) if maxVersion < v { maxVersion = v } - continue - } - if len(configSlice) == 0 { - hisMap[len(hisMap)] = *cfg - return nil } if maxVersion < newConfVersion { hisMap[len(hisMap)] = *cfg From f35d472ba752d89be4f8864c7d04de33e68cb415 Mon Sep 17 00:00:00 2001 From: JIN Tong Date: Wed, 17 Nov 2021 20:01:41 +0800 Subject: [PATCH 2/3] consensus: make smr run with a single validator. (#284) * consensus: make smr run with a single validator. --- bcs/consensus/xpoa/kernel_contract.go | 47 ++++++++++++---- bcs/consensus/xpoa/kernel_contract_test.go | 55 ++++++++++++++----- .../consensus/base/driver/chained-bft/smr.go | 45 +++++++++++++-- 3 files changed, 115 insertions(+), 32 deletions(-) diff --git a/bcs/consensus/xpoa/kernel_contract.go b/bcs/consensus/xpoa/kernel_contract.go index 1c2a64a56..91b59b4b9 100644 --- a/bcs/consensus/xpoa/kernel_contract.go +++ b/bcs/consensus/xpoa/kernel_contract.go @@ -33,7 +33,25 @@ func (x *xpoaConsensus) methodEditValidates(contractCtx contract.KContext) (*con if err != nil { return common.NewContractErrResponse(common.StatusBadRequest, "invalid acl: pls check accept value."), err } - if !x.isAuthAddress(aks, acceptValue) { + + curValiBytes, err := contractCtx.Get(x.election.bindContractBucket, + []byte(fmt.Sprintf("%d_%s", x.election.consensusVersion, validateKeys))) + curVali, err := func() ([]string, error) { + if err != nil || curValiBytes == nil { + return x.election.initValidators, nil + } + var curValiKey ProposerInfo + err = json.Unmarshal(curValiBytes, &curValiKey) + if err != nil { + x.log.Error("Unmarshal error") + return nil, err + } + return curValiKey.Address, nil + }() + if err != nil { + return common.NewContractErrResponse(common.StatusBadRequest, err.Error()), err + } + if !x.isAuthAddress(curVali, aks, acceptValue, x.election.enableBFT) { return common.NewContractErrResponse(common.StatusBadRequest, aclErr.Error()), aclErr } @@ -87,10 +105,18 @@ func (x *xpoaConsensus) methodGetValidates(contractCtx contract.KContext) (*cont } // isAuthAddress 判断输入aks是否能在贪心下仍能满足签名数量>33%(Chained-BFT装载) or 50%(一般情况) -func (x *xpoaConsensus) isAuthAddress(aks map[string]float64, threshold float64) bool { +func (x *xpoaConsensus) isAuthAddress(validators []string, aks map[string]float64, threshold float64, enableBFT bool) bool { + // 0. 是否是单个候选人 + if len(validators) == 1 { + weight, ok := aks[validators[0]] + if !ok { + return false + } + return weight >= threshold + } // 1. 判断aks中的地址是否是当前集合地址 for addr, _ := range aks { - if !Find(addr, x.election.validators) { + if !Find(addr, validators) { return false } } @@ -106,15 +132,14 @@ func (x *xpoaConsensus) isAuthAddress(aks map[string]float64, threshold float64) greedyCount := 0 sum := threshold for i := 0; i < len(aks); i++ { - if sum > 0 { - sum -= s[i].Weight - greedyCount++ - continue + if sum <= 0 { + break } - break + sum -= s[i].Weight + greedyCount++ } - if !x.election.enableBFT { - return greedyCount >= len(x.election.validators)/2+1 + if !enableBFT { + return greedyCount >= len(validators)/2+1 } - return CalFault(int64(greedyCount), int64(len(x.election.validators))) + return CalFault(int64(greedyCount), int64(len(validators))) } diff --git a/bcs/consensus/xpoa/kernel_contract_test.go b/bcs/consensus/xpoa/kernel_contract_test.go index fcca1af3d..010a8be05 100644 --- a/bcs/consensus/xpoa/kernel_contract_test.go +++ b/bcs/consensus/xpoa/kernel_contract_test.go @@ -12,23 +12,15 @@ var ( "dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.5, "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.5, } -) - -func TestIsAuthAddress(t *testing.T) { - cCtx, err := prepare(getXpoaConsensusConf()) - if err != nil { - t.Error("prepare error", "error", err) - return - } - i := NewXpoaConsensus(*cCtx, getConfig(getXpoaConsensusConf())) - xpoa, ok := i.(*xpoaConsensus) - if !ok { - t.Error("transfer err.") + aks2 = map[string]float64{ + "dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.5, + "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.6, } - if !xpoa.isAuthAddress(aks, 0.6) { - t.Error("isAuthAddress err.") + aks3 = map[string]float64{ + "dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN": 0.4, + "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT": 0.6, } -} +) func NewEditArgs() map[string][]byte { a := make(map[string][]byte) @@ -85,3 +77,36 @@ func TestMethodGetValidates(t *testing.T) { return } } + +func TestIsAuthAddress(t *testing.T) { + cCtx, err := prepare(getXpoaConsensusConf()) + if err != nil { + t.Error("prepare error", "error", err) + return + } + i := NewXpoaConsensus(*cCtx, getConfig(getXpoaConsensusConf())) + xpoa, ok := i.(*xpoaConsensus) + if !ok { + t.Error("transfer err.") + return + } + v1 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN", "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"} + if !xpoa.isAuthAddress(v1, aks, 0.6, false) { + t.Error("isAuthAddress err.") + return + } + v2 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN"} + if xpoa.isAuthAddress(v2, aks2, 0.6, true) { + t.Error("isAuthAddress err.") + return + } + v3 := []string{"WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"} + if !xpoa.isAuthAddress(v3, aks2, 0.6, true) { + t.Error("isAuthAddress err.") + return + } + v4 := []string{"dpzuVdosQrF2kmzumhVeFQZa1aYcdgFpN", "WNWk3ekXeM5M2232dY2uCJmEqWhfQiDYT"} + if !xpoa.isAuthAddress(v4, aks2, 0.7, true) { + t.Error("isAuthAddress err.") + } +} diff --git a/kernel/consensus/base/driver/chained-bft/smr.go b/kernel/consensus/base/driver/chained-bft/smr.go index aba883df5..9ce170e82 100644 --- a/kernel/consensus/base/driver/chained-bft/smr.go +++ b/kernel/consensus/base/driver/chained-bft/smr.go @@ -219,14 +219,11 @@ func (s *Smr) ResetProposerStatus(tipBlock cctx.BlockInterface, s.mtx.Lock() defer s.mtx.Unlock() - if bytes.Equal(s.getHighQC().GetProposalId(), tipBlock.GetBlockid()) { + if bytes.Equal(s.getHighQC().GetProposalId(), tipBlock.GetBlockid()) && + s.validNewHighQC(tipBlock.GetBlockid(), validators) { // 此处需要获取带签名的完整Justify return false, s.getCompleteHighQC(), nil } - // 单个节点不存在投票验证的hotstuff流程,因此返回true - if len(validators) == 1 { - return false, nil, nil - } // 从当前TipBlock开始往前追溯,交给smr根据状态进行回滚。 // 在本地状态树上找到指代TipBlock的QC,若找不到,则在状态树上找和TipBlock同一分支上的最近值 @@ -384,10 +381,43 @@ func (s *Smr) ProcessProposal(viewNumber int64, proposalID []byte, parentID []by } go s.p2p.SendMessage(createNewBCtx(), netMsg, p2p.WithAccounts(s.removeLocalValidator(validatesIpInfo))) s.localProposal.Store(utils.F(proposalID), proposal.Timestamp) - s.log.Debug("smr:ProcessProposal::new proposal has been made", "address", s.address, "proposalID", utils.F(proposalID)) + // 若为单候选人情况,则此处需要特殊处理,矿工需要给自己提前签名 + if len(validatesIpInfo) == 1 { + s.voteToSelf(viewNumber, proposalID, parentQuorumCert) + } + s.log.Debug("smr:ProcessProposal::new proposal has been made", "address", s.address, "proposalID", utils.F(proposalID), "target", validatesIpInfo) return nil } +func (s *Smr) voteToSelf(viewNumber int64, proposalID []byte, parent storage.QuorumCertInterface) { + selfVote := &storage.VoteInfo{ + ProposalId: proposalID, + ProposalView: viewNumber, + ParentId: parent.GetProposalId(), + } + selfLedgerInfo := &storage.LedgerCommitInfo{ + VoteInfoHash: proposalID, + } + selfQC := storage.NewQuorumCert(selfVote, selfLedgerInfo, nil) + selfSign, err := s.cryptoClient.SignVoteMsg(proposalID) + if err != nil { + s.log.Error("smr::voteProposal::voteToSelf error", "err", err) + return + } + s.qcVoteMsgs.LoadOrStore(utils.F(proposalID), []*chainedBftPb.QuorumCertSign{selfSign}) + selfNode := &storage.ProposalNode{ + In: selfQC, + } + if err := s.qcTree.UpdateQcStatus(selfNode); err != nil { + s.log.Error("smr::voteProposal::updateQcStatus error", "err", err) + return + } + // 更新本地smr状态机 + s.pacemaker.AdvanceView(selfQC) + s.qcTree.UpdateHighQC(proposalID) + s.log.Debug("smr:voteProposal::done local voting", "address", s.address, "proposalID", utils.F(proposalID)) +} + // reloadJustifyQC 与LibraBFT不同,返回一个指定的parentQC func (s *Smr) reloadJustifyQC(parentID []byte) (storage.QuorumCertInterface, error) { // 第一次proposal,highQC==rootQC==genesisQC @@ -695,6 +725,9 @@ func (s *Smr) validNewHighQC(inProposalId []byte, validators []string) bool { if !ok { return false } + if len(validators) == 1 { + return len(signs) == len(validators) + } return s.saftyrules.CalVotesThreshold(len(signs), len(validators)) } From e9bc306e2b2ca188c21946614be47d8d3d6655d5 Mon Sep 17 00:00:00 2001 From: JIN Tong Date: Wed, 17 Nov 2021 20:21:09 +0800 Subject: [PATCH 3/3] xpoa: parameter version works compatibly with int and string type (#286) * xpoa: parameter version works compatibly with int and string type. --- bcs/consensus/xpoa/common.go | 27 +++++++++++++++++++++- bcs/consensus/xpoa/common_test.go | 37 +++++++++++++++++++++++++++++++ bcs/consensus/xpoa/schedule.go | 8 +------ bcs/consensus/xpoa/xpoa.go | 5 ++--- 4 files changed, 66 insertions(+), 11 deletions(-) diff --git a/bcs/consensus/xpoa/common.go b/bcs/consensus/xpoa/common.go index a67372e76..bb94a66e1 100644 --- a/bcs/consensus/xpoa/common.go +++ b/bcs/consensus/xpoa/common.go @@ -3,6 +3,7 @@ package xpoa import ( "encoding/json" "errors" + "strconv" ) var ( @@ -30,7 +31,6 @@ const ( ) type xpoaConfig struct { - Version string `json:"version,omitempty"` // 每个候选人每轮出块个数 BlockNum int64 `json:"block_num"` // 单位为毫秒 @@ -103,3 +103,28 @@ func (a aksSlice) Less(i, j int) bool { } return a[j].Weight < a[i].Weight } + +type xpoaStringConfig struct { + Version string `json:"version,omitempty"` +} + +type xpoaIntConfig struct { + Version int64 `json:"version,omitempty"` +} + +// ParseVersion 支持string格式和int格式的version type +func ParseVersion(cfg string) (int64, error) { + intVersion := xpoaIntConfig{} + if err := json.Unmarshal([]byte(cfg), &intVersion); err == nil { + return intVersion.Version, nil + } + strVersion := xpoaStringConfig{} + if err := json.Unmarshal([]byte(cfg), &strVersion); err != nil { + return 0, err + } + version, err := strconv.ParseInt(strVersion.Version, 10, 64) + if err != nil { + return 0, err + } + return version, nil +} diff --git a/bcs/consensus/xpoa/common_test.go b/bcs/consensus/xpoa/common_test.go index 06fe22a22..93fa02ec0 100644 --- a/bcs/consensus/xpoa/common_test.go +++ b/bcs/consensus/xpoa/common_test.go @@ -42,3 +42,40 @@ func TestLoadValidatorsMultiInfo(t *testing.T) { t.Error("TestLoadValidatorsMultiInfo error 2.") } } + +func TestParseVersion(t *testing.T) { + strVersion := `{ + "version": "2" + }` + v, err := ParseVersion(strVersion) + if err != nil { + t.Error("ParseVersion err, err: ", err) + return + } + if v != 2 { + t.Error("ParseVersion err, v: ", v) + return + } + intVersion := `{ + "version": 3 + }` + v, err = ParseVersion(intVersion) + if err != nil { + t.Error("ParseVersion err, err: ", err) + return + } + if v != 3 { + t.Error("ParseVersion err, v: ", v) + return + } + empryVersion := `{}` + v, err = ParseVersion(empryVersion) + if err != nil { + t.Error("ParseVersion err, err: ", err) + return + } + if v != 0 { + t.Error("ParseVersion err, v: ", v) + return + } +} diff --git a/bcs/consensus/xpoa/schedule.go b/bcs/consensus/xpoa/schedule.go index 0e3df7e03..31ca35d03 100644 --- a/bcs/consensus/xpoa/schedule.go +++ b/bcs/consensus/xpoa/schedule.go @@ -2,7 +2,6 @@ package xpoa import ( "fmt" - "strconv" "time" common "github.com/xuperchain/xupercore/kernel/consensus/base/common" @@ -35,12 +34,7 @@ type xpoaSchedule struct { ledger cctx.LedgerRely } -func NewXpoaSchedule(xconfig *xpoaConfig, cCtx context.ConsensusCtx, startHeight int64) *xpoaSchedule { - version, err := strconv.ParseInt(xconfig.Version, 10, 64) - if err != nil { - cCtx.XLog.Error("Xpoa::NewXpoaSchedule::Parse version error.", "err", err) - return nil - } +func NewXpoaSchedule(xconfig *xpoaConfig, cCtx context.ConsensusCtx, startHeight, version int64) *xpoaSchedule { s := xpoaSchedule{ address: cCtx.Network.PeerInfo().Account, period: xconfig.Period, diff --git a/bcs/consensus/xpoa/xpoa.go b/bcs/consensus/xpoa/xpoa.go index dae382097..3266c06bb 100644 --- a/bcs/consensus/xpoa/xpoa.go +++ b/bcs/consensus/xpoa/xpoa.go @@ -3,7 +3,6 @@ package xpoa import ( "bytes" "encoding/json" - "strconv" "time" "github.com/xuperchain/xupercore/kernel/common/xcontext" @@ -66,13 +65,13 @@ func NewXpoaConsensus(cCtx cctx.ConsensusCtx, cCfg def.ConsensusConfig) base.Con cCtx.XLog.Error("consensus:xpoa:NewXpoaConsensus: xpoa struct unmarshal error", "error", err) return nil } - version, err := strconv.ParseInt(xconfig.Version, 10, 64) + version, err := ParseVersion(cCfg.Config) if err != nil { cCtx.XLog.Error("consensus:xpoa:NewXpoaConsensus: version error", "error", err) return nil } // create xpoaSchedule - schedule := NewXpoaSchedule(xconfig, cCtx, cCfg.StartHeight) + schedule := NewXpoaSchedule(xconfig, cCtx, cCfg.StartHeight, version) if schedule == nil { cCtx.XLog.Error("consensus:xpoa:NewXpoaSchedule error") return nil