Skip to content

Commit

Permalink
Merge pull request #5595 from oasisprotocol/kostko/feature/storage-pe…
Browse files Browse the repository at this point in the history
…rrt-fetchpool
  • Loading branch information
kostko authored Mar 13, 2024
2 parents 2b1f401 + 82bb7b2 commit 917dcc3
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 14 deletions.
4 changes: 4 additions & 0 deletions .changelog/5595.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
go/worker/storage: Make fetch pool per-runtime

This should speed up storage sync in case of nodes that have multiple
runtimes configured.
7 changes: 6 additions & 1 deletion go/worker/storage/committee/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ type Node struct { // nolint: maligned

func NewNode(
commonNode *committee.Node,
fetchPool *workerpool.Pool,
roleProvider registration.RoleProvider,
rpcRoleProvider registration.RoleProvider,
workerCommonCfg workerCommon.Config,
Expand All @@ -163,6 +162,10 @@ func NewNode(
) (*Node, error) {
initMetrics()

// Create the fetcher pool.
fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String())
fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount)

n := &Node{
commonNode: commonNode,

Expand Down Expand Up @@ -293,6 +296,8 @@ func (n *Node) Stop() {
n.status = api.StatusStopping
n.statusLock.Unlock()

n.fetchPool.Stop()

n.ctxCancel()
}

Expand Down
14 changes: 1 addition & 13 deletions go/worker/storage/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/oasisprotocol/oasis-core/go/common/grpc"
"github.com/oasisprotocol/oasis-core/go/common/logging"
"github.com/oasisprotocol/oasis-core/go/common/node"
"github.com/oasisprotocol/oasis-core/go/common/workerpool"
"github.com/oasisprotocol/oasis-core/go/config"
workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common"
committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee"
Expand All @@ -27,8 +26,7 @@ type Worker struct {
initCh chan struct{}
quitCh chan struct{}

runtimes map[common.Namespace]*committee.Node
fetchPool *workerpool.Pool
runtimes map[common.Namespace]*committee.Node
}

// New constructs a new storage worker.
Expand All @@ -53,9 +51,6 @@ func New(
return s, nil
}

s.fetchPool = workerpool.New("storage_fetch")
s.fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount)

// Start storage node for every runtime.
for id, rt := range s.commonWorker.GetRuntimes() {
if err := s.registerRuntime(rt); err != nil {
Expand Down Expand Up @@ -94,7 +89,6 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error {

node, err := committee.NewNode(
commonNode,
w.fetchPool,
rp,
rpRPC,
w.commonWorker.GetConfig(),
Expand Down Expand Up @@ -152,9 +146,6 @@ func (w *Worker) Start() error {
for _, r := range w.runtimes {
<-r.Quit()
}
if w.fetchPool != nil {
<-w.fetchPool.Quit()
}
}()

// Start all runtimes and wait for initialization.
Expand Down Expand Up @@ -188,9 +179,6 @@ func (w *Worker) Stop() {
for _, r := range w.runtimes {
r.Stop()
}
if w.fetchPool != nil {
w.fetchPool.Stop()
}
}

// Quit returns a channel that will be closed when the service terminates.
Expand Down

0 comments on commit 917dcc3

Please sign in to comment.