Skip to main content

A message queue written around PostgreSQL.

Project description

https://img.shields.io/pypi/v/psycopg2_mq.svg main CI Status

psycopg2_mq is a message queue implemented on top of PostgreSQL, SQLAlchemy, and psycopg2.

Currently the library provides only the low-level constructs that can be used to build a multithreaded worker system. It is broken into two components:

  • psycopg2_mq.MQWorker - a reusable worker object that manages a single-threaded worker that can accept jobs and execute them. An application should create worker per thread. It supports an API for thread-safe graceful shutdown.

  • psycopg2_mq.MQSource - a source object providing a client-side API for invoking and querying job states.

It is expected that these core components are then wrapped into your own application in any way that you see fit, without dictating a specific CLI or framework.

Data Model

Queues

Workers run jobs defined in queues. Currently each queue will run jobs concurrently, while a future version may support serial execution on a per-queue basis. Each registered queue should contain an execute_job(job) method.

Jobs

The execute_job method of a queue is passed a Job object containing the following attributes:

  • id

  • queue

  • method

  • args

  • cursor

As a convenience, there is an extend(**kw) method which can be used to add extra attributes to the object. This is useful in individual queues to define a contract between a queue and its methods.

Cursors

By defining a cursor you can execute jobs sequentially instead of in parallel. There can only be one running (or lost) job for any cursor at a time.

A Job can be scheduled with a cursor_key.

A job.cursor dict is provided to the workers containing the cursor data, and is saved back to the database when the job is completed. This effectively gives jobs some persistent, shared memory between jobs on the cursor.

Because a “lost” job on the cursor counts as running, it will also block any other jobs from executing. To resolve this situation it is necessary for the lost job to be either marked as failed or retried. Look at MQSource.retry_job and MQSource.fail_lost_job APIs.

Collapsible

When collapse_on_cursor is set to True on a job, this is declaring the job as “collapsible”. There can only be one job in the “pending” state marked collapsible == True for the same queue, method, and cursor_key. This means that if there is an existing “pending” job with the same cursor_key, queue, and method with collapsible == True then the new job will be “collapsed” into the existing job. By default, this typically means that the new job will be dropped because another job already exists. This means that the args should be “constant”, because the new job’s args are ignored.

If the args are not constant, then you will likely want to pass a conflict_resolver callback. This is a function that will be invoked when a job already exists. You can then update the existing Job object in the “pending” state, adjusting the args etc.

Delayed Jobs

A Job can be delayed to run in the future by providing a datetime object to the when argument. When this feature is used with collapsible jobs you can create a highly efficient throttle greatly reducing the number of jobs that run in the background. For example, schedule 20 jobs all with the same cursor_key, and collapse_on_cursor=True, and when=timedelta(seconds=30). You will see only 1 job created and it will execute in 30 seconds. All of the other jobs are collapsed into this one instead of creating separate jobs.

Schedules

A JobSchedule can be defined which supports RFC 5545 RRULE schedules. These are powerful and can support timezones, frequencies based on calendars as well as simple recurring rules from an anchor time using DTSTART. Cron jobs can be converted to this syntax for simpler scenarios.

psycopg2-mq workers will automatically negotiate which worker is responsible for managing schedules so clustered workers should operate as expected.

To register a new schedule, look at the MQSource.add_schedule(queue, method, args, *, rrule) API.

If you set collapse_on_cursor is True on the schedule and there is already a job pending then the firing of the schedule is effectively a no-op.

Events / Listeners

A JobListener can be defined which supports creating new jobs when events are emitted. When an event is emitted via MQSource.emit_event then any listeners matching this event will be used to create a new job in the system.

To register a listener, look at the MQSource.add_listener(event, queue, method, args, ...) API.

There is a default event emitted every time a job moved to a finished state. It has the format:

mq.job_finished.<queue>.<method>.<state>

For example, mq.job_finished.completed.dummy.echo.completed.

You are free to emit your own custom events as well if you need different dimensions!

When collapse_on_cursor is False then the listener receives an event arg containing the name, listener_id, and data keys.

If collapse_on_cursor is True on the listener then the resulting job will receive an events arg containing a list of all of the emitted events that occured while the job was in the “pending” state.

Example Worker

from psycopg2_mq import (
    MQWorker,
    make_default_model,
)
from sqlalchemy import (
    MetaData,
    create_engine,
)
import sys

class EchoQueue:
    def execute_job(self, job):
        return f'hello, {job.args["name"]} from method="{job.method}"'

if __name__ == '__main__':
    engine = create_engine(sys.argv[1])
    metadata = MetaData()
    model = make_default_model(metadata)
    worker = MQWorker(
        engine=engine,
        queues={
            'echo': EchoQueue(),
        },
        model=model,
    )
    worker.run()

Example Source

engine = create_engine('postgresql+psycopg2://...')
metadata = MetaData()
model = make_default_model(metadata)
metadata.create_all(engine)
session_factory = sessionmaker(engine)

with session_factory.begin():
    mq = MQSource(
        dbsession=dbsession,
        model=model,
    )
    job_id = mq.call('echo', 'hello', {'name': 'Andy'})
    print(f'queued job={job_id}')

Changes

0.12.4 (2024-11-13)

  • Emit an event any time a job hits a terminal state. The format has changed from mq_job_complete.<queue>.<method> to mq.job_finished.<queue>.<method>.<state>. This includes completed, failed, and lost.

0.12.3 (2024-10-29)

  • Add job.schedule_ids and job.listener_ids to the job context passed to the background workers.

0.12.2 (2024-10-29)

  • Prevent canceling jobs in the lost state.

  • Allow retrying jobs in any state.

  • Allow setting a result on a lost job when moving it to failed.

0.12.1 (2024-10-27)

  • Support SQLAlchemy 2.x.

  • Must mark a lost job as failed prior to being able to retry it.

0.12 (2024-10-27)

  • Support a job being linked properly to multiple schedule and listener sources such that provenance is properly tracked on retries.

  • Add a new CANCELED job state that can be used to manually mark any pending, failed, or lost jobs as canceled. Jobs do not enter this state automatically - theyt must be manually marked but will be useful to disambiguate failed from canceled.

  • [breaking] job.schedule_id is removed from the job object passed to background workers.

  • [model migration] Moved the schedule_id and listener_id foreign keys from the Job table to many-to-many link tables to support tracking the source properly when collapsing occurs. Possible migration:

    insert into mq_job_schedule_link (job_id, schedule_id)
      select id, schedule_id from mq_job where schedule_id is not null;
    
    insert into mq_job_listener_link (job_id, listener_id)
      select id, listener_id from mq_job where listener_id is not null;
    
    alter table mq_job drop column schedule_id;
    alter table mq_job drop column listener_id;
  • [model migration] Add a new CANCELED state to the mq_job_state enum. Possible migration:

    alter type mq_job_state add value 'canceled';

0.11 (2024-10-27)

  • Add support for Python 3.13.

  • [breaking] Modified the MQSource.call, and MQSource.add_schedule APIs such that when a cursor is used collapse_on_cursor defaults to False instead of True. You must explicitly set it to True in scenarios in which that is desired as it is no longer the default behavior.

  • [model migration] Add collapse_on_cursor attribute to the JobSchedule model. A bw-compat migration would set this value to False if cursor_key is NULL and True on everything else.

  • [model migration] Add a new JobListener model.

  • [model migration] Add listener_id foreign key to the Job model.

  • Fix a bug in which NOTIFY events were missed in some cases causing jobs to wait until the maintenance window to execute.

  • Add the concept of pub/sub event listeners. Listeners can be registered that act as a job factory, creating a new job when an event is emitted.

    It is possible to emit events manually as needed via the MQSource.emit_event API.

    Events are emitted automatically when a job is completed. Every job when it is completed successfully emits a new mq_job_complete:<queue>.<method> event. This event contains the result of the job.

  • The MQSource that is used by the MQWorker can now be overridden via the mq_source_factory option.

0.10 (2024-08-06)

  • Add support for Python 3.12.

  • Drop support for Python 3.7, and 3.8.

  • Fix a race condition on shutdown where the job fails to cleanup because the triggers are gone while the pool is still shutting down.

0.9 (2023-04-21)

  • Add support for Python 3.10, and 3.11.

  • [breaking] Prevent retrying of collapsible jobs. Require them to be invoked using call instead for an opportunity to specify a conflict_resolver.

  • [model migration] Fix a bug in the default model schema in which the collapsible database index was not marked unique.

  • Copy trace info when retrying a job.

  • Capture the stringified exception to the job result in the message key, alongside the existing tb, exc, and args keys.

  • The worker was not recognizing capture_signals=False, causing problems when running the event loop in other threads.

  • Blackify the codebase and add some real tests. Yay!

0.8.3 (2022-04-15)

  • [breaking] Remove MQWorker.make_job_context.

0.8.2 (2022-04-15)

  • Drop Python 3.6 support.

  • [breaking] Require SQLAlchemy 1.4+ and resolve deprecation warnings related to SQLAlchemy 2.0.

  • [model migration] Rename update_job_id to updated_job_id in the JobCursor model.

0.8.1 (2022-04-15)

  • Ensure the trace attribute is populated on the JobContext.

  • Add MQWorker.make_job_context which can be defined to completely override the JobContext factory using the Job object and open database session.

0.8.0 (2022-04-15)

  • [model migration] Add update_job_id foreign key to the JobCursor model to make it possible to know which job last updated the value in the cursor.

  • [model migration] Add trace json blob to the Job model.

  • Support a trace json blob when creating new jobs. This value is available on the running job context and can be used when creating sub-jobs or when making requests to external systems to pass through tracing metadata.

    See MQSource.call’s new trace parameter when creating jobs. See JobContext.trace attribute when handling jobs.

  • Add a standard FailedJobError exception which can be raised by jobs to mark a failure with a custom result object. This is different from unhandled exceptions that cause the MQWorker.result_from_error method to be invoked.

0.7.0 (2022-03-03)

  • Fix a corner case with lost jobs attached to cursors. In scenarios where multiple workers are running, if one loses a database connection then the other is designed to notice and mark jobs lost. However, it’s possible the job is not actually lost and the worker can then recover after resuming its connection, and marking the job running again. In this situation, we do not want another job to begin on the same cursor. To fix this issue, new jobs will not be run if another job is marked lost on the same cursor. You will be required to recover the job by marking it as not lost (probably failed) first to unblock the rest of the jobs on the cursor.

0.6.2 (2022-03-01)

  • Prioritize maintenance work higher than running new jobs. There was a chicken-and-egg issue where a job would be marked running but needs to be marked lost. However marking it lost is lower priority than trying to start new jobs. In the case where a lot of jobs were scheduled at the same time, the worker always tried to start new jobs and didn’t run the maintenance so the job never got marked lost, effectively blocking the queue.

0.6.1 (2022-01-15)

  • Fix a bug introduced in the 0.6.0 release when scheduling new jobs.

0.6.0 (2022-01-14)

  • [model migration] Add model changes to mark jobs as collapsible.

  • [model migration] Add model changes to the cursor index.

  • Allow multiple pending jobs to be scheduled on the same cursor if either:

    1. The queue or method are different from existing pending jobs on the cursor.

    2. collapse_on_cursor is set to False when scheduling the job.

0.5.7 (2021-03-07)

  • Add a schedule_id attribute to the job context for use in jobs that want to know whether they were executed from a schedule or not.

0.5.6 (2021-02-28)

  • Some UnicodeDecodeError exceptions raised from jobs could trigger a serialization failure (UntranslatableCharacter) because it would contain the sequence \u0000` which, while valid in Python, is not allowed in postgres. So when dealing with the raw bytes, we’ll decode it with the replacement character that can be properly stored. Not ideal, but better than failing to store the error at all.

0.5.5 (2021-01-22)

  • Fixed some old code causing the worker lock to release after a job completed.

0.5.4 (2021-01-20)

  • Log at the error level when marking a job as lost.

0.5.3 (2021-01-11)

  • Copy the schedule_id information to retried jobs.

0.5.2 (2021-01-11)

  • [breaking] Require call_schedule to accept an id instead of an object.

0.5.1 (2021-01-09)

  • [model migration] Drop the UNIQUE constraint on the background job lock_id column.

0.5 (2021-01-09)

0.4.5 (2020-12-22)

  • Use column objects in the insert statement to support ORM-level synonyms, enabling the schema to have columns with different names.

0.4.4 (2019-11-07)

  • Ensure the advisory locks are released when a job completes.

0.4.3 (2019-10-31)

  • Ensure maintenance (finding lost jobs) always runs at set intervals defined by the timeout parameter.

0.4.2 (2019-10-30)

  • Recover active jobs when the connection is lost by re-locking them and ensuring they are marked running.

0.4.1 (2019-10-30)

  • Attempt to reconnect to the database after losing the connection. If the reconnect attempt fails then crash.

0.4 (2019-10-28)

  • [model migration] Add a worker column to the Job model to track what worker is handling a job.

  • Add an optional name argument to MQWorker to name the worker - the value will be recorded in each job.

  • Add a threads argument (default=``1``) to MQWorker to support handling multiple jobs from the same worker instance instead of making a worker per thread.

  • Add capture_signals argument (default=``True``) to MQWorker which will capture SIGTERM, SIGINT and SIGUSR1. The first two will trigger graceful shutdown - they will make the process stop handling new jobs while finishing active jobs. The latter will dump to stderr a JSON dump of the current status of the worker.

0.3.3 (2019-10-23)

  • Only save a cursor update if the job is completed successfully.

0.3.2 (2019-10-22)

  • Mark lost jobs during timeouts instead of just when a worker starts in order to catch them earlier.

0.3.1 (2019-10-17)

  • When attempting to schedule a job with a cursor and a scheduled_time earlier than a pending job on the same cursor, the job will be updated to run at the earlier time.

  • When attempting to schedule a job with a cursor and a pending job already exists on the same cursor, a conflict_resolver function may be supplied to MQSource.call to update the job properties, merging the arguments however the user wishes.

0.3 (2019-10-15)

  • [model migration] Add a new column cursor_snapshot to the Job model which will contain the value of the cursor when the job begins.

0.2 (2019-10-09)

  • [model migration] Add cursor support for jobs. This requires a schema migration to add a cursor_key column, a new JobCursor model, and some new indices.

0.1.6 (2019-10-07)

  • Support passing custom kwargs to the job in psycopg2_mq.MQSource.call to allow custom columns on the job table.

0.1.5 (2019-05-17)

  • Fix a regression when serializing errors with strings or cycles.

0.1.4 (2019-05-09)

  • More safely serialize exception objects when jobs fail.

0.1.3 (2018-09-04)

  • Rename the thread to contain the job id while it’s handling a job.

0.1.2 (2018-09-04)

  • [model migration] Rename Job.params to Job.args.

0.1.1 (2018-09-04)

  • Make psycopg2 an optional dependency in order to allow apps to depend on psycopg2-binary if they wish.

0.1 (2018-09-04)

  • Initial release.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

psycopg2_mq-0.12.4.tar.gz (36.9 kB view details)

Uploaded Source

Built Distribution

psycopg2_mq-0.12.4-py3-none-any.whl (24.7 kB view details)

Uploaded Python 3

File details

Details for the file psycopg2_mq-0.12.4.tar.gz.

File metadata

  • Download URL: psycopg2_mq-0.12.4.tar.gz
  • Upload date:
  • Size: 36.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for psycopg2_mq-0.12.4.tar.gz
Algorithm Hash digest
SHA256 9930665793c45f0ac51edc53dc35eedc3703f944590582851734e8c814c30b02
MD5 cea51ad017403714a117d90b040c4230
BLAKE2b-256 f86f3b7009b9ae28a14058e19d6f1bb0a47034065f089fa4736d1914dd8d2f7c

See more details on using hashes here.

File details

Details for the file psycopg2_mq-0.12.4-py3-none-any.whl.

File metadata

  • Download URL: psycopg2_mq-0.12.4-py3-none-any.whl
  • Upload date:
  • Size: 24.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.12.7

File hashes

Hashes for psycopg2_mq-0.12.4-py3-none-any.whl
Algorithm Hash digest
SHA256 d9eb999c72132aebdd0529c0472b2de0b738c4c80bdf82d3e854fc88e09b8c6b
MD5 fca888faf03cefe1293fae579c2b0b92
BLAKE2b-256 7a6b09e1bed6a4f7d296a7cb7186a64b5201366b3ed614fb3c0bb7156c6dcadd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page