Idiomatic asyncio utilities
Project description
aiotools
Idiomatic asyncio utilties
NOTE: This project is under early stage of developement. The public APIs may break version by version.
Async Context Manager
This is an asynchronous version of contextlib.contextmanager to make it easier to write asynchronous context managers without creating boilerplate classes.
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
await asyncio.sleep(1)
yield a + 1
await asyncio.sleep(1)
async def somewhere():
async with mygen(1) as b:
assert b == 2
Note that you need to wrap yield with a try-finally block to ensure resource releases (e.g., locks), even in the case when an exception is ocurred inside the async-with block.
import asyncio
import aiotools
lock = asyncio.Lock()
@aiotools.actxmgr
async def mygen(a):
await lock.acquire()
try:
yield a + 1
finally:
lock.release()
async def somewhere():
try:
async with mygen(1) as b:
raise RuntimeError('oops')
except RuntimeError:
print('caught!') # you can catch exceptions here.
You can also create a group of async context managers, which are entered/exited all at once using asyncio.gather().
import asyncio
import aiotools
@aiotools.actxmgr
async def mygen(a):
yield a + 10
async def somewhere():
ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
async with ctxgrp as values:
assert len(values) == 10
for i in range(10):
assert values[i] == i + 10
Async Server
This implements a common pattern to launch asyncio-based server daemons.
import asyncio
import aiotools
async def echo(reader, writer):
data = await reader.read(100)
writer.write(data)
await writer.drain()
writer.close()
@aiotools.actxmgr
async def myworker(loop, pidx, args):
server = await asyncio.start_server(echo, '0.0.0.0', 8888,
reuse_port=True, loop=loop)
print(f'[{pidx}] started')
yield # wait until terminated
server.close()
await server.wait_closed()
print(f'[{pidx}] terminated')
if __name__ == '__main__':
# Run the above server using 4 worker processes.
aiotools.start_server(myworker, num_workers=4)
It handles SIGINT/SIGTERM signals automatically to stop the server, as well as lifecycle management of event loops running on multiple processes.
Async Timer
import aiotools
i = 0
async def mytick(interval):
print(i)
i += 1
async def somewhere():
t = aiotools.create_timer(mytick, 1.0)
...
t.cancel()
await t
t is an asyncio.Task object. To stop the timer, call t.cancel(); await t. Please don’t forget await-ing t because it requires extra steps to cancel and await all pending tasks. To make your timer function to be cancellable, add a try-except clause catching asyncio.CancelledError since we use it as a termination signal.
You may add TimerDelayPolicy argument to control the behavior when the timer-fired task takes longer than the timer interval. DEFAULT is to accumulate them and cancel all the remainings at once when the timer is cancelled. CANCEL is to cancel any pending previously fired tasks on every interval.
import asyncio
import aiotools
async def mytick(interval):
await asyncio.sleep(100) # cancelled on every next interval.
async def somewhere():
t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
...
t.cancel()
await t
Changelog
0.5.1 (2018-01-11)
server: Fix a race condition related to handling of worker initialization errors with multiple workers
0.5.0 (2017-11-08)
func: Add lru_cache() which is a coroutine version of functools.lru_cache().
0.4.5 (2017-10-14)
server: Fix a race condition related to signal handling in the multiprocessing module during termination
server: Improve error handling during initialization of workers (automatic shutdown of other workers and the main loop after logging the exception)
0.4.4 (2017-09-12)
Add a new module aiotools.func with apartial() function which is an async version of functools.partial() in the standard library
0.4.3 (2017-08-06)
Add aclosing() context manager like closing() in the standard library
Speed up Travis CI builds for packaging
Now provide README in rst as well as CHANGES (this file)
0.4.2 (2017-08-01)
server: Fix spawning subprocesses in child workers
Add support for uvloop
0.4.0 (2017-08-01)
Add use_threading argument to
Add initial documentation (which currently not served on readthedocs.io due to Python version problem)
0.3.2 (2017-07-31)
Add extra_procs argument to start_server() function
Add socket and ZeroMQ server examples
Improve CI configs
0.3.1 (2017-07-26)
Improve CI scripts
Adopt editorconfig
0.3.0 (2017-04-26)
Add start_server() function using multiprocessing with automatic children lifecycle management
Clarify the semantics of AsyncContextGroup using asyncio.gather() with return_exceptions=True
0.2.0 (2017-04-20)
Add abstract types for AsyncContextManager
Rename AsyncGenContextManager to AsyncContextManager
Add AsyncContextGroup
0.1.1 (2017-04-14)
Initial release
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.