A message queue written around PostgreSQL.
Project description
psycopg2_mq is a message queue implemented on top of PostgreSQL, SQLAlchemy, and psycopg2.
Currently the library provides only the low-level constructs that can be used to build a multithreaded worker system. It is broken into two components:
psycopg2_mq.MQWorker - a reusable worker object that manages a single-threaded worker that can accept jobs and execute them. An application should create worker per thread. It supports an API for thread-safe graceful shutdown.
psycopg2_mq.MQSource - a source object providing a client-side API for invoking and querying job states.
Data Model
Queues
Workers run jobs defined in queues. Currently each queue will run jobs concurrently, while a future version may support serial execution on a per-queue basis. Each registered queue should contain an execute_job(job) method.
Jobs
The execute_job method of a queue is passed a Job object containing the following attributes:
id
queue
method
args
As a convenience, there is an extend(**kw) method which can be used to add extra attributes to the object. This is useful in individual queues to define a contract between a queue and its methods.
Example Worker
from psycopg2_mq import (
MQWorker,
make_default_model,
)
from sqlalchemy import (
MetaData,
create_engine,
)
import sys
class EchoQueue:
def execute_job(self, job):
return f'hello, {job.args["name"]} from method="{job.method}"'
if __name__ == '__main__':
engine = create_engine(sys.argv[1])
metadata = MetaData()
model = make_default_model(metadata)
worker = MQWorker(
engine=engine,
queues={
'echo': EchoQueue(),
},
model=model,
)
worker.run()
Example Source
engine = create_engine()
metadata = MetaData()
model = make_default_model(metadata)
session_factory = sessionmaker()
session_factory.configure(bind=engine)
dbsession = session_factory()
with dbsession.begin():
mq = MQSource(
dbsession=dbsession,
model=model,
)
job = mq.call('echo', 'hello', {'name': 'Andy'})
print(f'queued job={job.id}')
0.1.5 (2019-05-17)
Fix a regression when serializing errors with strings or cycles.
0.1.4 (2019-05-09)
More safely serialize exception objects when jobs fail.
0.1.3 (2018-09-04)
Rename the thread to contain the job id while it’s handling a job.
0.1.2 (2018-09-04)
Rename Job.params to Job.args.
0.1.1 (2018-09-04)
Make psycopg2 an optional dependency in order to allow apps to depend on psycopg2-binary if they wish.
0.1 (2018-09-04)
Initial release.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for psycopg2_mq-0.1.5-py2.py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | ee28bda7a251b2631a7b84e2437eda0528f0ebe028c95530ef8c3f5c44c7a2e8 |
|
MD5 | 306c3621547da336974c86e5ac496774 |
|
BLAKE2b-256 | 99340e7e8b79b61384af5a3c3165ed2669b643842bcd94eea03f892200968a46 |