Neu.ro Airflow plugin
Project description
Neu.ro Airflow plugin
This package helps you execute your ML workloads on neu.ro platform from Airflow environment.
Also, take a look at our CLI reference and Python API reference.
Environment
- Python 3.6+
- apache-airflow >= 1.10.x
- Neuromation >= 20.4.6
Installation
The plugin is written to automatically register with Airflow, so all you have to do is install it into your Python environment:
pip install neuro-airflow-plugin
Usage
Before start you need to get a Neuro token by using CLI command:
neuro config show-token
and set up a Neuro Connection (neuro_default
by default) in Airflow:
airflow connections --add \
--conn_id neuro_default2 \
--conn_type "neuro" \
--conn_extra '{"token": "Put your Token here..."}'
Apart from token
you can also provide those fields as part of extra json:
cluster
- name of the cluster used for compute scheduling. Default cluster will be used if not provided.api_url
- entry URL for Neuro Platform. Only needed for custom clusters.
You can set up the connection from UI interface as well, just put the same JSON document
into Extra
form field. Connection type does not matter, so you can pick any that the UI allows.
For more information on how to set up connections in Airflow see Managing Connections.
NeuroRunOperator
Runs a Job in the Neuro Platform. Example usage:
from airflow.operators.neuro import NeuroRunOperator
run = NeuroRunOperator(
task_id="small-deeds",
job_command="echo 'Big things start from small deeds'",
job_image="ubuntu:{{ var.value.ubuntu_tag }}",
job_resources_preset="{% if var.value.use_large_cpu %}cpu-large{% else %}cpu-small{% endif %}"
)
For more usage examples see examples/dags
folder of the repository.
Operator arguments
job_command
str Required - Command to be executed in the Job. If you need to override the entrypoint of an image, seejob_entrypoint
instead.job_image
str Required - Container image used for the Job. Name can be either a docker image name hosted on an external public repository or a Neuro image specified byimage://
scheme.job_name
str - Optional job name. Note that creating 2 running jobs with the same name by the same user is forbidden.job_volumes
list - List of strings describing a volume mount orneuromation.Volume
objects. String description consists of 3 parts separated by column: storage URI, mount path, mount mode. For example:storage:my-project:/var/storage:ro
.job_resources_preset
str - Predefined resource configuration (to see available values, runneuro config show
)job_resources
Resources - Custom resource configuration. See Python API reference for details.job_is_preemptible
bool - Whether the Job may be run on a preemptible, or also known as Spot instance. Is only used with custom resource configuration.job_extshm
bool - Request extended '/dev/shm' space. Defaults toTrue
and is only used with predefined resource configuration.job_tags
list - List of string tags to mark the Job with. Can later be used for filtering, etc.job_description
str - Optional job description in free format.job_lifespan
float - Optional job run-time limit in seconds. Is unlimited by default.job_environ
dict - Environment variables to run the Job with. Jinja template support is only provided for values, not for keys, see more details below.job_entrypoint
str - Override ENTRYPOINT of the container image.job_http_port
str - Enable HTTP port forwarding to specified container port. If used you can access it from a custom link definition on the Task panel in Airflow UI (see Airflow docs for details on how it works). Disabled by default.job_http_auth
bool - Disable Neuro authentication on the exposed port injob_http_port
.job_tty
bool - Allocate a TTY for the Container.job_detach
bool - Detach after starting the job. If detached Job logs will not be viewable in Airflow interface, but the job will not consume Airflow worker slot. Defaults toTrue
.raise_on_errno
bool - Raise an error if job returns a non-zero exit code. Ignored ifjob_detach
isTrue
. Default toTrue
.neuro_conn_id
bool - Name of the connection to use for Neuro authentication. Defaults toneuro_default
.
See also the neuro run
reference in CLI documentation
Jinja2 template fields
Airflow supports passing custom attributes and dynamic definitions using Jinja templating fields. This operator supports templating on the following fields:
job_command
job_image
job_volumes
job_name
job_resources_preset
job_tags
job_environ
job_entrypoint
job_description
job_http_port
neuro_conn_id
XCom exports
The operator exports 2 XCom values: return_value
(default in Airflow for query) and assigned_job
. Both are
JSON documents with the following fields:
id
str - Job ID assigned by Neuro on start.exit_code
int - Command return code if the Job already finished.status
str - One of job statuses:pending
,running
,succeeded
,failed
orunknown
.http_url
str - URL of the exposed HTTP port ifjob_http_port
is used.
NeuroJobStatusSensor
Wait for a Job to be completed or any other status transition to happen. Example usage:
from airflow.sensors.neuro import NeuroJobStatusSensor
wait = NeuroJobStatusSensor(
task_id="wait_close",
job_id="{{ task_instance.xcom_pull(task_ids='small-deeds')['id'] }}", # noqa
poke_interval=5,
timeout=10 * 60,
)
Operator arguments
job_id
str - Job ID to query for status updates.job_statuses
list - List JobStatus enum values to wait for.neuro_conn_id
str - Name of the connection to use for Neuro authentication. Defaults toneuro_default
.
Jinja2 template fields
job_id
XCom exports
Does not export any XCom values.
NeuroHook
In some cases you may need to access other functionalities of the platform. This can be done using the NeuroHook. For example:
import yarl
from neuromation.api import ResourceNotFound
from airflow.hooks.neuro import NeuroHook
from airflow.operators.python_operator import BranchPythonOperator
def check_model(templates_dict, **kw):
hook = NeuroHook()
with hook:
try:
hook.run(
hook.client.storage.stat(
yarl.URL("storage:" + templates_dict["model_path"])
)
)
return "process_with_model"
except ResourceNotFound:
return "process_without_model"
check_model = BranchPythonOperator(
task_id="check_model_exists",
python_callable=check_model,
provide_context=True,
templates_dict={"model_path": "{{ var.value.project_home }}/model.pth"},
)
Explore the Python SDK for more features of the Platform.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for neuro-airflow-plugin-0.0.1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2d6ddf716f0026ce043808ac1f43c3394f18ff54b6b5e9abb7f88cc39c84ed00 |
|
MD5 | 5add455cbdce851de9656afbb668f589 |
|
BLAKE2b-256 | 4f268f6d1f88c54a917117c154ceb21b72b92e6bf8ab1a202755a9bf8ee5ac74 |
Hashes for neuro_airflow_plugin-0.0.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 136af84892fadebe7f04b6cbf834f01419d05b040061611259def6bb5587949e |
|
MD5 | 61b09e9fc0a89d8ef1208f30fd3d2fa4 |
|
BLAKE2b-256 | 47040c7ea05ddc5ef694d72e80ea66b614830823365545482e9a14c2a04b9ff3 |