HTTP server for the aio asyncio framework
Project description
aio.http
HTTP server for the aio asyncio framework
Build status
Installation
Install with:
pip install aio.http
Configuration
Example configuration for a hello world server
[aio:commands]
run: aio.app.cmd.cmd_run
[server:test]
factory: aio.http.server
protocol: my.example.protocol_factory
port: 8080
And the corresponding protocol_factory
import asyncio
import aiohttp
@asyncio.coroutine
def protocol_factory(name):
loop = asyncio.get_event_loop()
webapp = aiohttp.web.Application(loop=loop)
webapp['name'] = name
@asyncio.coroutine
def handle_hello_world(webapp):
return aiohttp.web.Response(body=b"Hello, world")
webapp.router.add_route("GET", "/", handle_hello_world)
return webapp.make_handler()
Running
Run with the aio command
aio run
aio.http usage
Configuration
Create a config defining a factory method and a root handler
>>> config = """ ... [aio:commands] ... run: aio.app.cmd.cmd_run ... ... [server:test] ... factory: aio.http.server ... port: 7070 ... """
Running an http server
By default the http server will respond with a 404 as there are no routes set up
>>> import asyncio >>> import aiohttp >>> from aio.app.runner import runner>>> def run_http_server(): ... yield from runner(['run'], config_string=config) ... ... @asyncio.coroutine ... def _test_http_server(): ... result = yield from ( ... yield from aiohttp.request( ... "GET", "http://localhost:7070")).read() ... print(result) ... ... return _test_http_server>>> from aio.testing import aiofuturetest >>> aiofuturetest(run_http_server, timeout=1, sleep=1)() b'404: Not Found'
The server object is accessible from the aio.app.servers[{name}] var
>>> import aio.app >>> aio.app.servers['test'] <Server sockets=[<socket.socket...laddr=('0.0.0.0', 7070)...]>
Lets clear the app
>>> aio.app.clear()
Running the server with a custom protocol
If you specify a protocol in the server: config, the http server will use that function as a protocol factory.
The function should be a coroutine and is called with the name of the server
>>> def http_protocol_factory(name): ... loop = asyncio.get_event_loop() ... webapp = aiohttp.web.Application(loop=loop) ... webapp['name'] = name ... ... def handle_hello_world(webapp): ... return aiohttp.web.Response(body=b"Hello, world") ... ... webapp.router.add_route("GET", "/", asyncio.coroutine(handle_hello_world)) ... return webapp.make_handler()>>> aio.http.tests._test_http_protocol = asyncio.coroutine(http_protocol_factory)>>> config_with_protocol = """ ... [aio:commands] ... run: aio.app.cmd.cmd_run ... ... [server:test] ... factory: aio.http.server ... protocol: aio.http.tests._test_http_protocol ... port: 7070 ... """>>> def run_http_server(): ... yield from runner(['run'], config_string=config_with_protocol) ... ... @asyncio.coroutine ... def _test_http_server(): ... result = yield from ( ... yield from aiohttp.request( ... "GET", "http://localhost:7070")).read() ... ... print(result) ... ... return _test_http_server>>> aiofuturetest(run_http_server, timeout=1, sleep=1)() b'Hello, world'>>> del aio.http.tests._test_http_protocol >>> aio.app.clear()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file aio.http-0.0.2.tar.gz
.
File metadata
- Download URL: aio.http-0.0.2.tar.gz
- Upload date:
- Size: 4.4 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 6d547b050977b75804c7fb4dc645df6dd7d24fc977833a34fb089007ca75b436 |
|
MD5 | b89abacd480ca4413a1d7c2cc8d91b46 |
|
BLAKE2b-256 | 4e5f32fda29e6758f7217c6f259bcddebe3b8421eea15b77b27c67ad7be8774a |