DuckDB (duckdb.org) provider for Apache Airflow
Project description
airflow-provider-duckdb
A DuckDB provider for Airflow. This provider exposes a hook/connection that returns a DuckDB connection.
This works for either local or MotherDuck connections.
Installation
pip install airflow-provider-duckdb
Connection
The connection type is duckdb
. It supports setting the following parameters:
host
(optional): Path to local file or MotherDuck database (leave blank for in-memory database)password
(optional): MotherDuck Service token (leave blank for local database)
These have been relabeled in the Airflow UI for clarity.
For example, if you want to connect to a local file:
host
:/path/to/file.db
password
: (leave blank)
If you want to connect to a MotherDuck database:
host
:<YOUR_DB_NAME>
password
:<YOUR_MOTHERDUCK_SERVICE_TOKEN>
Usage
import pandas as pd
import pendulum
from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook
@dag(
schedule=None,
start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
catchup=False,
)
def duckdb_transform():
@task
def create_df() -> pd.DataFrame:
"""
Create a dataframe with some sample data
"""
df = pd.DataFrame(
{
"a": [1, 2, 3],
"b": [4, 5, 6],
"c": [7, 8, 9],
}
)
return df
@task
def simple_select(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to select a subset of the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# execute a simple query
res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()
return res
@task
def add_col(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to add a column to the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# add a column
conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")
# get the table
return conn.execute("SELECT * FROM tb").df()
@task
def aggregate(df: pd.DataFrame) -> pd.DataFrame:
"""
Use DuckDB to aggregate the data
"""
hook = DuckDBHook.get_hook('duckdb_default')
conn = hook.get_conn()
# aggregate
return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()
create_df_res = create_df()
simple_select_res = simple_select(create_df_res)
add_col_res = add_col(simple_select_res)
aggregate_res = aggregate(add_col_res)
duckdb_transform()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file airflow-provider-duckdb-0.1.1a3.tar.gz
.
File metadata
- Download URL: airflow-provider-duckdb-0.1.1a3.tar.gz
- Upload date:
- Size: 7.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 62bae2fd379d83c080a4939271f65b27fe6af910fd6088984470884fac9f5da8 |
|
MD5 | 8c37e4ee568f28bb38e5268e34e6364e |
|
BLAKE2b-256 | 118cd4ee94ce426f48d75047a2d9f572a261b7ac1e8e371c4e39abaa7d52b251 |
File details
Details for the file airflow_provider_duckdb-0.1.1a3-py3-none-any.whl
.
File metadata
- Download URL: airflow_provider_duckdb-0.1.1a3-py3-none-any.whl
- Upload date:
- Size: 8.3 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/4.0.1 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9d72288f7ad1b1526bab3d97a2a645bd2533411974d42f22a6ee29ce97a5799b |
|
MD5 | e230c07c9dd27083a7536bda45d6c4be |
|
BLAKE2b-256 | 8fcb2d91d22aa99066ae640c001343d8479f7173cc4383721d1fad6c8b2d305f |