Skip to content

Commit

Permalink
Merge pull request #1501 from 0chain/feat/opt-repair
Browse files Browse the repository at this point in the history
Optional repair
  • Loading branch information
dabasov authored May 30, 2024
2 parents 5bf2ed6 + f107a26 commit 4e6be22
Show file tree
Hide file tree
Showing 11 changed files with 126 additions and 47 deletions.
51 changes: 51 additions & 0 deletions wasmsdk/blobber.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,3 +940,54 @@ func getBlobbers(stakable bool) ([]*sdk.Blobber, error) {
}
return blobbs, err
}

func repairAllocation(allocationID string) error {
alloc, err := getAllocation(allocationID)
if err != nil {
return err
}
statusBar := sdk.NewRepairBar(allocationID)
err = alloc.RepairAlloc(statusBar)
if err != nil {
return err
}
statusBar.Wait()
return statusBar.CheckError()
}

func checkAllocStatus(allocationID string) (string, error) {
alloc, err := getAllocation(allocationID)
if err != nil {
return "", err
}
status, blobberStatus, err := alloc.CheckAllocStatus()
var statusStr string
switch status {
case sdk.Repair:
statusStr = "repair"
case sdk.Broken:
statusStr = "broken"
default:
statusStr = "ok"
}
statusResult := CheckStatusResult{
Status: statusStr,
Err: err,
BlobberStatus: blobberStatus,
}
statusBytes, err := json.Marshal(statusResult)
if err != nil {
return "", err
}

return string(statusBytes), err
}

func skipStatusCheck(allocationID string, checkStatus bool) error {
alloc, err := getAllocation(allocationID)
if err != nil {
return err
}
alloc.SetCheckStatus(checkStatus)
return nil
}
16 changes: 8 additions & 8 deletions wasmsdk/demo/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,10 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
}

const getWallet = () => {
const clientID = "b1d533fa60431a76014c4f94a7e8e19a3b1a7f34eebd4cacd29a8dd948b3844c"
const publicKey = "1f30a07b34146435cabc3244a1452fb8933f6982e0b33f384e5d25b9d6531e24e342003349990483f8481052a0748cbc72355d9cbda621ec914f7ed03c127791"
const privateKey = "fb98f46969be6921586e547b0f6f70c6b92d7823359f00bafa3900523910661a"
const mnemonic = "snake second property crush thrive monkey already lake fire sort cheap lake census adult this cloth panic filter taste punch pistol project rack obscure"
const clientID = "ab5b8ab19abe574d92238e0a5dc0c2abd53614cd12cab6b09576fab2a0f64a83"
const publicKey = "d430b1b33eab43bd09886e6125e246600e36e3f88d658d00bf836aa564251e2364bccbfb18f1cb1b5fa4d96ba602b59bc009e6e0223b6a8ebdafc14822b78d23"
const privateKey = "0c2cb85c8c33b3cc35dc12b4754e61ae9488bba857982c382caa461ecac19d19"
const mnemonic = "grace fiscal menu squeeze certain drum ostrich lunar ugly remember cousin observe oxygen brisk toward notable shoot cushion develop marble open aspect couch noise"
return {
clientID, publicKey, privateKey, mnemonic
}
Expand Down Expand Up @@ -211,7 +211,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates

let network = query.get('network')
if (!network || network == 'undefined') {
network = "dev.zus.network"
network = "mainnet.zus.network"
}

const blockWorker = 'https://' + network + '/dns';
Expand Down Expand Up @@ -441,7 +441,7 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
alert("please selection allocationID")
return
}
const { list = [] } = await goWasm.sdk.listObjects(allocationId, '/')
const { list = [] } = await goWasm.sdk.listObjects(allocationId, '/',-1,50)
files = list || []
bindFiles()
})
Expand Down Expand Up @@ -799,15 +799,15 @@ <h2>please download zcn.wasm from https://github.com/0chain/gosdk/releases/lates
}

objects.push({
remotePath: path,
//remotePath: path,
downloadOp: 1,
numBlocks: 0,
downloadToDisk: true,
})
let stringifiedArray = JSON.stringify(objects);

try {
const results = await goWasm.sdk.multiDownload(allocationId, stringifiedArray, '', '')
const results = await goWasm.sdk.multiDownload('', stringifiedArray, 'eyJjbGllbnRfaWQiOiIiLCJvd25lcl9pZCI6IjI2ZTIzMjFhZWMxZmEyZDY1NGQ1MDQ5OWY3ZjhmYWJhNjNkYWMxYTExYTQwZDU3NDJkNDAzMWJmMzEzMzAxMTYiLCJhbGxvY2F0aW9uX2lkIjoiMDAwMzAzOTA1MGI3ZDdiM2FlNmI3MGEwZTVjMWU4ZjRhOTkxNzc1YWJiOTQ2NjljMDg4YzczNzJlMzYwMzkyYiIsImZpbGVfcGF0aF9oYXNoIjoiYWEzODE0NTM2ZWI2OWQwNjU4ZWM0OTgyZmE3ZTIwM2I2ZGI2ZWExYmU4ZmMxODRiMWJhOTZhMTk3NmMwM2JlOCIsImFjdHVhbF9maWxlX2hhc2giOiIxMjUwMjJhZGRiZTIwZDNhOWUzYjcxZTA0NjUzZjY3YiIsImZpbGVfbmFtZSI6InVidW50dS0yMi4wNC40LWxpdmUtc2VydmVyLWFtZDY0LmlzbyIsInJlZmVyZW5jZV90eXBlIjoiZiIsImV4cGlyYXRpb24iOjAsInRpbWVzdGFtcCI6MTcxNjM3ODIxNiwiZW5jcnlwdGVkIjpmYWxzZSwic2lnbmF0dXJlIjoiYmEzNzQ1NzlmZTczZDc1MWIwMTNiMjM2NjUzZDRiMGYyYzNjZDJlYTMyNTFkODg0MmRiNWQxNTlhNjBiN2ExMiJ9', '')
console.log(JSON.stringify(results))
} catch (e) {
alert(e)
Expand Down
3 changes: 3 additions & 0 deletions wasmsdk/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ func main() {
"send": send,
"cancelUpload": cancelUpload,
"pauseUpload": pauseUpload,
"repairAllocation": repairAllocation,
"checkAllocStatus": checkAllocStatus,
"skipStatusCheck": skipStatusCheck,

// player
"play": play,
Expand Down
8 changes: 8 additions & 0 deletions wasmsdk/response.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package main

import "github.com/0chain/gosdk/zboxcore/sdk"

type FileCommandResponse struct {
CommandSuccess bool `json:"commandSuccess,omitempty"`
Error string `json:"error,omitempty"`
Expand All @@ -12,3 +14,9 @@ type DownloadCommandResponse struct {
FileName string `json:"fileName,omitempty"`
Url string `json:"url,omitempty"`
}

type CheckStatusResult struct {
Status string `json:"status"`
Err error `json:"error"`
BlobberStatus []sdk.BlobberStatus `json:"blobberStatus"`
}
4 changes: 4 additions & 0 deletions zboxcore/sdk/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ func SetWasm() {
extraCount = 0
}

func (a *Allocation) SetCheckStatus(checkStatus bool) {
a.checkStatus = checkStatus
}

func getPriceRange(name string) (PriceRange, error) {
conf, err := GetStorageSCConfig()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/copyworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (req *CopyRequest) ProcessCopy() error {
defer writeMarkerMutex.Unlock(req.ctx, req.copyMask, req.blobbers, time.Minute, req.connectionID) //nolint: errcheck

//Check if the allocation is to be repaired or rolled back
status, err := req.allocationObj.CheckAllocStatus()
status, _, err := req.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status: ", err)
return fmt.Errorf("Copy failed: %s", err.Error())
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/moveworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (req *MoveRequest) ProcessMove() error {
}

//Check if the allocation is to be repaired or rolled back
status, err := req.allocationObj.CheckAllocStatus()
status, _, err := req.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status: ", err)
return fmt.Errorf("Move failed: %s", err.Error())
Expand Down
25 changes: 3 additions & 22 deletions zboxcore/sdk/multi_operation_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (mo *MultiOperation) Process() error {
start = time.Now()
status := Commit
if !mo.isRepair && !mo.allocationObj.checkStatus {
status, err = mo.allocationObj.CheckAllocStatus()
status, _, err = mo.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status", err)
if singleClientMode {
Expand All @@ -251,34 +251,15 @@ func (mo *MultiOperation) Process() error {
return fmt.Errorf("Check allocation status failed: %s", err.Error())
}
if status == Repair {
logger.Logger.Info("Repairing allocation")
if singleClientMode {
mo.allocationObj.commitMutex.Unlock()
} else {
writeMarkerMutex.Unlock(mo.ctx, mo.operationMask, mo.allocationObj.Blobbers, time.Minute, mo.connectionID) //nolint: errcheck
}
statusBar := NewRepairBar(mo.allocationObj.ID)
if statusBar == nil {
for _, op := range mo.operations {
op.Error(mo.allocationObj, 0, ErrRetryOperation)
}
return ErrRetryOperation
}
statusBar.wg.Add(1)
err = mo.allocationObj.RepairAlloc(statusBar)
if err != nil {
return err
}
statusBar.wg.Wait()
if statusBar.success {
l.Logger.Info("Repair success")
} else {
l.Logger.Error("Repair failed")
}
for _, op := range mo.operations {
op.Error(mo.allocationObj, 0, ErrRetryOperation)
op.Error(mo.allocationObj, 0, ErrRepairRequired)
}
return ErrRetryOperation
return ErrRepairRequired
}
}
if singleClientMode {
Expand Down
2 changes: 1 addition & 1 deletion zboxcore/sdk/renameworker.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (req *RenameRequest) ProcessRename() error {
defer writeMarkerMutex.Unlock(req.ctx, req.renameMask, req.blobbers, time.Minute, req.connectionID) //nolint: errcheck

//Check if the allocation is to be repaired or rolled back
status, err := req.allocationObj.CheckAllocStatus()
status, _, err := req.allocationObj.CheckAllocStatus()
if err != nil {
logger.Logger.Error("Error checking allocation status: ", err)
return fmt.Errorf("rename failed: %s", err.Error())
Expand Down
15 changes: 14 additions & 1 deletion zboxcore/sdk/repairCallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,25 @@ func NewRepairBar(allocID string) *StatusBar {
if !mutMap[allocID].TryLock() {
return nil
}
wg := &sync.WaitGroup{}
wg.Add(1)
return &StatusBar{
wg: &sync.WaitGroup{},
wg: wg,
allocID: allocID,
}
}

func (s *StatusBar) Wait() {
s.wg.Wait()
}

func (s *StatusBar) CheckError() error {
if !s.success {
return s.err
}
return nil
}

func mutUnlock(allocID string) {
mapLock.Lock()
mutMap[allocID].Unlock()
Expand Down
45 changes: 32 additions & 13 deletions zboxcore/sdk/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,21 @@ const (
Rollback
)

var ErrRetryOperation = errors.New("retry_operation")
var (
ErrRetryOperation = errors.New("retry_operation")
ErrRepairRequired = errors.New("repair_required")
)

type RollbackBlobber struct {
blobber *blockchain.StorageNode
commitResult *CommitResult
lpm *LatestPrevWriteMarker
blobIndex int
}

type BlobberStatus struct {
ID string
Status string
}

func GetWritemarker(allocID, allocTx, id, baseUrl string) (*LatestPrevWriteMarker, error) {
Expand Down Expand Up @@ -242,23 +251,29 @@ func (rb *RollbackBlobber) processRollback(ctx context.Context, tx string) error
return thrown.New("rolback_error", fmt.Sprint("Rollback failed"))
}

func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {
func (a *Allocation) CheckAllocStatus() (AllocStatus, []BlobberStatus, error) {

wg := &sync.WaitGroup{}
markerChan := make(chan *RollbackBlobber, len(a.Blobbers))
var errCnt int32
var markerError error
for _, blobber := range a.Blobbers {
blobberRes := make([]BlobberStatus, len(a.Blobbers))
for ind, blobber := range a.Blobbers {

wg.Add(1)
go func(blobber *blockchain.StorageNode) {
go func(blobber *blockchain.StorageNode, ind int) {

defer wg.Done()
blobStatus := BlobberStatus{
ID: blobber.ID,
Status: "available",
}
wr, err := GetWritemarker(a.ID, a.Tx, blobber.ID, blobber.Baseurl)
if err != nil {
atomic.AddInt32(&errCnt, 1)
markerError = err
l.Logger.Error("error during getWritemarker", zap.Error(err))
blobStatus.Status = "unavailable"
}
if wr == nil {
markerChan <- nil
Expand All @@ -267,15 +282,17 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {
blobber: blobber,
lpm: wr,
commitResult: &CommitResult{},
blobIndex: ind,
}
}
}(blobber)
blobberRes[ind] = blobStatus
}(blobber, ind)

}
wg.Wait()
close(markerChan)
if a.ParityShards > 0 && errCnt > int32(a.ParityShards) {
return Broken, common.NewError("check_alloc_status_failed", markerError.Error())
return Broken, blobberRes, common.NewError("check_alloc_status_failed", markerError.Error())
}

versionMap := make(map[string][]*RollbackBlobber)
Expand Down Expand Up @@ -312,18 +329,20 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {
}

if len(versionMap) < 2 {
return Commit, nil
return Commit, blobberRes, nil
}

req := a.DataShards

if len(versionMap[latestVersion]) > req {
return Commit, nil
return Commit, blobberRes, nil
}

if len(versionMap[latestVersion]) >= req || len(versionMap[prevVersion]) >= req || len(versionMap) > 2 {
// TODO: Return Repair after refactoring the repair function
return Repair, nil
for _, rb := range versionMap[prevVersion] {
blobberRes[rb.blobIndex].Status = "repair"
}
return Repair, blobberRes, nil
} else {
l.Logger.Info("versionMapLen", zap.Int("versionMapLen", len(versionMap)), zap.Int("latestLen", len(versionMap[latestVersion])), zap.Int("prevLen", len(versionMap[prevVersion])))
}
Expand Down Expand Up @@ -351,14 +370,14 @@ func (a *Allocation) CheckAllocStatus() (AllocStatus, error) {

wg.Wait()
if errCnt > int32(fullConsensus) {
return Broken, common.NewError("rollback_failed", "Rollback failed")
return Broken, blobberRes, common.NewError("rollback_failed", "Rollback failed")
}

if errCnt == int32(fullConsensus) {
return Repair, nil
return Repair, blobberRes, nil
}

return Rollback, nil
return Rollback, blobberRes, nil
}

func (a *Allocation) RollbackWithMask(mask zboxutil.Uint128) {
Expand Down

0 comments on commit 4e6be22

Please sign in to comment.