Skip to content

Commit

Permalink
pgmq: Provide message metadata (Tries, DeliverAt, MaxTries)
Browse files Browse the repository at this point in the history
  • Loading branch information
sundbry committed Sep 22, 2024
1 parent d64ed15 commit 7096e86
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
2 changes: 2 additions & 0 deletions models/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Message struct {
TenantID int64 `db:"tenant_id"`
QueueID int64 `db:"queue_id"`

// FIXME: DeliverAt and DeliveredAt should use int64
// for Unix timestamps to prevent overflow in 2038
DeliverAt int `db:"deliver_at"`
DeliveredAt int `db:"delivered_at"`
Tries int `db:"tries"`
Expand Down
31 changes: 16 additions & 15 deletions queue/pgmq/pgmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type Envelope struct {
type MessageRow struct {
MsgID int64 `gorm:"not null,primaryKey,column:msg_id"`
Message pgtype.JSONB `gorm:"type:jsonb"`
ReadCount int `gorm:"not null,column:read_ct"`
// Visibility timeout
VT pgtype.Timestamptz `gorm:"not null,column:vt"`
}

type MetaRow struct {
Expand Down Expand Up @@ -98,13 +101,12 @@ func toMessage(tenantId int64, in *pgmq.Message) (*models.Message, error) {
}

return &models.Message {
ID: in.MsgID,
TenantID: tenantId,
//QueueID: message.QueueID,
//DeliverAt: int(message.DeliverAt),
//DeliveredAt: int(message.DeliveredAt),
//Tries: message.Tries,
//MaxTries: message.MaxTries,
ID: in.MsgID,
TenantID: tenantId,
DeliverAt: int(in.VT.Unix()),
Tries: int(in.ReadCount),
// pgmq has no max retry count; it's always n + 1 here.
MaxTries: int(in.ReadCount) + 1,
Message: []byte(envelope.Body),
KeyValues: envelope.Headers,
}, nil
Expand All @@ -118,14 +120,13 @@ func rowToMessage(tenantId int64, in *MessageRow) (*models.Message, error) {
}

return &models.Message {
ID: in.MsgID,
TenantID: tenantId,
//QueueID: message.QueueID,
//DeliverAt: int(message.DeliverAt),
//DeliveredAt: int(message.DeliveredAt),
//Tries: message.Tries,
//MaxTries: message.MaxTries,
Message: []byte(envelope.Body),
ID: in.MsgID,
TenantID: tenantId,
DeliverAt: int(in.VT.Time.Unix()),
Tries: int(in.ReadCount),
// pgmq has no max retry count; it's always n + 1 here.
MaxTries: int(in.ReadCount) + 1,
Message: []byte(envelope.Body),
KeyValues: envelope.Headers,
}, nil
}
Expand Down

0 comments on commit 7096e86

Please sign in to comment.