Socket.IO server to schedule Celery tasks from clients in real-time.
Project description
Stirfried 🥡
Socket.IO server to control Celery tasks from the client (browser) in real-time.
Running the example
You can run the example included in the repo as follows:
- Clone the repository
cd
into theexample
directory- Run
docker-compose build
- Then
docker-compose up
- Open your browser and go to
http://localhost:8080/
- You should see the following interface:
Getting started
Stirfried has a three layered architecture:
The design allows you to independently scale the number of servers when server-client communication workload increases and the number of workers when the task processing workload increases.
By leveraging Celery's task routing (explained below) you can also divide workers into groups and scale groups independently.
Socket.IO clients
Clients can connect using standard Socket.IO libraries.
The server is listening for clients to emit any of the following events:
Event | Description |
---|---|
send_task({task_name, args, kwargs}) -> {status, data} |
Emit to schedule a task. Server immediately replies with status and task_id in case of success or a message in case of failure. Use a callback to receive it in the client. |
revoke_task(task_id) |
Emit to cancel a task. |
Clients can subscribe to the following events emitted by the server:
Event | Description |
---|---|
on_progress({current, total, info, task_id, task_name}) |
Emitted on task progress updates. |
on_retry({task_id, task_name[, einfo]}) |
Emitted on task retries. einfo is only available if stirfried_error_info=True . |
on_failure({task_id, task_name[, einfo]}) |
Emitted on task failure. einfo is only available if stirfried_error_info=True . |
on_success({retval, task_id, task_name}) |
Emitted on task success. |
on_return({status, retval, task_id, task_name}) |
Emitted on task success and failure. |
Socket.IO server
For the Socket.IO server component you can pull the prebuilt docker image:
docker pull korijn/stirfried
or you can copy the project and customize it to your liking.
Configuration
You are required to provide a settings.py
file with the configuration
for the server. Stirfried uses on the standard Celery configuration mechanism.
Available settings for stirfried:
stirfried_redis_url
- Required. Redis connection string for the Socket.IO server.stirfried_error_info
- Optional. Set toTrue
to include tracebacks in events when tasks fail.stirfried_available_tasks
- Optional. List of task names. If given,send_task
will fail if a task name is not contained in the list.
Configuration: python-socketio
You can configure python-socketio by prefixing configuration keys with socketio_
. They will be passed on without the prefix to the AsyncServer
constructor.
Task routing
The server sends tasks to the Celery broker by name, so it can act as a gateway to many different Celery workers with different tasks. You can leverage Celery's task routing configuration for this purpose.
Example
Let's say you have two workers, one listening on the feeds
queue and
another on the web
queue. This is how you would configure the
server accordingly with settings.py
:
# Stirfried settings
stirfried_redis = "redis://localhost:6379/0"
# Celery settings
broker_url = "redis://localhost:6379/1"
task_routes = {
"feed.tasks.*": {"queue": "feeds"},
"web.tasks.*": {"queue": "web"},
}
You can then run the server as follows:
docker run --rm -ti -v `pwd`/settings.py:/app/settings.py:ro -p 8000:8000 korijn/stirfried
Celery workers
You need to install Stirfried in your Celery workers via pip:
pip install stirfried
In your Celery workers, import the StirfriedTask
:
from stirfried.celery import StirfriedTask
Configure StirfriedTask
as the base class globally:
app = Celery(..., task_cls=StirfriedTask)
...or per task:
@app.task(base=StirfriedTask)
def add(x, y, room=None):
return x + y
Rooms
The server injects the client's sid
into the keyword argument room
.
The StirfriedTask
base class depends on the presence of this keyword argument.
This means you are required to add the keyword argument room=None
to your
task definitions in order to receive it.
Progress
You can emit progress from tasks by calling self.emit_progress(current, total, info=None)
.
Use the info=None
keyword argument to send along arbitrary metadata, such as a
progress message or early results.
Note that you are required to pass bind=True
to the celery.task
decorator
in order to get access to the self
instance variable.
@celery.task(bind=True)
def add(self, x, y, room=None):
s = x
self.emit_progress(50, 100) # 50%
s += y
self.emit_progress(100, 100) # 100%
return s
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file stirfried-0.4.0.tar.gz
.
File metadata
- Download URL: stirfried-0.4.0.tar.gz
- Upload date:
- Size: 5.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-88-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2ca0bb2b0208088f2dcd209852f39b348d8d627c2a14db4bf64f10719d2a3d50 |
|
MD5 | 0145793f387fa4cc7ded04fe46bac091 |
|
BLAKE2b-256 | 46fdbf55058498522d7f8ca7e09ea22a0365f493638d15195c0fa22f42d5c54b |
File details
Details for the file stirfried-0.4.0-py3-none-any.whl
.
File metadata
- Download URL: stirfried-0.4.0-py3-none-any.whl
- Upload date:
- Size: 5.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-88-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 0239e25d4b33b6a046f0dfda6aba2f4ab28c335eeefc4ededdf8bffe4a982c63 |
|
MD5 | 35352adb27176ae91b4f723e702f0412 |
|
BLAKE2b-256 | 88fe78706a28553e217198d4b7b43e2e0ffe73f6912659b1eee19a8475aa1644 |