Pure python AMQP asynchronous client library
Project description
aiormq is a pure python AMQP client library.
Status
Development - BETA
Features
Connecting by URL
amqp example: amqp://user:password@server.host/vhost
secure amqp example: amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0
Buffered queue for received frames
Only PLAIN auth mechanism support
Publisher confirms support
Transactions support
Channel based asynchronous locks
Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)
Full SSL/TLS support
Python type hints
Uses pamqp as an AMQP 0.9.1 frame encoder/decoder
Tutorial
Introduction
Simple consumer
import asyncio
import aiormq
async def on_message(message):
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(" [x] Received message %r" % message)
print("Message body is: %r" % message.body)
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
deaclare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
deaclare_ok.queue, on_message, no_ack=True
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Simple publisher
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Sending the message
await channel.basic_publish(b'Hello World!', routing_key='hello')
print(" [x] Sent 'Hello World!'")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Work Queues
Create new task
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body,
routing_key='task_queue',
properties=aiormq.spec.Basic.Properties(
delivery_mode=1,
)
)
print(" [x] Sent %r" % body)
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple worker
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print(" [x] Received message %r" % (message,))
print(" Message body is: %r" % (message.body,))
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Publish Subscribe
Publisher
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body, routing_key='info', exchange='logs'
)
print(" [x] Sent %r" % (body,))
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Subscriber
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print("[x] %r" % (message.body,))
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
# Declaring queue
declare_ok = await channel.queue_declare(exclusive=True)
# Binding the queue to the exchange
await channel.queue_bind(declare_ok.queue, 'logs')
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()
Routing
Direct consumer
import sys
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print(" [x] %r:%r" % (message.delivery.routing_key, message.body))
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = aiormq.Connection("amqp://guest:guest@localhost/")
await connection.connect()
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(
"Usage: %s [info] [warning] [error]\n" % sys.argv[0]
)
sys.exit(1)
# Declare an exchange
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
# Declaring random queue
declare_ok = await channel.queue_declare(durable=True, auto_delete=True)
for severity in severities:
await channel.queue_bind(
declare_ok.queue, 'logs', routing_key=severity
)
# Start listening the random queue
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Emitter
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'
await channel.basic_publish(
body, exchange='logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(" [x] Sent %r" % body)
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiormq-0.3.0.tar.gz
.
File metadata
- Download URL: aiormq-0.3.0.tar.gz
- Upload date:
- Size: 19.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.18.4 setuptools/33.1.1 requests-toolbelt/0.8.0 tqdm/4.20.0 CPython/3.6.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7c375aa79f6780a91a4a577e88b477a5dff5e324e63c10522ee0679725dc1a61 |
|
MD5 | 697b96e52f3fde4035c7c6d8c272c6bc |
|
BLAKE2b-256 | 2f3271296ea5949ef001f118f35d67dfc92c6e29523f8896bfe49e9bf36c226c |
File details
Details for the file aiormq-0.3.0-py3-none-any.whl
.
File metadata
- Download URL: aiormq-0.3.0-py3-none-any.whl
- Upload date:
- Size: 28.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.18.4 setuptools/33.1.1 requests-toolbelt/0.8.0 tqdm/4.20.0 CPython/3.6.5
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 5863f49ca1f06b03f54705cba1205c896456e014f68530cef77c5ba4da7744ca |
|
MD5 | 36e76f36621f0f86bcb0876e21aec767 |
|
BLAKE2b-256 | 2e69c7d88dfd85a72192e28aa378c8365b43c551483c1599aa379345eadd1e46 |