Skip to main content

aiomisc - miscellaneous utils for asyncio

Project description

Coveralls Drone CI Latest Version https://img.shields.io/pypi/wheel/aiomisc.svg https://img.shields.io/pypi/pyversions/aiomisc.svg https://img.shields.io/pypi/l/aiomisc.svg

Miscellaneous utils for asyncio.

Installation

Installing from pypi:

pip3 install aiomisc

Installing from github.com:

pip3 install git+https://github.com/mosquito/aiomisc.git

Quick Start

Async entrypoint with logging and useful arguments:

import argparse
import asyncio
import os
import logging

from aiomisc.entrypoint import entrypoint, LogFormat


parser = argparse.ArgumentParser()

parser.add_argument(
    "-L", "--log-level", help="Log level",
    default=os.getenv('LOG_LEVEL', 'info'),
    choices=(
        'critical', 'fatal', 'error', 'warning',
        'warn', 'info', 'debug', 'notset'
    ),
)

parser.add_argument(
    "--log-format", help="Log format",
    default=os.getenv('LOG_FORMAT', 'color'),
    choices=LogFormat.choices(),
    metavar='LOG_FORMAT',
)

parser.add_argument(
    "-D", "--debug", action='store_true',
    help="Run loop and application in debug mode"
)


parser.add_argument(
    "--pool-size", help="Thread pool size",
    default=os.getenv('THREAD_POOL'), type=int,
)


log = logging.getLogger(__name__)


async def main():
    log.info('Starting')
    await asyncio.sleep(3)
    log.info('Exiting')


if __name__ == '__main__':
    arg = parser.parse_args()

    with entrypoint(log_level=arg.log_level,
                    log_format=arg.log_format) as loop:
        loop.run_until_complete(main())

Install event loop on program start:

import asyncio
from aiomisc.utils import new_event_loop


# Installing uvloop event loop
# and set `aiomisc.thread_pool.ThreadPoolExecutor`
# as default executor
new_event_loop()


async def main():
    await asyncio.sleep(3)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Close current event loop and install a new one:

import asyncio
from aiomisc.utils import new_event_loop


async def main():
    await asyncio.sleep(3)


if __name__ == '__main__':
    loop = new_event_loop()
    loop.run_until_complete(main())

Overview:

entrypoint

In generic case the entrypoint helper creates event loop and cancels already running coroutines on exit.

import asyncio
from aiomisc.entrypoint import entrypoint

async def main():
    await asyncio.sleep(1)

with entrypoint() as loop:
    loop.run_until_complete(main())

Services

Services is an abstractions helping organize many tasks on one process. Each service must implement start() method and stop() method (optional).

Service should be passed to the entrypoint, and will be started after an event loop creates.

This package contains some useful base classes for simple service writing.

TCPServer

TCPServer - it’s a base class for writing TCP servers. Just implement handle_client(reader, writer) for use it.

class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


with entrypoint(EchoServer(address='::1', port=8901)) as loop:
    loop.run_forever()

UDPServer

UDPServer - it’s a base class for writing UDP servers. Just implement handle_datagram(data, addr) for use it.

class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


with entrypoint(UDPPrinter(address='::1', port=3000)) as loop:
    loop.run_forever()

TLSServer

it’s a base class for writing TCP servers with TLS. Just implement handle_client(reader, writer) for use it.

class SecureEchoServer(TLSServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())

service = SecureEchoServer(
    address='::1',
    port=8900,
    ca='ca.pem',
    cert='cert.pem',
    key='key.pem',
    verify=False,
)

with entrypoint(service) as loop:
    loop.run_forever()

Multiple services

You can pass service instances to the entrypoint for running them, after exiting context manager service instances will be gracefully shut down.

import asyncio
from aiomisc.entrypoint import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    async def start(self):
        while True:
            print('Hello from service', self.name)
            await asyncio.sleep(1)


class EchoServer(TCPServer):
    async def handle_client(self, reader: asyncio.StreamReader,
                            writer: asyncio.StreamWriter):
        while True:
            writer.write(await reader.readline())


class UDPPrinter(UDPServer):
    async def handle_datagram(self, data: bytes, addr):
        print(addr, '->', data)


services = (
    LoggingService(name='#1'),
    EchoServer(address='::1', port=8901),
    UDPPrinter(address='::1', port=3000),
)


with entrypoint(*services) as loop:
    loop.run_forever()

Configuration

Service metaclass accepts all kwargs and will set it to the self as an attributes.

import asyncio
from aiomisc.entrypoint import entrypoint
from aiomisc.service import Service, TCPServer, UDPServer


class LoggingService(Service):
    # required kwargs
    __required__ = frozenset({'name'})

    # default value
    delay: int = 1

    async def start(self):
        while True:
            # attribute ``name`` from kwargs
            # must be defined when instance initializes
            print('Hello from service', self.name)

            # attribute ``delay`` from kwargs
            await asyncio.sleep(self.delay)

services = (
    LoggingService(name='#1'),
    LoggingService(name='#2', delay=3),
)


with entrypoint(*services) as loop:
    loop.run_forever()

Context

Services might be required data for each other. In this case you should use Context.

Context-object will be creating in entrypoint and links to the running event loop.

Cross dependent services might wait data or might set data for each other via the context.

import asyncio
from random import random, randint

from aiomisc.entrypoint import entrypoint, get_context
from aiomisc.service import Service


class LoggingService(Service):
    async def start(self):
        context = get_context()

        wait_time = await context['wait_time']

        print('Wait time is', wait_time)
        while True:
            print('Hello from service', self.name)
            await asyncio.sleep(wait_time)


class RemoteConfiguration(Service):
    async def start(self):
        context = get_context()

        # querying from remote server
        await asyncio.sleep(random())

        context['wait_time'] = randint(1, 5)


services = (
    LoggingService(name='#1'),
    LoggingService(name='#2'),
    LoggingService(name='#3'),
    RemoteConfiguration()
)

with entrypoint(*services) as loop:
    loop.run_forever()

timeout decorator

Decorator that guarantee maximum execution time for decorated function.

from aiomisc.timeout import timeout

@timeout(1)
async def bad_func():
    await asyncio.sleep(2)

Async backoff

Decorator that ensures that the decorated function will be successfully completed in waterline time at best case, or will be retried until deadline time expires.

from aiomisc.backoff import asyncbackoff

waterline = 0.1
deadline = 1
pause = 0.1

@asyncbackoff(waterline, deadline, pause)
async def db_fetch():
    ...


@asyncbackoff(0.1, 1, 0.1)
async def db_save(data: dict):
    ...

asynchronous file operations

Asynchronous files operations. Based on thread-pool under the hood.

from aiomisc.io import async_open


async def db_fetch():
    async with async_open('/tmp/test.txt', 'w+') as afp:
        await afp.write("Hello")
        await afp.write(" ")
        await afp.write("world")

        await afp.seek(0)
        print(await afp.read())

Service for aiohttp

Installed aiohttp required.

import aiohttp.web
from aiomisc.entrypoint import entrypoint
from aiomisc.service.aiohttp import AIOHTTPService


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return aiohttp.web.Response(text=text)


class REST(AIOHTTPService):
    async def create_application(self):
        app = aiohttp.web.Application()

        app.add_routes([
            aiohttp.web.get('/', handle),
            aiohttp.web.get('/{name}', handle)
        ])

        return app


service = REST(address='127.0.0.1', port=8080)


with entrypoint(service) as loop:
    loop.run_forever()

Threaded decorator

Wraps blocking function and run it in the thread pool.

import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.thread_pool import threaded


@threaded
def blocking_function():
    time.sleep(1)


async def main():
    # Running in parallel
    await asyncio.gather(
        blocking_function(),
        blocking_function(),
    )


if __name__ == '__main__':
    loop = new_event_loop()
    loop.run_until_complete(main())

Fast ThreadPoolExecutor

This is a simple thread pool implementation.

Installation as a default thread pool:

import asyncio
from aiomisc.thread_pool import ThreadPoolExecutor

loop = asyncio.get_event_loop()
thread_pool = ThreadPoolExecutor(4, loop=loop)
loop.set_default_executor(thread_pool)

Bind socket

from aiomisc.utils import bind_socket

# IPv4 socket
sock = bind_socket(address="127.0.0.1", port=1234)

# IPv6 socket (on Linux IPv4 socket will be bind too)
sock = bind_socket(address="::1", port=1234)

Periodic callback

Runs coroutine function periodically

import asyncio
import time
from aiomisc.utils import new_event_loop
from aiomisc.periodic import PeriodicCallback


async def periodic_function():
    print("Hello")


if __name__ == '__main__':
    loop = new_event_loop()

    periodic = PeriodicCallback(periodic_function)

    # Call it each second
    periodic.start(1)

    loop.run_forever()

Logging configuration

Setting up colorized logs:

import logging
from aiomisc.log import basic_config


# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='color')

Setting up json logs:

import logging
from aiomisc.log import basic_config


# Configure logging
basic_config(level=logging.INFO, buffered=False, log_format='json')

Buffered log handler

Parameter buffered=True enables memory buffer that flushes logs into a thread.

import logging
from aiomisc.log import basic_config
from aiomisc.periodic import PeriodicCallback
from aiomisc.utils import new_event_loop


# Configure logging globally
basic_config(level=logging.INFO, buffered=False, log_format='json')

async def write_log(loop):
    logging.info("Hello %f", loop.time())

if __name__ == '__main__':
    loop = new_event_loop()

    # Configure
    basic_config(
        level=logging.INFO,
        buffered=True,
        log_format='color',
        flush_interval=2
    )

    periodic = PeriodicCallback(write_log, loop)
    periodic.start(0.3)

    loop.run_forever()

Useful services

Memory Tracer

Simple and useful service for logging largest python objects allocated in memory.

import asyncio
import os
from aiomisc.entrypoint import entrypoint
from aiomisc.service import MemoryTracer


async def main():
    leaking = []

    while True:
        leaking.append(os.urandom(128))
        await asyncio.sleep(0)


with entrypoint(MemoryTracer(interval=1, top_results=5)) as loop:
    loop.run_until_complete(main())

This example will log something like this each second.

[T:[1] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
      12 |       12 |   1.9KiB |   1.9KiB | aiomisc/periodic.py:40
      12 |       12 |   1.8KiB |   1.8KiB | aiomisc/entrypoint.py:93
       6 |        6 |   1.1KiB |   1.1KiB | aiomisc/thread_pool.py:71
       2 |        2 |   976.0B |   976.0B | aiomisc/thread_pool.py:44
       5 |        5 |   712.0B |   712.0B | aiomisc/thread_pool.py:52

[T:[6] Thread Pool] INFO:aiomisc.service.tracer: Top memory usage:
 Objects | Obj.Diff |   Memory | Mem.Diff | Traceback
   43999 |    43999 |   7.1MiB |   7.1MiB | scratches/scratch_8.py:11
      47 |       47 |   4.7KiB |   4.7KiB | env/bin/../lib/python3.7/abc.py:143
      33 |       33 |   2.8KiB |   2.8KiB | 3.7/lib/python3.7/tracemalloc.py:113
      44 |       44 |   2.4KiB |   2.4KiB | 3.7/lib/python3.7/tracemalloc.py:185
      14 |       14 |   2.4KiB |   2.4KiB | aiomisc/periodic.py:40

Versioning

This software follows Semantic Versioning

How to develop?

Should be installed:

  • virtualenv

  • GNU Make as make

  • Python 3.5+ as python3

For setting up developer environment just type:

make develop

Project details


Release history Release notifications | RSS feed

This version

1.6.0

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiomisc-1.6.0.tar.gz (17.5 kB view details)

Uploaded Source

Built Distribution

aiomisc-1.6.0-py3-none-any.whl (27.5 kB view details)

Uploaded Python 3

File details

Details for the file aiomisc-1.6.0.tar.gz.

File metadata

  • Download URL: aiomisc-1.6.0.tar.gz
  • Upload date:
  • Size: 17.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.29.0 CPython/3.6.7

File hashes

Hashes for aiomisc-1.6.0.tar.gz
Algorithm Hash digest
SHA256 c2df1074392ac281c88faa49b446ff74d77c9657863b88dde256b9acb4579ba8
MD5 a3908ba2de5c5a3c8a4776ee2e04a60c
BLAKE2b-256 3cc8b7573ae73ac77ee140a217183bc7767cf39c868f6f72ee3470026e814654

See more details on using hashes here.

File details

Details for the file aiomisc-1.6.0-py3-none-any.whl.

File metadata

  • Download URL: aiomisc-1.6.0-py3-none-any.whl
  • Upload date:
  • Size: 27.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.5.0.1 requests/2.21.0 setuptools/39.0.1 requests-toolbelt/0.8.0 tqdm/4.29.0 CPython/3.6.7

File hashes

Hashes for aiomisc-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 3eeca34960dccf0acd593277a654f3ea51b4adb3c7729874cac79fb7f3fb6f0f
MD5 dedc3f950ec4c76ded7a1c07c6370faa
BLAKE2b-256 b5e68520133547322e85488a4f95067087360c0596a994470abc7134e8fbabda

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page