Skip to main content

Affordable Databricks Workflows in Apache Airflow

Project description

Deprecation Notice

With the release 0.3.0 of the astro-provider-databricks package, this provider stands deprecated and will no longer receive updates. We recommend migrating to the official apache-airflow-providers-databricks>= 6.8.0 for the latest features and support. For the operators and sensors that are deprecated in this repository, migrating to the official Apache Airflow Databricks Provider is as simple as changing the import path in your DAG code as per the below examples.

Previous import path used (Deprecated now) Suggested import path to use
from astro_databricks.operators.notebook import DatabricksNotebookOperator from airflow.providers.databricks.operators.databricks import DatabricksNotebookOperator
from astro_databricks.operators.workflow import DatabricksWorkflowTaskGroup from airflow.providers.databricks.operators.databricks_workflow import DatabricksWorkflowTaskGroup
from astro_databricks.operators.common import DatabricksTaskOperator from airflow.providers.databricks.operators.databricks import DatabricksTaskOperator
from astro_databricks.plugins.plugin import AstroDatabricksPlugin from airflow.providers.airflow.providers.databricks.plugins.databricks_workflow import DatabricksWorkflowPlugin

Archives

Databricks Workflows in Airflow

The Astro Databricks Provider is an Apache Airflow provider to write Databricks Workflows using Airflow as the authoring interface. Running your Databricks notebooks as Databricks Workflows can result in a 75% cost reduction ($0.40/DBU for all-purpose compute, $0.07/DBU for Jobs compute).

While this is maintained by Astronomer, it's available to anyone using Airflow - you don't need to be an Astronomer customer to use it.

There are a few advantages to defining your Databricks Workflows in Airflow:

via Databricks via Airflow
Authoring interface Web-based via Databricks UI Code via Airflow DAG
Workflow compute pricing
Notebook code in source control
Workflow structure in source control
Retry from beginning
Retry single task
Task groups within Workflows
Trigger workflows from other DAGs
Workflow-level parameters

Example

The following Airflow DAG illustrates how to use the DatabricksTaskGroup and DatabricksNotebookOperator to define a Databricks Workflow in Airflow:

from pendulum import datetime

from airflow.decorators import dag, task_group
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from astro_databricks import DatabricksNotebookOperator, DatabricksWorkflowTaskGroup

# define your cluster spec - can have from 1 to many clusters
job_cluster_spec = [
   {
      "job_cluster_key": "astro_databricks",
      "new_cluster": {
         "cluster_name": "",
         # ...
      },
   }
]

@dag(start_date=datetime(2023, 1, 1), schedule_interval="@daily", catchup=False)
def databricks_workflow_example():
   # the task group is a context manager that will create a Databricks Workflow
   with DatabricksWorkflowTaskGroup(
      group_id="example_databricks_workflow",
      databricks_conn_id="databricks_default",
      job_clusters=job_cluster_spec,
      # you can specify common fields here that get shared to all notebooks
      notebook_packages=[
         { "pypi": { "package": "pandas" } },
      ],
      # notebook_params supports templating
      notebook_params={
         "start_time": "{{ ds }}",
      }
   ) as workflow:
      notebook_1 = DatabricksNotebookOperator(
         task_id="notebook_1",
         databricks_conn_id="databricks_default",
         notebook_path="/Shared/notebook_1",
         source="WORKSPACE",
         # job_cluster_key corresponds to the job_cluster_key in the job_cluster_spec
         job_cluster_key="astro_databricks",
         # you can add to packages & params at the task level
         notebook_packages=[
            { "pypi": { "package": "scikit-learn" } },
         ],
         notebook_params={
            "end_time": "{{ macros.ds_add(ds, 1) }}",
         }
      )

      # you can embed task groups for easier dependency management
      @task_group(group_id="inner_task_group")
      def inner_task_group():
         notebook_2 = DatabricksNotebookOperator(
            task_id="notebook_2",
            databricks_conn_id="databricks_default",
            notebook_path="/Shared/notebook_2",
            source="WORKSPACE",
            job_cluster_key="astro_databricks",
         )

         notebook_3 = DatabricksNotebookOperator(
            task_id="notebook_3",
            databricks_conn_id="databricks_default",
            notebook_path="/Shared/notebook_3",
            source="WORKSPACE",
            job_cluster_key="astro_databricks",
         )

      notebook_4 = DatabricksNotebookOperator(
         task_id="notebook_4",
         databricks_conn_id="databricks_default",
         notebook_path="/Shared/notebook_4",
         source="WORKSPACE",
         job_cluster_key="astro_databricks",
      )

      notebook_1 >> inner_task_group() >> notebook_4

   trigger_workflow_2 = TriggerDagRunOperator(
      task_id="trigger_workflow_2",
      trigger_dag_id="workflow_2",
      execution_date="{{ next_execution_date }}",
   )

   workflow >> trigger_workflow_2

databricks_workflow_example_dag = databricks_workflow_example()

Airflow UI

Airflow UI

Databricks UI

Databricks UI

Quickstart

Check out the following quickstart guides:

Documentation

The documentation is a work in progress--we aim to follow the Diátaxis system:

Changelog

Astro Databricks follows semantic versioning for releases. Read changelog to understand more about the changes introduced to each version.

Contribution guidelines

All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.

Read the Contribution Guidelines for a detailed overview on how to contribute.

Contributors and maintainers should abide by the Contributor Code of Conduct.

License

Apache Licence 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

astro_provider_databricks-0.3.0.tar.gz (2.2 MB view details)

Uploaded Source

Built Distribution

File details

Details for the file astro_provider_databricks-0.3.0.tar.gz.

File metadata

File hashes

Hashes for astro_provider_databricks-0.3.0.tar.gz
Algorithm Hash digest
SHA256 9f31528a4b74a2f9668245a3cbd7db204662ff47a3d69a7875604bceaa55dabc
MD5 50d6186c2b1963a7cd4ef11a821292f9
BLAKE2b-256 df100ede52a8e62931c454e5759b552c3815a48637446f3e4fb893465cee102c

See more details on using hashes here.

File details

Details for the file astro_provider_databricks-0.3.0-py3-none-any.whl.

File metadata

File hashes

Hashes for astro_provider_databricks-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 37e0912fef1be03aee9d4841c1d1061c223aeae739035ca9ca141a48d6457c25
MD5 774647d5d5c45deab9a172db256729d1
BLAKE2b-256 22a06344aadea13590930035bae670b1e67c21bf900f86a020e3a2733839b177

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page