Socket.IO server to schedule Celery tasks from clients in real-time.
Project description
Stirfried 🥡
Stirfried is an ASGI HTTP/Socket.IO server that provides both browser-based and regular clients with real-time control over Celery tasks.
Tasks are scheduled by name, meaning the server won't necessarily need an update when changes are made to the workers and tasks available to it.
Stirfried implements a simple-to-scale, three layered architecture: clients, servers and workers. Any layer can be scaled out by adding more instances.
Stirfried provides Socket.IO and HTTP APIs with three core functions:
- Schedule a task
- Revoke a task
- Query task info
Want to see Stirfried in action before digging through the README? Try running the example.
Built on:
Workers
Install Stirfried in your Celery workers via pip/pipenv/poetry:
pip install stirfried
Import the StirfriedTask
:
from stirfried.celery import StirfriedTask
Configure the base class globally:
app = Celery(..., task_cls=StirfriedTask)
...or per task:
@app.task(base=StirfriedTask)
def add(x, y, room=None):
return x + y
Servers
The server can be run by running the korijn/stirfried
Docker container and
exposing port 8000, or alternatively by cloning this repo, installing the dependencies
with poetry and starting the uvicorn
server as demonstrated in the example code.
You can configure both the servers and workers with a settings.py
file, via the standard Celery configuration mechanism. In the docker deployment scenario, you can mount the settings file to the path /app/settings.py
.
Clients
Clients can connect to the Socket.IO API using standard Socket.IO libraries, and to the HTTP API using plain window.fetch
.
Task object schema
Tasks are scheduled by submitting the following task object to either of the APIs:
{
"task_name": "", // (required) task name
"args": [], // (optional) task arguments
"kwargs": {}, // (optional) task keyword arguments
"room": "", // (optional) custom room override, only processed if
// `custom_rooms` is enabled
// NOTE: can also be used to disable server events
// for this task by passing the sentinel room
// ("NO_EMIT" by default)
"chain": [] // (optional) array of task objects to chain onto the main task
// task objects use the same schema, except for
// the `chain` property which cannot be nested further
// NOTE: chained tasks are applied in reverse order
}
Socket.IO API
Events are described in the following format: name(args[, optional]) -> callback_args
Clients can emit any of the following events that servers are listening for:
Event | Description |
---|---|
send_task({task_name[, args][, kwargs][, room][, chain]}) -> {status, data} |
Schedule a task. Use a callback to receive the reply in the client. status indicates if scheduling succeeded and data contains the task id or error message in case of failure. The client can use the task id as reference when processing subsequent server-emitted events. Reference the task object schema for more details. |
revoke_task(task_id) |
Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running. |
task_info(task_id) -> {id, state, result} |
Query task info. Use a callback to receive the reply in the client. Only works if a Celery result backend is configured. |
Clients can listen for the following server-emitted events, which directly hook into the Celery Task class callbacks, except for on_progress
which is a Stirfried addition and may be implemented by tasks to support progress events:
Event | Description |
---|---|
on_progress({current, total, info, task_id, task_name}) |
Emitted on task progress updates. This event will only be emitted if tasks call emit_progress . |
on_retry({task_id, task_name[, einfo]}) |
Emitted automatically on task retries. einfo is only included if stirfried_error_info=True . |
on_failure({task_id, task_name[, einfo]}) |
Emitted automatically on task failure. einfo is only included if stirfried_error_info=True . |
on_success({retval, task_id, task_name}) |
Emitted automatically on task success. |
on_return({status, retval, task_id, task_name}) |
Emitted automatically on task success and failure. |
HTTP API
Endpoint | Description |
---|---|
POST /task |
Schedule a task. Submit the Task object as JSON the body of the request. Reference the task object schema for more details. |
DELETE /task/{id} |
Revoke a task. Will not fail if the task does not exist, and won't do anything if the task is already running. |
GET /task/{id} |
Query task info. Only works if a Celery result backend is configured. |
Settings
You can configure Celery, Socket.IO and Stirfried all from the same settings.py
file. Stirfried settings are prefixed with stirfried_
, Socket.IO settings are prefixed with socketio_
, and Celery settings are used as-is (not prefixed).
Socket.IO server settings are passed on directly (but without the prefix) to the AsyncServer
constructor of the python-socketio library, see their documentation for the options that are available. See the Celery documentation for the options there.
The following options are additionally available for configuring Stirfried servers and workers:
Key | Type | Default | Description |
---|---|---|---|
stirfried_enable_http |
bool |
True |
Set to False to disable the HTTP API. |
stirfried_enable_socketio |
bool |
True |
Set to False to disable the Socket.IO API. |
stirfried_enable_task_info |
bool |
True |
Set to False to disable the task_info event and GET /task/{id} endpoint. |
stirfried_enable_revoke_task |
bool |
True |
Set to False to disable the revoke_task event and DELETE /task/{id} endpoint. |
stirfried_redis_url |
str |
"" |
Connection string for the Socket.IO API server-to-server communication over Redis pubsub. Required if you want workers to be able to emit events. |
stirfried_available_tasks |
List[str] |
[] |
If non-empty, send_task and POST /task will fail if a task name is not contained in the list. |
stirfried_error_info |
bool |
False |
Set to True to include error messages and tracebacks in events, event callbacks and HTTP responses. |
stirfried_sentinel_room |
str |
"NO_EMIT" |
A magic string value that can be passed to the room argument to prevent workers from emitting events for a task. This is the default room value for task sent to the HTTP API since there is no Socket.IO client. |
stirfried_custom_rooms |
bool |
False |
Set to True to allow clients to override the default room for server-emitted events. |
stirfried_header_task_map |
Dict[str, Dict[str, str] |
{} |
Configure to map headers to keyword arguments for specific tasks. For example, {"send_email": {"Date": "date"}} , would cause the Date header's value to be injected into the keyword argument date whenever a send_email task is scheduled. This can be used in concert with Socket.IO's extraHeaders feature to implement authorization and validation. |
Rooms
For Socket.IO API scheduled tasks, server-emitted events are sent to the client that scheduled the task by default. For HTTP API scheduled tasks, server-emitted events are not emitted by default. The server accomplishes this by injecting a value into
the room
keyword argument of Stirfried Celery tasks.
The StirfriedTask
base class depends on the presence of this keyword argument.
This means you are required to add the keyword argument room=None
to your
task definitions in order to receive it.
If custom_rooms
is enabled, clients can override the value by sending along a custom room
value (though not via the keyword arguments of the task, see the API schema documentation).
Progress events
You can emit progress events from workers by calling self.emit_progress(current, total, info=None)
in a task.
You can use the optional info
keyword argument to send along arbitrary metadata, such as a progress message or early results.
Note that you are required to pass bind=True
to the celery.task
decorator
in order to get access to the self
instance variable.
@celery.task(bind=True)
def add(self, x, y, room=None):
s = x
self.emit_progress(50, 100) # 50%
s += y
return s
Binary/big data
Socket.IO clients, servers and Celery workers support the msgpack
transport, allowing you to use binary data directly (without needing to manually convert
to and from base64 encoded strings and suffering the according performance penalty).
You should also be aware of limitations in Redis on client output buffers. This means that you cannot emit events greater than a certain value (32mb by default). You can override this setting in various ways, here's how to do it via the Redis server CLI:
# client-output-buffer-limit <class> <hard limit> <soft limit> <soft seconds>
redis-server --client-output-buffer-limit pubsub 256mb 128mb 30
Testing
When unit testing a Stirfried Celery worker, the recommended approach is to disable the
Redis connection by simply leaving out stirfried_redis_url
from your settings, and to
directly call the task functions in unit tests. The lack of a Redis connection will
short-circuit any events that would normally be emitted. This setup will allow you to
treat tasks as regular functions and perform unit testing as usual.
Optionally, you can patch/mock any calls to self.emit_progress
using standard Python
testing utilities to test those too.
Example code
The repo includes an example demonstrating all of the functionality provided by Stirfried.
You can run the example as follows:
- Clone the repository
cd
into theexample
directory- Run
docker-compose build
- Then
docker-compose up
- Open your browser and go to
http://localhost:8080/
- You should see the following interface and are ready to give Stirfried a try:
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file stirfried-0.7.0.tar.gz
.
File metadata
- Download URL: stirfried-0.7.0.tar.gz
- Upload date:
- Size: 9.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-96-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5a715003c3417bf7dd6531be2fd701bb18c001470dcd6750879065d99428e327 |
|
MD5 | 8d86b95f23361d73023b7df22bd66b9c |
|
BLAKE2b-256 | c8037bcfb58478a565e5567d4ccba91ca7d61e5ba6cb8aa427776b62a4639b05 |
File details
Details for the file stirfried-0.7.0-py3-none-any.whl
.
File metadata
- Download URL: stirfried-0.7.0-py3-none-any.whl
- Upload date:
- Size: 8.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.0.5 CPython/3.6.8 Linux/4.15.0-96-generic
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | b5e049f99122e589be6e7eac4a8507576e37a11b719332a1b58d613be6c4dc9d |
|
MD5 | bceeca9c55db7b17a8c062712f9b01a7 |
|
BLAKE2b-256 | 240abaa5130005be9c6211d4619b28a65ecd71d0013e1e46a62b51151b3cc863 |