From 82bb7b27360ee2271c986fbc662ad40100296df4 Mon Sep 17 00:00:00 2001 From: Jernej Kos Date: Wed, 13 Mar 2024 11:44:42 +0100 Subject: [PATCH] go/worker/storage: Make fetch pool per-runtime This should speed up storage sync in case of nodes that have multiple runtimes configured. --- .changelog/5595.feature.md | 4 ++++ go/worker/storage/committee/node.go | 7 ++++++- go/worker/storage/worker.go | 14 +------------- 3 files changed, 11 insertions(+), 14 deletions(-) create mode 100644 .changelog/5595.feature.md diff --git a/.changelog/5595.feature.md b/.changelog/5595.feature.md new file mode 100644 index 00000000000..02caa2e1845 --- /dev/null +++ b/.changelog/5595.feature.md @@ -0,0 +1,4 @@ +go/worker/storage: Make fetch pool per-runtime + +This should speed up storage sync in case of nodes that have multiple +runtimes configured. diff --git a/go/worker/storage/committee/node.go b/go/worker/storage/committee/node.go index 5eb571ef9de..8f75a19509f 100644 --- a/go/worker/storage/committee/node.go +++ b/go/worker/storage/committee/node.go @@ -154,7 +154,6 @@ type Node struct { // nolint: maligned func NewNode( commonNode *committee.Node, - fetchPool *workerpool.Pool, roleProvider registration.RoleProvider, rpcRoleProvider registration.RoleProvider, workerCommonCfg workerCommon.Config, @@ -163,6 +162,10 @@ func NewNode( ) (*Node, error) { initMetrics() + // Create the fetcher pool. + fetchPool := workerpool.New("storage_fetch/" + commonNode.Runtime.ID().String()) + fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) + n := &Node{ commonNode: commonNode, @@ -293,6 +296,8 @@ func (n *Node) Stop() { n.status = api.StatusStopping n.statusLock.Unlock() + n.fetchPool.Stop() + n.ctxCancel() } diff --git a/go/worker/storage/worker.go b/go/worker/storage/worker.go index 04560e27817..8463fd8de86 100644 --- a/go/worker/storage/worker.go +++ b/go/worker/storage/worker.go @@ -7,7 +7,6 @@ import ( "github.com/oasisprotocol/oasis-core/go/common/grpc" "github.com/oasisprotocol/oasis-core/go/common/logging" "github.com/oasisprotocol/oasis-core/go/common/node" - "github.com/oasisprotocol/oasis-core/go/common/workerpool" "github.com/oasisprotocol/oasis-core/go/config" workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common" committeeCommon "github.com/oasisprotocol/oasis-core/go/worker/common/committee" @@ -27,8 +26,7 @@ type Worker struct { initCh chan struct{} quitCh chan struct{} - runtimes map[common.Namespace]*committee.Node - fetchPool *workerpool.Pool + runtimes map[common.Namespace]*committee.Node } // New constructs a new storage worker. @@ -53,9 +51,6 @@ func New( return s, nil } - s.fetchPool = workerpool.New("storage_fetch") - s.fetchPool.Resize(config.GlobalConfig.Storage.FetcherCount) - // Start storage node for every runtime. for id, rt := range s.commonWorker.GetRuntimes() { if err := s.registerRuntime(rt); err != nil { @@ -94,7 +89,6 @@ func (w *Worker) registerRuntime(commonNode *committeeCommon.Node) error { node, err := committee.NewNode( commonNode, - w.fetchPool, rp, rpRPC, w.commonWorker.GetConfig(), @@ -152,9 +146,6 @@ func (w *Worker) Start() error { for _, r := range w.runtimes { <-r.Quit() } - if w.fetchPool != nil { - <-w.fetchPool.Quit() - } }() // Start all runtimes and wait for initialization. @@ -188,9 +179,6 @@ func (w *Worker) Stop() { for _, r := range w.runtimes { r.Stop() } - if w.fetchPool != nil { - w.fetchPool.Stop() - } } // Quit returns a channel that will be closed when the service terminates.