PostgreSQL backed message queues
Project description
pgmq
A message queue built on top of Postgres.
Purpose
Sometimes you have a Postgres database and need a queue. You could stand up more infrastructure (SQS, Redis, etc), or you could use your existing database. There are plenty of use cases for a persistent queue that do not require infinite scalability, snapshots or any of the other advanced features full fledged queues/event buses/message brokers have.
Features
- Best effort at most once delivery (messages are only delivered to one worker at a time)
- Automatic redelivery of failed messages
- Low latency delivery (near realtime, uses PostgreSQL's
NOTIFY
feature) - Low latency completion tracking (using
NOTIFY
) - Bulk sending and receiving
- Fully typed async Python client (using asyncpg)
- Persistent scheduled messages (scheduled in the database, not the client application)
Possible features:
- Exponential backoffs
- Dead letter queues
- Custom delivery ordering key
- Message attributes and attribute filtering
- FIFO delivery
- Backpressure / bound queues
Unplanned features:
- Sending back response data (currently it needs to be sent out of band)
- Supporting "subscriptions" (this is a simple queue, not a message broker)
Examples
from contextlib import AsyncExitStack
import anyio
import asyncpg # type: ignore
from pgmq import create_queue, connect_to_queue, migrate_to_latest_version
async def main() -> None:
async with AsyncExitStack() as stack:
pool: asyncpg.Pool = await stack.enter_async_context(
asyncpg.create_pool( # type: ignore
"postgres://postgres:postgres@localhost/postgres"
)
)
await migrate_to_latest_version(pool)
await create_queue("myq", pool)
queue = await stack.enter_async_context(
connect_to_queue("myq", pool)
)
async with anyio.create_task_group() as tg:
async def worker() -> None:
async with queue.receive() as msg_handle_rcv_stream:
# receive a single message
async with (await msg_handle_rcv_stream.receive()).acquire():
print("received")
# do some work
await anyio.sleep(1)
print("done processing")
print("acked")
tg.start_soon(worker)
tg.start_soon(worker)
async with queue.send(b'{"foo":"bar"}') as completion_handle:
print("sent")
await completion_handle()
print("completed")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
# prints:
# "sent"
# "received"
# "done processing"
# "acked"
# "completed"
Development
- Clone the repo
- Start a disposable PostgreSQL instance (e.g
docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
) - Run
make test
See this release on GitHub: v0.1.0
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pgmq-0.1.0.tar.gz
(12.5 kB
view hashes)
Built Distribution
pgmq-0.1.0-py3-none-any.whl
(13.2 kB
view hashes)