From 338f6d6d4e08babd74f38e74ec82cca57aba7a8d Mon Sep 17 00:00:00 2001 From: Vladyslav Kopaihorodskyi Date: Sun, 22 May 2022 15:06:37 +0300 Subject: [PATCH] added possibility to configure subscriber, workers pool etc --- messagebus.go | 18 ++++---- pubsub/subscriber/subscriber.go | 77 +++++++++++++++++++++++++------- pubsub/transport/amqp/amqp.go | 2 +- pubsub/transport/amqp/options.go | 4 +- 4 files changed, 73 insertions(+), 28 deletions(-) diff --git a/messagebus.go b/messagebus.go index 4bc4675..a55a579 100644 --- a/messagebus.go +++ b/messagebus.go @@ -23,6 +23,7 @@ type SubscriberOption func(subscriberOpts *subscriberOpts, c *subscriberContaine type subscriberOpts struct { subscriber subscriber.Subscriber + opts []subscriber.Opt transport transport.Transport } @@ -38,10 +39,11 @@ func WithSubscriber(subscriber subscriber.Subscriber) SubscriberOption { } } -// DefaultWithTransport option allows to specify your own transport which will be used in the default subscriber -func DefaultWithTransport(transport transport.Transport) SubscriberOption { +// DefaultSubscriber option allows to specify your own transport which will be used in the default subscriber +func DefaultSubscriber(transport transport.Transport, opts ...subscriber.Opt) SubscriberOption { return func(subscriberOpts *subscriberOpts, c *subscriberContainer) { subscriberOpts.transport = transport + subscriberOpts.opts = opts } } @@ -136,16 +138,16 @@ func NewMessageBus(logger log.Logger, msgMarshaller message.Marshaller, scheme s mBus.router = container.router mBus.scheme = scheme - subscriberOpt := &subscriberOpts{} - subscriberOption(subscriberOpt, &subscriberContainer{ + subscriberCreationOpts := &subscriberOpts{} + subscriberOption(subscriberCreationOpts, &subscriberContainer{ msgMarshaller: msgMarshaller, processor: container.processor, }) - if subscriberOpt.subscriber != nil { - mBus.subscriber = subscriberOpt.subscriber - } else if subscriberOpt.transport != nil { - mBus.subscriber = subscriber.NewSubscriber(subscriberOpt.transport, container.processor, logger) + if subscriberCreationOpts.subscriber != nil { + mBus.subscriber = subscriberCreationOpts.subscriber + } else if subscriberCreationOpts.transport != nil { + mBus.subscriber = subscriber.NewSubscriber(subscriberCreationOpts.transport, container.processor, logger, subscriberCreationOpts.opts...) } else { return nil, errors.New("subscriber is nil") } diff --git a/pubsub/subscriber/subscriber.go b/pubsub/subscriber/subscriber.go index 6a0f150..3e072cc 100644 --- a/pubsub/subscriber/subscriber.go +++ b/pubsub/subscriber/subscriber.go @@ -15,13 +15,6 @@ import ( "github.com/pkg/errors" ) -const ( - maxTasksInProgress = 100 - packageProcessingMaxTime time.Duration = time.Second * 60 - gracefulShutdownTimeout time.Duration = time.Second * 120 - scheduleTimeout time.Duration = time.Second * 3 -) - // Subscriber starts listening for queues and processes messages type Subscriber interface { // Run listens queues for packages and processes them. Gracefully shuts down either on os.Signal or ctx.Done() or Stop() @@ -30,16 +23,66 @@ type Subscriber interface { Stop(ctx context.Context) error } +// Config allows to configure subscriber workflow +type Config struct { + // WorkersCount specifies a number workers that process packages + WorkersCount uint + // WorkerWaitingAssignmentTimeout amount of time that a worker will wait for assigning a package + WorkerWaitingAssignmentTimeout time.Duration + // PackageProcessingMaxTime amount of time for a package to be processed + PackageProcessingMaxTime time.Duration + // GracefulShutdownTimeout amount of time for graceful shutdown + GracefulShutdownTimeout time.Duration +} + +type subscriberOpts struct { + config *Config +} + +type Opt func(o *subscriberOpts) + +func WithConfig(c *Config) Opt { + return func(o *subscriberOpts) { + o.config = c + } +} + +// NewSubscriber creates default subscriber implementation +func NewSubscriber(transport transport.Transport, processor Processor, logger log.Logger, opts ...Opt) Subscriber { + sOpts := &subscriberOpts{} + + for _, o := range opts { + o(sOpts) + } + + var config *Config + + if sOpts.config != nil { + config = sOpts.config + } else { + config = &Config{ + WorkersCount: 10, + WorkerWaitingAssignmentTimeout: time.Second * 3, + PackageProcessingMaxTime: time.Second * 60, + GracefulShutdownTimeout: time.Second * 61, + } + } + + return &subscriber{ + transport: transport, + logger: logger, + processor: processor, + workerDispatcher: newDispatcher(config.WorkersCount), + config: config, + } +} + type subscriber struct { transport transport.Transport logger log.Logger processor Processor workerDispatcher *dispatcher -} - -// NewSubscriber creates default subscriber implementation -func NewSubscriber(transport transport.Transport, processor Processor, logger log.Logger) Subscriber { - return &subscriber{transport: transport, logger: logger, processor: processor, workerDispatcher: newDispatcher(maxTasksInProgress)} + config *Config } func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error { @@ -51,11 +94,11 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error { signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM) consumerCtx, cancelConsumerCtx := context.WithCancel(ctx) - shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout) + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), s.config.GracefulShutdownTimeout) defer shutdownCancel() defer cancelConsumerCtx() - consumedPkgs, err := s.transport.Consume(consumerCtx, queues, amqp.WithQosPrefetchCount(maxTasksInProgress)) + consumedPkgs, err := s.transport.Consume(consumerCtx, queues, amqp.WithQosPrefetchCount(s.config.WorkersCount)) if err != nil { return errors.WithStack(err) @@ -63,7 +106,7 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error { s.workerDispatcher.start(consumerCtx) - scheduleTicker := time.NewTicker(scheduleTimeout) + scheduleTicker := time.NewTicker(s.config.WorkerWaitingAssignmentTimeout) defer scheduleTicker.Stop() @@ -76,7 +119,7 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error { } select { case <-scheduleTicker.C: - s.logger.Logf(log.DebugLevel, "worker was waiting %s for a job to start. returning him to the pool", scheduleTimeout.String()) + s.logger.Logf(log.DebugLevel, "worker was waiting %s for a job to start. returning him to the pool", s.config.WorkerWaitingAssignmentTimeout.String()) s.workerDispatcher.queue() <- worker break case incomingPkg, open := <-consumedPkgs: @@ -104,7 +147,7 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error { } func (s *subscriber) processPackage(ctx context.Context, inPkg transport.IncomingPkg) { - processorCtx, processorCancel := context.WithTimeout(ctx, packageProcessingMaxTime) + processorCtx, processorCancel := context.WithTimeout(ctx, s.config.PackageProcessingMaxTime) defer processorCancel() if err := s.processor.Process(processorCtx, inPkg); err != nil { diff --git a/pubsub/transport/amqp/amqp.go b/pubsub/transport/amqp/amqp.go index e20c0a8..f8faefe 100644 --- a/pubsub/transport/amqp/amqp.go +++ b/pubsub/transport/amqp/amqp.go @@ -177,7 +177,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o } if consumeOptions.PrefetchCount > 0 { - if err := t.consumingChannel.Qos(consumeOptions.PrefetchCount, 0, false); err != nil { + if err := t.consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil { return nil, errors.WithStack(err) } } diff --git a/pubsub/transport/amqp/options.go b/pubsub/transport/amqp/options.go index 44de32c..355992d 100644 --- a/pubsub/transport/amqp/options.go +++ b/pubsub/transport/amqp/options.go @@ -9,7 +9,7 @@ type ConsumeOptions struct { Exclusive bool NoLocal bool NoWait bool - PrefetchCount int + PrefetchCount uint } func convertConsumeOptsType(options interface{}) (*ConsumeOptions, error) { @@ -32,7 +32,7 @@ func convertSendOptsType(options interface{}) (*SendOptions, error) { return opts, nil } -func WithQosPrefetchCount(limit int) transport.ConsumeOpts { +func WithQosPrefetchCount(limit uint) transport.ConsumeOpts { return func(options interface{}) error { opts, err := convertConsumeOptsType(options)