Skip to content

Commit

Permalink
Ipfs2 external rpc (#216)
Browse files Browse the repository at this point in the history
* Added ipfs as datastore

* Added more ipfs configs

However it throws error related to badgerds plugin not loaded initially

* Update main.go

* Update main.go

* reduced maxCidPushRate as we were getting stream reset

* added true for pinning
  • Loading branch information
ehsan6sha authored Feb 21, 2024
1 parent 0c7f851 commit 4d3c098
Show file tree
Hide file tree
Showing 10 changed files with 798 additions and 200 deletions.
2 changes: 1 addition & 1 deletion announcements/announcements.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (an *FxAnnouncements) AnnounceIExistPeriodically(ctx context.Context) {
log.Errorw("Failed to publish iexist announcement", "err", err)
continue
}
log.Infow("Announced iexist message", "from", an.h.ID(), "announcement", a, "time", t)
log.Debugw("Announced iexist message", "from", an.h.ID(), "announcement", a, "time", t)
}
}
}
Expand Down
55 changes: 36 additions & 19 deletions blox/blox.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func New(o ...Option) (*Blox, error) {
}
p.ls.StorageReadOpener = p.blockReadOpener
p.ls.StorageWriteOpener = p.blockWriteOpener
if opts.ls != nil {
p.ls = *opts.ls
}
p.ex, err = exchange.NewFxExchange(p.h, p.ls, p.exchangeOpts...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -262,24 +265,26 @@ func (p *Blox) Start(ctx context.Context) error {
}

// Create an HTTP server instance
p.IPFShttpServer = &http.Server{
Addr: "127.0.0.1:5001",
Handler: p.ServeIpfsRpc(),
}

log.Debug("called wg.Add in blox start")
p.wg.Add(1)
go func() {
log.Debug("called wg.Done in Start blox")
defer p.wg.Done()
defer log.Debug("Start blox go routine is ending")
log.Infow("IPFS RPC server started on address http://127.0.0.1:5001")
if err := p.IPFShttpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorw("IPFS RPC server stopped erroneously", "err", err)
} else {
log.Infow("IPFS RPC server stopped")
if p.DefaultIPFShttpServer {
p.IPFShttpServer = &http.Server{
Addr: "127.0.0.1:5001",
Handler: p.ServeIpfsRpc(),
}
}()

log.Debug("called wg.Add in blox start")
p.wg.Add(1)
go func() {
log.Debug("called wg.Done in Start blox")
defer p.wg.Done()
defer log.Debug("Start blox go routine is ending")
log.Infow("IPFS RPC server started on address http://127.0.0.1:5001")
if err := p.IPFShttpServer.ListenAndServe(); err != http.ErrServerClosed {
log.Errorw("IPFS RPC server stopped erroneously", "err", err)
} else {
log.Infow("IPFS RPC server stopped")
}
}()
}

if anErr == nil {
log.Debug("called wg.Add in AnnounceIExistPeriodically")
Expand Down Expand Up @@ -447,6 +452,10 @@ func (p *Blox) Shutdown(ctx context.Context) error {
// Cancel the context to signal all operations to stop
p.cancel()

// Use a separate context for shutdown with a timeout
shutdownCtx, cancelShutdown := context.WithTimeout(context.Background(), 20*time.Second)
defer cancelShutdown()

// Shutdown the various components
if bErr := p.bl.Shutdown(ctx); bErr != nil {
log.Errorw("Error occurred in blockchain shutdown", "bErr", bErr)
Expand Down Expand Up @@ -497,9 +506,17 @@ func (p *Blox) Shutdown(ctx context.Context) error {
// Handle remaining errors after all goroutines have finished
log.Debug("Shutdown done")
return nil
case <-ctx.Done():
err := ctx.Err()
case <-shutdownCtx.Done():
err := shutdownCtx.Err()
log.Infow("Shutdown completed with timeout", "err", err)
}

// Check for errors from shutdownCtx
if err := shutdownCtx.Err(); err == context.DeadlineExceeded {
// Handle the case where shutdown did not complete in time
log.Warn("Shutdown did not complete in the allotted time")
return err
}

return nil
}
46 changes: 27 additions & 19 deletions blox/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,26 @@ type (
Option func(*options) error
PoolNameUpdater func(string) error
options struct {
h host.Host
name string
topicName string
storeDir string
announceInterval time.Duration
ds datastore.Batching
ls *ipld.LinkSystem
authorizer peer.ID
authorizedPeers []peer.ID
exchangeOpts []exchange.Option
relays []string
updatePoolName PoolNameUpdater
pingCount int
maxPingTime int
minSuccessRate int
blockchainEndpoint string
secretsPath string
IPFShttpServer *http.Server
wg *sync.WaitGroup
h host.Host
name string
topicName string
storeDir string
announceInterval time.Duration
ds datastore.Batching
ls *ipld.LinkSystem
authorizer peer.ID
authorizedPeers []peer.ID
exchangeOpts []exchange.Option
relays []string
updatePoolName PoolNameUpdater
pingCount int
maxPingTime int
minSuccessRate int
blockchainEndpoint string
secretsPath string
IPFShttpServer *http.Server
DefaultIPFShttpServer bool
wg *sync.WaitGroup
}
)

Expand Down Expand Up @@ -156,6 +157,13 @@ func WithDatastore(ds datastore.Batching) Option {
}
}

func WithDefaultIPFShttpServer(n bool) Option {
return func(o *options) error {
o.DefaultIPFShttpServer = n
return nil
}
}

func WithLinkSystem(ls *ipld.LinkSystem) Option {
return func(o *options) error {
o.ls = ls
Expand Down
Loading

0 comments on commit 4d3c098

Please sign in to comment.