Skip to main content

Simple framework to create data validation workflows.

Project description

Version Build status Coverage License Documentation status

Data Validation Framework

This project provides simple tools to create data validation workflows. The workflows are based on the luigi library.

The main objective of this framework is to gather in a same place both the specifications that the data must follow and the code that actually tests the data. This avoids having multiple documents to store the specifications and a repository to store the code.

Installation

This package should be installed using pip:

pip install data-validation-framework

Usage

Building a workflow

Building a new workflow is simple, as you can see in the following example:

import luigi
import data_validation_framework as dvf


class ValidationTask1(dvf.task.ElementValidationTask):
    """Use the class docstring to describe the specifications of the ValidationTask1."""

    output_columns = {"col_name": None}

    @staticmethod
    def validation_function(row, output_path, *args, **kwargs):
        # Return the validation result for one row of the dataset
        if row["col_name"] <= 10:
            return dvf.result.ValidationResult(is_valid=True)
        else:
            return dvf.result.ValidationResult(
                is_valid=False,
                ret_code=1,
                comment="The value should always be <= 10"
            )


def external_validation_function(df, output_path, *args, **kwargs):
    # Update the dataset inplace here by setting values to the 'is_valid' column.
    # The 'ret_code' and 'comment' values are optional, they will be added to the report
    # in order to help the user to understand why the dataset did not pass the validation.

    # We can use the value from kwargs["param_value"] here.
    if int(kwargs["param_value"]) <= 10:
        df["is_valid"] = True
    else:
        df["is_valid"] = False
        df["ret_code"] = 1
        df["comment"] = "The value should always be <= 10"


class ValidationTask2(dvf.task.SetValidationTask):
    """In some cases you might want to keep the docstring to describe what a developer
    needs to know, not the end-user. In this case, you can use the ``__specifications__``
    attribute to store the specifications."""

    a_parameter = luigi.Parameter()

    __specifications__ = """Use the __specifications__ to describe the specifications of the
    ValidationTask2."""

    def inputs(self):
        return {ValidationTask1: {"col_name": "new_col_name_in_current_task"}}

    def kwargs(self):
        return {"param_value": self.a_parameter}

    validation_function = staticmethod(external_validation_function)


class ValidationWorkflow(dvf.task.ValidationWorkflow):
    """Use the global workflow specifications to give general context to the end-user."""

    def inputs(self):
        return {
            ValidationTask1: {},
            ValidationTask2: {},
        }

Where the ValidationWorkflow class only defines the sub-tasks that should be called for the validation. The sub-tasks can be either a dvf.task.ElementValidationTask or a dvf.task.SetValidationTask. In both cases, you can define relations between these sub-tasks since one could need the result of another one to run properly. This is defined in two steps:

  1. in the required task, a output_columns attribute should be defined so that the next tasks can know what data is available, as shown in the previous example for the ValidationTask1.
  2. in the task that requires another task, a inputs method should be defined, as shown in the previous example for the ValidationTask2.

The sub-classes of dvf.task.ElementValidationTask should return a dvf.result.ValidationResult object. The sub-classes of dvf.task.SetValidationTask should return a Pandas.DataFrame object with at least the following columns ["is_valid", "ret_code", "comment", "exception"] and with the same index as the input dataset.

Generate the specifications of a workflow

The specifications that the data should follow can be generated with the following luigi command:

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --result-path out --ValidationTask2-a-parameter 15 --specifications-only

Running a workflow

The workflow can be run with the following luigi command (note that the module test_validation must be available in your sys.path):

luigi --module test_validation ValidationWorkflow --log-level INFO --local-scheduler --dataset-df dataset.csv --result-path out --ValidationTask2-a-parameter 15

This workflow will generate the following files:

  • out/report_ValidationWorkflow.pdf: the PDF validation report.
  • out/ValidationTask1/report.csv: The CSV containing the validity values of the task ValidationTask1.
  • out/ValidationTask2/report.csv: The CSV containing the validity values of the task ValidationTask2.
  • out/ValidationWorkflow/report.csv: The CSV containing the validity values of the complete workflow.

.. note::

As any `luigi <https://luigi.readthedocs.io/en/stable>`_ workflow, the values can be stored
into a `luigi.cfg` file instead of being passed to the CLI.

Advanced features

Require a regular Luigi task

In some cases, one want to execute a regular Luigi task in a validation workflow. In this case, it is possible to use the extra_requires() method to pass these extra requirements. In the validation task it is then possible to get the targets of these extra requirements using the extra_input() method.

class TestTaskA(luigi.Task):

    def run(self):
        # Do something and write the 'target.file'

    def output(self):
        return target.OutputLocalTarget("target.file")

class TestTaskB(task.SetValidationTask):

    output_columns = {"extra_target_path": None}

    def kwargs(self):
        return {"extra_task_target_path": self.extra_input().path}

    def extra_requires(self):
        return TestTaskA()

    @staticmethod
    def validation_function(df, output_path, *args, **kwargs):
        df["is_valid"] = True
        df["extra_target_path"] = kwargs["extra_task_target_path"]

Funding & Acknowledgment

The development of this software was supported by funding to the Blue Brain Project, a research center of the École polytechnique fédérale de Lausanne (EPFL), from the Swiss government’s ETH Board of the Swiss Federal Institutes of Technology.

For license and authors, see LICENSE.txt and AUTHORS.md respectively.

Copyright © 2022-2023 Blue Brain Project/EPFL

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

data-validation-framework-0.7.2.tar.gz (196.5 kB view details)

Uploaded Source

Built Distribution

data_validation_framework-0.7.2-py3-none-any.whl (26.2 kB view details)

Uploaded Python 3

File details

Details for the file data-validation-framework-0.7.2.tar.gz.

File metadata

File hashes

Hashes for data-validation-framework-0.7.2.tar.gz
Algorithm Hash digest
SHA256 9f5b213114706b486c8e70d63a4127a845ceb4dd1c0cd53d7619551c2c13ba01
MD5 e59120c48a48d11993011f74f557ced4
BLAKE2b-256 2eaeb804a66fa2df6173660ed8122f15caa066a023bffa556b9b46230c68f402

See more details on using hashes here.

Provenance

File details

Details for the file data_validation_framework-0.7.2-py3-none-any.whl.

File metadata

File hashes

Hashes for data_validation_framework-0.7.2-py3-none-any.whl
Algorithm Hash digest
SHA256 e57dca7514978338c0b6f292726a1d8a1c19ab64ad7feb7456a4658b43a00f6a
MD5 3b14388bd3d93ae6b0532553f75f7c57
BLAKE2b-256 70f8f25ec7c9e12cb2a8677914d4d0db658fd2aa479ab8d272306c42b8c93fe7

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page