Skip to main content

Simple framework to create data validation workflows.

Project description

Version Build status Coverage License Documentation status

Data Validation Framework

This project provides simple tools to create data validation workflows. The workflows are based on the luigi library.

The main objective of this framework is to gather in a same place both the specifications that the data must follow and the code that actually tests the data. This avoids having multiple documents to store the specifications and a repository to store the code.

Installation

This package should be installed using pip:

pip install data-validation-framework

Usage

Building a workflow

Building a new workflow is simple, as you can see in the following example:

import luigi
import data_validation_framework as dvf


class ValidationTask1(dvf.task.ElementValidationTask):
    """Use the class docstring to describe the specifications of the ValidationTask1."""

    output_columns = {"col_name": None}

    @staticmethod
    def validation_function(row, output_path, *args, **kwargs):
        # Return the validation result for one row of the dataset
        if row["col_name"] <= 10:
            return dvf.result.ValidationResult(is_valid=True)
        else:
            return dvf.result.ValidationResult(
                is_valid=False,
                ret_code=1,
                comment="The value should always be <= 10"
            )


def external_validation_function(df, output_path, *args, **kwargs):
    # Update the dataset inplace here by setting values to the 'is_valid' column.
    # The 'ret_code' and 'comment' values are optional, they will be added to the report
    # in order to help the user to understand why the dataset did not pass the validation.

    # We can use the value from kwargs["param_value"] here.
    if int(kwargs["param_value"]) <= 10:
        df["is_valid"] = True
    else:
        df["is_valid"] = False
        df["ret_code"] = 1
        df["comment"] = "The value should always be <= 10"


class ValidationTask2(dvf.task.SetValidationTask):
    """In some cases you might want to keep the docstring to describe what a developer
    needs to know, not the end-user. In this case, you can use the ``__specifications__``
    attribute to store the specifications."""

    a_parameter = luigi.Parameter()

    __specifications__ = """Use the __specifications__ to describe the specifications of the
    ValidationTask2."""

    def inputs(self):
        return {ValidationTask1: {"col_name": "new_col_name_in_current_task"}}

    def kwargs(self):
        return {"param_value": self.a_parameter}

    validation_function = staticmethod(external_validation_function)


class ValidationWorkflow(dvf.task.ValidationWorkflow):
    """Use the global workflow specifications to give general context to the end-user."""

    def inputs(self):
        return {
            ValidationTask1: {},
            ValidationTask2: {},
        }

Where the ValidationWorkflow class only defines the sub-tasks that should be called for the validation. The sub-tasks can be either a dvf.task.ElementValidationTask or a dvf.task.SetValidationTask. In both cases, you can define relations between these sub-tasks since one could need the result of another one to run properly. This is defined in two steps:

  1. in the required task, a output_columns attribute should be defined so that the next tasks can know what data is available, as shown in the previous example for the ValidationTask1.
  2. in the task that requires another task, a inputs method should be defined, as shown in the previous example for the ValidationTask2.

The sub-classes of dvf.task.ElementValidationTask should return a dvf.result.ValidationResult object. The sub-classes of dvf.task.SetValidationTask should return a Pandas.DataFrame object with at least the following columns ["is_valid", "ret_code", "comment", "exception"] and with the same index as the input dataset.

Generate the specifications of a workflow

The specifications that the data should follow can be generated with the following luigi command:

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --result-path out --ValidationTask2-a-parameter 15 --specifications-only

Running a workflow

The workflow can be run with the following luigi command (note that the module test_validation must be available in your sys.path):

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --dataset-df dataset.csv --result-path out --ValidationTask2-a-parameter 15

This workflow will generate the following files:

  • out/report_ValidationWorkflow.pdf: the PDF validation report.
  • out/ValidationTask1/report.csv: The CSV containing the validity values of the task ValidationTask1.
  • out/ValidationTask2/report.csv: The CSV containing the validity values of the task ValidationTask2.
  • out/ValidationWorkflow/report.csv: The CSV containing the validity values of the complete workflow.

.. note::

As any `luigi <https://luigi.readthedocs.io/en/stable>`_ workflow, the values can be stored
into a `luigi.cfg` file instead of being passed to the CLI.

Advanced features

Require a regular Luigi task

In some cases, one want to execute a regular Luigi task in a validation workflow. In this case, it is possible to use the extra_requires() method to pass these extra requirements. In the validation task it is then possible to get the targets of these extra requirements using the extra_input() method.

class TestTaskA(luigi.Task):

    def run(self):
        # Do something and write the 'target.file'

    def output(self):
        return target.OutputLocalTarget("target.file")

class TestTaskB(task.SetValidationTask):

    output_columns = {"extra_target_path": None}

    def kwargs(self):
        return {"extra_task_target_path": self.extra_input().path}

    def extra_requires(self):
        return TestTaskA()

    @staticmethod
    def validation_function(df, output_path, *args, **kwargs):
        df["is_valid"] = True
        df["extra_target_path"] = kwargs["extra_task_target_path"]

Funding & Acknowledgment

The development of this software was supported by funding to the Blue Brain Project, a research center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH Board of the Swiss Federal Institutes of Technology.

For license and authors, see LICENSE.txt and AUTHORS.md respectively.

Copyright © 2022-2023 Blue Brain Project/EPFL

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data_validation_framework-0.9.0.tar.gz (269.8 kB view details)

Uploaded Source

Built Distribution

File details

Details for the file data_validation_framework-0.9.0.tar.gz.

File metadata

File hashes

Hashes for data_validation_framework-0.9.0.tar.gz
Algorithm Hash digest
SHA256 82d6afe28da90d590afb2eb8ac35961f9f0fda1603508b90132701112043f2df
MD5 bb2760e8d6927a7138eddb0dd8f4b16a
BLAKE2b-256 a4bd2269f62e1496dc7dc3d641caa2b21260ed036e92b96e2929a18aca09d9d9

See more details on using hashes here.

Provenance

File details

Details for the file data_validation_framework-0.9.0-py3-none-any.whl.

File metadata

File hashes

Hashes for data_validation_framework-0.9.0-py3-none-any.whl
Algorithm Hash digest
SHA256 d9eb8dd652b2b9bfa926815c00f94bbac6b79d1fed5afab12ba7a8781f1ca8f3
MD5 fe4306b159386fa5a1cab5364fa1ac78
BLAKE2b-256 0e010e54a87dadb82a0df8c735448116fc90da7606747c34839f291b1056385d

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page