Skip to content

Commit

Permalink
fix::> cleanup + Added simple impl
Browse files Browse the repository at this point in the history
Signed-off-by: Abhinav Prakash <[email protected]>
  • Loading branch information
PsychoPunkSage committed Feb 16, 2025
1 parent b33097c commit 5d130b3
Showing 1 changed file with 11 additions and 133 deletions.
144 changes: 11 additions & 133 deletions core/node/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/ipfs/boxo/mfs"
pin "github.com/ipfs/boxo/pinning/pinner"
provider "github.com/ipfs/boxo/provider"
"github.com/ipfs/go-cid"
"github.com/ipfs/kubo/repo"
irouting "github.com/ipfs/kubo/routing"
"go.uber.org/fx"
Expand Down Expand Up @@ -170,143 +169,22 @@ func newProvidingStrategy(onlyPinned, onlyRoots bool) interface{} {
Root *mfs.Root
}
return func(in input) provider.KeyChanFunc {
if onlyRoots {
return NewPinnedProviderWithMFS(true, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore)
}

if onlyPinned {
return NewPinnedProviderWithMFS(false, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore)
}

return provider.NewPrioritizedProvider(
NewPinnedProviderWithMFS(true, in.Pinner, in.IPLDFetcher, in.Root, in.Blockstore),
provider.NewBlockstoreProvider(in.Blockstore),
)
}
}

func NewPinnedProviderWithMFS(onlyRoots bool, pinner pin.Pinner, fetcher fetcher.Factory, root *mfs.Root, bs blockstore.Blockstore) provider.KeyChanFunc {
return func(ctx context.Context) (<-chan cid.Cid, error) {
out := make(chan cid.Cid)

// Combine both pinned and MFS content
go func() {
defer close(out)
pinnedProvider := provider.NewPinnedProvider(onlyRoots, in.Pinner, in.IPLDFetcher)

// First handle regular pins
pinCh, err := provider.NewPinnedProvider(onlyRoots, pinner, fetcher)(ctx)
if onlyRoots || onlyPinned {
mfsRoot, err := in.Root.GetDirectory().GetNode()
if err != nil {
logger.Errorf("error getting pinned provider: %s", err)
return
logger.Errorf("Error getting MFS root: %s", err)
return pinnedProvider
}

// Forward all pins
for c := range pinCh {
select {
case out <- c:
case <-ctx.Done():
return
}
}

// Then handle MFS content
if err := walkMFS(ctx, root, onlyRoots, out, bs); err != nil {
logger.Errorf("error walking MFS: %s", err)
return
}
}()

return out, nil
}
}

func walkMFS(ctx context.Context, root *mfs.Root, onlyRoots bool, out chan<- cid.Cid, bs blockstore.Blockstore) error {
if root == nil {
return nil
}

// Get the root directory
rootDir := root.GetDirectory()

// Walk function to process each entry
var walk func(dir *mfs.Directory) error
walk = func(dir *mfs.Directory) error {
entries, err := dir.List(ctx)
if err != nil {
return err
mfsProvider := provider.NewDAGProvider(mfsRoot.Cid(), in.IPLDFetcher)
return provider.NewPrioritizedProvider(pinnedProvider, mfsProvider)
}

for _, entry := range entries {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

// Handle directory
if entry.Type == int(mfs.TDir) {
dirNode, err := dir.Child(entry.Name)
if err != nil {
continue
}
mfsDir, ok := dirNode.(*mfs.Directory)
if !ok {
continue
}

if onlyRoots {
n, err := mfsDir.GetNode()
if err != nil {
continue
}
c := n.Cid()
if exists, err := bs.Has(ctx, c); err != nil || !exists {
continue // Skip if not in local blockstore
}
select {
case out <- n.Cid():
case <-ctx.Done():
return ctx.Err()
}
} else {
if err := walk(mfsDir); err != nil {
return err
}
}
}

// Handle file
if entry.Type == int(mfs.TFile) {
fileNode, err := dir.Child(entry.Name)
if err != nil {
continue
}
mfsFile, ok := fileNode.(*mfs.File)
if !ok {
continue
}

n, err := mfsFile.GetNode()
if err != nil {
continue
}
c := n.Cid()

if onlyRoots || c.Prefix().Codec == cid.Raw {
c := n.Cid()
if exists, err := bs.Has(ctx, c); err != nil || !exists {
continue // Skip if not in local blockstore
}
select {
case out <- c:
case <-ctx.Done():
return ctx.Err()
}
}
}

}
return nil
return provider.NewPrioritizedProvider(
pinnedProvider,
provider.NewBlockstoreProvider(in.Blockstore),
)
}
return walk(rootDir)
}

0 comments on commit 5d130b3

Please sign in to comment.