Skip to content

Commit

Permalink
Cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
ehsan6sha committed Feb 21, 2024
1 parent 4d3c098 commit 37438e0
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 20 deletions.
8 changes: 5 additions & 3 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ func (p *Blox) Start(ctx context.Context) error {
}

// Create an HTTP server instance
if p.DefaultIPFShttpServer {
if p.DefaultIPFShttpServer == "fula" {
p.IPFShttpServer = &http.Server{
Addr: "127.0.0.1:5001",
Handler: p.ServeIpfsRpc(),
Expand Down Expand Up @@ -483,8 +483,10 @@ func (p *Blox) Shutdown(ctx context.Context) error {
}

// Shutdown the HTTP server
if IPFSErr := p.IPFShttpServer.Shutdown(ctx); IPFSErr != nil {
log.Errorw("Error shutting down IPFS HTTP server", "IPFSErr", IPFSErr)
if p.IPFShttpServer != nil {
if IPFSErr := p.IPFShttpServer.Shutdown(ctx); IPFSErr != nil {
log.Errorw("Error shutting down IPFS HTTP server", "IPFSErr", IPFSErr)
}
}

if dsErr := p.ds.Close(); dsErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions blox/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type (
blockchainEndpoint string
secretsPath string
IPFShttpServer *http.Server
DefaultIPFShttpServer bool
DefaultIPFShttpServer string
wg *sync.WaitGroup
}
)
Expand Down Expand Up @@ -157,7 +157,7 @@ func WithDatastore(ds datastore.Batching) Option {
}
}

func WithDefaultIPFShttpServer(n bool) Option {
func WithDefaultIPFShttpServer(n string) Option {
return func(o *options) error {
o.DefaultIPFShttpServer = n
return nil
Expand Down
93 changes: 79 additions & 14 deletions cmd/blox/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,19 @@ import (

"github.com/functionland/go-fula/blox"
"github.com/functionland/go-fula/exchange"
ipfsPath "github.com/ipfs/boxo/path"
blockformat "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
badger "github.com/ipfs/go-ds-badger"
logging "github.com/ipfs/go-log/v2"
oldcmds "github.com/ipfs/kubo/commands"
config "github.com/ipfs/kubo/config"
core "github.com/ipfs/kubo/core"
coreapi "github.com/ipfs/kubo/core/coreapi"
"github.com/ipfs/kubo/core/corehttp"
iface "github.com/ipfs/kubo/core/coreiface"
kubolibp2p "github.com/ipfs/kubo/core/node/libp2p"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/repo/fsrepo"
Expand Down Expand Up @@ -619,7 +625,63 @@ func CustomHostOption(h host.Host) kubolibp2p.HostOption {
}
}

func CustomStorageReadOpener(ctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
// THE TWO BELOW ARE IF YOU CHOOSE internal AS IPFS HOST

// CustomStorageWriteOpener creates a StorageWriteOpener using the IPFS blockstore
func CustomStorageWriteOpenerInternal(ipfsNode *core.IpfsNode) linking.BlockWriteOpener {
return func(lnkCtx linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) {
// The returned io.Writer is where the data will be written.
// The BlockWriteCommitter function will be called to "commit" the data once writing is done.
var buffer bytes.Buffer
committer := func(lnk ipld.Link) error {
// Convert the IPLD link to a CID.
c, err := cid.Parse(lnk.String())
if err != nil {
return err
}

// Create an IPFS block with the data from the buffer and the CID.
block, err := blockformat.NewBlockWithCid(buffer.Bytes(), c)
if err != nil {
return err
}
logger.Debugw("block was created with cid", "block", block, "cid", c)

// Store the block using the IPFS node's blockstore.
err = ipfsNode.Blockstore.Put(context.Background(), block)
if err != nil {
logger.Errorw("Error in ipfs store", "block", block, "cid", c)
}
return err
}

return &buffer, committer, nil
}
}

func CustomStorageReadOpenerInternal(ipfsApi iface.CoreAPI) ipld.BlockReadOpener {
return func(lnkCtx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidLink, ok := lnk.(cidlink.Link)
if !ok {
return nil, fmt.Errorf("link is not a cid link")
}
// Convert the CID link to a path
p, err := ipfsPath.NewPath("/ipfs/" + cidLink.Cid.String())
if err != nil {
return nil, err
}
// Use the Block API's Get method to obtain a reader for the block's data
reader, err := ipfsApi.Block().Get(context.Background(), p)
if err != nil {
return nil, err
}
return reader, nil
}
}

// END OF INTERNAL

func CustomStorageReadOpenerNone(ctx ipld.LinkContext, lnk ipld.Link) (io.Reader, error) {
cidStr := lnk.String()
url := fmt.Sprintf("%s/api/v0/block/get?arg=%s", baseURL, cidStr)

Expand Down Expand Up @@ -661,7 +723,7 @@ func (cw *CustomWriter) Write(p []byte) (n int, err error) {
}

// CustomStorageWriteOpener opens a block for writing to an external IPFS node via HTTP API.
func CustomStorageWriteOpener(lctx linking.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
func CustomStorageWriteOpenerNone(lctx linking.LinkContext) (io.Writer, ipld.BlockWriteCommitter, error) {
cw := NewCustomWriter()

committer := func(lnk ipld.Link) error {
Expand Down Expand Up @@ -1012,14 +1074,18 @@ func action(ctx *cli.Context) error {
if err != nil {
return err
}
/*ds, err := badger.NewDatastore(app.config.StoreDir, &badger.DefaultOptions)
ds, err := badger.NewDatastore(app.config.StoreDir, &badger.DefaultOptions)
if err != nil {
return err
}*/
}

dirPath := filepath.Dir(app.configPath)
const useDefaultIPFSServer = true
if !useDefaultIPFSServer {
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.StorageReadOpener = CustomStorageReadOpenerNone
linkSystem.StorageWriteOpener = CustomStorageWriteOpenerNone

const useIPFSServer = "none" //internal: runs local ipfs instance, none requires an external one and fula runs the mock server on 5001
if useIPFSServer == "internal" {
repo, err := CreateCustomRepo(ctx2, dirPath, h, &badger.DefaultOptions, app.config.StoreDir, "90%")
if err != nil {
logger.Fatal(err)
Expand Down Expand Up @@ -1102,17 +1168,16 @@ func action(ctx *cli.Context) error {
}
select {}
}()
//You need to correct the below as well. This commit was working https://github.com/functionland/go-fula/commit/34e39f76ec8f7e8d2175283eff9c2725c98ec059#diff-b1997e94b91855c9f21a9d9e5fb5aba0fc5ac30765046e3c5972aad2cc5cbcd3
//ds = ipfsNode.Repo.Datastore()
// linkSystem.StorageReadOpener = CustomStorageReadOpenerInternal
// linkSystem.StorageWriteOpener = CustomStorageWriteOpenerInternal
}

//ds := ipfsNode.Repo.Datastore()
linkSystem := cidlink.DefaultLinkSystem()
linkSystem.StorageReadOpener = CustomStorageReadOpener
linkSystem.StorageWriteOpener = CustomStorageWriteOpener

bb, err := blox.New(
blox.WithHost(h),
blox.WithWg(&wg),
//blox.WithDatastore(ds),
blox.WithDatastore(ds),
blox.WithLinkSystem(&linkSystem),
blox.WithPoolName(app.config.PoolName),
blox.WithTopicName(app.config.PoolName),
Expand All @@ -1122,7 +1187,7 @@ func action(ctx *cli.Context) error {
blox.WithBlockchainEndPoint(app.blockchainEndpoint),
blox.WithSecretsPath(app.secretsPath),
blox.WithPingCount(5),
blox.WithDefaultIPFShttpServer(useDefaultIPFSServer),
blox.WithDefaultIPFShttpServer(useIPFSServer),
blox.WithExchangeOpts(
exchange.WithUpdateConfig(updateConfig),
exchange.WithWg(&wg),
Expand All @@ -1136,7 +1201,7 @@ func action(ctx *cli.Context) error {
exchange.WithIpniGetEndPoint("https://cid.contact/cid/"),
exchange.WithIpniProviderEngineOptions(
engine.WithHost(ipnih),
//engine.WithDatastore(namespace.Wrap(ds, datastore.NewKey("ipni/ads"))),
engine.WithDatastore(namespace.Wrap(ds, datastore.NewKey("ipni/ads"))),
engine.WithPublisherKind(engine.DataTransferPublisher),
engine.WithDirectAnnounce(app.config.IpniPublishDirectAnnounce...),
),
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/docker/docker v24.0.7+incompatible
github.com/grandcat/zeroconf v1.0.0
github.com/ipfs/boxo v0.17.0
github.com/ipfs/go-block-format v0.2.0
github.com/ipfs/go-cid v0.4.1
github.com/ipfs/go-datastore v0.6.0
github.com/ipfs/go-ds-badger v0.3.0
Expand Down Expand Up @@ -125,7 +126,6 @@ require (
github.com/ipfs-shipyard/nopfs/ipfs v0.13.2-0.20231027223058-cde3b5ba964c // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-block-format v0.2.0 // indirect
github.com/ipfs/go-blockservice v0.5.1 // indirect
github.com/ipfs/go-cidutil v0.1.0 // indirect
github.com/ipfs/go-ds-flatfs v0.5.1 // indirect
Expand Down

0 comments on commit 37438e0

Please sign in to comment.