Skip to main content

Some tools for Luigi to cut down the length of your pipelines and work in interactive environments such as Jupyter notebooks.

Project description

pipecutter

PyPI version Python versions build status coverage

pipecutter provides a few tools for luigi such that it works better with data science libraries and environments such as pandas, scikit-learn, and Jupyter notebooks.

Table of contents

Installation

pip install pipecutter

Python 3.6+ is required. pipecutter follows semantic versioning.

Usage

pipecutter currently provides

  • a more convenient way to run and debug luigi tasks in interactive environments such as Jupyter notebooks
  • some luigi targets for saving pandas dataframes to parquet, scikit-learn models with joblib, ...

Debug in an interactive environment

With luigi, you can already run tasks in a Python script/Jupyter notebook/Python console by using the luigi.build function (probably with local_scheduler=True as arugment). However, if the tasks throws an exception this will be caught by luigi and you are not able to drop into a post mortem debugging session. pipecutter.run is a light wrapper around luigi.build which disables this exception handling.

In [1]: import luigi
In [2]: import pipecutter

In [3]: class TaskWhichFails(luigi.Task):
   ...:     def run(self):
   ...:         raise Exception("Something is wrong")

# Traceback below is shortened for readability
In [4]: pipecutter.run(TaskWhichFails())
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-5-a970d52d810a> in <module>
----> 1 pipecutter.run(TaskWhichFails())

...

<ipython-input-3-4e27674090fa> in run(self)
      1 class TaskWhichFails(luigi.Task):
      2     def run(self):
----> 3         raise Exception

Exception: Something is wrong

# Drop straight into the debugger
In [5]: %debug
> <ipython-input-6-e7528a27d82e>(3)run()
      1 class TaskWhichFails(luigi.Task):
      2     def run(self):
----> 3         raise Exception
      4
ipdb>

This should reduce the barrier for already using luigi tasks while developing a model and thereby making it easier to move into production later on.

Additionally, you can print the dependencies of tasks with pipecutter.print_tree (wrapper around luigi.tools.deps_tree.print_tree).

Targets

In pipecutter.targets you find a few targets which build on luigi's LocalTarget but additionally have a load and a dump method. A convenient way to name the targets is hereby to use the task_id in the name, which is unique with respect to the task name and its passed in parameters.

import luigi
import pipecutter
from pipecutter.targets import JoblibTarget
from sklearn.ensemble import RandomForestClassifier

class TrainModel(luigi.Task):
    n_estimators = luigi.Parameter()

    def output(self):
        return JoblibTarget(self.task_id + ".joblib")

    def run(self):
        model = RandomForestClassifier(n_estimators=self.n_estimators)
        self.output().dump(model)

pipecutter.run(TrainModel(n_estimators=100))
# -> Produces a file called TrainModel_100_0b0ec0cdea.joblib

If you use task_id in the filename the above task can be written more concise with the pipecutter.targets.outputs decorator which adds the output method. By default it puts the files in a folder called data. This can be adjusted by the optional folder argument.

from pipeline.targets import outputs

@outputs(JoblibTarget)
class TrainModel(luigi.Task):
    n_estimators = luigi.Parameter()

    def run(self):
        model = RandomForestClassifier(n_estimators=self.n_estimators)
        self.output().dump(model)

Full example

import luigi
import pandas as pd
import numpy as np
import pipecutter
from luigi.util import requires
from pipecutter.targets import outputs, JoblibTarget, ParquetTarget
from sklearn.ensemble import RandomForestClassifier

@outputs(ParquetTarget)
class PrepareData(luigi.Task):
    drop_missings = luigi.BoolParameter()

    def run(self):
        train_df = pd.DataFrame.from_dict({"A": [0, 1, np.nan], "B": [5, 1, 2], "label": [0, 1, 1]})
        if self.drop_missings:
            train_df = train_df.dropna()

        self.output().dump(train_df)

@requires(PrepareData)
@outputs(JoblibTarget)
class TrainModel(luigi.Task):
    n_estimators = luigi.Parameter()

    def run(self):
        train_df = self.input().load()
        X, y = train_df.drop("label", axis=1), train_df["label"]

        model = RandomForestClassifier(n_estimators=self.n_estimators)
        model.fit(X, y)

        self.output().dump(model)

pipecutter.run(TrainModel(n_estimators=100, drop_missings=True))

Other interesting libraries

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

pipecutter-1.2.0.tar.gz (5.0 kB view details)

Uploaded Source

Built Distribution

pipecutter-1.2.0-py3-none-any.whl (6.0 kB view details)

Uploaded Python 3

File details

Details for the file pipecutter-1.2.0.tar.gz.

File metadata

  • Download URL: pipecutter-1.2.0.tar.gz
  • Upload date:
  • Size: 5.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2.post20191203 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.7.5

File hashes

Hashes for pipecutter-1.2.0.tar.gz
Algorithm Hash digest
SHA256 256a89fd1296392539b0f9919b556be410263f5008e2b71daac2e3f392cec0f0
MD5 ec14b870ef00fa919dd83bde28b73e6b
BLAKE2b-256 16ebd210848a78f81c63afa1ede9f70dd8600031f245215d3e94d36154c9b5f8

See more details on using hashes here.

Provenance

File details

Details for the file pipecutter-1.2.0-py3-none-any.whl.

File metadata

  • Download URL: pipecutter-1.2.0-py3-none-any.whl
  • Upload date:
  • Size: 6.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/42.0.2.post20191203 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.7.5

File hashes

Hashes for pipecutter-1.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 46cccd9a0c9567725acfdf591c1ae594d182c8b626f3caf8fc7d6a7a23f1598e
MD5 420b5df6f22f2a68973d645596316622
BLAKE2b-256 56fae4f48b2d755374669b7544de1c9d0a6ffd0422e6b0eca25a12451136dd50

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page