From 7096e860eee6f9ecef024ecdb6fb121f06c28874 Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Sun, 22 Sep 2024 01:15:00 -0700 Subject: [PATCH] pgmq: Provide message metadata (Tries, DeliverAt, MaxTries) --- models/message.go | 2 ++ queue/pgmq/pgmq.go | 31 ++++++++++++++++--------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/models/message.go b/models/message.go index 2f10e04..fbee3ed 100644 --- a/models/message.go +++ b/models/message.go @@ -35,6 +35,8 @@ type Message struct { TenantID int64 `db:"tenant_id"` QueueID int64 `db:"queue_id"` + // FIXME: DeliverAt and DeliveredAt should use int64 + // for Unix timestamps to prevent overflow in 2038 DeliverAt int `db:"deliver_at"` DeliveredAt int `db:"delivered_at"` Tries int `db:"tries"` diff --git a/queue/pgmq/pgmq.go b/queue/pgmq/pgmq.go index 1ebb144..efac33f 100644 --- a/queue/pgmq/pgmq.go +++ b/queue/pgmq/pgmq.go @@ -24,6 +24,9 @@ type Envelope struct { type MessageRow struct { MsgID int64 `gorm:"not null,primaryKey,column:msg_id"` Message pgtype.JSONB `gorm:"type:jsonb"` + ReadCount int `gorm:"not null,column:read_ct"` + // Visibility timeout + VT pgtype.Timestamptz `gorm:"not null,column:vt"` } type MetaRow struct { @@ -98,13 +101,12 @@ func toMessage(tenantId int64, in *pgmq.Message) (*models.Message, error) { } return &models.Message { - ID: in.MsgID, - TenantID: tenantId, - //QueueID: message.QueueID, - //DeliverAt: int(message.DeliverAt), - //DeliveredAt: int(message.DeliveredAt), - //Tries: message.Tries, - //MaxTries: message.MaxTries, + ID: in.MsgID, + TenantID: tenantId, + DeliverAt: int(in.VT.Unix()), + Tries: int(in.ReadCount), + // pgmq has no max retry count; it's always n + 1 here. + MaxTries: int(in.ReadCount) + 1, Message: []byte(envelope.Body), KeyValues: envelope.Headers, }, nil @@ -118,14 +120,13 @@ func rowToMessage(tenantId int64, in *MessageRow) (*models.Message, error) { } return &models.Message { - ID: in.MsgID, - TenantID: tenantId, - //QueueID: message.QueueID, - //DeliverAt: int(message.DeliverAt), - //DeliveredAt: int(message.DeliveredAt), - //Tries: message.Tries, - //MaxTries: message.MaxTries, - Message: []byte(envelope.Body), + ID: in.MsgID, + TenantID: tenantId, + DeliverAt: int(in.VT.Time.Unix()), + Tries: int(in.ReadCount), + // pgmq has no max retry count; it's always n + 1 here. + MaxTries: int(in.ReadCount) + 1, + Message: []byte(envelope.Body), KeyValues: envelope.Headers, }, nil }