Skip to main content

Postgresql job scheduling

Project description

Jobs

A PL/PGSQL based work queue (Publisher/Consumer), with a python asyncio/asyncpg api

alpha software

Features

  • Implements a two layer API:

A postgresql layer: tasks can be published from PL/PGSQL functions, or procedures. Also can be extended using triggers.

A python layer (or any client with a postgresql driver). The default implementations is based on asyncio python, using the awesome asyncpg driver.

  • It's compatible with postgrest. All procedures, and tables, are scoped on an owned postgresql schema, and can be exposed throught it, with postgrest

  • Retry logic, schedule_at or timeout, are implemented on the publish method. A task, can be published, with a max_retries, param, or an especific timeout.

  • Internally uses two tables jobs.job_queue the table where pending and running tasks are scheduled, and jobs.job the table where ended tasks, are moved (success or failures).

  • By default, tasks are retyried three times, with backoff.

  • Timeout jobs, are expired, tasks by default had a 60s tiemout.

  • Tasks can be scheduled on the future, just provide a scheduled_at param.

  • There are views to monitor queue stats: jobs.all (all tasks), jobs.expired and jobs.running

  • Tasks could also be priorized, provide a priority number, greater priority, precedence over other tasks

Use from postgresql

SELECT job_id FROM
    jobs.publish(
        i_task -- method or function to be executed,
        i_arguments::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
        i_scheduled_at: timestamp = null, -- when the task should run
        i_timeout:numeric(7,2) -- timeout in seconds for the job
        i_priority:integer = null -- gretare number more priority
    )

On the worker side:

SELECT * from jobs.consume(
    num: integer -- number of desired jobs
);

returns a list of jobs to be processed, jobs are marked as processing, and should be acnlowledged with:

SELECT FROM jobs.ack(job_id);

or to return a failed job.

SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)

Also you can batch enqueue multiple jobs in a single request, using

SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);

where jobs.bulk_job is

create type jobs.bulk_job as (
    task varchar,
    arguments jsonb,
    scheduled_at timestamp,
    timeout integer,
    priority integer,
    max_retries integer
);

Use from python

On this side, implementing a worker, should be something like

    db = await asyncpg.connect(dsn)
    while True:
        jobs = await jobs.consume(db, 1)
        for job in jobs:
            try:
                await jobs.run(db, job["job_id"])
                await jobs.ack(job["job_id"])
            except Exception as e:
                await jobs.nack(job["job_id"], str(e))
        await asyncio.sleep(1)

On the publisher side, jobs could be enqueued from between a postgresql transaction:

db = await asyncpg.connect(dsn)
async with db.transaction():
    # do whatever is needed,
    # queue a task
    await jobs.publish("package.file.sum", args=[1,2])

Installing the package

pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db

This will create the schema on the `jobs` postgresql schema

To run the worker,

jobs-worker postgresql://dsn

At the moment there are no too much things implemented there, just a single threaded worker, that needs a bit more of love :) If your application resides on a python package, tasks like yourpackage.file.method will be runnable as is.

Observavility and monitor

With psql, or exposing them throught postgresql_exporter

TODO

  • connect notifications, using pg_notify, when tasks are queued, are picked, are completed. With this in place, it's easy enought to write o WS to send notifications to connected customers.

  • improve the worker to run every job on an asyncio task

  • handle better exceptions on the python side

  • fix requirements file

  • add github actions to run CI

  • write better docs and some examples

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pgjobs-0.2.0.tar.gz (11.4 kB view details)

Uploaded Source

File details

Details for the file pgjobs-0.2.0.tar.gz.

File metadata

  • Download URL: pgjobs-0.2.0.tar.gz
  • Upload date:
  • Size: 11.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.2.0 pkginfo/1.5.0.1 requests/2.24.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.48.0 CPython/3.8.2

File hashes

Hashes for pgjobs-0.2.0.tar.gz
Algorithm Hash digest
SHA256 e280142b491b117d657a96f03788d5c403e51becd9b44f3dcbf2b329f233d06e
MD5 0690c1d71c423eeab666b7c3b9bf4ec7
BLAKE2b-256 99e11ceabd4d0b5cfb0ef7eb0ab63a9545e4e03ea8923db4ff8d6722b69eefbb

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page