PostgreSQL backed job queues
Project description
pgjobq
A job queue built on top of Postgres.
Features
- Best effort at most once delivery (jobs are only delivered to one worker at a time)
- Automatic redelivery of failed jobs
- Low latency delivery (near realtime, uses PostgreSQL's
NOTIFY
feature) - Completion tracking (using
NOTIFY
) - Fully typed async Python client (using asyncpg)
- Optional FIFO queuing
- Persistent scheduled jobs (scheduled in the database, not the client application)
Possible features:
- Bulk sending
- Exponential backoffs
- Maybe more efficient continuous polling?
Unplanned features:
- Sending back response data (currently it needs to be sent out of band)
- Supporting "subscriptions" (this is a simple queue, not a message broker)
Examples
from contextlib import AsyncExitStack
import anyio
import asyncpg # type: ignore
from pgjobq import create_queue, connect_to_queue, migrate_to_latest_version
async def main() -> None:
async with AsyncExitStack() as stack:
pool: asyncpg.Pool = await stack.enter_async_context(
asyncpg.create_pool( # type: ignore
"postgres://postgres:postgres@localhost/postgres"
)
)
await migrate_to_latest_version(pool)
await create_queue("myq", pool)
(send, rcv) = await stack.enter_async_context(
connect_to_queue("myq", pool)
)
async with anyio.create_task_group() as tg:
async def worker() -> None:
async for msg_handle in rcv.poll():
async with msg_handle:
print("received")
# do some work
await anyio.sleep(1)
print("done processing")
print("acked")
tg.start_soon(worker)
tg.start_soon(worker)
async with send.send(b'{"foo":"bar"}') as completion_handle:
print("sent")
await completion_handle()
print("completed")
tg.cancel_scope.cancel()
if __name__ == "__main__":
anyio.run(main)
# prints:
# "sent"
# "received"
# "done processing"
# "acked"
# "completed"
Development
- Clone the repo
- Start a disposable PostgreSQL instance (e.g
docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres
) - Run
make test
See this release on GitHub: v0.1.0
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
pgjobq-0.1.0.tar.gz
(10.8 kB
view hashes)
Built Distribution
pgjobq-0.1.0-py3-none-any.whl
(11.7 kB
view hashes)