PostgreSQL backed message queues
Project description
pgmq
A message queue built on top of Postgres.
Purpose
Sometimes you have a Postgres database and need a queue. You could stand up more infrastructure (SQS, Redis, etc), or you could use your existing database. There are plenty of use cases for a persistent queue that do not require infinite scalability, snapshots or any of the other advanced features full fledged queues/event buses/message brokers have.
Features
- Best effort at most once delivery (messages are only delivered to one worker at a time)
- Automatic redelivery of failed messages
- Low latency delivery (near realtime, uses PostgreSQL's
NOTIFY
feature) - Low latency completion tracking (using
NOTIFY
) - Bulk sending and receiving
- Fully typed async Python client (using asyncpg)
- Persistent scheduled messages (scheduled in the database, not the client application)
Possible features:
- Exponential backoffs
- Dead letter queues
- Custom delivery ordering key
- Responses / completion messages
- Message attributes and attribute filtering
- FIFO delivery
- Backpressure / bound queues
Unplanned features:
- Sending back response data (currently it needs to be sent out of band)
- Supporting "subscriptions" (this is a simple queue, not a message broker)
Examples
from contextlib import AsyncExitStack
import anyio
import asyncpg # type: ignore
from pgmq import create_queue, connect_to_queue, migrate_to_latest_version
async def main() -> None:
async with AsyncExitStack() as stack:
pool: asyncpg.Pool = await stack.enter_async_context(
asyncpg.create_pool( # type: ignore
"postgres://postgres:postgres@localhost/postgres"
)
)
await migrate_to_latest_version(pool)
await create_queue("myq", pool)
queue = await stack.enter_async_context(
connect_to_queue("myq", pool)
)
async with anyio.create_task_group() as tg:
async def worker() -> None:
async with queue.receive() as msg_handle_rcv_stream:
# receive a single message
async with (await msg_handle_rcv_stream.receive()).acquire():
print("received")
# do some work
await anyio.sleep(1)
print("done processing")
print("acked")
tg.start_soon(worker)
tg.start_soon(worker)
async with queue.send(b'{"foo":"bar"}') as completion_handle:
print("sent")
await completion_handle()
print("completed")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
# prints:
# "sent"
# "received"
# "done processing"
# "acked"
# "completed"
Development
- Clone the repo
- Start a disposable PostgreSQL instance (e.g
docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
) - Run
make test
See this release on GitHub: v0.6.1
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pgmq-0.6.1.tar.gz
(12.7 kB
view details)
Built Distribution
pgmq-0.6.1-py3-none-any.whl
(13.5 kB
view details)
File details
Details for the file pgmq-0.6.1.tar.gz
.
File metadata
- Download URL: pgmq-0.6.1.tar.gz
- Upload date:
- Size: 12.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Linux/5.15.0-1022-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2469a8c1f32ff7342b980eb0614e37e188be413df036fc9b6b06c800c77d4420 |
|
MD5 | 28dbb48c8c284911cb9312a637297611 |
|
BLAKE2b-256 | 91a4dd62df6b1356c749435f690177bda063bde2be60595318c6203845f732f3 |
File details
Details for the file pgmq-0.6.1-py3-none-any.whl
.
File metadata
- Download URL: pgmq-0.6.1-py3-none-any.whl
- Upload date:
- Size: 13.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.2.2 CPython/3.10.8 Linux/5.15.0-1022-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5cca8234063478a76f86c4f591e96312d7465d8ce237215e1088321dc9e68b7a |
|
MD5 | e4f43877fcfa8a65f67e65b81ecabfb0 |
|
BLAKE2b-256 | b5a68dd6fb557594a6e0ae498daf879a29bfd6757c2e0da68885d410725a17d3 |