Skip to main content

DuckDB (duckdb.org) provider for Apache Airflow

Project description

airflow-provider-duckdb

A DuckDB provider for Airflow. This provider exposes a hook/connection that returns a DuckDB connection.

Installation

pip install airflow-provider-duckdb

Connection

The connection type is duckdb. It supports setting the following parameters:

  • file (optional): The path to the DuckDB database file. If not set, operations will be done in-memory.

Example connection strings:

  • duckdb://:memory:
  • duckdb:///tmp/duckdb.db

Usage

import pandas as pd
import pendulum

from airflow.decorators import dag, task
from duckdb_provider.hooks.duckdb_hook import DuckDBHook


@dag(
    schedule=None,
    start_date=pendulum.datetime(2022, 1, 1, tz="UTC"),
    catchup=False,
)
def duckdb_transform():
    @task
    def create_df() -> pd.DataFrame:
        """
        Create a dataframe with some sample data
        """
        df = pd.DataFrame(
            {
                "a": [1, 2, 3],
                "b": [4, 5, 6],
                "c": [7, 8, 9],
            }
        )
        return df

    @task
    def simple_select(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to select a subset of the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # execute a simple query
        res = conn.execute("SELECT a, b, c FROM df WHERE a >= 2").df()

        return res

    @task
    def add_col(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to add a column to the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # add a column
        conn.execute("CREATE TABLE tb AS SELECT *, a + b AS d FROM df")

        # get the table
        return conn.execute("SELECT * FROM tb").df()

    @task
    def aggregate(df: pd.DataFrame) -> pd.DataFrame:
        """
        Use DuckDB to aggregate the data
        """
        hook = DuckDBHook.get_hook('duckdb_default')
        conn = hook.get_conn()

        # aggregate
        return conn.execute("SELECT SUM(a), COUNT(b) FROM df").df()

    create_df_res = create_df()
    simple_select_res = simple_select(create_df_res)
    add_col_res = add_col(simple_select_res)
    aggregate_res = aggregate(add_col_res)


duckdb_transform()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

airflow-provider-duckdb-0.0.2.tar.gz (7.1 kB view details)

Uploaded Source

Built Distribution

airflow_provider_duckdb-0.0.2-py3-none-any.whl (8.0 kB view details)

Uploaded Python 3

File details

Details for the file airflow-provider-duckdb-0.0.2.tar.gz.

File metadata

File hashes

Hashes for airflow-provider-duckdb-0.0.2.tar.gz
Algorithm Hash digest
SHA256 c35d9368494ca056ceec1a97ea482bdb177c68b36996a2f5e47944f1220c131e
MD5 eb658e9c2f41242414527d0d9567bee3
BLAKE2b-256 4f24b0d58495d3e996d7a992eb91a25463b55cb7eec3aa63b4f8ad33975ce165

See more details on using hashes here.

File details

Details for the file airflow_provider_duckdb-0.0.2-py3-none-any.whl.

File metadata

File hashes

Hashes for airflow_provider_duckdb-0.0.2-py3-none-any.whl
Algorithm Hash digest
SHA256 7c7ee534dbb90b4f7819cda1eb2f6d826dd95564c970a43cba260024c769e31f
MD5 8d623e3e0610f1cf0872e58fb4429a0e
BLAKE2b-256 ecfce73884b363aa7324ccf5c1862b8856a358420e6277db81c56ef40f2e7d7c

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page