Skip to main content

Simple data transformation pipeline.

Project description

A simple data transformation pipeline based on python’s iteration protocol that runs on python versions 2.7, 3.3 and 3.4.

+----------+      +-------------+      +-------------+      +--------+      +----------+
| Producer | ---> | Transformer | ---> | Transformer | ---> | Tester | ---> | Consumer |
+----------+      +-------------+      +-------------+      +--------+      +----------+

A pipeline model expects 4 types of filters:

  • Producer: starting point, outbound only;

  • Transformer: input, processing, output;

  • Tester: input, discard or pass-thru;

  • Consumer: ending point, inbound only.

import plumber

@plumber.filter
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"HEY JUDE, DON'T MAKE IT BAD"

Since the design is based on python’s iteration protocol, both producers and consumers are ordinary iterable objects. Transformers are implemented as callables that accept a single argument, perform the processing and return the result.

Input data may also be checked against some preconditions in order to decide if the transformation should happen or be by-passed. For example:

import plumber

def is_vowel(data):
    if data not in 'aeiou':
        raise plumber.UnmetPrecondition()

@plumber.filter
@plumber.precondition(is_vowel)
def upper(data):
    return data.upper()

ppl = plumber.Pipeline(upper)
output = ppl.run("Hey Jude, don't make it bad")

print(''.join(output))
"hEy jUdE, dOn't mAkE It bAd"

Prefetching

If you think the pipes are taking too long to move data forward, you can use a prefetching feature. To use it, just define the upper limit of items to be pre fetched.

Using the same example as above:

ppl = plumber.Pipeline(stripper, upper)
transformed_data = ppl.run([" I am the Great Cornholio!", "Hey Jude, don't make it bad "],
                           prefetch=2)

for td in transformed_data:
    print(td)

I AM THE GREAT CORNHOLIO!
"HEY JUDE, DON'T MAKE IT BAD"

By default the prefetching mechanism is thread-based, so be careful with cpu-bound pipelines.

Installation

Pypi (recommended):

$ pip install picles.plumber

Source code (development version):

$ git clone https://github.com/picleslivre/plumber.git && cd plumber && python setup.py install

Use license

This project is licensed under FreeBSD 2-clause. See LICENSE for more details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

picles.plumber-0.11.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

picles.plumber-0.11-py2.py3-none-any.whl (6.9 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file picles.plumber-0.11.tar.gz.

File metadata

File hashes

Hashes for picles.plumber-0.11.tar.gz
Algorithm Hash digest
SHA256 dfeebfc1b37fb384846a3c2650814aed0803ad4a360a8e7195f0fc9e0c5b9a89
MD5 98a736be643f1a21d63895255e9fb3db
BLAKE2b-256 0d3c932f2464d9ae4664c73f20791412a2a88d55d0f4524ce818d8635f60ef0d

See more details on using hashes here.

File details

Details for the file picles.plumber-0.11-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for picles.plumber-0.11-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 cebb5ed132f43b33be0fa14bf2ffadb2aaf14b9f26af90441840e576ba6e0ffe
MD5 90f0ba9be68f0a5a9c7be419dd466dc7
BLAKE2b-256 19cd7445ba7b97c8ac6fd934f7cda7ca5d0ad905ab4925d06629bd5f1211c0dd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page