Skip to main content

DuckDB (duckdb.org) provider for Apache Airflow

Project description

airflow-provider-duckdb

A DuckDB provider for Airflow. This provider exposes a hook/connection that returns a DuckDB connection.

This works for either local or MotherDuck connections.

Installation

pip install airflow-provider-duckdb

Connection

The connection type is duckdb. It supports setting the following parameters:

  • host (optional): Path to local file or MotherDuck database (leave blank for in-memory database)
  • password (optional): MotherDuck Service token (leave blank for local database)

These have been relabeled in the Airflow UI for clarity.

For example, if you want to connect to a local file:

  • host: /path/to/file.db
  • password: (leave blank)

If you want to connect to a MotherDuck database:

  • host: <YOUR_DB_NAME>
  • password: <YOUR_MOTHERDUCK_SERVICE_TOKEN>

Usage

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook


@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
)
def duckdb_transform():
    @task
    def create_df() -> pd.DataFrame:
        """
        Create a dataframe with some sample data
        """
        df = pd.DataFrame(
            {
                "a": [1, 2, 3],
                "b": [4, 5, 6],
                "c": [7, 8, 9],
            }
        )
        return df

    @task
    def simple_select(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to select a subset of the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # execute a simple query
        res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()

        return res

    @task
    def add_col(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to add a column to the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # add a column
        conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")

        # get the table
        return conn.execute("SELECT * FROM tb").df()

    @task
    def aggregate(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to aggregate the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # aggregate
        return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()

    create_df_res = create_df()
    simple_select_res = simple_select(create_df_res)
    add_col_res = add_col(simple_select_res)
    aggregate_res = aggregate(add_col_res)


duckdb_transform()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-duckdb-0.1.1a3.tar.gz (7.4 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file airflow-provider-duckdb-0.1.1a3.tar.gz.

File metadata

File hashes

Hashes for airflow-provider-duckdb-0.1.1a3.tar.gz
Algorithm Hash digest
SHA256 62bae2fd379d83c080a4939271f65b27fe6af910fd6088984470884fac9f5da8
MD5 8c37e4ee568f28bb38e5268e34e6364e
BLAKE2b-256 118cd4ee94ce426f48d75047a2d9f572a261b7ac1e8e371c4e39abaa7d52b251

See more details on using hashes here.

File details

Details for the file airflow_provider_duckdb-0.1.1a3-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_duckdb-0.1.1a3-py3-none-any.whl
Algorithm Hash digest
SHA256 9d72288f7ad1b1526bab3d97a2a645bd2533411974d42f22a6ee29ce97a5799b
MD5 e230c07c9dd27083a7536bda45d6c4be
BLAKE2b-256 8fcb2d91d22aa99066ae640c001343d8479f7173cc4383721d1fad6c8b2d305f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page