Skip to content

Commit

Permalink
added possibility to configure subscriber, workers pool etc
Browse files Browse the repository at this point in the history
  • Loading branch information
kopaygorodsky committed May 22, 2022
1 parent aa19154 commit 338f6d6
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 28 deletions.
18 changes: 10 additions & 8 deletions messagebus.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type SubscriberOption func(subscriberOpts *subscriberOpts, c *subscriberContaine

type subscriberOpts struct {
subscriber subscriber.Subscriber
opts []subscriber.Opt
transport transport.Transport
}

Expand All @@ -38,10 +39,11 @@ func WithSubscriber(subscriber subscriber.Subscriber) SubscriberOption {
}
}

// DefaultWithTransport option allows to specify your own transport which will be used in the default subscriber
func DefaultWithTransport(transport transport.Transport) SubscriberOption {
// DefaultSubscriber option allows to specify your own transport which will be used in the default subscriber
func DefaultSubscriber(transport transport.Transport, opts ...subscriber.Opt) SubscriberOption {
return func(subscriberOpts *subscriberOpts, c *subscriberContainer) {
subscriberOpts.transport = transport
subscriberOpts.opts = opts
}
}

Expand Down Expand Up @@ -136,16 +138,16 @@ func NewMessageBus(logger log.Logger, msgMarshaller message.Marshaller, scheme s
mBus.router = container.router
mBus.scheme = scheme

subscriberOpt := &subscriberOpts{}
subscriberOption(subscriberOpt, &subscriberContainer{
subscriberCreationOpts := &subscriberOpts{}
subscriberOption(subscriberCreationOpts, &subscriberContainer{
msgMarshaller: msgMarshaller,
processor: container.processor,
})

if subscriberOpt.subscriber != nil {
mBus.subscriber = subscriberOpt.subscriber
} else if subscriberOpt.transport != nil {
mBus.subscriber = subscriber.NewSubscriber(subscriberOpt.transport, container.processor, logger)
if subscriberCreationOpts.subscriber != nil {
mBus.subscriber = subscriberCreationOpts.subscriber
} else if subscriberCreationOpts.transport != nil {
mBus.subscriber = subscriber.NewSubscriber(subscriberCreationOpts.transport, container.processor, logger, subscriberCreationOpts.opts...)
} else {
return nil, errors.New("subscriber is nil")
}
Expand Down
77 changes: 60 additions & 17 deletions pubsub/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ import (
"github.com/pkg/errors"
)

const (
maxTasksInProgress = 100
packageProcessingMaxTime time.Duration = time.Second * 60
gracefulShutdownTimeout time.Duration = time.Second * 120
scheduleTimeout time.Duration = time.Second * 3
)

// Subscriber starts listening for queues and processes messages
type Subscriber interface {
// Run listens queues for packages and processes them. Gracefully shuts down either on os.Signal or ctx.Done() or Stop()
Expand All @@ -30,16 +23,66 @@ type Subscriber interface {
Stop(ctx context.Context) error
}

// Config allows to configure subscriber workflow
type Config struct {
// WorkersCount specifies a number workers that process packages
WorkersCount uint
// WorkerWaitingAssignmentTimeout amount of time that a worker will wait for assigning a package
WorkerWaitingAssignmentTimeout time.Duration
// PackageProcessingMaxTime amount of time for a package to be processed
PackageProcessingMaxTime time.Duration
// GracefulShutdownTimeout amount of time for graceful shutdown
GracefulShutdownTimeout time.Duration
}

type subscriberOpts struct {
config *Config
}

type Opt func(o *subscriberOpts)

func WithConfig(c *Config) Opt {
return func(o *subscriberOpts) {
o.config = c
}
}

// NewSubscriber creates default subscriber implementation
func NewSubscriber(transport transport.Transport, processor Processor, logger log.Logger, opts ...Opt) Subscriber {
sOpts := &subscriberOpts{}

for _, o := range opts {
o(sOpts)
}

var config *Config

if sOpts.config != nil {
config = sOpts.config
} else {
config = &Config{
WorkersCount: 10,
WorkerWaitingAssignmentTimeout: time.Second * 3,
PackageProcessingMaxTime: time.Second * 60,
GracefulShutdownTimeout: time.Second * 61,
}
}

return &subscriber{
transport: transport,
logger: logger,
processor: processor,
workerDispatcher: newDispatcher(config.WorkersCount),
config: config,
}
}

type subscriber struct {
transport transport.Transport
logger log.Logger
processor Processor
workerDispatcher *dispatcher
}

// NewSubscriber creates default subscriber implementation
func NewSubscriber(transport transport.Transport, processor Processor, logger log.Logger) Subscriber {
return &subscriber{transport: transport, logger: logger, processor: processor, workerDispatcher: newDispatcher(maxTasksInProgress)}
config *Config
}

func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error {
Expand All @@ -51,19 +94,19 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error {
signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

consumerCtx, cancelConsumerCtx := context.WithCancel(ctx)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout)
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), s.config.GracefulShutdownTimeout)
defer shutdownCancel()
defer cancelConsumerCtx()

consumedPkgs, err := s.transport.Consume(consumerCtx, queues, amqp.WithQosPrefetchCount(maxTasksInProgress))
consumedPkgs, err := s.transport.Consume(consumerCtx, queues, amqp.WithQosPrefetchCount(s.config.WorkersCount))

if err != nil {
return errors.WithStack(err)
}

s.workerDispatcher.start(consumerCtx)

scheduleTicker := time.NewTicker(scheduleTimeout)
scheduleTicker := time.NewTicker(s.config.WorkerWaitingAssignmentTimeout)

defer scheduleTicker.Stop()

Expand All @@ -76,7 +119,7 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error {
}
select {
case <-scheduleTicker.C:
s.logger.Logf(log.DebugLevel, "worker was waiting %s for a job to start. returning him to the pool", scheduleTimeout.String())
s.logger.Logf(log.DebugLevel, "worker was waiting %s for a job to start. returning him to the pool", s.config.WorkerWaitingAssignmentTimeout.String())
s.workerDispatcher.queue() <- worker
break
case incomingPkg, open := <-consumedPkgs:
Expand Down Expand Up @@ -104,7 +147,7 @@ func (s *subscriber) Run(ctx context.Context, queues ...transport.Queue) error {
}

func (s *subscriber) processPackage(ctx context.Context, inPkg transport.IncomingPkg) {
processorCtx, processorCancel := context.WithTimeout(ctx, packageProcessingMaxTime)
processorCtx, processorCancel := context.WithTimeout(ctx, s.config.PackageProcessingMaxTime)
defer processorCancel()

if err := s.processor.Process(processorCtx, inPkg); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pubsub/transport/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o
}

if consumeOptions.PrefetchCount > 0 {
if err := t.consumingChannel.Qos(consumeOptions.PrefetchCount, 0, false); err != nil {
if err := t.consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil {
return nil, errors.WithStack(err)
}
}
Expand Down
4 changes: 2 additions & 2 deletions pubsub/transport/amqp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type ConsumeOptions struct {
Exclusive bool
NoLocal bool
NoWait bool
PrefetchCount int
PrefetchCount uint
}

func convertConsumeOptsType(options interface{}) (*ConsumeOptions, error) {
Expand All @@ -32,7 +32,7 @@ func convertSendOptsType(options interface{}) (*SendOptions, error) {
return opts, nil
}

func WithQosPrefetchCount(limit int) transport.ConsumeOpts {
func WithQosPrefetchCount(limit uint) transport.ConsumeOpts {
return func(options interface{}) error {
opts, err := convertConsumeOptsType(options)

Expand Down

0 comments on commit 338f6d6

Please sign in to comment.