Toolkit for encapsulating Python-based computation into deployable and distributable tasks
Project description
werkit
Toolkit for encapsulating Python-based computation into deployable and distributable tasks.
Provides code that helps package things up:
- Serializing results
- Handling and serializing errors
- Deploying task workers using Redis, RQ and the Fargate CLI
They're particularly useful for providing repsonse consistency across different revisions of a service or different services.
Installation
pip install werkit
Usage
from werkit import Manager
def myfunc(param, verbose=False, handle_exceptions=True):
with Manager(handle_exceptions=handle_exceptions, verbose=verbose) as manager:
manager.result = do_some_computation()
return manager.serialized_result
Parallel computation
Werkit supports parallel computation using Redis and RQ.
You must install the dependencies separately:
pip install redis rq
Requesting work
from mylib import myfunc
from werkit.parallel import invoke_for_each
items = {'a': ..., 'b': ...}
job_ids = invoke_for_each(myfunc, items, connection=Redis.from_url(...))
Performing work
pip install redis rq
rq worker --burst werkit-default --url rediss://...
Note: mylib.myfunc
must be importable.
Using CloudManager
In place of the low-level API you can make your calls using CloudManager:
#!/usr/bin/env python
import click
from werkit.parallel import Config, CloudManager, invoke_for_each
manager = CloudManager(
config=Config(
local_repository="my-project",
ecr_repository="123456789012.dkr.ecr.us-east-1.amazonaws.com/my-project",
ecs_task_name="my-project",
task_args=[
"--cpu",
"1024",
"--memory",
"2048",
"--task-role",
"arn:aws:iam::123456789012:role/...",
"--security-group-id",
"sg-...",
"--subnet-id",
"subnet-...",
],
default_task_count=5,
)
)
@click.group()
def cli():
pass
@cli.command()
def login():
manager.login()
@cli.command()
@click.argument("tag")
def build_and_push(tag):
manager.build_and_push()
@cli.command()
def enqueue():
from myproject import myfunc
items = {"key1": "value1", "key2": "value2"}
invoke_for_each(
measure_body,
items,
clean=True,
connection=manager.redis_connection,
)
@cli.command()
@click.option(
"--count",
default=manager.config.default_task_count,
type=int,
help="Number of tasks to run",
)
@click.argument("tag")
def run(count, tag):
manager.run(tag=tag, count=count)
@cli.command()
def dashboard():
manager.dashboard()
@cli.command()
def ps():
manager.ps()
@cli.command()
def get_results():
print(manager.get_results())
@cli.command()
def clean():
manager.clean()
if __name__ == "__main__":
cli()
Getting results
from redis import Redis
from werkit.parallel import get_results
get_results(wait_until_done=True, connection=Redis.from_url(...))
Monitoring
You can monitor your queues using RQ Dashboard or one of the other methods outlined here.
Contribute
- Issue Tracker: https://github.com/metabolize/werkit/issues
- Source Code: https://github.com/metabolize/werkit
Pull requests welcome!
Support
If you are having issues, please let us know.
License
The project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file werkit-0.3.0.tar.gz
.
File metadata
- Download URL: werkit-0.3.0.tar.gz
- Upload date:
- Size: 7.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.9.1 setuptools/40.8.0 requests-toolbelt/0.8.0 tqdm/4.25.0 CPython/2.7.16
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | a2c87505c588415a4767182c5b54d18555825d7f064119cf5721360e4f6b54aa |
|
MD5 | 996ea13bbdfdf6a2ed639e94cdff3eb1 |
|
BLAKE2b-256 | 36147c6dc3cb8cf8faa3c2f8a029c4811620ee3855a6ada1e2c05a27e5e1a8c6 |