Skip to main content

Socket.IO server to schedule Celery tasks from clients in real-time.

Project description

PyPI version Docker Image Version (latest semver)

Stirfried 🥡

Socket.IO server to schedule Celery tasks from clients in real-time.

Getting started

Stirfried has a three layered architecture:

  1. Socket.IO clients
  2. Socket.IO server
  3. Celery workers

The design allows you to independently scale the number of servers when server-client communication workload increases and the number of workers when the task processing workload increases.

By leveraging Celery's task routing (explained below) you can also divide workers into groups and scale groups independently.

Socket.IO clients

Clients can connect using standard Socket.IO libraries.

The server is listening for clients to emit any of the following events:

Event Description
send_task({task_name, args, kwargs}) Emit to schedule a task.
revoke_task(task_id) Emit to cancel a task.

Clients can subscribe to the following events emitted by the server:

Event Description
on_return({status, retval, task_id, task_name}) Emitted on task success and failure.
on_progress({current, total, task_id, task_name}) Emitted on task progress updates.
on_retry({task_id, task_name}) Emitted on task retries.

Socket.IO server

For the Socket.IO server component you can pull the prebuilt docker image:

docker pull korijn/stirfried

or you can copy the project and customize it to your liking.

You are required to provide a settings.py file with the configuration for the server.

It requires at a minimum:

  • socketio_redis - Redis connection string for the Socket.IO server.
  • broker_url - Connection string for the Celery broker.

Task routing

The server sends tasks to the Celery broker by name, so it can act as a gateway to many different Celery workers with different tasks. You can leverage Celery's task routing configuration for this purpose.

Example

Let's say you have two workers, one listening on the feeds queue and another on the web queue. This is how you would configure the server accordingly with settings.py:

socketio_redis = "redis://localhost:6379/0"
broker_url = "redis://localhost:6379/1"
task_routes = {
    "feed.tasks.*": {"queue": "feeds"},
    "web.tasks.*": {"queue": "web"},
}

You can then run the server as follows:

docker run --rm -ti -v `pwd`/settings.py:/app/settings.py:ro -p 8000:8000 stirfried

Celery workers

You need to install Stirfried in your Celery workers via pip:

pip install stirfried

In your Celery workers, import the StirfriedTask:

from stirfried.celery import StirfriedTask

Configure StirfriedTask as the base class globally:

app = Celery(..., task_cls=StirfriedTask)

...or per task:

@app.task(base=StirfriedTask)
def add(x, y, room=None):
    return x + y

Rooms

The server injects the client's sid into the keyword argument room.

The StirfriedTask base class depends on the presence of this keyword argument.

This means you are required to add the keyword argument room=None to your task definitions in order to receive it.

Progress

You can emit progress from tasks by calling self.emit_progress(current, total).

Note that you are required to pass bind=True to the celery.task decorator in order to get access to the self instance variable.

@celery.task(bind=True)
def add(self, x, y, room=None):
    s = x
    self.emit_progress(50, 100)  # 50%
    s += y
    self.emit_progress(100, 100)  # 100%
    return s

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

stirfried-0.3.0.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

stirfried-0.3.0-py3-none-any.whl (4.4 kB view details)

Uploaded Python 3

File details

Details for the file stirfried-0.3.0.tar.gz.

File metadata

  • Download URL: stirfried-0.3.0.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.6.8 Windows/10

File hashes

Hashes for stirfried-0.3.0.tar.gz
Algorithm Hash digest
SHA256 69eb1277f19decf3cd396e6d1bb5a1ea138f1148c989002bfc8ea8d6c2fa08f6
MD5 54d8b41f0a52164067ccc66f78abba89
BLAKE2b-256 fac0ff11f5cb3a00f1539756b24778b3c51c060e7aea4b8ff0a7a4f361dff6c4

See more details on using hashes here.

File details

Details for the file stirfried-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: stirfried-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 4.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.0.5 CPython/3.6.8 Windows/10

File hashes

Hashes for stirfried-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fa6f5ee7f963c366fc985c399288a860b0a7931735248ecfc40b201489a99ba5
MD5 15b956c95063aad580f4f3df3b504c25
BLAKE2b-256 183861a3fddff29d44f31a9cb6131124e8a09859a3883ee8f40bd9d8d3e75c46

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page