Skip to main content

A library of multithreaded iterator workflows.

Project description

Build Status

Quenouille

A library of multithreaded iterator workflows for python.

Installation

You can install quenouille with pip with the following command:

pip install quenouille

Usage

imap

Function lazily consuming an iterator and applying the desired function over the yielded items in a multithreaded fashion. The function will yield results in an order consistent with the provided iterator.

Furthermore, it's possible to tweak options regarding group parallelism if you ever need to ensure that a limited number of threads may perform their tasks over the same group, e.g. a domain name when fetching urls: you can give a function extracting the group from the current task, you can tweak the maximum number of threads working on a same group and finally you can edit a group's buffer size to let the function load more values into memory in hope of finding next ones it can process without needing to wait.

If you don't care about output order and want snappier performance, the library also exports an imap_unordered method.

import csv
from quenouille import imap

# Example fetching urls from a CSV file
with open(csv_path, 'r') as f:
  reader = csv.DictReader(f)

  urls = (line['url'] for line in reader)

  # The `fetch` function remains to be implemented by the reader
  for html in imap(urls, fetch, 10):

    # Results will be yielded in lines order
    print(html)

Arguments

  • iterable iterable: Any python iterable.
  • func callable: Function used to perform desired tasks. The function takes any item yielded from the given iterable as sole argument. Note that since this function will be dispatched in a multithreaded environment, it should be thread-safe.
  • threads int: Number of threads to use.
  • group ?callable [None]: Function taking a single item yielded by the provided iterable and returning its group.
  • group_parallelism ?int [Infinity]: Maximum number of threads that can work on the same group at once. Defaults to no limit. This option requires that you give a function as the group argument.
  • group_buffer_size ?int [1]: Maximum number of values that will be loaded into memory from the iterable before waiting for other relevant threads to be available.
  • group_throttle ?float|?callable [0]: throttle time to wait (in seconds) between two tasks on the same group. Can also be a function taking the group and item and returning throttle time.
  • group_throttle_entropy ?float [0]: additional random throttle time between 0 and given value. Useful to simulate erratic behavior.
  • listener callable [None]: A function called on certain events with the name of the event and the related item.

Events

  • start: Emitted when the given function actually starts to work on a yielded item.

imap_unordered

Function lazily consuming an iterator and applying the desired function over the yielded items in a multithreaded fashion. The function will yield results in arbitrary order based on thread completion.

Furthermore, it's possible to tweak options regarding group parallelism if you ever need to ensure that a limited number of threads may perform their tasks over the same group, e.g. a domain name when fetching urls: you can give a function extracting the group from the current task, you can tweak the maximum number of threads working on a same group and finally you can edit a group's buffer size to let the function load more values into memory in hope of finding next ones it can process without needing to wait.

If output order is important to you, the library also exports an imap method.

import csv
from quenouille import imap_unordered

# Example fetching urls from a CSV file
with open(csv_path, 'r') as f:
  reader = csv.DictReader(f)

  urls = (line['url'] for line in reader)

  # The `fetch` function remains to be implemented by the reader
  for html in imap_unordered(urls, fetch, 10):

    # Results will be yielded in arbitrary order as soon as tasks complete
    print(html)

Arguments

  • iterable iterable: Any python iterable.
  • func callable: Function used to perform desired tasks. The function takes any item yielded from the given iterable as sole argument. Note that since this function will be dispatched in a multithreaded environment, it should be thread-safe.
  • threads int: Number of threads to use.
  • group ?callable [None]: Function taking a single item yielded by the provided iterable and returning its group.
  • group_parallelism ?int [Infinity]: Maximum number of threads that can work on the same group at once. Defaults to no limit. This option requires that you give a function as the group argument.
  • group_buffer_size ?int [1]: Maximum number of values that will be loaded into memory from the iterable before waiting for other relevant threads to be available.
  • group_throttle ?float|?callable [0]: throttle time to wait (in seconds) between two tasks on the same group. Can also be a function taking the group and item and returning throttle time.
  • group_throttle_entropy ?float [0]: additional random throttle time between 0 and given value. Useful to simulate erratic behavior.
  • listener callable [None]: A function called on certain events with the name of the event and the related item.

Events

  • start: Emitted when the given function actually starts to work on a yielded item.

Caveats

On having more threads than the size of the consumed iterator

This should be safe but note that it can have a slight performance cost related to the fact that the library will allocate and terminate threads that won't be used anyway. So you should probably clamp the number of threads based upon the size of your iterator if you know it beforehand (and use a condition not to call imap etc. on an empty iterator, for instance).

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

quenouille-0.6.3.tar.gz (7.6 kB view details)

Uploaded Source

Built Distribution

quenouille-0.6.3-py3-none-any.whl (9.9 kB view details)

Uploaded Python 3

File details

Details for the file quenouille-0.6.3.tar.gz.

File metadata

  • Download URL: quenouille-0.6.3.tar.gz
  • Upload date:
  • Size: 7.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.6.1 requests/2.25.1 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.10

File hashes

Hashes for quenouille-0.6.3.tar.gz
Algorithm Hash digest
SHA256 f0126a62b0e8ff8820fa01a9ff4e3fc3bc55ba0f191b5b919e4ed7d2ab0261dc
MD5 5fdce41265ca5a3ca50076b1dc4017c1
BLAKE2b-256 bc5f48f60906584355b69c07894175445f20a02e12d8ef9d20f68b3304ecda89

See more details on using hashes here.

File details

Details for the file quenouille-0.6.3-py3-none-any.whl.

File metadata

  • Download URL: quenouille-0.6.3-py3-none-any.whl
  • Upload date:
  • Size: 9.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.6.1 requests/2.25.1 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.54.1 CPython/3.6.10

File hashes

Hashes for quenouille-0.6.3-py3-none-any.whl
Algorithm Hash digest
SHA256 6235cf0e1931a51731785c98723b54c0013c23cb3f6dc27d6737cf94b1f7a61b
MD5 511e206dc6c297b7eb2bec894ad54e0a
BLAKE2b-256 228bbe973e19c8a1991aad16051c426a728eba1204eb615d150550078a470c48

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page