Skip to main content

Utility classes and functions for AnyIO

Project description

anyioutils

Utility classes and functions for AnyIO.

Task

task = anyioutils.create_task(my_async_func()) behaves the same as task = asyncio.create_task(my_async_func()) except that:

  • the task still has to be launched in the background with an existing task group tg, using tg.start_soon(task.wait),
  • and/or the task can be awaited with result = task.wait().
from anyioutils import CancelledError, create_task
from anyio import create_task_group, run, sleep

async def foo():
    return 1

async def bar():
    await sleep(float("inf"))

async def main():
    async with create_task_group() as tg:
        task = create_task(foo())
        assert await task.wait() == 1

    try:
        async with create_task_group() as tg:
            task = create_task(bar())
            tg.start_soon(task.wait)
            await sleep(0.1)
            task.cancel()
    except ExceptionGroup as exc_group:
        assert len(exc_group.exceptions) == 1
        assert type(exc_group.exceptions[0]) == CancelledError

run(main)

Future

anyioutils.Future behaves the same as asyncio.Future except that:

  • you cannot directly await an anyioutils.Future object, but through its .wait() method (unlike an asyncio.Future, but like an asyncio.Event).
  • cancelling an anyioutils.Future doesn't raise an anyio.get_cancelled_exc_class(), but an anyioutils.CancelledError.
from anyioutils import CancelledError, Future
from anyio import create_task_group, run

async def set_result(future):
    future.set_result("done")

async def cancel(future):
    future.cancel()

async def main():
    async with create_task_group() as tg:
        future0 = Future()
        tg.start_soon(set_result, future0)
        assert await future0.wait() == "done"

        future1 = Future()
        tg.start_soon(cancel, future1)
        try:
            await future1.wait()
        except CancelledError:
            assert future1.cancelled()

run(main)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

anyioutils-0.2.0.tar.gz (4.9 kB view details)

Uploaded Source

Built Distribution

anyioutils-0.2.0-py3-none-any.whl (5.2 kB view details)

Uploaded Python 3

File details

Details for the file anyioutils-0.2.0.tar.gz.

File metadata

  • Download URL: anyioutils-0.2.0.tar.gz
  • Upload date:
  • Size: 4.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for anyioutils-0.2.0.tar.gz
Algorithm Hash digest
SHA256 9d0b1398a4d31e651ebd3fb9ef4cbcf7df26298ed2d3d76ee6ac33c34324f725
MD5 ea6ca25bec53519960ed5ff74e38980e
BLAKE2b-256 73357f05bf39cdfa1a517ff78241624739091b7762acf0006d1e9ae6037437d2

See more details on using hashes here.

File details

Details for the file anyioutils-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: anyioutils-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 5.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.10.12

File hashes

Hashes for anyioutils-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 9edd2af93840fae2761f9fc70141317fd7ac69eccf62a8c0e31743763460c8c7
MD5 f1be353b4750c8ea36d2a5105cac58d4
BLAKE2b-256 14446894f7fabf8a017349d44d08e7bef78a9cc2e4bb3736fc3216206d7a1f3b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page