Skip to main content

PostgreSQL backed message queues

Project description

pgmq

A message queue built on top of Postgres.

Purpose

Sometimes you have a Postgres database and need a queue. You could stand up more infrastructure (SQS, Redis, etc), or you could use your existing database. There are plenty of use cases for a persistent queue that do not require infinite scalability, snapshots or any of the other advanced features full fledged queues/event buses/message brokers have.

Features

  • Best effort at most once delivery (messages are only delivered to one worker at a time)
  • Automatic redelivery of failed messages
  • Low latency delivery (near realtime, uses PostgreSQL's NOTIFY feature)
  • Low latency completion tracking (using NOTIFY)
  • Bulk sending and receiving
  • Fully typed async Python client (using asyncpg)
  • Persistent scheduled messages (scheduled in the database, not the client application)

Possible features:

  • Exponential backoffs
  • Dead letter queues
  • Custom delivery ordering key
  • Responses / completion messages
  • Message attributes and attribute filtering
  • FIFO delivery
  • Backpressure / bound queues

Unplanned features:

  • Sending back response data (currently it needs to be sent out of band)
  • Supporting "subscriptions" (this is a simple queue, not a message broker)

Examples

from contextlib import AsyncExitStack

import anyio
import asyncpg  # type: ignore
from pgmq import create_queue, connect_to_queue, migrate_to_latest_version

async def main() -> None:

    async with AsyncExitStack() as stack:
        pool: asyncpg.Pool = await stack.enter_async_context(
            asyncpg.create_pool(  # type: ignore
                "postgres://postgres:postgres@localhost/postgres"
            )
        )
        await migrate_to_latest_version(pool)
        await create_queue("myq", pool)
        queue = await stack.enter_async_context(
            connect_to_queue("myq", pool)
        )
        async with anyio.create_task_group() as tg:

            async def worker() -> None:
                async with queue.receive() as msg_handle_rcv_stream:
                    # receive a single message
                    async with (await msg_handle_rcv_stream.receive()).acquire():
                        print("received")
                        # do some work
                        await anyio.sleep(1)
                        print("done processing")
                        print("acked")

            tg.start_soon(worker)
            tg.start_soon(worker)

            async with queue.send(b'{"foo":"bar"}') as completion_handle:
                print("sent")
                await completion_handle()
                print("completed")
                tg.cancel_scope.cancel()


if __name__ == "__main__":
    anyio.run(main)
    # prints:
    # "sent"
    # "received"
    # "done processing"
    # "acked"
    # "completed"

Development

  1. Clone the repo
  2. Start a disposable PostgreSQL instance (e.g docker run -it -e POSTGRES_PASSWORD=postgres -p 5432:5432 postgres)
  3. Run make test

See this release on GitHub: v0.6.1

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgmq-0.6.1.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

pgmq-0.6.1-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file pgmq-0.6.1.tar.gz.

File metadata

  • Download URL: pgmq-0.6.1.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.10.8 Linux/5.15.0-1022-azure

File hashes

Hashes for pgmq-0.6.1.tar.gz
Algorithm Hash digest
SHA256 2469a8c1f32ff7342b980eb0614e37e188be413df036fc9b6b06c800c77d4420
MD5 28dbb48c8c284911cb9312a637297611
BLAKE2b-256 91a4dd62df6b1356c749435f690177bda063bde2be60595318c6203845f732f3

See more details on using hashes here.

File details

Details for the file pgmq-0.6.1-py3-none-any.whl.

File metadata

  • Download URL: pgmq-0.6.1-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.2 CPython/3.10.8 Linux/5.15.0-1022-azure

File hashes

Hashes for pgmq-0.6.1-py3-none-any.whl
Algorithm Hash digest
SHA256 5cca8234063478a76f86c4f591e96312d7465d8ce237215e1088321dc9e68b7a
MD5 e4f43877fcfa8a65f67e65b81ecabfb0
BLAKE2b-256 b5a68dd6fb557594a6e0ae498daf879a29bfd6757c2e0da68885d410725a17d3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page