Toolkit for encapsulating Python-based computation into deployable and distributable tasks
Project description
werkit
Toolkit for encapsulating Python-based computation into deployable and distributable tasks.
Provides code that helps package things up:
- Serializing results
- Handling and serializing errors
- Deploying task workers using Redis, RQ and the Fargate CLI
They're particularly useful for providing repsonse consistency across different revisions of a service or different services.
Installation
pip install werkit
Usage
from werkit import Manager
def myfunc(param, verbose=False, handle_exceptions=True):
with Manager(handle_exceptions=handle_exceptions, verbose=verbose) as manager:
manager.result = do_some_computation()
return manager.serialized_result
Parallel computation
Werkit supports parallel computation using Redis and RQ.
You must install the dependencies separately:
pip install redis rq
Requesting work
from mylib import myfunc
from werkit.parallel import invoke_for_each
items = {'a': ..., 'b': ...}
job_ids = invoke_for_each(myfunc, items, connection=Redis.from_url(...))
Performing work
pip install redis rq
rq worker --burst werkit-default --url rediss://...
Note: mylib.myfunc
must be importable.
Using CloudManager
In place of the low-level API you can make your calls using CloudManager:
#!/usr/bin/env python
import click
from werkit.parallel import Config, CloudManager, invoke_for_each
manager = CloudManager(
config=Config(
local_repository="my-project",
ecr_repository="123456789012.dkr.ecr.us-east-1.amazonaws.com/my-project",
ecs_task_name="my-project",
task_args=[
"--cpu",
"1024",
"--memory",
"2048",
"--task-role",
"arn:aws:iam::123456789012:role/...",
"--security-group-id",
"sg-...",
"--subnet-id",
"subnet-...",
],
default_task_count=5,
)
)
@click.group()
def cli():
pass
@cli.command()
def login():
manager.login()
@cli.command()
@click.argument("tag")
def build_and_push(tag):
manager.build_and_push()
@cli.command()
def enqueue():
from myproject import myfunc
items = {"key1": "value1", "key2": "value2"}
invoke_for_each(
measure_body,
items,
clean=True,
connection=manager.redis_connection,
)
@cli.command()
@click.option(
"--count",
default=manager.config.default_task_count,
type=int,
help="Number of tasks to run",
)
@click.argument("tag")
def run(count, tag):
manager.run(tag=tag, count=count)
@cli.command()
def dashboard():
manager.dashboard()
@cli.command()
def ps():
manager.ps()
@cli.command()
def get_results():
print(manager.get_results())
@cli.command()
def clean():
manager.clean()
if __name__ == "__main__":
cli()
Getting results
from redis import Redis
from werkit.parallel import get_results
get_results(wait_until_done=True, connection=Redis.from_url(...))
Monitoring
You can monitor your queues using RQ Dashboard or one of the other methods outlined here.
Parallel computation on AWS lambda
Werkit also implements a parallel map on AWS lambda.
Werkit comes with a default lambda handler, that accepts an event of the form {"input":[a, b, ...],"extra_args":[c, d, ...]}
. Werkit invokes a lambda function in parallel for every item in input
, with an event of the form {"input": a, "extra_args":[c, d, ...]}
.
The werkit default handler is configurable via the following environmnent variables:
LAMBDA_WORKER_FUNCTION_NAME
: Name of the lambda worker function to invokeLAMBDA_WORKER_TIMEOUT
: How long to wait in seconds for the lambda worker function to return before returning a TimeoutError
Building and deploying functions to AWS Lambda
Werkit provides tools for programmatically building and deploying functions to AWS Lambda. There are two distinct steps: build and deploy.
The build process can run natively using a virtualenv, or in Docker. When either of the following cases apply, you can use the native virtualenv method:
- The function's dependencies are pure Python (no compiled extensions).
- You are building the function in Linux.
When building a function using compiled dependencies in OS X, the virtualenv method will try to pack up the OS X dependencies which of course won't work on Lambda. In that case you must use the Docker method.
Building a function natively using a virtualenv
def build_natively(build_dir="build", target_dir="build"):
import os
import shutil
from werkit.aws_lambda.build import (
collect_zipfile_contents,
create_venv_with_dependencies,
create_zipfile_from_dir,
)
shutil.rmtree(build_dir, ignore_errors=True)
venv_dir = os.path.join(build_dir, "venv")
create_venv_with_dependencies(
venv_dir=venv_dir,
# These are the defaults, which you can override if necessary.
upgrade_pip=True,
install_wheel=True,
install_werkit=False,
install_requirements_from=["requirements.txt"],
# You can pass credentials to `pip install`.
environment={"DEPLOY_TOKEN": DEPLOY_TOKEN},
)
contents_dir = os.path.join(build_dir, "contents")
collect_zipfile_contents(
target_dir=contents_dir,
venv_dir=venv_dir,
src_dirs=["mypackage", "assets"],
# Specify additional system files to copy to `lib/` inside the zipfile.
lib_files=[...],
)
os.makedirs(target_dir, exist_ok=True)
temp_path_to_zipfile = os.path.join(target_dir, "function.zip")
create_zipfile_from_dir(
dir_path=contents_dir,
path_to_zipfile=temp_path_to_zipfile
)
Building a function using Docker
Create a collect_and_zip.py
script which will run in Docker, with /target
mounted to a folder on the host system.
from werkit.aws_lambda.build import create_zipfile_from_dir
VERBOSE = False
collect_zipfile_contents(
target_dir="/build/contents",
venv_dir="/build/venv",
src_dirs=["mypackage", "assets"],
# Specify additional system files to copy to `lib/` inside the zipfile.
lib_files=[...],
)
# To improve performance, zip into the container and copy to the host
# afterward.
create_zipfile_from_dir(
dir_path="/build/contents",
path_to_zipfile="/build/function.zip"
)
shutil.copyfile("/build/function.zip", "/target/function.zip")
Create a Dockerfile
:
FROM python:3.7
WORKDIR /src
# Install any system dependencies you want to include in the zipfile.
RUN apt-get install -y --no-install-recommends ...
RUN rm -rf /var/lib/apt/lists/*
# Install werkit, along with any other dependencies needed for
# `collect_and_zip.py`.
RUN python3 -m pip install "werkit==0.10.0"
# Optionally receive credentials from the environment.
ARG DEPLOY_TOKEN
# Create the venv. As is necessary for some Docker base images, upgrade pip and
# install wheel.
RUN python3 -m venv /build/venv
RUN /build/venv/bin/pip install --upgrade pip wheel
# Install Python dependencies.
COPY requirements.txt /src/
RUN DEPLOY_TOKEN=${DEPLOY_TOKEN} /build/venv/bin/pip install -r requirements.txt
COPY mypackage/ /src/mypackage/
COPY assets/ /src/assets/
COPY collect_and_zip.py /src/
# Optionally set PYTHONPATH if `collect_and_zip.py` imports any internal source
# code.
ENV PYTHONPATH /src
CMD python3 collect_and_zip.py
Invoke the build:
DEPLOY_TOKEN = "..."
def build_in_docker(build_dir="build", target_dir="build"):
from executor import execute
docker_tag = "mypackage-lambda-builder"
execute(
"docker",
"build",
"-t",
docker_tag,
"-f",
"Dockerfile",
# Optionally pass credentials for the Docker build process.
"--build-arg",
f"DEPLOY_TOKEN={DEPLOY_TOKEN}",
".",
)
print("Build image created")
os.makedirs("build/", exist_ok=True)
execute(
"docker",
"run",
"--volume",
f"{os.path.abspath("build/")}:/target:Z",
"-t",
docker_tag,
)
print("Build finished")
Deploying the function
# A Lambda role is always required.
LAMBDA_ROLE = ...
# When the zipfile is larger than 50 MB, a temporary bucket is required.
S3_CODE_BUCKET = ...
def create():
from werkit.aws_lambda.deploy import perform_create
perform_create(
# Region is required.
aws_region="us-east-1",
function_name="myfunction",
local_path_to_zipfile="build/function.zip",
# The importable name of your handler function, which should have the
# signature `def handler(event, context):`.
handler="mypackage.worker.handler",
role=LAMBDA_ROLE,
# Required when the zipfile is larger than 50 MB.
s3_code_bucket=S3_CODE_BUCKET,
# Optionally override.
timeout=TIMEOUT,
memory_size=WORKER_MEMORY_SIZE,
runtime="python3.8",
env_vars={},
)
Updating the code for an existing function
LAMBDA_ROLE = ...
S3_CODE_BUCKET = ...
def update_code():
from werkit.aws_lambda.deploy import perform_update_code
perform_update_code(
aws_region="us-east-1",
function_name="myfunction",
local_path_to_zipfile="build/function.zip",
# Required when the zipfile is larger than 50 MB.
s3_code_bucket=S3_CODE_BUCKET,
)
Type definitions
TypeScript types are available for the Werkit job message format.
Contribute
- Issue Tracker: https://github.com/metabolize/werkit/issues
- Source Code: https://github.com/metabolize/werkit
Pull requests welcome!
Support
If you are having issues, please let us know.
License
The project is licensed under the MIT License.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.