Skip to content

Commit

Permalink
pgmq: Postgres connection initialization & shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
sundbry committed Sep 21, 2024
1 parent 73f82b8 commit 66f8eda
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
6 changes: 5 additions & 1 deletion cmd/smoothmq/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func Run(tm models.TenantManager, queue models.Queue, cfg config.ServerCommand)

// Initialize PGMQ when configured
if queue == nil && cfg.PGMQ.Uri != "" {
queue = pgmq.NewPGMQQueue(cfg.PGMQ)
_queue, err := pgmq.NewPGMQQueue(cfg.PGMQ)
if err != nil {
panic(err)
}
queue = _queue
}

// Initialize default queue implementation
Expand Down
13 changes: 11 additions & 2 deletions queue/pgmq/pgmq.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pgmq

import (
"context"
"github.com/craigpastro/pgmq-go"
"github.com/poundifdef/smoothmq/config"
"github.com/poundifdef/smoothmq/models"
Expand All @@ -11,9 +12,16 @@ type PGMQQueue struct {
PGMQ *pgmq.PGMQ
}

func NewPGMQQueue(cfg config.PGMQConfig) *PGMQQueue {
func NewPGMQQueue(cfg config.PGMQConfig) (*PGMQQueue, error) {
log.Info().Msg("Initializing pgmq backend")
return &PGMQQueue{}
impl, err := pgmq.New(context.Background(), cfg.Uri)
if err != nil {
return nil, err
}
driver := &PGMQQueue{
PGMQ: impl,
}
return driver, nil
}

func (q *PGMQQueue) GetQueue(tenantId int64, queueName string) (models.QueueProperties, error) {
Expand Down Expand Up @@ -63,5 +71,6 @@ func (q *PGMQQueue) Delete(tenantId int64, queue string, messageId int64) error
}

func (q *PGMQQueue) Shutdown() error {
q.PGMQ.Close()
return nil
}

0 comments on commit 66f8eda

Please sign in to comment.