Skip to main content

Postgresql job scheduling

Project description

Jobs

A PL/PGSQL based work queue (Publisher/Consumer), with a python asyncio/asyncpg api

alpha software

Features

  • Implements a two layer API:

    A postgresql layer: tasks can be published from PL/PGSQL functions, or procedures. Also can be extended using triggers.

    A python layer (or any client with a postgresql driver). The default implementations is based on asyncio python, using the awesome asyncpg driver.

  • It's compatible with postgrest. All procedures, and tables, are scoped on an owned postgresql schema, and can be exposed throught it, with postgrest

  • Retry logic, schedule_at or timeout, are implemented on the publish method. A task, can be published, with a max_retries, param, or an especific timeout.

  • Internally uses two tables jobs.job_queue the table where pending and running tasks are scheduled, and jobs.job the table where ended tasks, are moved (success or failures).

  • By default, tasks are retyried three times, with backoff.

  • Timeout jobs, are expired, tasks by default had a 60s tiemout.

  • Tasks can be scheduled on the future, just provide a scheduled_at param.

  • There are views to monitor queue stats: jobs.all (all tasks), jobs.expired and jobs.running

  • Tasks could also be priorized, provide a priority number, greater priority, precedence over other tasks

  • consumer_topic, allows to consume tasks with a * (topic.element.%)

  • rudimentary benchs on my laptop showed that it can handle 1000 tasks/second, but anyway it depends on your postgres instance.

  • instead of a worker daemon, tasks could also be consumed from a cronjob, or a regular python or a kubernetes job. (It could be used to parallelize k8 jobs)

tradeofs

  • All jobs had to be aknowledged positive or negative (ack/nack)

Use from postgresql

SELECT job_id FROM
    jobs.publish(
        i_task -- method or function to be executed,
        i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
        i_scheduled_at: timestamp = null, -- when the task should run
        i_timeout:numeric(7,2) -- timeout in seconds for the job
        i_priority:integer = null -- gretare number more priority
    )

On the worker side:

SELECT * from jobs.consume(
    num: integer -- number of desired jobs
);

returns a list of jobs to be processed,

Or selective consume a topic:

SELECT * from jobs.consume_topic('topic.xxx.%', 10)

jobs are marked as processing, and should be acnlowledged with:

SELECT FROM jobs.ack(job_id);

or to return a failed job.

SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)

Also you can batch enqueue multiple jobs in a single request, using

SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);

where jobs.bulk_job is

create type jobs.bulk_job as (
    task varchar,
    body jsonb,
    scheduled_at timestamp,
    timeout integer,
    priority integer,
    max_retries integer
);

Use from python

On this side, implementing a worker, should be something like

    db = await asyncpg.connect(dsn)
    while True:
        jobs = await jobs.consume(db, 1)
        for job in jobs:
            try:
                await jobs.run(db, job["job_id"])
                await jobs.ack(job["job_id"])
            except Exception as e:
                await jobs.nack(job["job_id"], str(e))
        await asyncio.sleep(1)

On the publisher side, jobs could be enqueued from between a postgresql transaction:

db = await asyncpg.connect(dsn)
async with db.transaction():
    # do whatever is needed,
    # queue a task
    await jobs.publish("package.file.sum", args=[1,2])

Installing the package

pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db

This will create the schema on the `jobs` postgresql schema

To run the worker,

jobs-worker postgresql://dsn

At the moment there are no too much things implemented there, just a single threaded worker, that needs a bit more of love :) If your application resides on a python package, tasks like yourpackage.file.method will be runnable as is.

Observavility and monitor

With psql, or exposing them throught postgresql_exporter

TODO

  • connect notifications, using pg_notify, when tasks are queued, are picked, are completed. With this in place, it's easy enought to write o WS to send notifications to connected customers.

  • improve the worker to run every job on an asyncio task

  • handle better exceptions on the python side

  • fix requirements file

  • add github actions to run CI

  • write better docs and some examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgjobs-0.2.1.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

pgjobs-0.2.1-py3-none-any.whl (13.5 kB view details)

Uploaded Python 3

File details

Details for the file pgjobs-0.2.1.tar.gz.

File metadata

  • Download URL: pgjobs-0.2.1.tar.gz
  • Upload date:
  • Size: 12.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.0 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.50.2 CPython/3.8.6

File hashes

Hashes for pgjobs-0.2.1.tar.gz
Algorithm Hash digest
SHA256 942c2c0e99f4425ab0dd91f32c2a7469bceb3645ee93604f5bffaebdd863b366
MD5 b835d4bdfeb5f7fb9488d7745be7f75c
BLAKE2b-256 a8f21c84b389872e28c4a222f9bd7dc85d6d1758c49534ac1e6452d5cc7b235e

See more details on using hashes here.

File details

Details for the file pgjobs-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: pgjobs-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 13.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.6.0 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.50.2 CPython/3.8.6

File hashes

Hashes for pgjobs-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 30ce47c0d969c1afbc0c2d0a6e2ecaff290d31b257329b6044e9b84fda537429
MD5 75ea0bb25b4020ec7a224d0420d52b0f
BLAKE2b-256 16f72cee4fb90d710d920a73bf588eda2c43288d38099e9a9bfc32f255bb647f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page