Skip to main content

A Python concurrency scheduling library, compatible with asyncio and trio

Project description

aiometer

Build Status Coverage Python versions Package version

aiometer is a Python 3.8+ concurrency scheduling library compatible with asyncio and trio and inspired by Trimeter. It makes it easier to execute lots of tasks concurrently while controlling concurrency limits (i.e. applying backpressure) and collecting results in a predictable manner.

This project is currently in early alpha. Be sure to pin any dependencies to the latest minor.

Content

Example

Let's use HTTPX to make web requests concurrently...

Try this code interactively using IPython.

>>> import asyncio
>>> import functools
>>> import random
>>> import aiometer
>>> import httpx
>>>
>>> client = httpx.AsyncClient()
>>>
>>> async def fetch(client, request):
...     response = await client.send(request)
...     # Simulate extra processing...
...     await asyncio.sleep(2 * random.random())
...     return response.json()["json"]
...
>>> requests = [
...     httpx.Request("POST", "https://httpbin.org/anything", json={"index": index})
...     for index in range(100)
... ]
...
>>> # Send requests, and process responses as they're made available:
>>> async with aiometer.amap(
...     functools.partial(fetch, client),
...     requests,
...     max_at_once=10, # Limit maximum number of concurrently running tasks.
...     max_per_second=5,  # Limit request rate to not overload the server.
... ) as results:
...     async for data in results:
...         print(data)
...
{'index': 3}
{'index': 4}
{'index': 1}
{'index': 2}
{'index': 0}
...
>>> # Alternatively, fetch and aggregate responses into an (ordered) list...
>>> jobs = [functools.partial(fetch, client, request) for request in requests]
>>> results = await aiometer.run_all(jobs, max_at_once=10, max_per_second=5)
>>> results
[{'index': 0}, {'index': 1}, {'index': 2}, {'index': 3}, {'index': 4}, ...]

Installation

pip install "aiometer==0.1.*"

Features

  • Concurrency management and throttling helpers.
  • asyncio and trio support.
  • Fully type annotated.
  • 100% test coverage.

Guide

Flow control

The key highlight of aiometer is allowing you to apply flow control strategies in order to limit the degree of concurrency of your programs.

There are two knobs you can play with to fine-tune concurrency:

  • max_at_once: this is used to limit the maximum number of concurrently running tasks at any given time. (If you have 100 tasks and set max_at_once=10, then aiometer will ensure that no more than 10 run at the same time.)
  • max_per_second: this option limits the number of tasks spawned per second. This is useful to not overload I/O resources, such as servers that may have a rate limiting policy in place.

Example usage:

>>> import asyncio
>>> import aiometer
>>> async def make_query(query):
...     await asyncio.sleep(0.05)  # Simulate a database request.
...
>>> queries = ['SELECT * from authors'] * 1000
>>> # Allow at most 5 queries to run concurrently at any given time:
>>> await aiometer.run_on_each(make_query, queries, max_at_once=5)
...
>>> # Make at most 10 queries per second:
>>> await aiometer.run_on_each(make_query, queries, max_per_second=10)
...
>>> # Run at most 10 concurrent jobs, spawning new ones at least every 5 seconds:
>>> async def job(id):
...     await asyncio.sleep(10)  # A very long task.
...
>>> await aiometer.run_on_each(job, range(100),  max_at_once=10, max_per_second=0.2)

Running tasks

aiometer provides 4 different ways to run tasks concurrently in the form of 4 different run functions. Each function accepts all the options documented in Flow control, and runs tasks in a slightly different way, allowing to address a variety of use cases. Here's a handy table for reference:

Entrypoint Use case
run_on_each() Execute async callbacks in any order.
run_all() Return results as an ordered list.
amap() Iterate over results as they become available.
run_any() Return result of first completed function.

To illustrate the behavior of each run function, let's first setup a hello world async program:

>>> import asyncio
>>> import random
>>> from functools import partial
>>> import aiometer
>>>
>>> async def get_greeting(name):
...     await asyncio.sleep(random.random())  # Simulate I/O
...     return f"Hello, {name}"
...
>>> async def greet(name):
...     greeting = await get_greeting(name)
...     print(greeting)
...
>>> names = ["Robert", "Carmen", "Lucas"]

Let's start with run_on_each(). It executes an async function once for each item in a list passed as argument:

>>> await aiometer.run_on_each(greet, names)
'Hello, Robert!'
'Hello, Lucas!'
'Hello, Carmen!'

If we'd like to get the list of greetings in the same order as names, in a fashion similar to Promise.all(), we can use run_all():

>>> await aiometer.run_all([partial(get_greeting, name) for name in names])
['Hello, Robert', 'Hello, Carmen!', 'Hello, Lucas!']

amap() allows us to process each greeting as it becomes available (which means maintaining order is not guaranteed):

>>> async with aiometer.amap(get_greeting, names) as greetings:
...     async for greeting in greetings:
...         print(greeting)
'Hello, Lucas!'
'Hello, Robert!'
'Hello, Carmen!'

Lastly, run_any() can be used to run async functions until the first one completes, similarly to Promise.any():

>>> await aiometer.run_any([partial(get_greeting, name) for name in names])
'Hello, Carmen!'

As a last fun example, let's use amap() to implement a no-threads async version of sleep sort:

>>> import asyncio
>>> from functools import partial
>>> import aiometer
>>> numbers = [0.3, 0.1, 0.6, 0.2, 0.7, 0.5, 0.5, 0.2]
>>> async def process(n):
...     await asyncio.sleep(n)
...     return n
...
>>> async with aiometer.amap(process, numbers) as results:
...     sorted_numbers = [n async for n in results]
...
>>> sorted_numbers
[0.1, 0.2, 0.2, 0.3, 0.5, 0.5, 0.6, 0.7]

License

MIT

Changelog

All notable changes to this project will be documented in this file.

The format is based on Keep a Changelog.

0.1.0 - 2020-03-21

Added

  • Add run_on_each(), run_all(), amap() and run_any(), with max_at_once and max_per_second options. (Pull #1)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiometer-0.1.0.tar.gz (12.7 kB view details)

Uploaded Source

Built Distribution

aiometer-0.1.0-py3-none-any.whl (11.1 kB view details)

Uploaded Python 3

File details

Details for the file aiometer-0.1.0.tar.gz.

File metadata

  • Download URL: aiometer-0.1.0.tar.gz
  • Upload date:
  • Size: 12.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.2

File hashes

Hashes for aiometer-0.1.0.tar.gz
Algorithm Hash digest
SHA256 253bafb546c1f1567feda824c5d2a01c2c54b6a4ef0c76c0d90c4a0bb15facc0
MD5 050249c3c8e45390ea575622272996c4
BLAKE2b-256 b598060fa80243328725e2b3a1e244a8f499bc20d204c4bffe839140a5b7cf37

See more details on using hashes here.

File details

Details for the file aiometer-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: aiometer-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 11.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.8.2

File hashes

Hashes for aiometer-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fa0f2828140b1e52082a62de8bf46f0b447f524d03aef2dd0baa59ea53a4964e
MD5 026140757c287371abcfc0d3b8ba6c4a
BLAKE2b-256 80e75d5c1282b407664884401297f3fd90348bbae48afa35b98494da1a6ac465

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page