Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MFS Support to Reprovider Strategy #10704

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 132 additions & 3 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@

"github.com/ipfs/boxo/blockstore"
"github.com/ipfs/boxo/fetcher"
"github.com/ipfs/boxo/mfs"
pin "github.com/ipfs/boxo/pinning/pinner"
provider "github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
"go.uber.org/fx"
Expand Down Expand Up @@ -165,19 +167,146 @@
Pinner pin.Pinner
Blockstore blockstore.Blockstore
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
Root *mfs.Root
}
return func(in input) provider.KeyChanFunc {
if onlyRoots {
return provider.NewPinnedProvider(true, in.Pinner, in.IPLDFetcher)
return NewPinnedProviderWithMFS(true, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore)

Check warning on line 174 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L174

Added line #L174 was not covered by tests
}

if onlyPinned {
return provider.NewPinnedProvider(false, in.Pinner, in.IPLDFetcher)
return NewPinnedProviderWithMFS(false, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore)

Check warning on line 178 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L178

Added line #L178 was not covered by tests
}

return provider.NewPrioritizedProvider(
provider.NewPinnedProvider(true, in.Pinner, in.IPLDFetcher),
NewPinnedProviderWithMFS(true, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore),
provider.NewBlockstoreProvider(in.Blockstore),
)
}
}

func NewPinnedProviderWithMFS(onlyRoots bool, pinner pin.Pinner, fetcher fetcher.Factory, root *mfs.Root, bs blockstore.Blockstore) provider.KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
out := make(chan cid.Cid)

// Combine both pinned and MFS content
go func() {
defer close(out)

// First handle regular pins
pinCh, err := provider.NewPinnedProvider(onlyRoots, pinner, fetcher)(ctx)
if err != nil {
logger.Errorf("error getting pinned provider: %s", err)
return
}

Check warning on line 201 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L199-L201

Added lines #L199 - L201 were not covered by tests

// Forward all pins
for c := range pinCh {
select {
case out <- c:
case <-ctx.Done():
return

Check warning on line 208 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L207-L208

Added lines #L207 - L208 were not covered by tests
}
}

// Then handle MFS content
if err := walkMFS(ctx, root, onlyRoots, out, bs); err != nil {
logger.Errorf("error walking MFS: %s", err)
return
}

Check warning on line 216 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L214-L216

Added lines #L214 - L216 were not covered by tests
}()

return out, nil
}
}

func walkMFS(ctx context.Context, root *mfs.Root, onlyRoots bool, out chan<- cid.Cid, bs blockstore.Blockstore) error {
if root == nil {
return nil
}

Check warning on line 226 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L225-L226

Added lines #L225 - L226 were not covered by tests

// Get the root directory
rootDir := root.GetDirectory()

// Walk function to process each entry
var walk func(dir *mfs.Directory) error
walk = func(dir *mfs.Directory) error {
entries, err := dir.List(ctx)
if err != nil {
return err
}

Check warning on line 237 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L236-L237

Added lines #L236 - L237 were not covered by tests

for _, entry := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:

Check warning on line 243 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L240-L243

Added lines #L240 - L243 were not covered by tests
}

// Handle directory
if entry.Type == int(mfs.TDir) {
dirNode, err := dir.Child(entry.Name)
if err != nil {
continue

Check warning on line 250 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L247-L250

Added lines #L247 - L250 were not covered by tests
}
mfsDir, ok := dirNode.(*mfs.Directory)
if !ok {
continue

Check warning on line 254 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L252-L254

Added lines #L252 - L254 were not covered by tests
}

if onlyRoots {
n, err := mfsDir.GetNode()
if err != nil {
continue

Check warning on line 260 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L257-L260

Added lines #L257 - L260 were not covered by tests
}
c := n.Cid()
if exists, err := bs.Has(ctx, c); err != nil || !exists {
continue // Skip if not in local blockstore

Check warning on line 264 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L262-L264

Added lines #L262 - L264 were not covered by tests
}
select {
case out <- n.Cid():
case <-ctx.Done():
return ctx.Err()

Check warning on line 269 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L266-L269

Added lines #L266 - L269 were not covered by tests
}
} else {
if err := walk(mfsDir); err != nil {
return err
}

Check warning on line 274 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L271-L274

Added lines #L271 - L274 were not covered by tests
}
}

// Handle file
if entry.Type == int(mfs.TFile) {
fileNode, err := dir.Child(entry.Name)
if err != nil {
continue

Check warning on line 282 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L279-L282

Added lines #L279 - L282 were not covered by tests
}
mfsFile, ok := fileNode.(*mfs.File)
if !ok {
continue

Check warning on line 286 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L284-L286

Added lines #L284 - L286 were not covered by tests
}

n, err := mfsFile.GetNode()
if err != nil {
continue

Check warning on line 291 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L289-L291

Added lines #L289 - L291 were not covered by tests
}
c := n.Cid()

if onlyRoots || c.Prefix().Codec == cid.Raw {
c := n.Cid()
if exists, err := bs.Has(ctx, c); err != nil || !exists {
continue // Skip if not in local blockstore

Check warning on line 298 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L293-L298

Added lines #L293 - L298 were not covered by tests
}
select {
case out <- c:
case <-ctx.Done():
return ctx.Err()

Check warning on line 303 in core/node/provider.go

View check run for this annotation

Codecov / codecov/patch

core/node/provider.go#L300-L303

Added lines #L300 - L303 were not covered by tests
}
}
}

}
return nil
}
return walk(rootDir)
}
Loading