From 555cb3e79f455473192eb4f2b7a604559413cf01 Mon Sep 17 00:00:00 2001 From: ehsan shariati Date: Thu, 21 Mar 2024 14:55:34 -0400 Subject: [PATCH] reverted to checking files for new cids ipfs cluster API is not reliable in filtering on modificaiton time so reverted to use filesystem --- blox/blox.go | 112 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 88 insertions(+), 24 deletions(-) diff --git a/blox/blox.go b/blox/blox.go index 2ea2ec5..dd57d7f 100644 --- a/blox/blox.go +++ b/blox/blox.go @@ -2,6 +2,7 @@ package blox import ( "context" + "encoding/hex" "encoding/json" "errors" "fmt" @@ -32,7 +33,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peerstore" "github.com/multiformats/go-multiaddr" - "github.com/multiformats/go-multicodec" + "github.com/multiformats/go-multibase" + "github.com/multiformats/go-multihash" ) var log = logging.Logger("fula/blox") @@ -376,23 +378,76 @@ func (p *Blox) ListModifiedStoredLinks(ctx context.Context, lastChecked time.Tim return storedLinks, nil } +func hexStringToCID(hexDigest string) (cid.Cid, error) { + // Decode the hex digest into bytes + bytes, err := hex.DecodeString(hexDigest) + if err != nil { + return cid.Cid{}, fmt.Errorf("error decoding hex digest: %w", err) + } + + // Construct the multihash using the bytes + // Assuming the first byte is the multihash code for BLAKE3 (0x1e) and the second byte is the length + mh, err := multihash.Encode(bytes[2:], uint64(bytes[0])) + if err != nil { + return cid.Cid{}, fmt.Errorf("error constructing multihash: %w", err) + } + + // Construct the CID using dag-cbor codec (0x71) and the constructed multihash + c := cid.NewCidV1(cid.DagCBOR, mh) + + return c, nil +} +func findDigestHexFromBase32Multibase(base32MultibaseDigest string) (string, error) { + // Decode the Base32 Multibase digest to get the raw hash bytes + _, decodedBytes, err := multibase.Decode(base32MultibaseDigest) + if err != nil { + return "", fmt.Errorf("error decoding multibase: %w", err) + } + + // Convert the decoded bytes to a hexadecimal string + digestHex := hex.EncodeToString(decodedBytes) + + return digestHex, nil +} + +func FindCIDFromDigest(base32MultibaseDigest string) (cid.Cid, error) { + // Decode the Multibase prefix and data + hexStr, err := findDigestHexFromBase32Multibase(base32MultibaseDigest) + if err != nil { + return cid.Cid{}, fmt.Errorf("error finding hex value: %w", err) + } + c, err := hexStringToCID(hexStr) + if err != nil { + return cid.Cid{}, fmt.Errorf("error finding cid value from hex: %w", err) + } + + return c, nil +} + // ListModifiedStoredBlocks lists only the folders that have been modified after the last check time // and returns the filenames of the files created after the last check time in those folders. -func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error) { +func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]datamodel.Link, error) { blocksDir := "/uniondrive/ipfs_datastore/blocks" var modifiedDirs []string - var modifiedFiles []string + var modifiedLinks []datamodel.Link //lint:ignore S1021 err may be reused for different blocks of logic var err error // Find directories modified after lastChecked - err = filepath.Walk(blocksDir, func(path string, info os.FileInfo, err error) error { + err = filepath.WalkDir(blocksDir, func(path string, d os.DirEntry, err error) error { if err != nil { return err } - if path != blocksDir && info.IsDir() && info.ModTime().After(lastChecked) { - modifiedDirs = append(modifiedDirs, path) - return filepath.SkipDir + + if path != blocksDir && d.IsDir() && !strings.HasSuffix(d.Name(), ".temp") { + info, err := d.Info() + if err != nil { + return err + } + if info.ModTime().After(lastChecked) { + modifiedDirs = append(modifiedDirs, path) + return filepath.SkipDir + } } return nil }) @@ -404,37 +459,53 @@ func (p *Blox) ListModifiedStoredBlocks(lastChecked time.Time) ([]string, error) // Use a WaitGroup to wait for all goroutines to finish var wg sync.WaitGroup // Use a channel to safely collect files from multiple goroutines - filesChan := make(chan string, 100) // Buffered channel + linksChan := make(chan datamodel.Link, 1024) // Buffered channel // Create a goroutine for each modified directory for _, dir := range modifiedDirs { wg.Add(1) // Increment the WaitGroup counter go func(dir string) { defer wg.Done() // Decrement the counter when the goroutine completes - filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + // Use filepath.WalkDir for more efficient directory traversal + err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error { if err != nil { return err } - if !info.IsDir() && !strings.HasSuffix(info.Name(), ".temp") && info.ModTime().After(lastChecked) { - filesChan <- path + // Since WalkDir gives us a DirEntry, we can use its methods instead of os.Stat + if !d.IsDir() && !strings.HasSuffix(d.Name(), ".temp") && strings.HasSuffix(d.Name(), ".data") { + info, err := d.Info() // This is only needed if we want to check modification times + if err != nil { + return err + } + if info.ModTime().After(lastChecked) { + c, err := p.GetCidv1FromBlockFilename(path) + if err == nil { + linksChan <- cidlink.Link{Cid: c} + } + } } return nil }) + if err != nil { + // Handle the error. Since this is in a goroutine, consider using a channel to report errors. + log.Errorf("Error walking through directory %s: %v", dir, err) + } }(dir) + } // Start a goroutine to close the channel once all goroutines have finished go func() { wg.Wait() - close(filesChan) + close(linksChan) }() // Collect the results from the channel - for file := range filesChan { - modifiedFiles = append(modifiedFiles, file) + for l := range linksChan { + modifiedLinks = append(modifiedLinks, l) } - return modifiedFiles, nil + return modifiedLinks, nil } // GetCidv1FromBlockFilename extracts CIDv1 from block filename @@ -446,13 +517,7 @@ func (p *Blox) GetCidv1FromBlockFilename(filename string) (cid.Cid, error) { // Here's a sample implementation: base := filepath.Base(filename) b58 := "B" + strings.ToUpper(strings.TrimSuffix(base, filepath.Ext(base))) - cidV0, err := cid.Decode(b58) - if err != nil { - fmt.Println("Error encoding to cidV0:", err) - return cid.Cid{}, err - } - cidV1 := cid.NewCidV1(uint64(multicodec.DagPb), cidV0.Hash()) - return cidV1, nil + return FindCIDFromDigest(b58) } // UpdateLastCheckedTime updates the last checked time @@ -599,7 +664,6 @@ func (p *Blox) Start(ctx context.Context) error { defer log.Debug("Start blox blox.start go routine is ending") ticker := time.NewTicker(10 * time.Minute) defer ticker.Stop() - pidStr := p.h.ID().String() for { select { case <-ticker.C: @@ -618,7 +682,7 @@ func (p *Blox) Start(ctx context.Context) error { } p.topicName = p.getPoolName() if p.topicName != "0" { - storedLinks, err := p.ListModifiedStoredLinks(p.ctx, lastCheckedTime, pidStr) + storedLinks, err := p.ListModifiedStoredBlocks(lastCheckedTime) if err != nil { log.Errorf("Error listing stored blocks: %v", err) continue