A message queue written around PostgreSQL.
Project description
psycopg2_mq is a message queue implemented on top of PostgreSQL, SQLAlchemy, and psycopg2.
Currently the library provides only the low-level constructs that can be used to build a multithreaded worker system. It is broken into two components:
psycopg2_mq.MQWorker - a reusable worker object that manages a single-threaded worker that can accept jobs and execute them. An application should create worker per thread. It supports an API for thread-safe graceful shutdown.
psycopg2_mq.MQSource - a source object providing a client-side API for invoking and querying job states.
Data Model
Queues
Workers run jobs defined in queues. Currently each queue will run jobs concurrently, while a future version may support serial execution on a per-queue basis. Each registered queue should contain an execute_job(job) method.
Jobs
The execute_job method of a queue is passed a Job object containing the following attributes:
id
queue
method
params
As a convenience, there is an extend(**kw) method which can be used to add extra attributes to the object. This is useful in individual queues to define a contract between a queue and its methods.
Example Worker
from psycopg2_mq import (
MQWorker,
make_default_model,
)
from sqlalchemy import (
MetaData,
create_engine,
)
import sys
class EchoQueue:
def execute_job(self, job):
return f'hello, {job.args["name"]} from method="{job.method}"'
if __name__ == '__main__':
engine = create_engine(sys.argv[1])
metadata = MetaData()
model = make_default_model(metadata)
worker = MQWorker(
engine=engine,
queues={
'echo': EchoQueue(),
},
model=model,
)
worker.run()
Example Source
engine = create_engine()
metadata = MetaData()
model = make_default_model(metadata)
session_factory = sessionmaker()
session_factory.configure(bind=engine)
dbsession = session_factory()
with dbsession.begin():
mq = MQSource(
dbsession=dbsession,
model=model,
)
job = mq.call('echo', 'hello', {'name': 'Andy'})
print(f'queued job={job.id}')
0.1.1 (2018-09-04)
Make psycopg2 an optional dependency in order to allow apps to depend on psycopg2-binary if they wish.
0.1 (2018-09-04)
Initial release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for psycopg2_mq-0.1.1-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2bc20df8dd5af7686448bfd5820d0a3c15247b1569d95c3fff1713b003123756 |
|
MD5 | a3b2e5fe69176e752842357901ba4510 |
|
BLAKE2b-256 | b211ba8194be2b994ec8204d656b2de2e355ce552cb6cf3f5e43d446091668e4 |