diff --git a/zboxcore/sdk/rollback.go b/zboxcore/sdk/rollback.go index f623dbc63..f27931c46 100644 --- a/zboxcore/sdk/rollback.go +++ b/zboxcore/sdk/rollback.go @@ -316,6 +316,10 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) { return Broken, blobberRes, common.NewError("check_alloc_status_failed", markerError.Error()) } + if a.StorageVersion == StorageV2 { + return a.checkStatusV2(markerChan, blobberRes) + } + versionMap := make(map[string][]*RollbackBlobber) var ( @@ -472,3 +476,80 @@ func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) { wg.Wait() } + +func (a *Allocation) checkStatusV2(markerChan chan *RollbackBlobber, blobStatus []BlobberStatus) (AllocStatus, []BlobberStatus, error) { + var ( + latestVersionMap = make(map[string][]*RollbackBlobber) + allVersionMap = make(map[string][]*RollbackBlobber) + consensusVersion string + allVersionConensus string + ) + + for rb := range markerChan { + if rb == nil || rb.lpm.LatestWM == nil { + continue + } + version := rb.lpm.LatestWM.AllocationRoot + latestVersionMap[version] = append(latestVersionMap[version], rb) + if len(latestVersionMap[version]) > a.DataShards { + consensusVersion = version + } + allVersionMap[version] = append(allVersionMap[version], rb) + if rb.lpm.PrevWM != nil && rb.lpm.PrevWM.AllocationRoot != version { + allVersionMap[rb.lpm.PrevWM.AllocationRoot] = append(allVersionMap[rb.lpm.PrevWM.AllocationRoot], rb) + if len(allVersionMap[rb.lpm.PrevWM.AllocationRoot]) >= a.DataShards { + allVersionConensus = rb.lpm.PrevWM.AllocationRoot + } + } + if len(allVersionMap[version]) >= a.DataShards { + allVersionConensus = version + } + } + + if consensusVersion != "" { + a.allocationRoot = consensusVersion + return Commit, blobStatus, nil + } + + if allVersionConensus == "" { + return Broken, blobStatus, nil + } + + if len(latestVersionMap[allVersionConensus]) >= a.DataShards { + a.allocationRoot = allVersionConensus + return Repair, blobStatus, nil + } + l.Logger.Info("Rolling back to previous version") + fullConsensus := len(allVersionMap[allVersionConensus]) - len(latestVersionMap[allVersionConensus]) + consensusThresh := a.DataShards - len(latestVersionMap[allVersionConensus]) + var successCnt int32 + wg := &sync.WaitGroup{} + + for _, rb := range allVersionMap[allVersionConensus] { + if rb.lpm.LatestWM.AllocationRoot == allVersionConensus { + continue + } + wg.Add(1) + go func(rb *RollbackBlobber) { + defer wg.Done() + err := rb.processRollback(context.TODO(), a.Tx) + if err != nil { + rb.commitResult = ErrorCommitResult(err.Error()) + l.Logger.Error("error during rollback", zap.Error(err)) + } else { + atomic.AddInt32(&successCnt, 1) + rb.commitResult = SuccessCommitResult() + } + }(rb) + } + wg.Wait() + if successCnt < int32(consensusThresh) { + return Broken, blobStatus, common.NewError("rollback_failed", "Rollback failed") + } + a.allocationRoot = allVersionConensus + if successCnt == int32(fullConsensus) { + return Repair, blobStatus, nil + } + + return Commit, blobStatus, nil +}