Skip to main content

Aio application runner

Project description

Application runner for the aio asyncio framework

Build status

https://travis-ci.org/phlax/aio.app.svg?branch=master

Installation

Install with:

pip install aio.app

Running aio

You can run aio as follows:

aio -c custom.conf run

If you run the command without specifying a configuration file the aio command will look in the following places

  • aio.conf

  • etc/aio.conf

  • /etc/aio/aio.conf

The aio run command

On startup aio run sets up the following

  • Configuration - system-wide configuration

  • Modules - known modules

  • Schedulers - functions called at set times

  • Servers - listening on tcp/udp or other type of socket

  • Signals - functions called in response to events

Configuration

Configuration is in ini syntax

The system configuration is importable from aio.app

from aio.app import config

Modules

You can list any modules that should be imported at runtime in the configuration

[aio]
modules = aio.app
        aio.signals

The system modules can be accessed from aio.app

from aio.app import modules

Schedulers

Any sections in the configuration that start with schedule: will create a scheduler.

Specify the frequency and the function to call. The function should be a co-routine.

[schedule:example]
every = 2
func = my.scheduler.example_scheduler

The scheduler function takes 1 argument the name of the scheduler

@asyncio.coroutine
def example_scheduler(name):
    yield from asyncio.sleep(2)
    # do something
    pass

Servers

Any sections in the configuration that start with server: will create a server

The server requires either a factory or a protocol to start

Protocol configuration example:

[server:example]
protocol = my.example.ServerProtocol
address = 127.0.0.1
port = 8888

Protocol example code:

class ServerProtocol(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        # do stuff
        self.transport.close()

If you need further control over how the protocol is created and attached you can specify a factory method

Factory configuration example:

[server:example]
factory = my.example.server_factory
address = 127.0.0.1
port = 8080

Factory code example:

@asyncio.coroutine
def server_factory(name, protocol, address, port):
    loop = asyncio.get_event_loop()
    return (
        yield from loop.create_server(
           ServerProtocol, address, port))

Signals

Any section in the configuration that starts with listen: will subscribe listed functions to given events

An example listen configuration section

[listen:example]
example-signal = my.example.listener

And an example listener function

@asyncio.coroutine
def listener(signal, message):
    print(message)

yield from app.signals.emit(
    'example-signal', "BOOM!")

You can add multiple subscriptions within the section

[listen:example]
example-signal = my.example.listener
example-signal-2 = my.example.listener2

You can also subscribe multiple functions to a signal

[listen:example]
example-signal = my.example.listener
               my.example.listener2

And you can have multiple listen: sections

[listen:example]
example-signal = my.example.listener
               my.example.listener2

[listen:example2]
example-signal2 = my.example.listener2

aio test

The aio test runner will then test all modules listed in the aio config section

[aio]
modules = aio.app
         aio.signals
aio test

You can also specify a module

aio test aio.app

Dependencies

aio.app depends on the following packages

The aio command runner

The aio command can be run with any commands listed in the [aio:commands] section of its configuration

Initially aio.app does not have any config, signals, modules or servers

>>> import aio.app
>>> print(aio.app.signals, aio.app.config, aio.app.modules, aio.app.servers)
None None () {}

Lets start the app runner in a test loop with the default configuration and print out the signals and config objects

>>> from aio.app.runner import runner
>>> def run_app():
...     yield from runner(['run'])
...
...     print(aio.app.signals)
...     print(aio.app.config)
...     print(aio.app.modules)
...     print(aio.app.servers)
>>> from aio.testing import aiotest
>>> aiotest(run_app)()
<aio.signals.Signals object ...>
<configparser.ConfigParser ...>
(<module 'aio.app' from ...>,)
{}

Clear the app

We can clear the app vars

>>> aio.app.clear()
>>> print(aio.app.signals, aio.app.config, aio.app.modules, aio.app.servers)
None None () {}

Adding a signal listener

Lets create a test listener and make it importable

>>> def listener(signal, message):
...     print("Listener received: %s" % message)

The listener needs to be a coroutine

>>> import asyncio
>>> aio.app.tests._example_listener = asyncio.coroutine(listener)
>>> config = """
... [listen:testlistener]
... test-signal: aio.app.tests._example_listener
... """
>>> def run_app_emit(message):
...     yield from runner(['run'], config_string=config)
...     yield from aio.app.signals.emit('test-signal', message)
>>> aiotest(run_app_emit)('BOOM!')
Listener received: BOOM!
>>> aio.app.clear()

Adding app modules

We can make the app runner aware of any modules that we want to include

>>> config = """
... [aio]
... modules = aio.app
...          aio.core
... """
>>> def run_app_print_modules():
...     yield from runner(['run'], config_string=config)
...     print(aio.app.modules)
>>> aiotest(run_app_print_modules)()
(<module 'aio.app' from ...>, <module 'aio.core' from ...>)
>>> aio.app.clear()

Running a scheduler

Lets create a scheduler function. It needs to be a coroutine

>>> def scheduler(name):
...      print('HIT: %s' % name)
>>> aio.app.tests._example_scheduler = asyncio.coroutine(scheduler)

We need to use a aiofuturetest to wait for the scheduled events to occur

>>> from aio.testing import aiofuturetest
>>> config = """
... [schedule:test-scheduler]
... every: 2
... func: aio.app.tests._example_scheduler
... """
>>> def run_app_scheduler():
...     yield from runner(['run'], config_string=config)

Running the test for 5 seconds we get 3 hits

>>> aiofuturetest(run_app_scheduler, timeout=5)()
HIT: test-scheduler
HIT: test-scheduler
HIT: test-scheduler
>>> aio.app.clear()

Running a server

Lets set up and run an addition server

>>> class AdditionServerProtocol(asyncio.Protocol):
...
...     def connection_made(self, transport):
...         self.transport = transport
...
...     def data_received(self, data):
...         nums = [
...            int(x.strip())
...            for x in
...            data.decode("utf-8").split("+")]
...         self.transport.write(str(sum(nums)).encode())
...         self.transport.close()
>>> aio.app.tests._example_AdditionServerProtocol = AdditionServerProtocol
>>> config = """
... [server:additiontest]
... protocol: aio.app.tests._example_AdditionServerProtocol
... address: 127.0.0.1
... port: 8888
... """
>>> def run_app_addition(addition):
...     yield from runner(['run'], config_string=config)
...
...     @asyncio.coroutine
...     def call_addition_server():
...          reader, writer = yield from asyncio.open_connection(
...              '127.0.0.1', 8888)
...          writer.write(addition.encode())
...          yield from writer.drain()
...          result = yield from reader.read()
...
...          print(int(result))
...
...     return call_addition_server
>>> addition = '2 + 2 + 3'
>>> aiofuturetest(run_app_addition, timeout=5)(addition)
7
>>> aio.app.clear()

If you need more control over how the server protocol is created you can specify a factory instead

The factory method must be a coroutine

>>> def addition_server_factory(name, protocol, address, port):
...     loop = asyncio.get_event_loop()
...     return (
...         yield from loop.create_server(
...            AdditionServerProtocol,
...            address, port))
>>> aio.app.tests._example_addition_server_factory = asyncio.coroutine(addition_server_factory)
>>> config = """
... [server:additiontest]
... factory = aio.app.tests._example_addition_server_factory
... address: 127.0.0.1
... port: 8888
... """
>>> def run_app_addition(addition):
...     yield from runner(['run'], config_string=config)
...
...     @asyncio.coroutine
...     def call_addition_server():
...          reader, writer = yield from asyncio.open_connection(
...              '127.0.0.1', 8888)
...          writer.write(addition.encode())
...          yield from writer.drain()
...          result = yield from reader.read()
...
...          print(int(result))
...
...     return call_addition_server
>>> addition = '17 + 5 + 1'
>>> aiofuturetest(run_app_addition, timeout=5)(addition)
23

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aio.app-0.0.17.tar.gz (11.2 kB view details)

Uploaded Source

File details

Details for the file aio.app-0.0.17.tar.gz.

File metadata

  • Download URL: aio.app-0.0.17.tar.gz
  • Upload date:
  • Size: 11.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for aio.app-0.0.17.tar.gz
Algorithm Hash digest
SHA256 1879deeaa27ed227780debec2f5276c6694bed4dbb631aa9ef86c4b2bfad2642
MD5 1d82f25cc0cb9d3cb0b8c8bd5e92f895
BLAKE2b-256 3f4f670e31405e0b94980f72a20b31a27c31a1c87ed9b3524d84b88a9661faa3

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page