Skip to content

Commit

Permalink
Merge pull request #14 from tsamsiyu/master
Browse files Browse the repository at this point in the history
A channel per consumer
  • Loading branch information
kopaygorodsky authored Jun 7, 2022
2 parents 337c995 + db8d6dd commit 8e26257
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
28 changes: 12 additions & 16 deletions pubsub/transport/amqp/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type amqpTransport struct {
connection *Connection
publishingChannel *Channel
logger log.Logger
consumingChannel *Channel
}

func (t *amqpTransport) Connect(ctx context.Context) error {
Expand All @@ -40,15 +39,8 @@ func (t *amqpTransport) Connect(ctx context.Context) error {
return errors.WithStack(err)
}

consumingChannel, err := conn.Channel()

if err != nil {
return errors.WithStack(err)
}

t.connection = conn
t.publishingChannel = publishingChannel
t.consumingChannel = consumingChannel

return nil
}
Expand Down Expand Up @@ -165,6 +157,11 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o
return nil, errors.WithStack(err)
}

consumingChannel, err := t.connection.Channel()
if err != nil {
return nil, errors.WithStack(err)
}

consumeOptions := &ConsumeOptions{}

for _, opt := range options {
Expand All @@ -174,7 +171,7 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o
}

if consumeOptions.PrefetchCount > 0 {
if err := t.consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil {
if err := consumingChannel.Qos(int(consumeOptions.PrefetchCount), 0, false); err != nil {
return nil, errors.WithStack(err)
}
}
Expand All @@ -190,13 +187,16 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o

defer func() {
t.logger.Logf(log.InfoLevel, "canceling consumer %s", queue.Name())
if err := t.consumingChannel.Cancel(queue.Name(), true); err != nil {
if err := consumingChannel.Cancel(queue.Name(), true); err != nil {
t.logger.Logf(log.ErrorLevel, "error canceling consumer %s", err)
}
if err := consumingChannel.Close(); err != nil {
t.logger.Logf(log.ErrorLevel, "error closing amqp channel", err)
}
t.logger.Logf(log.InfoLevel, "canceled consumer %s", queue.Name())
}()

msgs, err := t.consumingChannel.Consume(
msgs, err := consumingChannel.Consume(
queue.Name(),
queue.Name(),
false,
Expand Down Expand Up @@ -238,18 +238,14 @@ func (t *amqpTransport) Consume(ctx context.Context, queues []transport.Queue, o
}

func (t *amqpTransport) Disconnect(ctx context.Context) error {
if t.connection == nil || t.publishingChannel == nil || t.consumingChannel == nil {
if t.connection == nil || t.publishingChannel == nil {
return nil
}

if err := t.publishingChannel.Close(); err != nil {
return errors.Wrap(err, "error closing publishing channel")
}

if err := t.consumingChannel.Close(); err != nil {
return errors.Wrap(err, "error closing consuming channel")
}

if err := t.connection.Close(); err != nil {
return errors.Wrap(err, "error closing connection")
}
Expand Down
1 change: 1 addition & 0 deletions pubsub/transport/amqp/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal,
time.Sleep(delay * time.Second)
if reconnectedCount > reconnectCount {
ch.logger.Logf(log.FatalLevel, "Reached limit of reconnects %d", reconnectCount)
break
}
reconnectedCount++
continue
Expand Down

0 comments on commit 8e26257

Please sign in to comment.