Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PGMQ driver implementation #38

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open

PGMQ driver implementation #38

wants to merge 12 commits into from

Conversation

sundbry
Copy link
Contributor

@sundbry sundbry commented Sep 21, 2024

This is the beginning of a backend implementation for SmoothMQ over pgmq.

After an embarrassing moment of downtime with the node running my SmoothMQ/SQLite database, I'd like to run it in a more highly available configuration.

PGMQ already has a decent go driver, and is one of the few queue service implementations which is polling based, similar to SQS, making it an easy fit to the API. Opening this PR for any early input and feedback from the community here.

So far we have implemented only CreateQueue, Enqueue, and Dequeue, but it is enough to pass the tester program.

Run the following programs in tandem to test:

docker run -d --name pgmq-postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 quay.io/tembo/pg16-pgmq:latest
./smoothmq server --pgmq-uri "postgres://postgres:postgres@localhost:5432/postgres"
./smoothmq tester --senders 1 --receivers 1 --messages 10

@poundifdef
Copy link
Owner

This is pretty interesting. I am in favor of a pg-based backend. I wonder if using gorm - which we're already using with sqlite - would be a good option here. That would let us reuse most of the sqlite code, and it would let us connect to any postgres db without needing to install an extension.

@sundbry
Copy link
Contributor Author

sundbry commented Sep 22, 2024

I believe there are some edge cases with running in a postgres transactional environment which are not so straightforward, especially with regard to the default READ_COMMITTED mode, that might cause strange behavior in production like double message dequeueing, or other unidentified problems which might not exist in a SQLite implementation, which pgmq has ironed out already. I would also expect there are enough performance and correctness optimizations which are postgres-specific or sqlite-specific that justify independent implementations.

One interesting fact is that the extension is pure SQL/plpgsql, so it should be possible to simply embed it with this program (BSD licensed) and initialize a pgmq schema automatically: https://github.com/tembo-io/pgmq/blob/main/pgmq-extension/sql/pgmq.sql

I have some more code to push up for this branch in a moment, which uses gorm minimally to read some of the raw rows directly in order to implement Peek() and ListQueues() in a tenant-safe way.

@sundbry sundbry force-pushed the pgmq branch 2 times, most recently from f07a54f to 6ff7754 Compare September 22, 2024 08:47
@sundbry
Copy link
Contributor Author

sundbry commented Sep 22, 2024

Hi @poundifdef, the implementation is more or less complete here, I plan to start using this ASAP to swap out my sqlite based instance to give it some testing.

Differences from sqlite implementation:

QueueProperties:

  • QueueName: Queue names are limited to ~40 characters, due to postgres table name length limits.
  • RateLimit: simply not implemented, can try adding this later
  • MaxRetires: The pgmq based implementation does not provide for a retry limit
  • VisibilityTimeout: the pgmq based implementation does not have a default visibility timeout configurable per queue in the pgmq.meta table, but the requeIn parameter on the Dequeue interface is used to set the per-message visibility timeout.

Because of the above, UpdateQueue is presently a no-op.

MessageStatus:

  • MessageStatusFailed does not apply since there is no retry limit.

Message:

  • QueueID does not apply since we do not assign numeric IDs to queues, but use a table per queue
  • DeliveredAt is not known since pgmq does not track the last read time of a message
  • MaxTries is always computed as Tries + 1 since it is unlimited in pgmq

@sundbry sundbry changed the title (Draft) PGMQ driver implementation PGMQ driver implementation Sep 22, 2024
@poundifdef
Copy link
Owner

Got it - let me know how this implementation goes!

Can you tell me more, incidentally, about how you're running smoothmq? What kind of project/environment/language are you using it for?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants