Skip to main content

Postgres Broker for Dramatiq Task Queue

Project description

dramatiq-pg − Postgres Broker for Dramatiq

Dramatiq is a simple task queue implementation for Python3. dramatiq-pg provides a Postgres-based implementation of a dramatiq broker.

The project is not feature complete yet.

Features

  • Super simple deployment.
  • Uses plain psycopg2. No ORM.
  • Stores message payload as native JSONb.
  • Stores all messages in a single table, in a dedicated schema.
  • Uses LISTEN/NOTIFY to keep worker sync. No polling.
  • Replay pending messages on worker startup.
  • Requeues failed tasks.
  • Delayed task.
  • Reliable thanks to Postgres MVCC.
  • Self-healing. Old messages are purge from time to time.

Note that dramatiq assumes tasks are idempotent. This broker makes the same assumptions for recovering after a crash.

Installation

  • Install dramatiq-pg package from PyPI:
    $ pip install dramatiq-pg
    
  • Apply dramatiq_pg/schema.sql file in your database:
    $ psql -f dramatiq_pg/schema.sql
    
  • Before importing actors, define global broker with a connection pool:
    import dramatiq
    import psycopg2.pool
    from dramatiq_pg import PostgresBroker
    
    dramatiq.set_broker(PostgresBroker(url="postgresql:///?minconn=0&maxconn=10"))
    
    @dramatiq.actor
    def myactor():
        ...
    

Now declare/import actors and manage worker just like any dramatiq setup. An example script is available, tested on CI.

The CLI tool dramatiq-pg allows you to requeue messages, purge old messages and show stats on the queue. See --help for details.

Deployment

Postgres does not replicate notifications to standby instances. Thus the broker connection pool must point to the master instance. Actor can connect to hot standby for its work.

If you use pgbouncer, you must configure session pooling method to keep notify.

Each dramatiq process opens one persistent connection per queue and one connection to ack messages. Thus, to be save, you should provision pool size with num_processes x num_queues x 2. A best practice is to only add process as needed and reduce the number of queues.

Roadmap

  • Result storage as JSONb.

Feel free to suggest feature through support channels.

Support

If you encounter a bug or miss a feature, please open an issue on GitLab with as much information as possible.

dramatiq_pg is available under the PostgreSQL licence.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dramatiq-pg-0.4.1.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

dramatiq_pg-0.4.1-py3-none-any.whl (18.2 kB view details)

Uploaded Python 3

File details

Details for the file dramatiq-pg-0.4.1.tar.gz.

File metadata

  • Download URL: dramatiq-pg-0.4.1.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/0.12.10 CPython/3.7.1 Linux/4.19.0-2-amd64

File hashes

Hashes for dramatiq-pg-0.4.1.tar.gz
Algorithm Hash digest
SHA256 6232ad2c4f108b75b5589bbd848d9abca0d637a2927d336fd70cd08aca9ee9f2
MD5 c524c9c48a26c323dceced4016862722
BLAKE2b-256 a4eab7b28a2f6c739f9c8b151ab3206a137379b2d79e314a249ca560083c1a8b

See more details on using hashes here.

Provenance

File details

Details for the file dramatiq_pg-0.4.1-py3-none-any.whl.

File metadata

  • Download URL: dramatiq_pg-0.4.1-py3-none-any.whl
  • Upload date:
  • Size: 18.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/0.12.10 CPython/3.7.1 Linux/4.19.0-2-amd64

File hashes

Hashes for dramatiq_pg-0.4.1-py3-none-any.whl
Algorithm Hash digest
SHA256 1480ef89582cef9407fde65a82d7777d92ec76131e49d1dfcf8b42fd182381a1
MD5 1bb77d6f3628aece43f2950a5a546570
BLAKE2b-256 e3fa5d726e53d3fb83bbd35c99cce2ab4391273992593ca7dbf3e23e8d19da56

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page