Consumer/producer like library built over amqp (aioamqp)
Project description
- info:
Consumer/producer like library built over amqp (aioamqp)
Installation
pip install aioamqp_consumer
Usage
import asyncio
from functools import partial
from aioamqp_consumer import Consumer, Producer
async def task(payload, options, sleep=0, *, loop):
await asyncio.sleep(sleep, loop=loop)
print(payload)
async def main(*, loop):
amqp_url = 'amqp://guest:guest@127.0.0.1:5672//'
amqp_queue = 'your-queue-here'
queue_kwargs = {
'durable': True,
}
amqp_kwargs = {} # https://aioamqp.readthedocs.io/en/latest/api.html#aioamqp.connect
async with Producer(amqp_url, amqp_kwargs=amqp_kwargs, loop=loop) as producer:
for _ in range(5):
await producer.publish(
b'hello',
amqp_queue,
queue_kwargs=queue_kwargs,
)
consumer = Consumer(
amqp_url,
partial(task, loop=loop, sleep=1),
amqp_queue,
queue_kwargs=queue_kwargs,
amqp_kwargs=amqp_kwargs,
loop=loop,
)
await consumer.scale(20) # scale up to 20 background coroutines
await consumer.scale(5) # downscale to 5 background coroutines
await consumer.join() # wait for rabbitmq queue is empty and all local messages are processed
consumer.close()
await consumer.wait_closed()
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop=loop))
loop.close()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
aioamqp_consumer-0.1.0.tar.gz
(8.2 kB
view hashes)