Apache Airflow Providers containing 3rd party integrations supported natively in Airflow
Project description
Astronomer Cosmos
A framework for generating Apache Airflow DAGs from other tools and frameworks.
- Current support for:
dbt
- Coming soon:
Jupyter
Hex
And more…open an issue if you have a request!
Principles
Astronomer Cosmos is a package to parse and render third-party workflows as Airflow DAGs, Airflow TaskGroups, or individual tasks.
Cosmos contains providers for third-party tools, and each provider can be deconstructed into the following components:
parsers: These are mostly hidden from the end user and are responsible for extracting the workflow from the provider and converting it into Task and Group objects. These are executed whenever the Airflow Scheduler heartbeats, allowing us to dynamically render the dependency graph of the workflow.
operators: These represent the “user interface” of Cosmos – lightweight classes the user can import and implement in their DAG to define their target behavior. They are responsible for executing the tasks in the workflow.
Cosmos operates on a few guiding principles:
Dynamic: Cosmos generates DAGs dynamically, meaning that the dependency graph of the workflow is generated at runtime. This allows users to update their workflows without having to restart Airflow.
Flexible: Cosmos is not opinionated in that it does not enforce a specific rendering method for third-party systems; users can decide whether they’d like to render their workflow as a DAG, TaskGroup, or individual task.
Extensible: Cosmos is designed to be extensible. Users can add their own parsers and operators to support their own workflows.
Modular: Cosmos is designed to be modular. Users can install only the dependencies they need for their workflows.
Quickstart
Clone this repository to set up a local environment. Then, head over to our astronomer-cosmos/examples
directory and follow its README!
Installation
Install and update using pip:
General Installation
pip install astronomer-cosmos
Note that this only installs dependencies for the core provider. Read below for more info on how to install specific providers.
Database Specific Installation (dbt)
To only install the dependencies for a specific databases, specify it in the extra argument as dbt.<database>. For example, for postgres run:
pip install 'astronomer-cosmos[dbt.postgres]'
Extras
Extra Name |
Installation Command |
Dependencies |
---|---|---|
core |
pip install astronomer-cosmos |
apache-airflow, pydantic, Jinja2 |
dbt.all |
pip install 'astronomer-cosmos[dbt.all]' |
astronomer-cosmos, dbt-core, dbt-bigquery, dbt-redshift, dbt-snowflake, dbt-postgres |
dbt.postgres |
pip install 'astronomer-cosmos[dbt.postgres]' |
astronomer-cosmos, dbt-core, dbt-postgres |
dbt.bigquery |
pip install 'astronomer-cosmos[dbt.bigquery]' |
astronomer-cosmos, dbt-core, dbt-bigquery |
dbt.redshift |
pip install 'astronomer-cosmos[dbt.redshift]' |
astronomer-cosmos, dbt-core, dbt-redshift |
dbt.snowflake |
pip install 'astronomer-cosmos[dbt.snowflake]' |
astronomer-cosmos, dbt-core, dbt-snowflake |
Example Usage
Imagine we have dbt projects located at ./dbt/{{DBT_PROJECT_NAME}}. We can render these projects as a Airflow DAGs using the DbtDag class:
from pendulum import datetime
from airflow import DAG
from cosmos.providers.dbt.dag import DbtDag
# dag for the project jaffle_shop
jaffle_shop = DbtDag(
dbt_project_name="jaffle_shop",
conn_id="airflow_db",
dbt_args={
"schema": "public",
},
dag_id="jaffle_shop",
start_date=datetime(2022, 11, 27),
)
Simiarly, we can render these projects as Airflow TaskGroups using the DbtTaskGroup class. Here’s an example with the jaffle_shop project:
from pendulum import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from cosmos.providers.dbt.task_group import DbtTaskGroup
with DAG(
dag_id="extract_dag",
start_date=datetime(2022, 11, 27),
schedule="@daily",
) as dag:
e1 = EmptyOperator(task_id="ingestion_workflow")
dbt_tg = DbtTaskGroup(
group_id="dbt_tg",
dbt_project_name="jaffle_shop",
conn_id="airflow_db",
dbt_args={
"schema": "public",
},
dag=dag,
)
e2 = EmptyOperator(task_id="some_extraction")
e1 >> dbt_tg >> e2
Changelog
We follow Semantic Versioning for releases. Check CHANGELOG.rst for the latest changes.
Contributing Guide
All contributions, bug reports, bug fixes, documentation improvements, enhancements are welcome.
A detailed overview an how to contribute can be found in the Contributing Guide.
As contributors and maintainers to this project, you are expected to abide by the Contributor Code of Conduct.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file astronomer-cosmos-0.0.7.tar.gz
.
File metadata
- Download URL: astronomer-cosmos-0.0.7.tar.gz
- Upload date:
- Size: 14.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.2 CPython/3.8.15
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 31911e66d635e2d4974c0199c71d7869f0f4d8838e6ac1af65329298e235b8fe |
|
MD5 | 755061b22c7aa64c7cff50bfbe8c15e0 |
|
BLAKE2b-256 | ccd9a1d805c8be5e2a1a2ab1852cdff21c627955b86aad126eb9d5daeb8a74d9 |