An Apache Airflow provider package built by Astronomer to integrate with Anyscale.
Project description
astro-provider-anyscale
This repository provides a set of tools for integrating Anyscale with Apache Airflow, enabling the orchestration of Anyscale jobs and services within Airflow workflows. It includes a custom hook, two operators, and two triggers specifically designed for managing and monitoring Anyscale jobs and services.
Components
Hook
- AnyscaleHook: Facilitates communication between Airflow and Anyscale. It uses the Anyscale API to interact with the Anyscale platform, providing methods to submit jobs, query their status, and manage services.
Operators
- SubmitAnyscaleJob: This operator submits a job to Anyscale. It takes configuration parameters for the job, such as the entry point, build ID, and compute configuration. The operator uses
AnyscaleHook
to handle the submission process. - RolloutAnyscaleService: Similar to the job submission operator, this operator is designed to manage services on Anyscale. It can be used to deploy new services or update existing ones, leveraging
AnyscaleHook
for all interactions with the Anyscale API.
Triggers
- AnyscaleJobTrigger: Monitors the status of asynchronous jobs submitted via the
SubmitAnyscaleJob
operator. It ensures that the Airflow task waits until the job is completed before moving forward in the DAG. - AnyscaleServiceTrigger: Works in a similar fashion to the
AnyscaleJobTrigger
but is focused on service rollout processes. It checks the status of the service being deployed or updated and returns control to Airflow upon completion.
Configuration Details for Anyscale Integration
To integrate Airflow with Anyscale, you will need to provide several configuration details:
-
Anyscale API Token: Obtain your API token either by using the anyscale cli or through the Anyscale console.
-
Compute Config (optional): If you want to constrain autoscaling, you can specify the compute cluster that will execute your Ray script by either:
- Dynamically providing this via the
compute_config
input parameter, or - Creating a compute configuration in Anyscale and using the resulting ID in the
compute_config_id
parameter.
- Dynamically providing this via the
-
Image URI: Specify the docker image you would like your operator to use. Make sure your image is accessible within your Anyscale account. Note, you can alternatively specify a containerfile that can be used to dynamically build the image
Usage
Install the Anyscale provider using the command below:
pip install astro-provider-anyscale
Airflow Connection Configuration
To integrate Airflow with Anyscale, configure an Airflow connection with a unique name and set the password as the API token gathered through the Anyscale console.
-
Access Airflow Web UI:
- Open the Airflow web interface and log in using your Airflow credentials.
-
Create a New Connection in Airflow:
- Go to the "Admin" tab and select "Connections" from the dropdown menu.
- Click the "Add a new record" button to create a new connection.
-
Configure the Connection:
- Conn Id: Enter a unique identifier for the connection, e.g.,
anyscale_conn
. - Conn Type: Select
Anyscale
- Password: Paste the API token you copied from the Anyscale console.
- Conn Id: Enter a unique identifier for the connection, e.g.,
-
Save the Connection:
- After filling in the required details, click the "Save" button at the bottom of the form to save the new connection.
Code samples
The below script is an example of how to configure and use the SubmitAnyscaleJob
operator within an Airflow DAG:
from pathlib import Path
from datetime import datetime, timedelta
from airflow import DAG
from anyscale_provider.operators.anyscale import SubmitAnyscaleJob
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
# Constants
FOLDER_PATH = Path(__file__).parent / "example_dags/ray_scripts"
dag = DAG(
"sample_anyscale_job_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)
submit_anyscale_job = SubmitAnyscaleJob(
task_id="submit_anyscale_job",
conn_id=ANYSCALE_CONN_ID,
name="AstroJob",
image_uri="anyscale/image/airflow-integration-testing:1",
compute_config="airflow-integration-testing:1",
working_dir=str(FOLDER_PATH),
entrypoint="python ray-job.py",
requirements=["requests", "pandas", "numpy", "torch"],
max_retries=1,
job_timeout_seconds=3000,
poll_interval=30,
dag=dag,
)
# Defining the task sequence
submit_anyscale_job
The below script uses the RolloutAnyscaleService
operator to deploy a service on Anyscale:
import uuid
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.trigger_rule import TriggerRule
from anyscale_provider.hooks.anyscale import AnyscaleHook
from anyscale_provider.operators.anyscale import RolloutAnyscaleService
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2024, 4, 2),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
# Define the Anyscale connection
ANYSCALE_CONN_ID = "anyscale_conn"
SERVICE_NAME = f"AstroService-CICD-{uuid.uuid4()}"
dag = DAG(
"sample_anyscale_service_workflow",
default_args=default_args,
description="A DAG to interact with Anyscale triggered manually",
schedule=None, # This DAG is not scheduled, only triggered manually
catchup=False,
)
deploy_anyscale_service = RolloutAnyscaleService(
task_id="rollout_anyscale_service",
conn_id=ANYSCALE_CONN_ID,
name=SERVICE_NAME,
image_uri="anyscale/image/airflow-integration-testing:1",
compute_config="airflow-integration-testing:1",
working_dir="https://github.com/anyscale/docs_examples/archive/refs/heads/main.zip",
applications=[{"import_path": "sentiment_analysis.app:model"}],
requirements=["transformers", "requests", "pandas", "numpy", "torch"],
in_place=False,
canary_percent=None,
service_rollout_timeout_seconds=600,
poll_interval=30,
dag=dag,
)
def terminate_service():
hook = AnyscaleHook(conn_id=ANYSCALE_CONN_ID)
result = hook.terminate_service(service_name=SERVICE_NAME, time_delay=5)
print(result)
terminate_anyscale_service = PythonOperator(
task_id="initialize_anyscale_hook",
python_callable=terminate_service,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag,
)
# Defining the task sequence
deploy_anyscale_service >> terminate_anyscale_service
Changelog
We follow Semantic Versioning for releases. Check CHANGELOG.rst for the latest changes.
Contributing Guide
All contributions, bug reports, bug fixes, documentation improvements, enhancements are welcome.
A detailed overview an how to contribute can be found in the Contributing Guide
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for astro_provider_anyscale-1.0.0a1.tar.gz
Algorithm | Hash digest | |
---|---|---|
SHA256 | 197fa4f48cbd418dbe887c9e640fd54e7f3f74f8ea79167fd1242dd32cc04c6f |
|
MD5 | ba48247851907b986c1625b8396fcb9b |
|
BLAKE2b-256 | 6c47c57e1f874e2ea16caeb0bd15a2f14fbfcda13517ab0d6844a0f059a0f186 |
Hashes for astro_provider_anyscale-1.0.0a1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 11779889012241ea27df5eb1e50a126de9791acd0eb8d2f91390698bd066ff33 |
|
MD5 | 87c26e5a4e875f6d8a8c09ff619607a8 |
|
BLAKE2b-256 | 3371801c3571af29af69aabb2332929819753fdd70e18529c702cecbb70c46ec |