Skip to content

Commit

Permalink
Limit parallel execution of a stage's layer
Browse files Browse the repository at this point in the history
Previously, the engine was executing modules in a stage's layer all in parallel. So if you had 20 independent mapper modules, they were all run in parallel.

This was hindering performance on high load where a lot of CPU cycles can be consumed will the machine has limited physical cores available.

We now change that behavior, development mode will not execute any modules in parallel, never. For production mode, we now limit to 2 parallel execution. A future update will make that value dynamic based on the subscription of the request.
  • Loading branch information
maoueh committed Jan 30, 2025
1 parent ff3d1de commit 7f4d451
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 18 deletions.
6 changes: 4 additions & 2 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

* Improve noop-mode: will now only send one signal per bundle, without any data.

* Limit parallel execution of a stage's layer
* Limit parallel execution of a stage's layer.

Previously, the engine was executing modules in a stage's layer all in parallel. We now change that behavior, development mode will not execute any modules in parallel and production mode will limit parallelism to 2 for now.
Previously, the engine was executing modules in a stage's layer all in parallel. We now change that behavior, development mode will from now on execute every sequentially and when in production mode will limit parallelism to 2 (hard-coded) for now.

The auth plugin can control that value dynamically by providing a trusted header `X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count`.

### Client

Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1
github.com/streamingfast/dgrpc v0.0.0-20250115215805-6f4ad2be7eef
Expand Down Expand Up @@ -194,7 +194,7 @@ require (
go.uber.org/multierr v1.10.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sync v0.8.0
golang.org/x/sys v0.25.0 // indirect
golang.org/x/term v0.20.0 // indirect
golang.org/x/text v0.18.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -527,8 +527,8 @@ github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320 h1:2XKZH4m
github.com/streamingfast/bstream v0.0.2-0.20250108204816-fd52ed027320/go.mod h1:n5wy+Vmwp4xbjXO7B81MAkAgjnf1vJ/lI2y6hWWyFbg=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375 h1:nwuFSEJtQfqTuN62WvysfAtDT4qqwQ6ghFX0i2VY1fY=
github.com/streamingfast/cli v0.0.4-0.20241119021815-815afa473375/go.mod h1:qOksW3DPhHVYBo8dcYxS7K3Q09wlcOChSdopeOjLWng=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338 h1:o3Imquu+RhIdF62OSr/ZxVPsn6jpKHwBV/Upl6P28o0=
github.com/streamingfast/dauth v0.0.0-20240219205130-bfe428489338/go.mod h1:cwfI5vaMd+CiwZIL0H0JdP5UDWCZOVFz/ex3L0+o/j4=
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf h1:RFkRnIRUk51fcPn6eWuuecUSYFqmCP5affIMKywPIDU=
github.com/streamingfast/dauth v0.0.0-20250129222106-6e8709b44acf/go.mod h1:d8NTrjIoiqplrZEYOUXwwFaeB2C8fVeHYLvaJFfa9Xo=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c h1:6WjE2yInE+5jnI7cmCcxOiGZiEs2FQm9Zsg2a9Ivp0Q=
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c/go.mod h1:dbfiy9ORrL8c6ldSq+L0H9pg8TOqqu/FsghsgUEWK54=
github.com/streamingfast/derr v0.0.0-20230515163924-8570aaa43fe1 h1:xJB7rXnOHLesosMjfwWsEL2i/40mFSkzenEb3M0qTyM=
Expand Down
10 changes: 8 additions & 2 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"github.com/streamingfast/dmetering"
"github.com/streamingfast/logging"
"github.com/streamingfast/substreams/metering"
"github.com/streamingfast/substreams/metrics"
pbssinternal "github.com/streamingfast/substreams/pb/sf/substreams/intern/v2"
Expand Down Expand Up @@ -307,8 +308,13 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
// the ctx is cached in the built moduleExecutors so we only activate timeout here
ctx, cancel := context.WithTimeout(ctx, p.executionTimeout)
defer cancel()

isDevelopmentMode := reqctx.IsInDevelopmentMode(ctx)
maxParallelExecutor := reqctx.MaxStageLayerParallelExecutor(ctx)
logging.Logger(ctx, p.stores.logger).Debug("executing stage's layers", zap.Int("layer_count", len(p.StagedModuleExecutors)), zap.Uint64("max_parallel_executor", maxParallelExecutor))

for _, layer := range p.StagedModuleExecutors {
if len(layer) <= 1 || reqctx.IsInDevelopmentModeRequest(ctx) {
if isDevelopmentMode || maxParallelExecutor <= 1 || len(layer) <= 1 {
for _, executor := range layer {
if !executor.RunsOnBlock(blockNum) {
continue
Expand All @@ -321,7 +327,7 @@ func (p *Pipeline) executeModules(ctx context.Context, execOutput execout.Execut
} else {
results := make([]resultObj, len(layer))
wg := errgroup.Group{}
wg.SetLimit(2)
wg.SetLimit(int(maxParallelExecutor))

for i, executor := range layer {
if !executor.RunsOnBlock(execOutput.Clock().Number) {
Expand Down
27 changes: 26 additions & 1 deletion reqctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func WithEmitter(ctx context.Context, emitter dmetering.EventEmitter) context.Co
return context.WithValue(ctx, emitterKey, emitter)
}

func IsInDevelopmentModeRequest(ctx context.Context) bool {
func IsInDevelopmentMode(ctx context.Context) bool {
details := Details(ctx)
if details == nil {
return true
Expand All @@ -100,6 +100,31 @@ func IsInDevelopmentModeRequest(ctx context.Context) bool {
return !details.ProductionMode
}

const defaultMaxStageLayerParallelExecutorCount = 2

const safeguardMaxStageLayerParallelExecutorCount = 16

// MaxParallelJobs returns the maximum number of parallel executors (e.g. go routines) that can
// be executed at the same time for a particular stage's layer as configured and accepted by the
// auth plugin.
//
// If the value is not set, it will return 1.
func MaxStageLayerParallelExecutor(ctx context.Context) uint64 {
details := Details(ctx)
if details == nil {
// Always give at least 1, but there should always be a details object attached to the context
return 1
}

// If unset, provide default value which is 2 for now
if details.MaxStageLayerParallelExecutor == 0 {
return defaultMaxStageLayerParallelExecutorCount
}

// Protect in case of misconfiguration to cap at a sane system max value
return min(details.MaxStageLayerParallelExecutor, safeguardMaxStageLayerParallelExecutorCount)
}

type ISpan interface {
// End completes the Span. The Span is considered complete and ready to be
// delivered through the rest of the telemetry pipeline after this method
Expand Down
11 changes: 6 additions & 5 deletions reqctx/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ type RequestDetails struct {
ResolvedStartBlockNum uint64
ResolvedCursor string

LinearHandoffBlockNum uint64
LinearGateBlockNum uint64
StopBlockNum uint64
MaxParallelJobs uint64
UniqueID uint64
LinearHandoffBlockNum uint64
LinearGateBlockNum uint64
StopBlockNum uint64
MaxParallelJobs uint64
MaxStageLayerParallelExecutor uint64
UniqueID uint64

ProductionMode bool
IsTier2Request bool
Expand Down
12 changes: 8 additions & 4 deletions service/tier1.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,18 +433,22 @@ func (s *Tier1Service) blocks(ctx context.Context, request *pbsubstreamsrpc.Requ
cacheTag := s.runtimeConfig.DefaultCacheTag
if auth := dauth.FromContext(ctx); auth != nil {
if parallelJobs := auth.Get("X-Sf-Substreams-Parallel-Jobs"); parallelJobs != "" {
if ll, err := strconv.ParseUint(parallelJobs, 10, 64); err == nil {
requestDetails.MaxParallelJobs = ll
if count, err := strconv.ParseUint(parallelJobs, 10, 64); err == nil {
requestDetails.MaxParallelJobs = count
}
}
if parallelExecutors := auth.Get("X-Sf-Substreams-Stage-Layer-Parallel-Executor-Max-Count"); parallelExecutors != "" {
if count, err := strconv.ParseUint(parallelExecutors, 10, 64); err == nil {
requestDetails.MaxStageLayerParallelExecutor = count
}
}
if ct := auth.Get("X-Sf-Substreams-Cache-Tag"); ct != "" {
if IsValidCacheTag(ct) {
cacheTag = ct
} else {
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and undescores", ct)
return fmt.Errorf("invalid value for X-Sf-Substreams-Cache-Tag %s, should only contain letters, numbers, hyphens and underscores", ct)
}
}

}

var requestStats *metrics.Stats
Expand Down

0 comments on commit 7f4d451

Please sign in to comment.