Utility classes and functions for AnyIO
Project description
anyioutils
Utility classes and functions for AnyIO.
Task
task = anyioutils.create_task(my_async_func(), task_group)
behaves the same as task = asyncio.create_task(my_async_func())
except that an existing task_group
has to be passed for the task to be launched in the background.
You can also use task = anyioutils.Task(my_async_func())
and then launch the task
with task_group.start_soon(task.wait)
, and/or await it with result = await task.wait()
.
from anyioutils import CancelledError, Task, create_task
from anyio import create_task_group, run, sleep
async def foo():
return 1
async def bar():
await sleep(float("inf"))
async def main():
async with create_task_group() as tg:
task = Task(foo())
assert await task.wait() == 1
try:
async with create_task_group() as tg:
task = create_task(bar(), tg)
await sleep(0.1)
task.cancel()
except BaseExceptionGroup as exc_group:
assert len(exc_group.exceptions) == 1
assert type(exc_group.exceptions[0]) == CancelledError
run(main)
Future
anyioutils.Future
behaves the same as asyncio.Future
except that:
- you cannot directly await an
anyioutils.Future
object, but through its.wait()
method (unlike anasyncio.Future
, but like anasyncio.Event
), - cancelling an
anyioutils.Future
doesn't raise ananyio.get_cancelled_exc_class()
, but ananyioutils.CancelledError
.
from anyioutils import CancelledError, Future
from anyio import create_task_group, run
async def set_result(future):
future.set_result("done")
async def cancel(future):
future.cancel()
async def main():
async with create_task_group() as tg:
future0 = Future()
tg.start_soon(set_result, future0)
assert await future0.wait() == "done"
future1 = Future()
tg.start_soon(cancel, future1)
try:
await future1.wait()
except CancelledError:
assert future1.cancelled()
run(main)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file anyioutils-0.4.4.tar.gz
.
File metadata
- Download URL: anyioutils-0.4.4.tar.gz
- Upload date:
- Size: 5.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 90ebf9ee4b1cae856fd78671c7fc9573514927219a7ec87b1dcb94642ae672d7 |
|
MD5 | b308801e7c27b049f0f4a2c53a39d15d |
|
BLAKE2b-256 | 94e2223d5e188f9fbfa4a6e1a1a9660d2290f2b310c8669abc769105071b718a |
File details
Details for the file anyioutils-0.4.4-py3-none-any.whl
.
File metadata
- Download URL: anyioutils-0.4.4-py3-none-any.whl
- Upload date:
- Size: 5.8 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.10.12
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6e1486bce6f889adb5b39139a2df074599d4c3fe82acd3795ba019aebd37dbf6 |
|
MD5 | 03639bf5d69c7293786596575d9f2607 |
|
BLAKE2b-256 | 3d0fd098a85eb2cafdc27107a032446a28ff94e58a0c916bda529c7b48b4a942 |