Skip to main content

A library for interrelated graphs of artifacts and the runs that produce them.

Project description

Artigraph

PyPI - Version PyPI - Python Version

A library for creating interrelated graphs of artifacts and the runs that produce them.


Table of Contents

Installation

pip install "artigraph[all]"

To install only a select set of dependencies replace all with any of:

  • aws
  • pandas
  • numpy
  • polars
  • pyarrow

About

Artigraph is narrowly focused on managing the artifacts produced by a data pipeline. It does not provide any functionality for running the pipeline itself. Instead, it is meant to be used in conjunction with a pipeline runner like Prefect.

Artigraph is built atop SQLAlchemy using its async engine. It supports most major databases including PostgreSQL, MySQL, and SQLite.

Usage

The core concepts in Artigraph are:

  • Artifacts: The data produced by a pipeline.
  • Artifact Models: A dataclass that defines the structure of an artifact.
  • Runs: A collection of artifacts that were produced together.

Under the hood all data is stored in a graph-like representation undef a single artigraph_node table that leverages single table inheritance to store different types of data. This allows Artigraph to support arbitrary nesting of artifacts and runs without needing to create additional tables.

Artifact Models

Define an artifact model like a dataclass:

from dataclasses import dataclass
from artigraph import ArtifactModel


@dataclass
class MyDataModel(ArtifactModel, version=1):
    some_value: int
    another_value: str

You can then save to, and load from, the database:

model = MyDataModel(some_value=42, another_value="hello")
artifact_id = await model.save(label="my-data-model")
assert await MyDataModel.load(artifact_id) == model

You may specify external storage or custom serializers for model fields. The code below shows how you might store a large Pandas DataFrame in S3:

import pandas as pd
from dataclasses import dataclass
from artigraph.storage import register_storage
from artigraph.storage.aws import S3Storage
from artigraph.serializer.pandas import pandas_serializer
from artigraph import ArtifactModel, artifact_field

s3_bucket = S3Storage("my-bucket").register()


@dataclass
class MyDataModel(ArtifactModel, version=1):
    some_value: int
    another_value: str
    large_value: pd.DataFrame = artifact_field(storage=s3_bucket, serializer=pandas_serializer)


model = MyDataModel(some_value=42, another_value="hello", large_value=pd.DataFrame({"a": [1, 2, 3]}))
artifact_id = await model.save(label="my-data-model")

Default storage and serializers for all fields on a model can be specified with a config:

from dataclasses import dataclass
from artigraph import ArtifactModel, ArtifactModelConfig, artifact_field


@dataclass
class MyDataModel(
    ArtifactModel,
    version=1,
    config=ArtifactModelConfig(
        default_field_storage=s3_bucket,
        default_field_serializer=pandas_serializer,
    )
):
    ...

Nesting Artifact Models

Artifact models can be nested within each other.

from dataclasses import dataclass


@dataclass
class MyDataModel(ArtifactModel, version=1):
    some_value: int
    nested_value: MyDataModel | None = None


model = MyDataModel(some_value=42, nested_value=MyDataModel(some_value=43))
artifact_id = await model.save(label="my-data-model")
assert await MyDataModel.load(artifact_id) == model

You can also do this with an ArtifactMapping or ArtifactSequence:

from dataclasses import dataclass, field


@dataclass
class MyDataModel(ArtifactModel, version=1):
    some_value: int
    nested_map: ArtifactMapping[str, MyDataModel] = field(default_factory=ArtifactMapping)
    nested_seq: ArtifactSequence[MyDataModel] = field(default_factory=ArtifactSequence)


model = MyDataModel(
    some_value=42,
    nested_map=ArtifactMapping(a=MyDataModel(some_value=43)),
    nested_seq=ArtifactSequence([MyDataModel(some_value=44)])
)
artifact_id = await model.save(label="my-data-model")
assert await MyDataModel.load(artifact_id) == model

Runs

A run allows you to group a collection of artifacts that were produced together:

from artigraph import Run, RunManager

run = Run(node_parent_id=None)
async with RunManager(run) as manager:
    await manager.save_artifact("my-data-model", MyDataModel(...))

If you're deep in a call stack and don't want to pass the run manager around, you can access the currently active manager with the run_manager() function:

from artigraph import Run, RunManager


async def my_function():
    await run_manager().save_artifact("my-data-model", MyDataModel(...))


run = Run(node_parent_id=None)
async with RunManager(run) as manager:
    my_function()

Runs can be nested and they will automatically inherit the parent run's node ID:

from artigraph import Run, RunManager

run1 = Run(node_parent_id=None)
async with RunManager(run1) as m1:
    run2 = Run(node_parent_id=m1.run.node_id)
    async with RunManager(run2) as m2:
        await m2.save_artifact("my-data-model", MyDataModel(...))
        assert m2.run.node_parent_id == m1.run.node_id

Artifacts from nested runs can be loaded:

from artigraph import Run, RunManager

run1 = Run(node_parent_id=None)
async with RunManager(run1) as m1:
    run2 = Run(node_parent_id=m1.run.node_id)
    async with RunManager(run2) as m2:
        await m2.save_artifact("my-data-model", MyDataModel(...))
        assert m2.run.node_parent_id == m1.run.node_id
        ...

run_artifacts = m1.load_descendant_artifacts()
assert run_artifacts == {
    run2.node_id: {"my-data-model": MyDataModel(...)},
    ...
}

Serializers

Artigraph has built-in support for the following data types and serialization formats:

  • Pandas DataFrames (artigraph.serializer.pandas.dataframe_serializer)
  • Numpy Arrays (only 1d and 2d) (artigraph.serializer.numpy.array_serializer)
  • Polars DataFrames (artigraph.serializer.polars.dataframe_serializer)
  • PyArrow:
    • Feather (artigraph.serializer.pyarrow.feather_serializer)
    • Parque (artigraph.serializer.pyarrow.parquet_serializer)

Storage

Artigraph has built-in support for the following storage backends:

  • Local filesystem (artigraph.storage.file.FileSystem)
  • AWS S3 (artigraph.storage.aws.S3Storage)

License

artigraph is distributed under the terms of the MIT license.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

artigraph-0.0.4.tar.gz (24.2 kB view details)

Uploaded Source

Built Distribution

artigraph-0.0.4-py3-none-any.whl (25.9 kB view details)

Uploaded Python 3

File details

Details for the file artigraph-0.0.4.tar.gz.

File metadata

  • Download URL: artigraph-0.0.4.tar.gz
  • Upload date:
  • Size: 24.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.24.1

File hashes

Hashes for artigraph-0.0.4.tar.gz
Algorithm Hash digest
SHA256 078c31e62d71798139889d7d94f58bce291ae38efbf4da9de103cf9fbaf8422a
MD5 1853c6d4a72e4063dbdd0ea633d8f349
BLAKE2b-256 d52390280c1d391b4dbfb8b2b7aa116c61f6dcce32afde8a427821b2f0988006

See more details on using hashes here.

File details

Details for the file artigraph-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: artigraph-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 25.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: python-httpx/0.24.1

File hashes

Hashes for artigraph-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 5bc101b120774caa62d723609b14079ab7c2414845897288ff2cfe08f5cb74e9
MD5 2fe9bedeecb1f764107ccc3630c7bbe2
BLAKE2b-256 70e79812db4ee975dc2552bb8d8795ca401c583235b145b48dc8d8edf3924394

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page