Skip to main content

Python library for writing asynchronous programs.

Project description

Async-Tamer

You shouldn't have to split your codebase or jump through hoops just to use async def. After all async functions are just functions ... with superpowers. Async-Tamer helps you harness these superpowers by seeing async def for what it is: A means to flag and reduce periods of busy waiting. (Details at the end of this readme.)

Features

  • ✅ 100% python
  • ✅ 100% free (BSD 3-clause)
  • ✅ 100% lean (no dependencies)

Installation

pip install async-tamer

Usage

In a nutshell, you add @tamed to an asynchronous function to enable calling it directly from either sync and async contexts. You may also assign a @tamed function to an AsyncScope to get structured lifecycle management.

import asyncio
from tamer import tamed, AsyncScope

@tamed # <-- Notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

slow_echo("sync > DELAY(.5)", 0.5)

with AsyncScope() as scope:
    slow_echo("scope > DELAY(.2)", .2, _async_scope=scope)
    slow_echo("scope > DELAY(.1)", .1, _async_scope=scope)
# implicit await :)


# Output
# ------
#
# sync > DELAY(.5)
# scope > DELAY(.1)
# scope > DELAY(.2)

The @tamed decorator

A @tamed asynchronous function changes its execution policy (how it behaves) depending on the context it is called from. In synchronous contexts, it behaves like an ordinary function (blocking). In async contexts, it behaves like an ordinary coroutine (non-blocking), and when assigned to an AsyncScope it follows the scopes context manager (non-blocking).

import asyncio
from tamer import tamed, AsyncScope

@tamed  # <-- notice the decorator
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

# ============================
# Asynchronous Execution
# ============================

async def main():
    first = slow_echo("async > DELAY(1)", 1)
    second = slow_echo("async > DELAY(.1)", 0.1)
    third = slow_echo("async > DELAY(.5)", 0.5)

    # Don't forget the await!
    await second
    await asyncio.gather(first, third)

asyncio.run(main())

# Output
# ------
#
# async > DELAY(.1)
# async > DELAY(.5)
# async > DELAY(1)


# ============================
# Synchronous Execution
# ============================

slow_echo("sync > DELAY(1)", 1)
slow_echo("sync > DELAY(.1)", 0.1)
slow_echo("sync > DELAY(.5)", 0.5)

# Output
# ------
#
# sync > DELAY(1)
# sync > DELAY(.1)
# sync > DELAY(.5)

# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    slow_echo("scope > DELAY(1)", 1, _async_scope=scope)
    slow_echo("scope > DELAY(.1)", 0.1, _async_scope=scope)
    slow_echo("scope > DELAY(.5)", 0.5, _async_scope=scope)

# Output
# ------
#
# scope > DELAY(.1)
# scope > DELAY(.5)
# scope > DELAY(1)

Note: The _async_scope kwarg is injected by the @tamed decorator and is used to add a @tamed function to an AsyncScope. The reason you may want to do this is documented with examples in the AsyncScope section.

Returning Results

@tamed functions know when (and how) you expect results to be returned.

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_io():
    await asyncio.sleep(0.1)
    return 200, "Time to be awesome!"

# ============================
# Asynchronous Execution
# ============================

async def main():
    coro = slow_io()  # <-- normal coroutine

    return_code, msg = await coro  # <-- await the result
    print(f"Status {return_code}: `{msg}`")

asyncio.run(main())

# Output
# ------
#
# Status 200: `Time to be awesome!`


# ============================
# Synchronous Execution
# ============================

return_code, msg = slow_io()  # <-- immediate result
print(f"Status {return_code}: `{msg}`")

# Output
# ------
#
# Status 200: `Time to be awesome!`

# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    delayed_result = slow_io(_async_scope=scope)
# <-- implicit await on exit

return_code, msg = delayed_result.value
print(f"Status {return_code}: `{msg}`")

# Output
# ------
#
# Status 200: `Time to be awesome!`

When called with an AsyncScope a @tamed function will return an instance of DelayedResult. This object represents the result of the @tamed function and should not be confused with similar concepts like a Future, asyncio.Task, or Coroutine which represent concurrently executing code. While those are related objects, a DelayedResult is simpler. For example, unlike code, results don't execute. As such you can't cancel them nor can you chain callbacks. They (results) are simply values that a function outputs and in the case of a DelayedResult it is a value that arrives late to the party.

What you can do with a DelayedResult is await it in an async context or use it to .block() a synchronous context until it becomes available. Further, you can inspect it's .value which will either return the result or raise an AttributeError if the result is unavailable.

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def request(delay:int):
    await asyncio.sleep(delay)
    return 200, "You are awesome!"

@tamed
async def post_process(raw_result):
    ret_code, msg = await raw_result  # <-- awaitable in async context
    a, b = msg.rsplit(" ", 1)
    return ret_code, " ".join((a, "very", b))

with AsyncScope() as scope:
    raw_result = request(0.1, _async_scope=scope)
    result = post_process(raw_result, _async_scope=scope)  # <-- pass it around

    try:
        return_code, msg = result.value
    except AttributeError:  # <-- AttributeError, not attived yet
        print(f"scope > result: Not yet available.")

    result.block()  # <-- block in sync context
    return_code, msg = result.value
    print(f"scope > result: Status {return_code}: `{msg}`")

# Output
# ------
#
# scope > result: Not yet available.
# scope > result: Status 200: `You are very awesome!`

The AsyncScope

An AsyncScope manages a set of @tamed functions and controls their lifecycle. It's a structured way to add async sections to your program. Long story short, you need to be aware of 3 keywords:

  1. Scheduling: Line of code that starts a @tamed function.
  2. Switching: Line of code that switches to another execution context.
  3. Cleaning: Line of code that deals with error handling.

You schedule a @tamed function by calling it with _async_scope= set to a meaningful value and the AsyncScope helps with switching and cleaning. To that end it guarantees that all functions within the scope have finished when a scope exits. To achieve this, it switches between async contexts at the end of the scope until all its functions have finished. Note here that finished does not mean succeeded; functions may raise exceptions or get cancled. This is where the cleaning part comes in which we cover later during "Exception Management".

Additionally, you can nest scopes. @tamed functions assigned to an outer_scope execute independelty and alongside @tamed functions from an inner_scope and there can be arbitrary switching between them. However, since the inner_scope waits for all its functions to complete before switching back to the synchronous context, the scheduling of new functions below after the inner_scope will wait for inner_scope's completion.

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

with AsyncScope() as outer_scope:
    slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
    slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
    
    with AsyncScope() as inner_scope:
        slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
        slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
    # await inner_scope functions

    # Note: scheduled after inner scope has finished
    slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)

# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)

Just like @tamed and DelayedResult, this works not just in synchronous contexts (with) but also in asynchronous ones (async with).

import asyncio
from tamer import tamed, AsyncScope

@tamed
async def slow_echo(msg:str, delay:int) -> None:
    await asyncio.sleep(delay)
    print(msg)

@tamed
async def slow_bulk_echo() -> None:
    async with AsyncScope() as outer_scope:  # <-- `async with` in async contexts
        slow_echo("Outer Scope > DELAY(1.5)", 1.5, _async_scope=outer_scope)
        slow_echo("Outer Scope > DELAY(1)", 1, _async_scope=outer_scope)
        
        async with AsyncScope() as inner_scope:
            slow_echo("Outer Scope > Inner Scope > DELAY(2)", 2, _async_scope=inner_scope)
            slow_echo("Outer Scope > Inner Scope > DELAY(1)", 1, _async_scope=inner_scope)
        # await inner_scope functions

        # Note: scheduled after inner scope has finished
        slow_echo("Outer Scope > DELAY(.5)", 0.5, _async_scope=outer_scope)

slow_bulk_echo()

# Output
# ------
#
# Outer Scope > DELAY(1)
# Outer Scope > Inner Scope > DELAY(1)
# Outer Scope > DELAY(1.5)
# Outer Scope > Inner Scope > DELAY(2)
# Outer Scope > DELAY(.5)

The ability to nest AsyncScopes is especially useful when you combine it with its kwargs: exit_mode and error_mode. As the names suggest, the exit_mode controls what happens when the scope exits and the error_mode controls what happens when an assigned function produces an exception.

By default these are set to exit_mode="wait" and error_mode="cancel". The former will "wait" for unfinished functions at the end of the scope. The latter will "cancel" other unfinished functions if one of them fails. This behavior matches a asyncio.TaskGroup or trio.Nursery. It is useful when you call functions in batches, e.g., when making several web API calls or reading a batch of images from disk.

Alternatively, you can use exit_mode="cancel" which will "cancel" unfinished functions at the end of the scope. This is useful to shut down "infinity loops" or to cancel ongoing requests for data that you thought you'd need, but didn't.

import asyncio
from datetime import datetime
from tamer import tamed, AsyncScope

class RateLimiter:
    def __init__(self):
        # allow an initial burst
        self.max_tokens = 3
        self.tokens = self.max_tokens

    @tamed
    async def generate_tokens(self, delay:int):
        while True:  # <-- generate new tokens forever
            await asyncio.sleep(delay)
            self.tokens = min(self.tokens + 1, self.max_tokens)

    @tamed
    async def get_token(self):
        # Note: This would not work with threads, but is perfectly 
        # fine in asyncio
        while self.tokens == 0:
            await asyncio.sleep(0)
        self.tokens -= 1
        return True

@tamed
async def fake_request(rate_limiter):
    await rate_limiter.get_token()
    print(datetime.now().strftime("%H:%M:%S.%f"), "Requesting...")

throttle = RateLimiter()
with AsyncScope(exit_mode="cancel") as service_layer:
    throttle.generate_tokens(1, _async_scope=service_layer)

    with AsyncScope() as batch:
        for _ in range(6):
            fake_request(throttle, _async_scope=batch)
    # <-- wait for all requests to finish
# <-- cancel the rate limiter

# Output
# ------
# 00:22:28.348290 Requesting...
# 00:22:28.348436 Requesting...
# 00:22:28.348564 Requesting...
# 00:22:29.347495 Requesting...
# 00:22:30.347555 Requesting...
# 00:22:31.347597 Requesting...

Exception Management

Unfortunately, shit happens. If it does, Python raises an exception and you, the author of the program, have to decide how to respond. @tamed async functions follow suite and there is no difference between them and ordinary functions.

from tamer import tamed, AsyncScope

@tamed
async def faulty_function()
    raise RuntimeError("Oh no!")

# ============================
# Asynchronous Execution
# ============================

async def main():
    coro = faulty_function()

    try:
        await coro
    except RuntimeError:
        print("Actually, I'm good.")

asyncio.run(main())

# Output
# ------
#
# Actually, I'm good.


# ============================
# Synchronous Execution
# ============================

try:
    faulty_function()
except RuntimeError:
    print("Actually, I'm good.")

# Output
# ------
#
# Actually, I'm good.


# ============================
# AsyncScope Execution
# ============================

with AsyncScope() as scope:
    delayed_result = faulty_function(_async_scope=scope)

    try:
        delayed_result.block()
    except RuntimeError: 
        print("Actually, I'm good.")

# Output
# ------
#
# Actually, I'm good.

The one special case are functions in an AsyncScope. Here, you consume results using DelayedResult.value but handle exceptions when waiting for a result via DelayedResult.block() or via await delayed_result. This is deliberate since any code following the above statements can now assert that the result has successfully arrived.

The implicit await at the end of an AsyncScope acts as a catch-all that raises any exceptions that you don't wait for explicitly. This ensures that no exception is left behind and that your program doesn't produce unintended side-effects.

from tamer import tamed, AsyncScope

@tamed
async def faulty_function()
    raise RuntimeError("Oh no!")

with AsyncScope() as scope:
    result = faulty_function(_async_scope=scope)

# Output (excerpt)
# ----------------
#
# Traceback (most recent call last):
#    [...]
# RuntimeError: Oh no!

Author's Note

Note: This section is quite philosophical and more about why this package exists and less about how you use it. It's the "ramblings of an old man" so I won't tell anyone if you choose to skip it :)

If you read about python's asyncio library online you will find that people generally approach the library with a performance mindset. However, only some actually see its benefits materialize. This results in mixed, but often negative sentiment about the library ranging between asyncio being "fake parallelism", "too complicated to use", or "useful but very niche".

In my mind, this sentiment is typically caused by a mix of three reasons that lead users to believe that asyncio should be used like threads:

  1. Users think that asyncio uses thread-level parallelism under the hood and/or is used to implement green threads.
  2. Asyncio uses thread-like semantics (see the table at the end).
  3. A lot of online documentation and tutorials point out that async/await gives massive performance boosts over thread-based webserver implementations.

Writing asynchronous code while thinking about writing threads is not wrong ... but it's incomplete and - in my opinion - missdirected. Why? Because they are two different kinds of parallelism. Threadding takes a big block of work, slices it into smaller chunks, and works on multiple of them in parallel across multiple cores. Async/await takes a big block of work, flags periods of busy waiting and reorders instructions to minimize idle time. In other words, threadding uses thread-level parallelism (duh!) and async/await uses instruction-level parallelism. Both are forms of parallelism but they are NOT the same thing.

Threads Async/Await
Orchestration Main thread + inf-loop Event Loop
Creation tid = Thread(fn, ...) coro = fn(...)
Synchronization tid.join() await coro
Data exchange shared memory shared memory
Overhead ~50 µs (OS level thread) None (function call)
Concurrency preemtive cooperative

Implicit Parallelism

Instead of thinking about async/await as multi-threadded code, we should think about it as code that splits loading and consuming of data. It is aware that input data comes from external systems (DB, socket, filesystem, blob storage, ...), that these external systems are slow, and that programs spend most of their time waiting for data.

Asyncios (clever) trick is to realize that we can only execute instructions one by one (thank you GIL), but we can wait for any number of external systems in parallel. Traditional (sync) code asks for a single piece of data, does nothing while it is being prepared (busy waiting), consumes it, and then moves on to ask for the next piece of data. Async code requests all the data first, busy waits for everything in parallel to arrive, and then consumes the data one by one without waiting ever again. This reduces time spent idle from sum(load_times) (sync) to max(load_times) (async); a trick known as implicit parallelism or - if you are into compilers and how CPUs work instruction-level parallelism.

Moreover, if our workload allows processing data out of order, we request all data at the start, busy wait until the first piece arrives, process it, and then either move to the next piece or resume busy waiting until data arrives again. This way we spend between min(load_times) and max(load_times) busy waiting, which can be a huge speedup compared to sum(load_times) in the sync case.

Why Create Async-Tamer

Realizing how the parallelism behind async/await works and moving away from a thread-like design simplifies the code and its design. Functions that load data become async def and functions that consume the data remain as they were. Unfortunately, it is not simple to express this with asyncio today. We can't use await outside an async function and executing an async function requires spinning up an event loop and orchestrating it.

This is why I wrote async-tamer. It limits the extent to which async proliferates through the codebase, i.e., it @tamed the async keyword ^.^. We need an event loop and its implicit parallelism while we are waiting for data. We don't need an event loop when locally processing data and we get into a world of pain if we attempt to do so. Both parts of a program should be clearly separated and async def (waiting for external stuff) vs def (processing local stuff) nicely fits that bill.

The rest of the library really is syntax sugar and an attempt to remove as much boilerplate as possible. I really like the idea of structured concurrency in trio and it is exactly what we want when loading an "initial batch" of data. Thus, AsyncScopes work in a similar fashion. However, I also think that Nurseries and (asyncio) TaskGroups lack fine-grained control and adding the lifecycle kwargs felt like a natural extension of the idea.

Thanks for reading all the way to the end and happy coding!

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

async_tamer-1.0.0.tar.gz (17.9 kB view details)

Uploaded Source

Built Distribution

async_tamer-1.0.0-py3-none-any.whl (13.4 kB view details)

Uploaded Python 3

File details

Details for the file async_tamer-1.0.0.tar.gz.

File metadata

  • Download URL: async_tamer-1.0.0.tar.gz
  • Upload date:
  • Size: 17.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.11 Darwin/22.6.0

File hashes

Hashes for async_tamer-1.0.0.tar.gz
Algorithm Hash digest
SHA256 dfa1dead8d3f95126172d5013697771e579e4b52a21fa92460e06883d949dc46
MD5 7732de31150e5c8da8ab6fd76550313e
BLAKE2b-256 582da70f527c7d9e90b7a2d5af18e7b8fe7782e0d70a926ef06c359ac52816dd

See more details on using hashes here.

File details

Details for the file async_tamer-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: async_tamer-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 13.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.7.1 CPython/3.10.11 Darwin/22.6.0

File hashes

Hashes for async_tamer-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 a6e5856ffa78a59d6b3cc627ddb61f900276615c745205c2bc86d93e273e48b1
MD5 71b6ce8b3f77e516d64c8922885b863e
BLAKE2b-256 c456bd3cce51c8202e56c43b3f98aaff45fd66b1e3b339a70a748583ab18d5ef

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page