Skip to main content

Socket.IO server to schedule Celery tasks from clients in real-time.

Project description

PyPI version Docker Image Version (latest semver)

Stirfried 🥡

Socket.IO server to control Celery tasks from the client (browser) in real-time.

Running the example

You can run the example included in the repo as follows:

  • Clone the repository
  • cd into the example directory
  • Run docker-compose build
  • Then docker-compose up
  • Open your browser and go to http://localhost:8080/
  • You should see the following interface:

Stirfried 🥡 test client

Getting started

Stirfried has a three layered architecture:

  1. Socket.IO clients
  2. Socket.IO server
  3. Celery workers

The design allows you to independently scale the number of servers when server-client communication workload increases and the number of workers when the task processing workload increases.

By leveraging Celery's task routing (explained below) you can also divide workers into groups and scale groups independently.

Socket.IO clients

Clients can connect using standard Socket.IO libraries.

The server is listening for clients to emit any of the following events:

Event Description
send_task({task_name, args, kwargs}) -> {status, data} Emit to schedule a task. Server immediately replies with status and task_id in case of success or a message in case of failure. Use a callback to receive it in the client.
revoke_task(task_id) Emit to cancel a task.

Clients can subscribe to the following events emitted by the server:

Event Description
on_progress({current, total, info, task_id, task_name}) Emitted on task progress updates.
on_retry({task_id, task_name[, einfo]}) Emitted on task retries. einfo is only available if stirfried_error_info=True.
on_failure({task_id, task_name[, einfo]}) Emitted on task failure. einfo is only available if stirfried_error_info=True.
on_success({retval, task_id, task_name}) Emitted on task success.
on_return({status, retval, task_id, task_name}) Emitted on task success and failure.

Socket.IO server

For the Socket.IO server component you can pull the prebuilt docker image:

docker pull korijn/stirfried

or you can copy the project and customize it to your liking.

Configuration

You are required to provide a settings.py file with the configuration for the server. Stirfried uses on the standard Celery configuration mechanism.

Available settings for stirfried:

  • stirfried_redis_url - Required. Redis connection string for the Socket.IO server.
  • stirfried_error_info - Optional. Set to True to include tracebacks in events when tasks fail.
  • stirfried_available_tasks - Optional. List of task names. If given, send_task will fail if a task name is not contained in the list.

Configuration: python-socketio

You can configure python-socketio by prefixing configuration keys with socketio_. They will be passed on without the prefix to the AsyncServer constructor.

Task routing

The server sends tasks to the Celery broker by name, so it can act as a gateway to many different Celery workers with different tasks. You can leverage Celery's task routing configuration for this purpose.

Example

Let's say you have two workers, one listening on the feeds queue and another on the web queue. This is how you would configure the server accordingly with settings.py:

# Stirfried settings
stirfried_redis = "redis://localhost:6379/0"

# Celery settings
broker_url = "redis://localhost:6379/1"
task_routes = {
    "feed.tasks.*": {"queue": "feeds"},
    "web.tasks.*": {"queue": "web"},
}

You can then run the server as follows:

docker run --rm -ti -v `pwd`/settings.py:/app/settings.py:ro -p 8000:8000 korijn/stirfried

Celery workers

You need to install Stirfried in your Celery workers via pip:

pip install stirfried

In your Celery workers, import the StirfriedTask:

from stirfried.celery import StirfriedTask

Configure StirfriedTask as the base class globally:

app = Celery(..., task_cls=StirfriedTask)

...or per task:

@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y

Rooms

The server injects the client's sid into the keyword argument room.

The StirfriedTask base class depends on the presence of this keyword argument.

This means you are required to add the keyword argument room=None to your task definitions in order to receive it.

Progress

You can emit progress from tasks by calling self.emit_progress(current, total, info=None).

Use the info=None keyword argument to send along arbitrary metadata, such as a progress message or early results.

Note that you are required to pass bind=True to the celery.task decorator in order to get access to the self instance variable.

@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    self.emit_progress(100, 100)  # 100%
    return s

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stirfried-0.4.0.tar.gz (5.5 kB view details)

Uploaded Source

Built Distribution

stirfried-0.4.0-py3-none-any.whl (5.3 kB view details)

Uploaded Python 3

File details

Details for the file stirfried-0.4.0.tar.gz.

File metadata

  • Download URL: stirfried-0.4.0.tar.gz
  • Upload date:
  • Size: 5.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-88-generic

File hashes

Hashes for stirfried-0.4.0.tar.gz
Algorithm Hash digest
SHA256 2ca0bb2b0208088f2dcd209852f39b348d8d627c2a14db4bf64f10719d2a3d50
MD5 0145793f387fa4cc7ded04fe46bac091
BLAKE2b-256 46fdbf55058498522d7f8ca7e09ea22a0365f493638d15195c0fa22f42d5c54b

See more details on using hashes here.

File details

Details for the file stirfried-0.4.0-py3-none-any.whl.

File metadata

  • Download URL: stirfried-0.4.0-py3-none-any.whl
  • Upload date:
  • Size: 5.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-88-generic

File hashes

Hashes for stirfried-0.4.0-py3-none-any.whl
Algorithm Hash digest
SHA256 0239e25d4b33b6a046f0dfda6aba2f4ab28c335eeefc4ededdf8bffe4a982c63
MD5 35352adb27176ae91b4f723e702f0412
BLAKE2b-256 88fe78706a28553e217198d4b7b43e2e0ffe73f6912659b1eee19a8475aa1644

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page