Postgresql job scheduling
Project description
Jobs
A PL/PGSQL based work queue (Publisher/Consumer), with a python asyncio/asyncpg api
alpha software
Features
-
Implements a two layer API:
A postgresql layer: tasks can be published from PL/PGSQL functions, or procedures. Also can be extended using triggers.
A python layer (or any client with a postgresql driver). The default implementations is based on asyncio python, using the awesome asyncpg driver.
-
It's compatible with postgrest. All procedures, and tables, are scoped on an owned postgresql schema, and can be exposed throught it, with postgrest
-
Retry logic, schedule_at or timeout, are implemented on the publish method. A task, can be published, with a max_retries, param, or an especific timeout.
-
Internally uses two tables
jobs.job_queue
the table where pending and running tasks are scheduled, andjobs.job
the table where ended tasks, are moved (success or failures). -
By default, tasks are retyried three times, with backoff.
-
Timeout jobs, are expired, tasks by default had a 60s tiemout.
-
Tasks can be scheduled on the future, just provide a
scheduled_at
param. -
There are views to monitor queue stats:
jobs.all
(all tasks),jobs.expired
andjobs.running
-
Tasks could also be priorized, provide a priority number, greater priority, precedence over other tasks
-
consumer_topic, allows to consume tasks with a * (topic.element.%)
-
rudimentary benchs on my laptop showed that it can handle 1000 tasks/second, but anyway it depends on your postgres instance.
-
instead of a worker daemon, tasks could also be consumed from a cronjob, or a regular python or a kubernetes job. (It could be used to parallelize k8 jobs)
tradeofs
- All jobs had to be aknowledged positive or negative (ack/nack)
Use from postgresql
SELECT job_id FROM
jobs.publish(
i_task -- method or function to be executed,
i_body::jsonb = null -- arguments passed to it (on python {args:[], kwargs:{}}),
i_scheduled_at: timestamp = null, -- when the task should run
i_timeout:numeric(7,2) -- timeout in seconds for the job
i_priority:integer = null -- gretare number more priority
)
On the worker side:
SELECT * from jobs.consume(
num: integer -- number of desired jobs
);
returns a list of jobs to be processed,
Or selective consume a topic:
SELECT * from jobs.consume_topic('topic.xxx.%', 10)
jobs are marked as processing, and should be acnlowledged with:
SELECT FROM jobs.ack(job_id);
or to return a failed job.
SELECT FROM jobs.nack(job_id, traceback, i_schedule_at)
Also you can batch enqueue multiple jobs in a single request, using
SELECT * FROM jobs.publish_bulk(jobs.bulk_job[]);
where jobs.bulk_job is
create type jobs.bulk_job as (
task varchar,
body jsonb,
scheduled_at timestamp,
timeout integer,
priority integer,
max_retries integer
);
Use from python
On this side, implementing a worker, should be something like
db = await asyncpg.connect(dsn)
while True:
jobs = await jobs.consume(db, 1)
for job in jobs:
try:
await jobs.run(db, job["job_id"])
await jobs.ack(job["job_id"])
except Exception as e:
await jobs.nack(job["job_id"], str(e))
await asyncio.sleep(1)
On the publisher side, jobs could be enqueued from between a postgresql transaction:
db = await asyncpg.connect(dsn)
async with db.transaction():
# do whatever is needed,
# queue a task
await jobs.publish("package.file.sum", args=[1,2])
Installing the package
pip install pgjobs
jobs-migrate postgresql://user:password@localhost:5432/db
This will create the schema on the `jobs` postgresql schema
To run the worker,
jobs-worker postgresql://dsn
At the moment there are no too much things implemented there,
just a single threaded worker, that needs a bit more of love :)
If your application resides on a python package,
tasks like yourpackage.file.method
will be runnable as is.
Observavility and monitor
With psql, or exposing them throught postgresql_exporter
TODO
-
connect notifications, using pg_notify, when tasks are queued, are picked, are completed. With this in place, it's easy enought to write o WS to send notifications to connected customers.
-
improve the worker to run every job on an asyncio task
-
handle better exceptions on the python side
-
fix requirements file
-
add github actions to run CI
-
write better docs and some examples
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.