Copyright Astronomer, Inc.
Project description
Table of Contents generated with DocToc
Astro :rocket:
Your new Airflow DAG writing experience. Maintained with ❤️ by Astronomer.
Philosophy
With the astro
library, we want to redefine the DAG writing experience from the bottom up. Our goal is to empower
data engineers and data scientists to write DAGs based around the momevent of data instead of the dependencies of tasks.
With this in mind, we built a library where every step is defined by how your data moves, while also simplifying the transformation
process between different environments. Our first two integrations are SQL and pandas, but we are planning many more in coming months.
With our SQL and dataframe modules, you should have the ability to treat SQL tables as if they're python objects. You can manipulate them, join them, templatize them, and ultimately turn them into dataframes if you want to run python functions against them. We hope that this library creates a cleaner Airflow ELT experience, as well as an easier onboarding for those who want to think in data transformations instead of DAGs.
Please feel free to raise issues and propose improvements, and community contributions are highly welcome!
Thank you,
:sparkles: The Astro Team :sparkles:
Basic Usage
from datetime import datetime, timedelta
from airflow.models import DAG
from pandas import DataFrame
from astro import sql as aql
from astro import dataframe as df
from astro.sql.table import Table
default_args = {
"owner": "airflow",
"retries": 1,
"retry_delay": 0,
}
dag = DAG(
dag_id="astro_example_dag",
start_date=datetime(2019, 1, 1),
max_active_runs=3,
schedule_interval=timedelta(minutes=30),
default_args=default_args,
)
@aql.transform
def aggregate_orders(orders_table: Table):
return """SELECT customer_id, count(*) AS purchase_count FROM {orders_table}
WHERE purchase_date >= DATEADD(day, -7, '{{ execution_date }}')"""
@aql.transform(conn_id="postgres_conn", database="pagila")
def get_customers(customer_table: Table = Table("customer")):
"""Basic clean-up of an existing table."""
return """SELECT customer_id, source, region, member_since
FROM {customer_table} WHERE NOT is_deleted"""
@aql.transform
def join_orders_and_customers(orders_table: Table, customer_table: Table):
"""Now join those together to create a very simple 'feature' dataset."""
return """SELECT c.customer_id, c.source, c.region, c.member_since,
CASE WHEN purchase_count IS NULL THEN 0 ELSE 1 END AS recent_purchase
FROM {orders_table} c LEFT OUTER JOIN {customer_table} p ON c.customer_id = p.customer_id"""
@df
def perform_dataframe_transformation(df: DataFrame):
"""Train model with Python. You can import any python library you like and treat this as you would a normal
dataframe
"""
recent_purchases_dataframe = df.loc[:, "recent_purchase"]
return recent_purchases_dataframe
@df
def dataframe_action_to_sql(df: DataFrame):
"""
This function gives us an example of a dataframe function that we intend to put back into SQL. The only thing
we need to keep in mind for a SQL return function is that the result has to be a dataframe. Any non-dataframe
return will result in an error as there's no way for us to know how to upload the object to SQL.
"""
return df
SOURCE_TABLE = "source_finance_table"
s3_path = (
f"s3://astronomer-galaxy-stage-dev/thanos/{SOURCE_TABLE}/"
"{{ execution_date.year }}/"
"{{ execution_date.month }}/"
"{{ execution_date.day}}/"
f"{SOURCE_TABLE}_"
"{{ ts_nodash }}.csv"
)
with dag:
"""Structure DAG dependencies.
So easy! It's like magic!
"""
raw_orders = aql.load_file(
path="s3://my/s3/path.csv",
file_conn_id="my_s3_conn",
output_table=Table(table_name="foo", conn_id="my_postgres_conn"),
)
agg_orders = aggregate_orders(raw_orders)
customers = get_customers()
features = join_orders_and_customers(customers, agg_orders)
simple_df = perform_dataframe_transformation(df=features)
# By defining the output_table int the invocation, we are telling astro where to put the result dataframe
dataframe_action_to_sql(
simple_df, output_table=Table(table_name="result", conn_id="my_postgres_conn")
)
Supported databases
The current implementation supports Postgresql and Snowflake. Other databases are on the roadmap.
To move data from one database to another, you can use the save_file
and load_file
functions to store intermediary tables on S3.
The Table class
To instantiate a table or bring in a table from a database into the astro
ecosystem, you can pass a Table
object into the class. This Table object will contain all necessary metadata to handle table creation between tasks. Once you define it in the beginning of your pipeline, astro
can automatically pass that metadata along.
from astro import sql as aql
from astro.sql.table import Table
@aql.transform
def my_first_sql_transformation(input_table: Table):
return "SELECT * FROM {input_table}"
@aql.transform
def my_second_sql_transformation(input_table_2: Table):
return "SELECT * FROM {input_table_2}"
with dag:
my_table = my_first_sql_transformation(
input_table=Table(table_name="foo", database="bar", conn_id="postgres_conn")
)
my_second_sql_transformation(my_table)
Loading Data
To create an ELT pipeline, users can first load (CSV or parquet) data (from local, S3, or GCS) into a SQL database with the load_sql
function.
To interact with S3, set an S3 Airflow connection in the AIRFLOW__SQL_DECORATOR__CONN_AWS_DEFAULT
environment variable.
from astro import sql as aql
from astro.sql.table import Table
raw_orders = aql.load_file(
path="s3://my/s3/path.csv",
file_conn_id="my_s3_conn",
output_table=Table(table_name="my_table", conn_id="postgres_conn"),
)
Transform
With your data is in an SQL system, it's time to start transforming it! The transform
function of
the SQL decorator is your "ELT" system. Each step of the transform pipeline creates a new table from the
SELECT
statement and enables tasks to pass those tables as if they were native Python objects.
You will notice that the functions use a custom templating system. Wrapping a value in single brackets
(like {customer_table}
) indicates the value needs to be rendered as a SQL table. The SQL decorator
also treats values in double brackets as Airflow jinja templates.
Please note that this is NOT an f string. F-strings in SQL formatting risk security breaches via SQL injections.
For security, users MUST explicitly identify tables in the function parameters by typing a value as a Table
. Only then will the SQL decorator treat the value as a table.
@aql.transform
def get_orders():
...
@aql.transform
def get_customers():
...
@aql.transform
def join_orders_and_customers(orders_table: Table, customer_table: Table):
"""Join `orders_table` and `customers_table` to create a simple 'feature' dataset."""
return """SELECT c.customer_id, c.source, c.region, c.member_since,
CASE WHEN purchase_count IS NULL THEN 0 ELSE 1 END AS recent_purchase
FROM {orders_table} c LEFT OUTER JOIN {customer_table} p ON c.customer_id = p.customer_id"""
with dag:
orders = get_orders()
customers = get_customers()
join_orders_and_customers(orders, customers)
Transform File
Another option for larger SQL queries is to use the transform_file
function to pass an external SQL file to the DAG.
All of the same templating will work for this SQL query.
with self.dag:
f = aql.transform_file(
sql=str(cwd) + "/my_sql_function.sql",
conn_id="postgres_conn",
database="pagila",
parameters={
"actor": Table("actor"),
"film_actor_join": Table("film_actor"),
"unsafe_parameter": "G%%",
},
output_table=Table("my_table_from_file"),
)
Raw SQL
Most ETL use-cases can be addressed by cross-sharing Task outputs, as shown above with @aql.transform
. For SQL operations that don't return tables but might take tables as arguments, there is @aql.run_raw_sql
.
@aql.run_raw_sql
def drop_table(table_to_drop):
return "DROP TABLE IF EXISTS {table_to_drop}"
Other SQL functions
While simple SQL statements such as SELECT
statements are very similar between different flavors of SQL, we have found that
certain functions can very widely between different SQL systems. This wide variation can lead to issues if a user decides to switch
from postgres to snowflake. To simplify this process we created some high level APIs that handle certain common SQL use-cases to ensure
universal interoperability of your DAGs across SQL flavors.
Appending data
Having transformed a table, you might want to append the results to a reporting table. An example of this might
be to aggregate daily data on a "main" table that analysts use for timeseries analysis. The aql.append
function merges tables assuming that there are no conflicts. You can choose to merge the data 'as-is' or cast it to a new value if needed. Note that this query will fail if there is a merge conflict.
foo = aql.append(
conn_id="postgres_conn",
database="postgres",
append_table=APPEND_TABLE,
columns=["Bedrooms", "Bathrooms"],
casted_columns={"Age": "INTEGER"},
main_table=MAIN_TABLE,
)
Merging data
To merge data into an existing table in situations where there MIGHT be conflicts, the aql.merge
function
adds data to a table with either an "update" or "ignore" strategy. The "ignore" strategy does not add values
that conflict, while the "update" strategy overwrites the older values. This function only handles basic merge statements. Use the run_raw_sql
function for complex statements.
Note that the merge_keys
parameter is a list in Postgres, but a map in Snowflake. This syntax decision was unavoidable due to the differences in how Postgres and Snowflake handle conflict resolution. Also note that *
inserts are disabled for the merge function.
Postgres:
a = aql.merge(
target_table=MAIN_TABLE,
merge_table=MERGE_TABLE,
merge_keys=["list", "sell"],
target_columns=["list", "sell", "taxes"],
merge_columns=["list", "sell", "age"],
conn_id="postgres_conn",
conflict_strategy="update",
database="pagila",
)
Snowflake:
a = aql.merge(
target_table=MAIN_TABLE,
merge_table=MERGE_TABLE,
merge_keys={"list": "list", "sell": "sell"},
target_columns=["list", "sell"],
merge_columns=["list", "sell"],
conn_id="snowflake_conn",
database="DWH_LEGACY",
conflict_strategy="ignore",
)
Truncate table
a = aql.truncate(
table=TRUNCATE_TABLE,
conn_id="snowflake_conn",
database="DWH_LEGACY",
)
Dataframe functionality
Finally, your pipeline might call for procedures that would be too complex or impossible in SQL. This could be building a model from a feature set, or using a windowing function which more Pandas is adept for. The df
functions can easily move your data into a Pandas dataframe and back to your database as needed.
At runtime, the operator loads any Table
object into a Pandas DataFrame. If the Task returns a DataFame, downstream Taskflow API Tasks can interact with it to continue using Python.
If after running the function, you wish to return the value into your database, simply include a Table
in the reserved output_table
parameters (please note that since this parameter is reserved, you can not use it in your function definition).
dataframe
from astro import dataframe as df
from astro import sql as aql
from astro.sql.table import Table
import pandas as pd
@df
def get_dataframe():
return pd.DataFrame({"numbers": [1, 2, 3], "colors": ["red", "white", "blue"]})
@aql.transform
def sample_pg(input_table: Table):
return "SELECT * FROM {input_table}"
with self.dag:
my_df = get_dataframe(
output_table=Table(
table_name="my_df_table", conn_id="postgres_conn", database="pagila"
)
)
pg_df = sample_pg(my_df)
ML Operations
We currently offer two ML based functions: train
and predict
. Currently these functions do the
exact same thing as dataframe
, but eventually we hope to add valuable ML functionality (e.g. hyperparam for train and
model serving options in predict).
For now please feel free to use these endpoints as convenience functions, knowing that there will long term be added functionality.
train
from astro.ml import train
@train
def my_df_func():
return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
predict
from astro.ml import predict
@predict
def my_df_func():
return pd.DataFrame(data={"col1": [1, 2], "col2": [3, 4]})
SQL Checks
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file astro-projects-0.1.2.tar.gz
.
File metadata
- Download URL: astro-projects-0.1.2.tar.gz
- Upload date:
- Size: 30.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.3 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 86322281bbc5cee9034b84b5ad114c3ba252dafa45f71a17c01663a1a05bed57 |
|
MD5 | 822ca731e30e1581db8cdb420f41ac2b |
|
BLAKE2b-256 | 84565976f9ac6417f786f820f5673c6c214eaceb5a462f9ea682f7757f7c6f49 |
File details
Details for the file astro_projects-0.1.2-py3-none-any.whl
.
File metadata
- Download URL: astro_projects-0.1.2-py3-none-any.whl
- Upload date:
- Size: 43.9 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.7.1 importlib_metadata/4.8.3 pkginfo/1.8.2 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8f13ea90cf2564759f45c03bb9043f6d82c3f9df55750b4d6452acaa07efb3cf |
|
MD5 | b6648ec7d09d2c0e931d41903bd2e274 |
|
BLAKE2b-256 | 1716fc7095359e64ddca9bce7bbdb169a4596ddbd763206cce0351a5da34313e |