Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PGMQ driver implementation #38

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions cmd/smoothmq/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/poundifdef/smoothmq/dashboard"
"github.com/poundifdef/smoothmq/models"
"github.com/poundifdef/smoothmq/protocols/sqs"
"github.com/poundifdef/smoothmq/queue/pgmq"
"github.com/poundifdef/smoothmq/queue/sqlite"
"github.com/poundifdef/smoothmq/tenants/defaultmanager"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -53,6 +54,15 @@ func Run(tm models.TenantManager, queue models.Queue, cfg config.ServerCommand)
tm = defaultmanager.NewDefaultTenantManager(cfg.SQS.Keys)
}

// Initialize PGMQ when configured
if queue == nil && cfg.PGMQ.Uri != "" {
_queue, err := pgmq.NewPGMQQueue(cfg.PGMQ)
if err != nil {
panic(err)
}
queue = _queue
}

// Initialize default queue implementation
if queue == nil {
queue = sqlite.NewSQLiteQueue(cfg.SQLite)
Expand Down
16 changes: 14 additions & 2 deletions cmd/smoothmq/tester/tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import (
func Run(c smoothCfg.TesterCommand) {
var sentMessages, receivedMessages int

queueUrl := "https://sqs.us-east-1.amazonaws.com/123/test-queue"

// Load the AWS configuration with hardcoded credentials
cfg, err := config.LoadDefaultConfig(context.TODO(),
config.WithRegion("us-east-1"),
Expand All @@ -43,6 +41,8 @@ func Run(c smoothCfg.TesterCommand) {

var ch chan int

queueUrl := createQueue(sqsClient)

for i := 0; i < c.Senders; i++ {
wg.Add(1)
go func(id int) {
Expand Down Expand Up @@ -96,6 +96,18 @@ func GenerateRandomString(n int) string {
return string(b)
}

func createQueue(client *sqs.Client) string {
queueName := fmt.Sprintf("test-queue-%d", rand.Int())
i := &sqs.CreateQueueInput{
QueueName: &queueName,
}
result, err := client.CreateQueue(context.TODO(), i)
if err != nil {
log.Error().Err(err).Send()
}
return *result.QueueUrl
}

func sendMessage(client *sqs.Client, queueUrl string, goroutineID, requestID, batchSize, delaySeconds int) {

if batchSize > 1 {
Expand Down
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type ServerCommand struct {
SQS SQSConfig `embed:"" prefix:"sqs-" envprefix:"Q_SQS_"`
Dashboard DashboardConfig `embed:"" prefix:"dashboard-" envprefix:"Q_DASHBOARD_"`
SQLite SQLiteConfig `embed:"" prefix:"sqlite-" envprefix:"Q_SQLITE_"`
PGMQ PGMQConfig `embed:"" prefix:"pgmq-" envprefix:"Q_PGMQ_"`
Metrics MetricsConfig `embed:"" prefix:"metrics-" name:"metrics" envprefix:"Q_METRICS_"`

DisableTelemetry bool `name:"disable-telemetry" default:"false" env:"DISABLE_TELEMETRY"`
Expand All @@ -53,6 +54,10 @@ type SQLiteConfig struct {
Path string `name:"path" help:"Path of SQLite file" default:"smoothmq.sqlite" env:"PATH"`
}

type PGMQConfig struct {
Uri string `name:"uri" help:"PostgreSQL connection string" env:"URI"`
}

type SQSConfig struct {
Enabled bool `name:"enabled" default:"true" help:"Enable SQS protocol for queue" env:"ENABLED"`
Port int `name:"port" default:"3001" help:"HTTP port for SQS protocol" env:"PORT"`
Expand Down
10 changes: 10 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,16 @@ require (
github.com/aws/smithy-go v1.20.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/craigpastro/pgmq-go v0.4.0 // indirect
github.com/gofiber/template v1.8.3 // indirect
github.com/gofiber/utils v1.1.0 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.3 // indirect
github.com/jackc/pgx/v5 v5.5.5 // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/klauspost/compress v1.17.9 // indirect
Expand All @@ -55,8 +62,11 @@ require (
github.com/tidwall/pretty v1.2.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/driver/postgres v1.5.9 // indirect
)
206 changes: 206 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Message struct {
TenantID int64 `db:"tenant_id"`
QueueID int64 `db:"queue_id"`

// FIXME: DeliverAt and DeliveredAt should use int64
// for Unix timestamps to prevent overflow in 2038
DeliverAt int `db:"deliver_at"`
DeliveredAt int `db:"delivered_at"`
Tries int `db:"tries"`
Expand Down
2 changes: 2 additions & 0 deletions models/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Queue interface {
Enqueue(tenantId int64, queue string, message string, kv map[string]string, delay int) (int64, error)
Dequeue(tenantId int64, queue string, numToDequeue int, requeueIn int) ([]*Message, error)

// TODO: Peek should include an error on return
Peek(tenantId int64, queue string, messageId int64) *Message
// TODO: Status should include an error on return
Stats(tenantId int64, queue string) QueueStats
Filter(tenantId int64, queue string, filterCriteria FilterCriteria) []int64

Expand Down
Loading