Skip to main content

Decorators for running functions in Thread/ThreadPool/IOLoop

Project description

threaded

https://travis-ci.org/python-useful-helpers/threaded.svg?branch=master https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master Documentation Status https://img.shields.io/pypi/v/threaded.svg https://img.shields.io/pypi/pyversions/threaded.svg https://img.shields.io/pypi/status/threaded.svg https://img.shields.io/github/license/python-useful-helpers/threaded.svg

threaded is a set of decorators, which wrap functions in:

  • concurrent.futures.ThreadPool

  • threading.Thread

  • asyncio.Task in Python 3.

  • gevent.threadpool.ThreadPool if gevent is installed.

Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.

Pros:

Python 2.7
PyPy

Decorators:

  • ThreadPooled - native concurrent.futures.ThreadPool usage on Python 3 and it’s backport on Python 2.7.

  • threadpooled is alias for ThreadPooled.

  • Threaded - wrap in threading.Thread.

  • threaded is alias for Threaded.

  • AsyncIOTask - wrap in asyncio.Task. Uses the same API, as Python 3 ThreadPooled.

  • asynciotask is alias for AsyncIOTask.

  • GThreadPooled - wrap function in gevent.threadpool.ThreadPool.

  • gthreadpooled is alias for GThreadPooled.

Usage

ThreadPooled

Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

threaded.ThreadPooled.configure(max_workers=3)

Python 2.7 usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage with asyncio:

loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Python 3.3+ usage with asyncio and loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True)  # loop_getter_need_context is required
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).

threaded.ThreadPooled.shutdown()

Threaded

Classic threading.Thread. Useful for running until close and self-closing threads without return.

Usage example:

@threaded.Threaded
def func(*args, **kwargs):
    pass

thread = func()
thread.start()
thread.join()

Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__

Override name can be don via corresponding argument:

@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
    pass

Thread can be daemonized automatically:

@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
    pass

Also, if no any addition manipulations expected before thread start, it can be started automatically before return:

@threaded.Threaded(started=True)
def func(*args, **kwargs):
    pass

AsyncIOTask

Wrap in asyncio.Task.

usage with asyncio:

@threaded.AsyncIOTask
def func():
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))

Provide event loop directly:

loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Usage with loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

GThreadPooled

Post function to gevent.threadpool.ThreadPool.

threaded.GThreadPooled.configure(max_workers=3)

Basic usage example:

@threaded.GThreadPooled
def func():
    pass

func().wait()

Testing

The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l

CI systems

For code checking several CI systems is used in parallel:

  1. Travis CI: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests. Also it’s publishes coverage on coveralls.

  2. coveralls: is used for coverage display.

CD system

Travis CI: is used for package delivery on PyPI.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded-1.0.7.tar.gz (15.1 kB view details)

Uploaded Source

Built Distribution

threaded-1.0.7-py2-none-any.whl (12.6 kB view details)

Uploaded Python 2

File details

Details for the file threaded-1.0.7.tar.gz.

File metadata

  • Download URL: threaded-1.0.7.tar.gz
  • Upload date:
  • Size: 15.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Python-urllib/2.7

File hashes

Hashes for threaded-1.0.7.tar.gz
Algorithm Hash digest
SHA256 45f1e39de5787921cb4bbc2c9d6b0eb5691a8733879e4a0e1d973e1be21caf28
MD5 8701ac45a1d5f367b5da8268c40acd60
BLAKE2b-256 64993292da3d411e4a9328ddc8bf72da363dbc70dd091e36fd46998ead81829f

See more details on using hashes here.

File details

Details for the file threaded-1.0.7-py2-none-any.whl.

File metadata

  • Download URL: threaded-1.0.7-py2-none-any.whl
  • Upload date:
  • Size: 12.6 kB
  • Tags: Python 2
  • Uploaded using Trusted Publishing? No
  • Uploaded via: Python-urllib/2.7

File hashes

Hashes for threaded-1.0.7-py2-none-any.whl
Algorithm Hash digest
SHA256 e1f03d8a89ba790330c9a677d34892c2667b97419f79bd4a96c5b90eba3459fc
MD5 1de33d434cf3f383b962e9f08a2f320a
BLAKE2b-256 4b9c8ddaae200dacfb8190c56ae399e3545507db887b05e7b6097516c5825be5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page