Skip to main content

Decorators for running functions in Thread/ThreadPool/IOLoop

Project description

threaded

https://travis-ci.org/python-useful-helpers/threaded.svg?branch=master https://coveralls.io/repos/github/python-useful-helpers/threaded/badge.svg?branch=master Documentation Status https://img.shields.io/pypi/v/threaded.svg https://img.shields.io/pypi/pyversions/threaded.svg https://img.shields.io/pypi/status/threaded.svg https://img.shields.io/github/license/python-useful-helpers/threaded.svg

threaded is a set of decorators, which wrap functions in:

  • concurrent.futures.ThreadPool

  • threading.Thread

  • asyncio.Task in Python 3.

  • gevent.threadpool.ThreadPool if gevent is installed.

Why? Because copy-paste of loop.create_task, threading.Thread and thread_pool.submit is boring, especially if target functions is used by this way only.

Pros:

Python 2.7
PyPy

Decorators:

  • ThreadPooled - native concurrent.futures.ThreadPool usage on Python 3 and it’s backport on Python 2.7.

  • threadpooled is alias for ThreadPooled.

  • Threaded - wrap in threading.Thread.

  • threaded is alias for Threaded.

  • AsyncIOTask - wrap in asyncio.Task. Uses the same API, as Python 3 ThreadPooled.

  • asynciotask is alias for AsyncIOTask.

  • GThreadPooled - wrap function in gevent.threadpool.ThreadPool.

  • gthreadpooled is alias for GThreadPooled.

Usage

ThreadPooled

Mostly it is required decorator: submit function to ThreadPoolExecutor on call.

threaded.ThreadPooled.configure(max_workers=3)

Python 2.7 usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage:

@threaded.ThreadPooled
def func():
    pass

concurrent.futures.wait([func()])

Python 3.3+ usage with asyncio:

loop = asyncio.get_event_loop()
@threaded.ThreadPooled(loop_getter=loop, loop_getter_need_context=False)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Python 3.3+ usage with asyncio and loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.ThreadPooled(loop_getter=loop_getter, loop_getter_need_context=True)  # loop_getter_need_context is required
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

During application shutdown, pool can be stopped (while it will be recreated automatically, if some component will request).

threaded.ThreadPooled.shutdown()

Threaded

Classic threading.Thread. Useful for running until close and self-closing threads without return.

Usage example:

@threaded.Threaded
def func(*args, **kwargs):
    pass

thread = func()
thread.start()
thread.join()

Without arguments, thread name will use pattern: 'Threaded: ' + func.__name__

Override name can be don via corresponding argument:

@threaded.Threaded(name='Function in thread')
def func(*args, **kwargs):
    pass

Thread can be daemonized automatically:

@threaded.Threaded(daemon=True)
def func(*args, **kwargs):
    pass

Also, if no any addition manipulations expected before thread start, it can be started automatically before return:

@threaded.Threaded(started=True)
def func(*args, **kwargs):
    pass

AsyncIOTask

Wrap in asyncio.Task.

usage with asyncio:

@threaded.AsyncIOTask
def func():
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(), timeout))

Provide event loop directly:

loop = asyncio.get_event_loop()
@threaded.AsyncIOTask(loop_getter=loop)
def func():
    pass

loop.run_until_complete(asyncio.wait_for(func(), timeout))

Usage with loop extraction from call arguments:

loop_getter = lambda tgt_loop: tgt_loop
@threaded.AsyncIOTask(loop_getter=loop_getter, loop_getter_need_context=True)
def func(*args, **kwargs):
    pass

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait_for(func(loop), timeout))

GThreadPooled

Post function to gevent.threadpool.ThreadPool.

threaded.GThreadPooled.configure(max_workers=3)

Basic usage example:

@threaded.GThreadPooled
def func():
    pass

func().wait()

Testing

The main test mechanism for the package threaded is using tox. Available environments can be collected via tox -l

CI systems

For code checking several CI systems is used in parallel:

  1. Travis CI: is used for checking: PEP8, pylint, bandit, installation possibility and unit tests. Also it’s publishes coverage on coveralls.

  2. coveralls: is used for coverage display.

CD system

Travis CI: is used for package delivery on PyPI.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

threaded-1.1.0.tar.gz (40.9 kB view details)

Uploaded Source

Built Distribution

threaded-1.1.0-py2-none-any.whl (15.1 kB view details)

Uploaded Python 2

File details

Details for the file threaded-1.1.0.tar.gz.

File metadata

  • Download URL: threaded-1.1.0.tar.gz
  • Upload date:
  • Size: 40.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.7.0 requests-toolbelt/0.8.0 tqdm/4.30.0 PyPy/5.8.0

File hashes

Hashes for threaded-1.1.0.tar.gz
Algorithm Hash digest
SHA256 2681f32508eb686885858edea5636468027f4f6f6feb6911b7a6beaafa5b7c0b
MD5 60eb0f2934db442353078436f463bead
BLAKE2b-256 5e2d43d54dc84518f0b29aa85da5fe95ea512d1326c7271648f7675555b0c4ef

See more details on using hashes here.

File details

Details for the file threaded-1.1.0-py2-none-any.whl.

File metadata

  • Download URL: threaded-1.1.0-py2-none-any.whl
  • Upload date:
  • Size: 15.1 kB
  • Tags: Python 2
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/40.7.0 requests-toolbelt/0.8.0 tqdm/4.30.0 PyPy/5.8.0

File hashes

Hashes for threaded-1.1.0-py2-none-any.whl
Algorithm Hash digest
SHA256 b8f61cb1a8a286bede1f6f3a3c5551d964ac5d6826f7c8e490cb68f560611c3b
MD5 40c0c4af009eb64680a01c030f4e3ef9
BLAKE2b-256 90d8a78dd0393b9d6ff85ccd07775f1f0cc460e659db4416ea2d433215c10871

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page