Skip to content

Commit

Permalink
🐛 Data synchronization accidentally deletes local files siyuan-note/s…
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Nov 12, 2023
1 parent 274a4ae commit a00187a
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 8 deletions.
2 changes: 1 addition & 1 deletion backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (repo *Repo) uploadTagIndex(tag, id string, context map[string]interface{})
apiPut++

// 上传标签
length, err = repo.updateCloudRef("refs/tags/"+tag, context)
_, length, err = repo.updateCloudRef("refs/tags/"+tag, context)
uploadFileCount++
uploadBytes += length
apiPut++
Expand Down
1 change: 0 additions & 1 deletion cloud/siyuan.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (siyuan *SiYuan) UploadObject(filePath string, overwrite bool) (length int6

func (siyuan *SiYuan) DownloadObject(filePath string) (ret []byte, err error) {
key := path.Join("siyuan", siyuan.Conf.UserID, "repo", siyuan.Conf.Dir, filePath)
key += "?r=" + gulu.Rand.String(7)
resp, err := httpclient.NewCloudFileRequest2m().Get(siyuan.Endpoint + key)
if nil != err {
err = fmt.Errorf("download object [%s] failed: %s", key, err)
Expand Down
67 changes: 61 additions & 6 deletions sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,8 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS

// 以下步骤是更新云端相关索引数据

var errs []error
errLock := sync.Mutex{}
waitGroup := &sync.WaitGroup{}
// 上传索引
waitGroup.Add(1)
Expand All @@ -471,7 +473,9 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
length, uploadErr := repo.uploadIndex(latest, context)
if nil != uploadErr {
logging.LogErrorf("upload latest index failed: %s", uploadErr)
err = uploadErr
errLock.Lock()
errs = append(errs, uploadErr)
errLock.Unlock()
return
}
trafficStat.UploadFileCount++
Expand All @@ -484,12 +488,48 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
go func() {
defer waitGroup.Done()

length, uploadErr := repo.updateCloudRef("refs/latest", context)
latestData, length, uploadErr := repo.updateCloudRef("refs/latest", context)
if nil != uploadErr {
logging.LogErrorf("update cloud [refs/latest] failed: %s", uploadErr)
err = uploadErr
errLock.Lock()
errs = append(errs, uploadErr)
errLock.Unlock()
return
}

// 确认上传 refs/latest 成功,因为路径不变,存储可能会有缓存,导致后续下载 refs/latest 时返回的是旧数据
// Data synchronization accidentally deletes local files https://github.com/siyuan-note/siyuan/issues/9631
confirmed := false
for i := 0; i < 32; i++ {
downloadedData, downloadErr := repo.downloadCloudObject("refs/latest")
if nil != downloadErr {
logging.LogWarnf("confirm [%d] uploaded cloud [refs/latest] failed: %s", i, downloadErr)
continue
}
if !bytes.Equal(latestData, downloadedData) {
logging.LogWarnf("confirm [%d] uploaded cloud [refs/latest] failed [local=%s, downloaded=%s]", i, latestData, downloadedData)
latestData, length, uploadErr = repo.updateCloudRef("refs/latest", context)
if nil != uploadErr {
logging.LogWarnf("fix [%d] uploaded cloud [refs/latest, %s] failed: %s", i, latestData, uploadErr)
} else {
logging.LogWarnf("fix [%d] uploaded cloud [refs/latest, %s]", i, latestData)
}
continue
}
confirmed = true
if 0 < i {
logging.LogInfof("confirmed [%d] uploaded cloud [refs/latest]", i)
}
break
}
if !confirmed {
logging.LogErrorf("final confirm uploaded cloud [refs/latest] failed: data not equal [local=%s, downloaded=%s]", latestData, latestData)
errLock.Lock()
errs = append(errs, errors.New("confirm uploaded cloud [refs/latest] failed"))
errLock.Unlock()
return
}

trafficStat.UploadFileCount++
trafficStat.UploadBytes += length
trafficStat.APIPut++
Expand All @@ -503,7 +543,9 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
downloadBytes, uploadBytes, uploadErr := repo.updateCloudIndexesV2(latest, context)
if nil != uploadErr {
logging.LogErrorf("update cloud indexes failed: %s", uploadErr)
err = uploadErr
errLock.Lock()
errs = append(errs, uploadErr)
errLock.Unlock()
return
}
trafficStat.DownloadFileCount++
Expand All @@ -522,7 +564,9 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
uploadErr := repo.updateCloudCheckIndex(checkIndex, context)
if nil != uploadErr {
logging.LogErrorf("update cloud check index failed: %s", uploadErr)
err = uploadErr
errLock.Lock()
errs = append(errs, uploadErr)
errLock.Unlock()
return
}
}()
Expand All @@ -536,6 +580,10 @@ func (repo *Repo) updateCloudIndexes(latest *entity.Index, trafficStat *TrafficS
}()

waitGroup.Wait()

if 0 < len(errs) {
err = errs[0]
}
return
}

Expand Down Expand Up @@ -778,8 +826,15 @@ func (repo *Repo) existDataFile(files []*entity.File, file *entity.File) bool {
return false
}

func (repo *Repo) updateCloudRef(ref string, context map[string]interface{}) (uploadBytes int64, err error) {
func (repo *Repo) updateCloudRef(ref string, context map[string]interface{}) (data []byte, uploadBytes int64, err error) {
eventbus.Publish(eventbus.EvtCloudBeforeUploadRef, context, ref)
absFilePath := filepath.Join(repo.cloud.GetConf().RepoPath, ref)
data, err = os.ReadFile(absFilePath)
if nil != err {
logging.LogErrorf("read ref [%s] failed: %s", ref, err)
return
}

length, err := repo.cloud.UploadObject(ref, true)
uploadBytes += length
return
Expand Down

0 comments on commit a00187a

Please sign in to comment.