Skip to content

Commit

Permalink
⚡ Reduce startup time when sync is enabled siyuan-note/siyuan#13589
Browse files Browse the repository at this point in the history
  • Loading branch information
88250 committed Dec 23, 2024
1 parent b6410d9 commit e7887b6
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 72 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ require (
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.48.2 // indirect
github.com/refraction-networking/utls v1.6.7 // indirect
github.com/vmihailenco/msgpack/v5 v5.3.5 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
go.uber.org/mock v0.5.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/studio-b12/gowebdav v0.9.0 h1:1j1sc9gQnNxbXXM4M/CebPOX4aXYtr7MojAVcN4dHjU=
github.com/studio-b12/gowebdav v0.9.0/go.mod h1:bHA7t77X/QFExdeAnDzK6vKM34kEZAcE1OX4MfiwjkE=
github.com/vmihailenco/msgpack v4.0.4/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk=
github.com/vmihailenco/msgpack/v5 v5.3.5 h1:5gO0H1iULLWGhs2H5tbAHIZTV8/cYafcFOr9znI5mJU=
github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down
68 changes: 67 additions & 1 deletion ref.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ import (
"errors"
"os"
"path/filepath"
"time"

"github.com/88250/go-humanize"
"github.com/88250/gulu"
"github.com/siyuan-note/dejavu/entity"
"github.com/siyuan-note/filelock"
"github.com/siyuan-note/logging"
"github.com/vmihailenco/msgpack/v5"
)

var ErrNotFoundIndex = errors.New("not found index")
Expand All @@ -51,7 +54,16 @@ func (repo *Repo) Latest() (ret *entity.Index, err error) {
return
}

// FullIndex 描述了完整的索引结构。
type FullIndex struct {
ID string `json:"id"`
Files []*entity.File `json:"files"`
Spec int `json:"spec"`
}

func (repo *Repo) UpdateLatest(index *entity.Index) (err error) {
start := time.Now()

refs := filepath.Join(repo.Path, "refs")
err = os.MkdirAll(refs, 0755)
if nil != err {
Expand All @@ -61,7 +73,61 @@ func (repo *Repo) UpdateLatest(index *entity.Index) (err error) {
if nil != err {
return
}
logging.LogInfof("updated local latest to [%s]", index.String())

fullLatestPath := filepath.Join(repo.Path, "full-latest.json")
files, err := repo.GetFiles(index)
if nil != err {
return
}

fullIndex := &FullIndex{ID: index.ID, Files: files, Spec: 0}
data, err := msgpack.Marshal(fullIndex)
if nil != err {
return
}
err = gulu.File.WriteFileSafer(fullLatestPath, data, 0644)
if nil != err {
return
}

logging.LogInfof("updated local latest to [%s], full latest [size=%s], cost [%s]", index.String(), humanize.Bytes(uint64(len(data))), time.Since(start))
return
}

func (repo *Repo) getFullLatest(latest *entity.Index) (ret *FullIndex) {
start := time.Now()

fullLatestPath := filepath.Join(repo.Path, "full-latest.json")
if !gulu.File.IsExist(fullLatestPath) {
return
}

data, err := os.ReadFile(fullLatestPath)
if nil != err {
logging.LogErrorf("read full latest failed: %s", err)
return
}

ret = &FullIndex{}
if err = msgpack.Unmarshal(data, ret); nil != err {
logging.LogErrorf("unmarshal full latest [%s] failed: %s", fullLatestPath, err)
ret = nil
if err = os.RemoveAll(fullLatestPath); nil != err {
logging.LogErrorf("remove full latest [%s] failed: %s", fullLatestPath, err)
}
return
}

if ret.ID != latest.ID {
logging.LogErrorf("full latest ID [%s] not match latest ID [%s]", ret.ID, latest.ID)
ret = nil
if err = os.RemoveAll(fullLatestPath); nil != err {
logging.LogErrorf("remove full latest [%s] failed: %s", fullLatestPath, err)
}
return
}

logging.LogInfof("got local full latest [size=%s], cost [%s]", humanize.Bytes(uint64(len(data))), time.Since(start))
return
}

Expand Down
149 changes: 78 additions & 71 deletions repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,90 +690,96 @@ func (repo *Repo) index0(memo string, checkChunks bool, context map[string]inter
init = true
}

var workerErrs []error
workerErrLock := sync.Mutex{}
var upserts, removes, latestFiles []*entity.File
if !init {
start = time.Now()
count := atomic.Int32{}
total := len(files)
eventbus.Publish(eventbus.EvtIndexBeforeGetLatestFiles, context, total)
lock := &sync.Mutex{}
waitGroup := &sync.WaitGroup{}
p, _ := ants.NewPoolWithFunc(4, func(arg interface{}) {
defer waitGroup.Done()

count.Add(1)
eventbus.Publish(eventbus.EvtIndexGetLatestFile, context, int(count.Load()), total)

fileID := arg.(string)
file, getErr := repo.store.GetFile(fileID)
if nil != getErr {
logging.LogErrorf("get file [%s] failed: %s", fileID, getErr)
workerErrLock.Lock()
workerErrs = append(workerErrs, ErrRepoFatal)
workerErrLock.Unlock()
return
}

lock.Lock()
latestFiles = append(latestFiles, file)
lock.Unlock()
fullLatest := repo.getFullLatest(latest)
if nil != fullLatest {
latestFiles = fullLatest.Files
} else {
var workerErrs []error
workerErrLock := sync.Mutex{}
if !init {
start = time.Now()
count := atomic.Int32{}
total := len(files)
eventbus.Publish(eventbus.EvtIndexBeforeGetLatestFiles, context, total)
lock := &sync.Mutex{}
waitGroup := &sync.WaitGroup{}
p, _ := ants.NewPoolWithFunc(4, func(arg interface{}) {
defer waitGroup.Done()

count.Add(1)
eventbus.Publish(eventbus.EvtIndexGetLatestFile, context, int(count.Load()), total)

fileID := arg.(string)
file, getErr := repo.store.GetFile(fileID)
if nil != getErr {
logging.LogErrorf("get file [%s] failed: %s", fileID, getErr)
workerErrLock.Lock()
workerErrs = append(workerErrs, ErrRepoFatal)
workerErrLock.Unlock()
return
}

if checkChunks { // 仅在非移动端校验,因为移动端私有数据空间不会存在外部操作导致分块损坏的情况 https://github.com/siyuan-note/siyuan/issues/13216
// Check local data chunk integrity before data synchronization https://github.com/siyuan-note/siyuan/issues/8853
for _, chunk := range file.Chunks {
info, statErr := repo.store.Stat(chunk)
if nil == statErr {
continue
}
lock.Lock()
latestFiles = append(latestFiles, file)
lock.Unlock()

if nil != info {
logging.LogWarnf("stat file [%s, %s, %s, %d] chunk [%s, perm=%04o] failed: %s",
file.ID, file.Path, time.UnixMilli(file.Updated).Format("2006-01-02 15:04:05"), file.Size, chunk, info.Mode().Perm(), statErr)
} else {
logging.LogWarnf("stat file [%s, %s, %s, %d] chunk [%s] failed: %s",
file.ID, file.Path, time.UnixMilli(file.Updated).Format("2006-01-02 15:04:05"), file.Size, chunk, statErr)
}
if checkChunks { // 仅在非移动端校验,因为移动端私有数据空间不会存在外部操作导致分块损坏的情况 https://github.com/siyuan-note/siyuan/issues/13216
// Check local data chunk integrity before data synchronization https://github.com/siyuan-note/siyuan/issues/8853
for _, chunk := range file.Chunks {
info, statErr := repo.store.Stat(chunk)
if nil == statErr {
continue
}

if errors.Is(statErr, os.ErrPermission) {
// 如果是权限问题,则尝试修改权限,不认为是分块文件损坏
// Improve checking local data chunk integrity before data sync https://github.com/siyuan-note/siyuan/issues/9688
if chmodErr := os.Chmod(chunk, 0644); nil != chmodErr {
logging.LogWarnf("chmod file [%s] failed: %s", chunk, chmodErr)
if nil != info {
logging.LogWarnf("stat file [%s, %s, %s, %d] chunk [%s, perm=%04o] failed: %s",
file.ID, file.Path, time.UnixMilli(file.Updated).Format("2006-01-02 15:04:05"), file.Size, chunk, info.Mode().Perm(), statErr)
} else {
logging.LogInfof("chmod file [%s] to [0644]", chunk)
logging.LogWarnf("stat file [%s, %s, %s, %d] chunk [%s] failed: %s",
file.ID, file.Path, time.UnixMilli(file.Updated).Format("2006-01-02 15:04:05"), file.Size, chunk, statErr)
}
continue
}

if errors.Is(statErr, os.ErrNotExist) {
workerErrLock.Lock()
workerErrs = append(workerErrs, ErrRepoFatal)
workerErrLock.Unlock()
return
if errors.Is(statErr, os.ErrPermission) {
// 如果是权限问题,则尝试修改权限,不认为是分块文件损坏
// Improve checking local data chunk integrity before data sync https://github.com/siyuan-note/siyuan/issues/9688
if chmodErr := os.Chmod(chunk, 0644); nil != chmodErr {
logging.LogWarnf("chmod file [%s] failed: %s", chunk, chmodErr)
} else {
logging.LogInfof("chmod file [%s] to [0644]", chunk)
}
continue
}

if errors.Is(statErr, os.ErrNotExist) {
workerErrLock.Lock()
workerErrs = append(workerErrs, ErrRepoFatal)
workerErrLock.Unlock()
return
}
}
}
})

for _, f := range latest.Files {
waitGroup.Add(1)
err = p.Invoke(f)
if nil != err {
logging.LogErrorf("invoke failed: %s", err)
return
}
}
})

for _, f := range latest.Files {
waitGroup.Add(1)
err = p.Invoke(f)
if nil != err {
logging.LogErrorf("invoke failed: %s", err)
waitGroup.Wait()
p.Release()
logging.LogInfof("get latest files [files=%d] cost [%s]", len(latestFiles), time.Since(start))
if 0 < len(workerErrs) {
err = workerErrs[0]
logging.LogErrorf("get latest files failed: %s", err)
return
}
}
waitGroup.Wait()
p.Release()
logging.LogInfof("get latest files [files=%d] cost [%s]", len(latestFiles), time.Since(start))
if 0 < len(workerErrs) {
err = workerErrs[0]
logging.LogErrorf("get latest files failed: %s", err)
return
}
}

upserts, removes = repo.diffUpsertRemove(files, latestFiles, false)
if 1 > len(upserts) && 1 > len(removes) {
ret = latest
Expand All @@ -795,7 +801,8 @@ func (repo *Repo) index0(memo string, checkChunks bool, context map[string]inter

count := atomic.Int32{}
total := len(upserts)
workerErrs = nil
var workerErrs []error
workerErrLock := sync.Mutex{}
eventbus.Publish(eventbus.EvtIndexUpsertFiles, context, total)
waitGroup := &sync.WaitGroup{}
p, _ := ants.NewPoolWithFunc(4, func(arg interface{}) {
Expand Down

0 comments on commit e7887b6

Please sign in to comment.