Skip to content

Commit

Permalink
reverted to checking files for new cids
Browse files Browse the repository at this point in the history
ipfs cluster API is not reliable in filtering on modificaiton time so reverted to use filesystem
  • Loading branch information
ehsan6sha committed Mar 21, 2024
1 parent f7cebba commit 555cb3e
Showing 1 changed file with 88 additions and 24 deletions.
112 changes: 88 additions & 24 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blox

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -32,7 +33,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multibase"
"github.com/multiformats/go-multihash"
)

var log = logging.Logger("fula/blox")
Expand Down Expand Up @@ -376,23 +378,76 @@ func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Tim
return storedLinks, nil
}

func hexStringToCID(hexDigest string) (cid.Cid, error) {
// Decode the hex digest into bytes
bytes, err := hex.DecodeString(hexDigest)
if err != nil {
return cid.Cid{}, fmt.Errorf("error decoding hex digest: %w", err)
}

// Construct the multihash using the bytes
// Assuming the first byte is the multihash code for BLAKE3 (0x1e) and the second byte is the length
mh, err := multihash.Encode(bytes[2:], uint64(bytes[0]))
if err != nil {
return cid.Cid{}, fmt.Errorf("error constructing multihash: %w", err)
}

// Construct the CID using dag-cbor codec (0x71) and the constructed multihash
c := cid.NewCidV1(cid.DagCBOR, mh)

return c, nil
}
func findDigestHexFromBase32Multibase(base32MultibaseDigest string) (string, error) {
// Decode the Base32 Multibase digest to get the raw hash bytes
_, decodedBytes, err := multibase.Decode(base32MultibaseDigest)
if err != nil {
return "", fmt.Errorf("error decoding multibase: %w", err)
}

// Convert the decoded bytes to a hexadecimal string
digestHex := hex.EncodeToString(decodedBytes)

return digestHex, nil
}

func FindCIDFromDigest(base32MultibaseDigest string) (cid.Cid, error) {
// Decode the Multibase prefix and data
hexStr, err := findDigestHexFromBase32Multibase(base32MultibaseDigest)
if err != nil {
return cid.Cid{}, fmt.Errorf("error finding hex value: %w", err)
}
c, err := hexStringToCID(hexStr)
if err != nil {
return cid.Cid{}, fmt.Errorf("error finding cid value from hex: %w", err)
}

return c, nil
}

// ListModifiedStoredBlocks lists only the folders that have been modified after the last check time
// and returns the filenames of the files created after the last check time in those folders.
func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error) {
func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]datamodel.Link, error) {
blocksDir := "/uniondrive/ipfs_datastore/blocks"
var modifiedDirs []string
var modifiedFiles []string
var modifiedLinks []datamodel.Link
//lint:ignore S1021 err may be reused for different blocks of logic
var err error

// Find directories modified after lastChecked
err = filepath.Walk(blocksDir, func(path string, info os.FileInfo, err error) error {
err = filepath.WalkDir(blocksDir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if path != blocksDir && info.IsDir() && info.ModTime().After(lastChecked) {
modifiedDirs = append(modifiedDirs, path)
return filepath.SkipDir

if path != blocksDir && d.IsDir() && !strings.HasSuffix(d.Name(), ".temp") {
info, err := d.Info()
if err != nil {
return err
}
if info.ModTime().After(lastChecked) {
modifiedDirs = append(modifiedDirs, path)
return filepath.SkipDir
}
}
return nil
})
Expand All @@ -404,37 +459,53 @@ func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error)
// Use a WaitGroup to wait for all goroutines to finish
var wg sync.WaitGroup
// Use a channel to safely collect files from multiple goroutines
filesChan := make(chan string, 100) // Buffered channel
linksChan := make(chan datamodel.Link, 1024) // Buffered channel

// Create a goroutine for each modified directory
for _, dir := range modifiedDirs {
wg.Add(1) // Increment the WaitGroup counter
go func(dir string) {
defer wg.Done() // Decrement the counter when the goroutine completes
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
// Use filepath.WalkDir for more efficient directory traversal
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
if !info.IsDir() && !strings.HasSuffix(info.Name(), ".temp") && info.ModTime().After(lastChecked) {
filesChan <- path
// Since WalkDir gives us a DirEntry, we can use its methods instead of os.Stat
if !d.IsDir() && !strings.HasSuffix(d.Name(), ".temp") && strings.HasSuffix(d.Name(), ".data") {
info, err := d.Info() // This is only needed if we want to check modification times
if err != nil {
return err
}
if info.ModTime().After(lastChecked) {
c, err := p.GetCidv1FromBlockFilename(path)
if err == nil {
linksChan <- cidlink.Link{Cid: c}
}
}
}
return nil
})
if err != nil {
// Handle the error. Since this is in a goroutine, consider using a channel to report errors.
log.Errorf("Error walking through directory %s: %v", dir, err)
}
}(dir)

}

// Start a goroutine to close the channel once all goroutines have finished
go func() {
wg.Wait()
close(filesChan)
close(linksChan)
}()

// Collect the results from the channel
for file := range filesChan {
modifiedFiles = append(modifiedFiles, file)
for l := range linksChan {
modifiedLinks = append(modifiedLinks, l)
}

return modifiedFiles, nil
return modifiedLinks, nil
}

// GetCidv1FromBlockFilename extracts CIDv1 from block filename
Expand All @@ -446,13 +517,7 @@ func (p *Blox) GetCidv1FromBlockFilename(filename string) (cid.Cid, error) {
// Here's a sample implementation:
base := filepath.Base(filename)
b58 := "B" + strings.ToUpper(strings.TrimSuffix(base, filepath.Ext(base)))
cidV0, err := cid.Decode(b58)
if err != nil {
fmt.Println("Error encoding to cidV0:", err)
return cid.Cid{}, err
}
cidV1 := cid.NewCidV1(uint64(multicodec.DagPb), cidV0.Hash())
return cidV1, nil
return FindCIDFromDigest(b58)
}

// UpdateLastCheckedTime updates the last checked time
Expand Down Expand Up @@ -599,7 +664,6 @@ func (p *Blox) Start(ctx context.Context) error {
defer log.Debug("Start blox blox.start go routine is ending")
ticker := time.NewTicker(10 * time.Minute)
defer ticker.Stop()
pidStr := p.h.ID().String()
for {
select {
case <-ticker.C:
Expand All @@ -618,7 +682,7 @@ func (p *Blox) Start(ctx context.Context) error {
}
p.topicName = p.getPoolName()
if p.topicName != "0" {
storedLinks, err := p.ListModifiedStoredLinks(p.ctx, lastCheckedTime, pidStr)
storedLinks, err := p.ListModifiedStoredBlocks(lastCheckedTime)
if err != nil {
log.Errorf("Error listing stored blocks: %v", err)
continue
Expand Down

0 comments on commit 555cb3e

Please sign in to comment.