From db8d6dd9aedb171909c1794ffff248300d5e18b6 Mon Sep 17 00:00:00 2001 From: Dmytro Lykhovyi Date: Tue, 7 Jun 2022 17:06:55 +0300 Subject: [PATCH] a channel per consumer; --- pubsub/transport/amqp/amqp.go | 28 ++++++++++++---------------- pubsub/transport/amqp/connection.go | 1 + 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/pubsub/transport/amqp/amqp.go b/pubsub/transport/amqp/amqp.go index 80cb5e4..8450c14 100644 --- a/pubsub/transport/amqp/amqp.go +++ b/pubsub/transport/amqp/amqp.go @@ -25,7 +25,6 @@ type amqpTransport struct { connection *Connection publishingChannel *Channel logger log.Logger - consumingChannel *Channel } func (t *amqpTransport) Connect(ctx context.Context) error { @@ -40,15 +39,8 @@ func (t *amqpTransport) Connect(ctx context.Context) error { return errors.WithStack(err) } - consumingChannel, err := conn.Channel() - - if err != nil { - return errors.WithStack(err) - } - t.connection = conn t.publishingChannel = publishingChannel - t.consumingChannel = consumingChannel return nil } @@ -165,6 +157,11 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o return nil, errors.WithStack(err) } + consumingChannel, err := t.connection.Channel() + if err != nil { + return nil, errors.WithStack(err) + } + consumeOptions := &ConsumeOptions{} for _, opt := range options { @@ -174,7 +171,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o } if consumeOptions.PrefetchCount > 0 { - if err := t.consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil { + if err := consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil { return nil, errors.WithStack(err) } } @@ -190,13 +187,16 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o defer func() { t.logger.Logf(log.InfoLevel, "canceling consumer %s", queue.Name()) - if err := t.consumingChannel.Cancel(queue.Name(), true); err != nil { + if err := consumingChannel.Cancel(queue.Name(), true); err != nil { t.logger.Logf(log.ErrorLevel, "error canceling consumer %s", err) } + if err := consumingChannel.Close(); err != nil { + t.logger.Logf(log.ErrorLevel, "error closing amqp channel", err) + } t.logger.Logf(log.InfoLevel, "canceled consumer %s", queue.Name()) }() - msgs, err := t.consumingChannel.Consume( + msgs, err := consumingChannel.Consume( queue.Name(), queue.Name(), false, @@ -238,7 +238,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o } func (t *amqpTransport) Disconnect(ctx context.Context) error { - if t.connection == nil || t.publishingChannel == nil || t.consumingChannel == nil { + if t.connection == nil || t.publishingChannel == nil { return nil } @@ -246,10 +246,6 @@ func (t *amqpTransport) Disconnect(ctx context.Context) error { return errors.Wrap(err, "error closing publishing channel") } - if err := t.consumingChannel.Close(); err != nil { - return errors.Wrap(err, "error closing consuming channel") - } - if err := t.connection.Close(); err != nil { return errors.Wrap(err, "error closing connection") } diff --git a/pubsub/transport/amqp/connection.go b/pubsub/transport/amqp/connection.go index d4b5534..bddb1eb 100644 --- a/pubsub/transport/amqp/connection.go +++ b/pubsub/transport/amqp/connection.go @@ -99,6 +99,7 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, time.Sleep(delay * time.Second) if reconnectedCount > reconnectCount { ch.logger.Logf(log.FatalLevel, "Reached limit of reconnects %d", reconnectCount) + break } reconnectedCount++ continue