diff --git a/platform/fabric/core/generic/finality/listenermanager.go b/platform/fabric/core/generic/finality/listenermanager.go index f511e98af..6395630d8 100644 --- a/platform/fabric/core/generic/finality/listenermanager.go +++ b/platform/fabric/core/generic/finality/listenermanager.go @@ -45,10 +45,11 @@ type ListenerManager[T TxInfo] interface { type TxInfoCallback[T TxInfo] func(T) error type DeliveryListenerManagerConfig struct { - MapperParallelism int - ListenerTimeout time.Duration - LRUSize int - LRUBuffer int + MapperParallelism int + BlockProcessParallelism int + ListenerTimeout time.Duration + LRUSize int + LRUBuffer int } func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery *fabric.Delivery, tracer trace.Tracer, mapper TxInfoMapper[T]) (*listenerManager[T], error) { @@ -71,12 +72,13 @@ func NewListenerManager[T TxInfo](config DeliveryListenerManagerConfig, delivery txInfos = cache.NewMapCache[driver2.TxID, T]() } flm := &listenerManager[T]{ - mapper: ¶llelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper}, - tracer: tracer, - listeners: listeners, - txInfos: txInfos, - ignoreBlockErrors: true, - delivery: delivery, + mapper: ¶llelBlockMapper[T]{cap: max(config.MapperParallelism, 1), mapper: mapper}, + tracer: tracer, + listeners: listeners, + txInfos: txInfos, + ignoreBlockErrors: true, + blockProcessingParallelism: config.BlockProcessParallelism, + delivery: delivery, } logger.Infof("Starting delivery service...") go flm.start() @@ -110,22 +112,40 @@ func fetchTxs[T TxInfo](evicted map[driver2.TxID][]ListenerEntry[T], mapper TxIn func (m *listenerManager[T]) start() { // In case the delivery service fails, it will try to reconnect automatically. - err := m.delivery.ScanBlock(context.Background(), func(ctx context.Context, block *common.Block) (bool, error) { - err := m.onBlock(ctx, block) - return !m.ignoreBlockErrors && err != nil, err - }) + err := m.delivery.ScanBlock(context.Background(), m.newBlockCallback()) logger.Errorf("failed running delivery: %v", err) } +func (m *listenerManager[T]) newBlockCallback() fabric.BlockCallback { + if m.blockProcessingParallelism <= 1 { + return func(ctx context.Context, block *common.Block) (bool, error) { + err := m.onBlock(ctx, block) + return !m.ignoreBlockErrors && err != nil, err + } + } + eg := errgroup.Group{} + eg.SetLimit(m.blockProcessingParallelism) + return func(ctx context.Context, block *common.Block) (bool, error) { + eg.Go(func() error { + if err := m.onBlock(ctx, block); err != nil { + logger.Warnf("Mapping block [%d] errored: %v", block.Header.Number, err) + } + return nil + }) + return false, nil + } +} + type listenerManager[T TxInfo] struct { tracer trace.Tracer mapper *parallelBlockMapper[T] - mu sync.RWMutex - listeners cache.Map[driver2.TxID, []ListenerEntry[T]] - txInfos cache.Map[driver2.TxID, T] - delivery *fabric.Delivery - ignoreBlockErrors bool + mu sync.RWMutex + listeners cache.Map[driver2.TxID, []ListenerEntry[T]] + txInfos cache.Map[driver2.TxID, T] + delivery *fabric.Delivery + blockProcessingParallelism int + ignoreBlockErrors bool } func (m *listenerManager[T]) onBlock(ctx context.Context, block *common.Block) error {