Astro SDK allows rapid and clean development of {Extract, Load, Transform} workflows using Python and SQL, powered by Apache Airflow.
Project description
astro
workflows made easy
Astro Python SDK is a Python SDK for rapid development of extract, transform, and load workflows in Apache Airflow. It allows you to express your workflows as a set of data dependencies without having to worry about ordering and tasks. The Astro Python SDK is maintained by Astronomer.
Prerequisites
- Apache Airflow >= 2.1.0.
Install
The Astro Python SDK is available at PyPI. Use the standard Python installation tools.
To install a cloud-agnostic version of the SDK, run:
pip install astro-sdk-python
You can also install dependencies for using the SDK with popular cloud providers:
pip install astro-sdk-python[amazon,google,snowflake,postgres]
Quickstart
-
Ensure that your Airflow environment is set up correctly by running the following commands:
export AIRFLOW_HOME=`pwd` export AIRFLOW__CORE__XCOM_BACKEND=astro.custom_backend.astro_custom_backend.AstroCustomXcomBackend export AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV=true airflow db init
Note:
AIRFLOW__CORE__ENABLE_XCOM_PICKLING
no longer needs to be enabled forastro-sdk-python
. This functionality is now deprecated as our custom xcom backend handles serialization.The
AIRFLOW__ASTRO_SDK__STORE_DATA_LOCAL_DEV
should only be used for local development. The XCom backend docs give further details about how to set this up in non-local environments.Currently, custom XCom backends are limited to data types that are json serializable. Since Dataframes are not json serializable, we need to enable XCom pickling to store dataframes.
The data format used by pickle is Python-specific. This has the advantage that there are no restrictions imposed by external standards such as JSON or XDR (which can’t represent pointer sharing); however it means that non-Python programs may not be able to reconstruct pickled Python objects.
Read more: enable_xcom_pickling and pickle:
-
Create a SQLite database for the example to run with:
# The sqlite_default connection has different host for MAC vs. Linux export SQL_TABLE_NAME=`airflow connections get sqlite_default -o yaml | grep host | awk '{print $2}'` sqlite3 "$SQL_TABLE_NAME" "VACUUM;"
-
Copy the following workflow into a file named
calculate_popular_movies.py
and add it to thedags
directory of your Airflow project:Alternatively, you can download
calculate_popular_movies.py
curl -O https://raw.githubusercontent.com/astronomer/astro-sdk/main/python-sdk/example_dags/calculate_popular_movies.py
-
Run the example DAG:
airflow dags test calculate_popular_movies `date -Iseconds`
-
Check the result of your DAG by running:
sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit"
You should see the following output:
$ sqlite3 "$SQL_TABLE_NAME" "select * from top_animation;" ".exit" Toy Story 3 (2010)|8.3 Inside Out (2015)|8.2 How to Train Your Dragon (2010)|8.1 Zootopia (2016)|8.1 How to Train Your Dragon 2 (2014)|7.9
Supported technologies
Databases |
---|
Databricks Delta |
Google BigQuery |
Postgres |
Snowflake |
SQLite |
Amazon Redshift |
Microsoft SQL |
DuckDB |
File types |
---|
CSV |
JSON |
NDJSON |
Parquet |
File stores |
---|
Amazon S3 |
Filesystem |
Google GCS |
Google Drive |
SFTP |
FTP |
Azure WASB |
Azure WASBS |
Available operations
The following are some key functions available in the SDK:
load_file
: Load a given file into a SQL tabletransform
: Applies a SQL select statement to a source table and saves the result to a destination tabledrop_table
: Drops a SQL tablerun_raw_sql
: Run any SQL statement without handling its outputappend
: Insert rows from the source SQL table into the destination SQL table, if there are no conflictsmerge
: Insert rows from the source SQL table into the destination SQL table, depending on conflicts:ignore
: Do not add rows that already existupdate
: Replace existing rows with new ones
export_file
: Export SQL table rows into a destination filedataframe
: Export given SQL table into in-memory Pandas data-frame
For a full list of available operators, see the SDK reference documentation.
Documentation
The documentation is a work in progress--we aim to follow the Diátaxis system:
- Getting Started Tutorial: A hands-on introduction to the Astro Python SDK
- How-to guides: Simple step-by-step user guides to accomplish specific tasks
- Reference guide: Commands, modules, classes and methods
- Explanation: Clarification and discussion of key decisions when designing the project
Changelog
The Astro Python SDK follows semantic versioning for releases. Check the changelog for the latest changes.
Release managements
To learn more about our release philosophy and steps, see Managing Releases.
Contribution guidelines
All contributions, bug reports, bug fixes, documentation improvements, enhancements, and ideas are welcome.
Read the Contribution Guideline for a detailed overview on how to contribute.
Contributors and maintainers should abide by the Contributor Code of Conduct.
License
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for astro_sdk_python-1.6.1-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | 622fd5bc89c6281ff3ad27261ff33dc63a60f31de1cc84fe30b97e371bf568cd |
|
MD5 | d1382150647759805227389c5e8de84d |
|
BLAKE2b-256 | ee4bb24481f09a429d08b15ec1a5d394fd37515546f096fce52dcc801665fec3 |