Skip to main content

DuckDB (duckdb.org) provider for Apache Airflow

Project description

airflow-provider-duckdb

A DuckDB provider for Airflow. This provider exposes a hook/connection that returns a DuckDB connection.

This works for either local or MotherDuck connections.

Installation

pip install airflow-provider-duckdb

Connection

The connection type is duckdb. It supports setting the following parameters:

Airflow field name Airflow UI label Description
host Path to local database file Path to local file. Leave blank (with no password) for in-memory database.
schema MotherDuck database name Name of the MotherDuck database. Leave blank for default.
password MotherDuck Service token MotherDuck Service token. Leave blank for local database.

These have been relabeled in the Airflow UI for clarity.

For example, if you want to connect to a local file:

Airflow field name Airflow UI label Value
host Path to local database file /path/to/file.db
schema MotherDuck database name (leave blank)
password MotherDuck Service token (leave blank)

If you want to connect to a MotherDuck database:

Airflow field name Airflow UI label Value
host Path to local database file (leave blank)
schema MotherDuck database name <YOUR_DB_NAME>, or leave blank for default
password MotherDuck Service token <YOUR_SERVICE_TOKEN>

Usage

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook


@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
)
def duckdb_transform():
    @task
    def create_df() -> pd.DataFrame:
        """
        Create a dataframe with some sample data
        """
        df = pd.DataFrame(
            {
                "a": [1, 2, 3],
                "b": [4, 5, 6],
                "c": [7, 8, 9],
            }
        )
        return df

    @task
    def simple_select(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to select a subset of the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # execute a simple query
        res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()

        return res

    @task
    def add_col(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to add a column to the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # add a column
        conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")

        # get the table
        return conn.execute("SELECT * FROM tb").df()

    @task
    def aggregate(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to aggregate the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # aggregate
        return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()

    create_df_res = create_df()
    simple_select_res = simple_select(create_df_res)
    add_col_res = add_col(simple_select_res)
    aggregate_res = aggregate(add_col_res)


duckdb_transform()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-duckdb-0.2.0.tar.gz (7.5 kB view details)

Uploaded Source

Built Distribution

airflow_provider_duckdb-0.2.0-py3-none-any.whl (8.5 kB view details)

Uploaded Python 3

File details

Details for the file airflow-provider-duckdb-0.2.0.tar.gz.

File metadata

File hashes

Hashes for airflow-provider-duckdb-0.2.0.tar.gz
Algorithm Hash digest
SHA256 80b81586a32eea5a573c3055c00be73061db1e53ef0f403bdba72528df2c3dc8
MD5 36faf9f5b5e557dd2a02599fb39f301a
BLAKE2b-256 ec9da71b0389a00f947bc21d29239eb764795af45e8b5d2cf8a233d904bc2fc3

See more details on using hashes here.

Provenance

File details

Details for the file airflow_provider_duckdb-0.2.0-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_duckdb-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 874590e069e143574251fc6e30c6d5cd737d6f375170f53f5ae74ee4fe7c4afd
MD5 611aea4fe52439b243cbdf18a40b5eff
BLAKE2b-256 5a1ad687f8e34299ce49512b290ac7aa69e37db3d4b5c64e94892f87cc27ac0b

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page