Skip to main content

A collection of task utils for asyncio

Project description

Utils for managing concurrent asyncio tasks.

You can use the concurrent async generator to run asyncio tasks concurrently.

It works much like asyncio.as_available, but with a couple of differences.

  • coros can be any iterables including sync/async generators

  • limit can be supplied to specify the maximum number of concurrent tasks

Setting limit to -1 will make all tasks run concurrently.

The default limit is number of cores + 4 to a maximum of 32. This (somewhat arbitrarily) reflects the default for asyncio’s ThreadPoolExecutor.

For network tasks it might make sense to set the concurrency limit lower than the default, if, for example, opening many concurrent connections will trigger rate-limiting or soak bandwidth.

If an error is raised while trying to iterate the provided coroutines, the error is wrapped in an ConcurrentIteratorError and is raised immediately.

In this case, no further handling occurs, and yield_exceptions has no effect.

Any errors raised while trying to create or run tasks are wrapped in ConcurrentError.

Any errors raised during task execution are wrapped in ConcurrentExecutionError.

If you specify yield_exceptions as True then the wrapped errors will be yielded in the results.

If yield_exceptions is False (the default), then the wrapped error will be raised immediately.

If you use any kind of Generator or AsyncGenerator to produce the awaitables, and yield_exceptions is False, in the event that an error occurs, it is your responsibility to close remaining awaitables that you might have created, but which have not already been fired.

This utility is useful for concurrency of io-bound (as opposed to cpu-bound) tasks.

Usage

Lets first create a coroutine that waits for a random amount of time, and then returns its id and how long it waited.

>>> import random

>>> async def task_to_run(task_id):
...     print(f"{task_id} starting")
...     wait = random.random() * 5
...     await asyncio.sleep(wait)
...     return task_id, wait

Next lets create an async generator that yields 10 of the coroutines.

Note that the coroutines are not awaited, they will be created as tasks.

>>> def provider():
...     for task_id in range(0, 10):
...         yield task_to_run(task_id)

Finally, lets create an function to asynchronously iterate the results, and fire it with the generator.

As we limit the concurrency to 3, the first 3 jobs start, and as the first returns, the next one fires.

This continues until all have completed.

>>> import asyncio
>>> from aio.tasks import concurrent

>>> async def run(coros):
...     async for (task_id, wait) in concurrent(coros, limit=3):
...         print(f"{task_id} waited {wait}")

>>> asyncio.run(run(provider()))
0 starting
1 starting
2 starting
... waited ...
3 starting
... waited ...
...
... waited ...

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distributions

No source distribution files available for this release.See tutorial on generating distribution archives.

Built Distribution

aio.tasks-0.0.4-py3-none-any.whl (14.6 kB view details)

Uploaded Python 3

File details

Details for the file aio.tasks-0.0.4-py3-none-any.whl.

File metadata

  • Download URL: aio.tasks-0.0.4-py3-none-any.whl
  • Upload date:
  • Size: 14.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.2 CPython/3.9.6

File hashes

Hashes for aio.tasks-0.0.4-py3-none-any.whl
Algorithm Hash digest
SHA256 9abd4b0881edb292c4f91a2f63b1dea7a9829a4bd4e8440225a1a412a90461fc
MD5 5bde778fe7bf1f00e800c445d281d9b6
BLAKE2b-256 f4af6f5948a67ea2aac67717eee3fa3f853a8d46f413794cc1deb86e2d2c9e90

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page