Schedules celery tasks to run in the potentially far future
Project description
Schedules celery tasks to run in the potentially far future, using a separate storage backend (currently only redis is supported) in combination with a cronjob.
Usage
Configure the storage by adding a setting like longterm_scheduler_backend = 'redis://localhost:6739/1' to your celery configuration. (The storage also respects the built-in celery configuration settings redis_socket_timeout, redis_socket_connect_timeout and redis_max_connections.)
Configure your celery app to use a customized task class MYCELERY = celery.Celery(task_cls=celery_longterm_scheduler.Task)
Set up a cronjob to run celery longterm_scheduler (e.g. every 5 minutes)
Now you can schedule your tasks by calling mytask.apply_async(args, kwargs, eta=datetime) as normal. This returns a normal AsyncResult object, but only reading the .id is supported; any other methods or properties may fail explictly or implicitly.
You can completely delete a scheduled job by calling celery_longterm_scheduler.get_scheduler(MYCELERY).revoke('mytaskid') (we cannot hook into the celery built-in AsyncResult.revoke(), unfortunately). revoke() returns True on success and False if the given task cannot be found in the storage backend (e.g. because it has already come due and been executed).
Instead of sending a normal job to the celery broker (with added timing information), this creates a job entry in the scheduler storage backend. The cronjob then periodically checks the storage for any jobs that are due, and only then sends a normal celery job to the broker.
Rationale
Why not use the celery built-in apply_async(eta=)? Because you cannot ever really delete a pending job. AsyncResult('mytaskid').revoke() can only add the task ID to the statedb, where it has to stay _forever_ so the job is recognized as revoked. For jobs that are scheduled to run in 6 months time or later, this would create an unmanageable, ever-growing statedb.
Why not use celerybeat? Because it is built for periodic jobs, and we need single-shot jobs. And then there’s not much to gain from the celerybeat implementation, especially since we want to use redis as storage (since we’re already using that as broker and result backend).
Implementation
Redis schema
celery_longterm_scheduler assumes that it talks to a dedicated redis database. It creates an entry per scheduled job using SET jobid job-configuration (job-configuration is serialized with JSON) and uses a single sorted set named scheduled_task_id_by_time that contains the jobids scored by the unix timestamp (UTC) when they are due.
Run tests
Using tox and py.test. Maybe install tox (e.g. via pip install tox) and then simply run tox.
For the integration tests you need to have the redis binary installed (tests start their own server).
celery_longterm_scheduler changes
1.3.0 (2024-01-08)
Also support rediss:// URLs
1.2.0 (2022-06-23)
Update to celery-5.x
1.1.2 (2020-05-27)
Add bw-compat so py3 can read py2-serialized tasks
1.1.1 (2019-12-19)
Update to current redis client library version 3.x
1.1.0 (2019-11-28)
Make Python-3 compatible.
1.0.1 (2018-01-17)
Don’t try to schedule on apply_async(eta=None) calls
1.0.0 (2017-09-29)
Initial release
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Hashes for celery_longterm_scheduler-1.3.0.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | d19a9d0f9d598cf8b13de35c2c2fdcd647b8c279c632ba959b0b1bc84ece86ea |
|
MD5 | 6feea098d09bf98593db89425064e3ab |
|
BLAKE2b-256 | ed418f9f78e698ecd769a08121acf5a27e62d365b8b630778c88cb64ce991eb7 |