Skip to main content

Run and monitor celery tasks

Project description

Summary

Run, monitor and log celery tasks.

Installation and setup

Declare tasks using celery task or cubicweb-celery cwtasks.

On worker side, install cw-celerytask-helpers.

celeryconfig.py example:

BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = BROKER_URL
CUBICWEB_CELERYTASK_REDIS_URL = BROKER_URL
CELERY_IMPORTS = ('cw_celerytask_helpers.helpers', 'module.containing.tasks')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json', 'msgpack', 'yaml']

In this configuration example, the cw_celerytask_helpers in CELERY_IMPORTS is required to have logging data (in the task) sent back to the Cubicweb instance via Redis. The CUBICWEB_CELERYTASK_REDIS_URL is the Redis endpoint used for this logging handling mechanism.

Start a worker:

# running cubicweb tasks (celeryconfig.py will be imported from your instance config directory)
celery -A cubicweb_celery -i <CW_INSTANCE_NAME> worker -l info

# running pure celery tasks
celery worker -l info

Task state synchronization requires to run the celery-monitor command:

cubicweb-ctl celery-monitor <instance-name>

Ensure to have the celeryconfig.py loaded for both cubicweb instance and celery worker, enforce by settings with CELERY_CONFIG_MODULE environment variable (it must be an importable python module).

Running tasks

Create a task:

from celery import current_app as app
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task(name='hi_there')
def my_task(arg, kw=0):
    logger.info('HI %s %s!', arg, kw)
    return 42

Run a task:

from cubes.celerytask.entities import start_async_task

cwtask = start_async_task(cnx, 'hi_there', 'THERE', kw=42)
cnx.commit()

start_async_task() accept task names, task objects or task signatures: http://docs.celeryproject.org/en/latest/userguide/canvas.html#signatures

For instance, to start the above task in a dedicated queue named myqueue:

import celery

start_async_task(cnx, celery.signature('hi_there', args=('THERE',),
                                       kwargs={'kw': 42}, queue='myqueue'))

Testing task based application

In CubicWeb test mode, tasks don’t run automatically, use cubes.celerytask.entities.get_tasks() to introspect them and cubes.celerytask.entities.run_all_tasks() to run them.

Also, CELERY_ALWAYS_EAGER and CELERY_EAGER_PROPAGATES_EXCEPTIONS are set to True by default.

Demo

A simple demo is supplied with the source code.

We assume the present cubicweb-celerytask cube is properly installed in a working Cubicweb environment, and there is a redis server available on redis://localhost:6379/0 (you can change this in demo/celeryconfig.py is needed).

Then:

user@host:~/celerytask$ cubicweb-ctl create celerytask demo

For the sake of simplicity, choose sqlite as database driver, and say ‘yes’ to the question “aAllow anonymous access ?”

Start the web application:

user@host:~/celerytask$ cubicweb-ctl start -D -linfo demo

Open your web browser on http://127.0.0.1:8080/

In another terminal, start a celery worker:

user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ celery worker -l info -E

In a third terminal, launch some tasks:

user@host:~$ cd celerytask/demo
user@host:~/celerytask/demo$ cubicweb-ctl shell demo launchtasks.py

You should be able to see 3 tasks on http://127.0.0.1:8080/CeleryTask

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubicweb-celerytask-0.5.0.tar.gz (21.4 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page