Distributed Python job queue with asyncio and redis
Project description
SAQ
SAQ (Simple Async Queue) is a simple and performant job queueing framework built on top of asyncio and redis. It can be used for processing background jobs with workers. For example, you could use SAQ to schedule emails, execute long queries, or do expensive data analysis.
It uses aioredis >= 2.0.
It is similar to RQ and heavily inspired by ARQ. Unlike RQ, it is async and thus significantly faster if your jobs are async. Even if they are not, SAQ is still considerably faster due to lower overhead.
SAQ optionally comes with a simple UI for monitor workers and jobs.
Install
# minimal install
pip install saq
# web + hiredis
pip install saq[web,hiredis]
Usage
usage: saq [-h] [--workers WORKERS] [--verbose] [--web] settings
Start Simple Async Queue Worker
positional arguments:
settings Namespaced variable containing worker settings eg: eg module_a.settings
options:
-h, --help show this help message and exit
--workers WORKERS Number of worker processes
--verbose, -v Logging level: 0: ERROR, 1: INFO, 2: DEBUG
--web Start web app
Example
import asyncio
from saq import CronJob, Queue
# all functions take in context dict and kwargs
async def test(ctx, *, a):
await asyncio.sleep(0.5)
# result should be json serializable
# custom serializers and deserializers can be used through Queue(dump=,load=)
return {"x": a}
async def cron(ctx):
print("i am a cron job")
async def startup(ctx):
ctx["db"] = await create_db()
async def shutdown(ctx):
await ctx["db"].disconnect()
async def before_process(ctx):
print(ctx["job"], ctx["db"])
async def after_process(ctx):
pass
queue = Queue.from_url("redis://localhost")
settings = {
"queue": queue,
"functions": [test],
"concurrency": 10,
"cron_jobs": [CronJob(cron, cron="* * * * * */5")], # run every 5 seconds
"startup": startup,
"shutdown": shutdown,
"before_process": before_process,
"after_process": after_process,
}
To start the worker, assuming the previous is available in the python path
saq module.file.settings
To enqueue jobs
# schedule a job normally
job = await queue.enqueue("test", a=1)
# wait 1 second for the job to complete
await job.refresh(1)
print(job.results)
# schedule a job in 10 seconds
await queue.enqueue("test", a=1, scheduled=time.time() + 10)
Demo
Start the worker
saq examples.simple.settings --web
Navigate to the web ui
Enqueue jobs
python examples/simple.py
Comparison to ARQ
SAQ is heavily inspired by ARQ but has several enhancements.
- Avoids polling by leveraging BLMOVE or RPOPLPUSH and NOTIFY
- SAQ has much lower latency than ARQ, with delays of < 5ms. ARQ's default polling frequency is 0.5 seconds
- SAQ is up to 8x faster than ARQ
- Web interface for monitoring queues and workers
- Heartbeat monitor for abandoned jobs
- More robust failure handling
- Storage of stack traces
- Sweeping stuck jobs
- Handling of cancelled jobs different from failed jobs (machine redeployments)
- Before and after job hooks
- Easily run multiple workers to leverage more cores
Development
python -m venv env
source env/bin/activate
pip install -e ".[dev,web]"
docker run -p 6379:6379 redis
./run_checks.sh
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file saq-0.6.0.tar.gz
.
File metadata
- Download URL: saq-0.6.0.tar.gz
- Upload date:
- Size: 40.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.10.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 937a39d5af23bc270a056c80b9557c66fb81ec20df21bc643d1b52cdb82e9f28 |
|
MD5 | a7b3750a4b78eca814306553e1cd04de |
|
BLAKE2b-256 | 85b95028b5a410fbc4ad1ebfb32a5f6126a339ee239dcf1f426a0d686d8105c4 |
File details
Details for the file saq-0.6.0-py3-none-any.whl
.
File metadata
- Download URL: saq-0.6.0-py3-none-any.whl
- Upload date:
- Size: 37.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.0 CPython/3.10.4
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cd058bf8df53d63eed3061a4dd22f7966c392f63c90d558ec0cdb2630ab73213 |
|
MD5 | fbf289bd2b2b118b643de94f87026186 |
|
BLAKE2b-256 | 29f8afeb47842986548b9e896ce1df8903a78222d5d70b28c6aed66bd96506a1 |