Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "stream resync to indexer crawler" #834

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions bgs/bgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,13 +1646,15 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
resync = bgs.SetResyncStatus(pds.ID, "checking revs")

// run loop over repos with some concurrency
// Create a buffered channel for collecting results
results := make(chan revCheckResult, len(repos))
sem := semaphore.NewWeighted(40)

// Check repo revs against our local copy and enqueue crawls for any that are out of date
for i, r := range repos {
for _, r := range repos {
if err := sem.Acquire(ctx, 1); err != nil {
log.Errorw("failed to acquire semaphore", "error", err)
results <- revCheckResult{err: err}
continue
}
go func(r comatproto.SyncListRepos_Repo) {
Expand All @@ -1662,41 +1664,56 @@ func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did)
if err != nil {
log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
results <- revCheckResult{err: err}
return
}

rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid)
if err != nil {
log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
err := bgs.Index.Crawler.Crawl(ctx, ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
}
results <- revCheckResult{ai: ai}
return
}

if rev == "" || rev < r.Rev {
log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
err := bgs.Index.Crawler.Crawl(ctx, ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
}
results <- revCheckResult{ai: ai}
return
}

results <- revCheckResult{}
}(r)
}

var numReposToResync int
for i := 0; i < len(repos); i++ {
res := <-results
if res.err != nil {
log.Errorw("failed to process repo during resync", "error", res.err)

}
if res.ai != nil {
numReposToResync++
err := bgs.Index.Crawler.Crawl(ctx, res.ai)
if err != nil {
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", res.ai.Uid, "did", res.ai.Did)
}
}
if i%100 == 0 {
if i%10_000 == 0 {
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt))
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", numReposToResync, "took", time.Now().Sub(resync.StatusChangedAt))
}
resync.NumReposChecked = i
resync.NumReposToResync = numReposToResync
bgs.UpdateResync(resync)
}
}

resync.NumReposChecked = len(repos)
resync.NumReposToResync = numReposToResync
bgs.UpdateResync(resync)

log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1)
log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", numReposToResync)

return nil
}
Loading