Pure python AMQP asynchronous client library
Project description
aiormq is a pure python AMQP client library.
Status
Production/Stable
Features
Connecting by URL
amqp example: amqp://user:password@server.host/vhost
secure amqp example: amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0
Buffered queue for received frames
Only PLAIN auth mechanism support
Publisher confirms support
Transactions support
Channel based asynchronous locks
Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)
- Full SSL/TLS support with url query parameters:
cafile= - string contains path to ca certificate file
capath= - string contains path to ca certificates
cadata= - base64 encoded ca certificate data
keyfile= - string contains path to key file
certfile= - string contains path to certificate file
no_verify_ssl - boolean disables certificates validation
Python type hints
Uses pamqp as an AMQP 0.9.1 frame encoder/decoder
Tutorial
Introduction
Simple consumer
import asyncio
import aiormq
async def on_message(message):
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(" [x] Received message %r" % message)
print("Message body is: %r" % message.body)
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
deaclare_ok = await channel.queue_declare('helo')
consume_ok = await channel.basic_consume(
deaclare_ok.queue, on_message, no_ack=True
)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()
Simple publisher
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost//")
# Creating a channel
channel = await connection.channel()
# Sending the message
await channel.basic_publish(b'Hello World!', routing_key='hello')
print(" [x] Sent 'Hello World!'")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Work Queues
Create new task
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body,
routing_key='task_queue',
properties=aiormq.spec.Basic.Properties(
delivery_mode=1,
)
)
print(" [x] Sent %r" % body)
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Simple worker
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print(" [x] Received message %r" % (message,))
print(" Message body is: %r" % (message.body,))
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Publish Subscribe
Publisher
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
body = b' '.join(sys.argv[1:]) or b"Hello World!"
# Sending the message
await channel.basic_publish(
body, routing_key='info', exchange='logs'
)
print(" [x] Sent %r" % (body,))
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Subscriber
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print("[x] %r" % (message.body,))
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
await channel.exchange_declare(
exchange='logs', exchange_type='fanout'
)
# Declaring queue
declare_ok = await channel.queue_declare(exclusive=True)
# Binding the queue to the exchange
await channel.queue_bind(declare_ok.queue, 'logs')
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()
Routing
Direct consumer
import sys
import asyncio
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print(" [x] %r:%r" % (message.delivery.routing_key, message.body))
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = aiormq.Connection("amqp://guest:guest@localhost/")
await connection.connect()
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(
"Usage: %s [info] [warning] [error]\n" % sys.argv[0]
)
sys.exit(1)
# Declare an exchange
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
# Declaring random queue
declare_ok = await channel.queue_declare(durable=True, auto_delete=True)
for severity in severities:
await channel.queue_bind(
declare_ok.queue, 'logs', routing_key=severity
)
# Start listening the random queue
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Emitter
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare(
exchange='logs', exchange_type='direct'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'
await channel.basic_publish(
body, exchange='logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(" [x] Sent %r" % body)
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Topics
Publisher
import sys
import asyncio
import aiormq
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.exchange_declare('topic_logs', exchange_type='topic')
routing_key = (
sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
)
body = (
b' '.join(arg.encode() for arg in sys.argv[2:])
or
b"Hello World!"
)
# Sending the message
await channel.basic_publish(
body, exchange='topic_logs', routing_key=routing_key,
properties=aiormq.spec.Basic.Properties(
delivery_mode=1
)
)
print(" [x] Sent %r" % (body,))
await connection.close()
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Consumer
import asyncio
import sys
import aiormq
import aiormq.types
async def on_message(message: aiormq.types.DeliveredMessage):
print(" [x] %r:%r" % (message.delivery.routing_key, message.body))
await message.channel.basic_ack(
message.delivery.delivery_tag
)
async def main():
# Perform connection
connection = await aiormq.connect(
"amqp://guest:guest@localhost/", loop=loop
)
# Creating a channel
channel = await connection.channel()
await channel.basic_qos(prefetch_count=1)
# Declare an exchange
await channel.exchange_declare('topic_logs', exchange_type='topic')
# Declaring queue
declare_ok = await channel.queue_declare('task_queue', durable=True)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write(
"Usage: %s [binding_key]...\n" % sys.argv[0]
)
sys.exit(1)
for binding_key in binding_keys:
await channel.queue_bind(
declare_ok.queue, 'topic_logs', routing_key=binding_key
)
# Start listening the queue with name 'task_queue'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main())
# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()
Remote procedure call (RPC)
RPC server
import asyncio
import aiormq
import aiormq.types
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
async def on_message(message:aiormq.types.DeliveredMessage):
n = int(message.body.decode())
print(" [.] fib(%d)" % n)
response = str(fib(n)).encode()
await message.channel.basic_publish(
response, routing_key=message.reply_to,
properties=aiormq.spec.Basic.Properties(
correlation_id=message.correlation_id
),
)
await message.channel.basic_ack(message.delivery.delivery_tag)
print('Request complete')
async def main():
# Perform connection
connection = await aiormq.connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
# Declaring queue
declare_ok = await channel.queue_declare('rpc_queue')
# Start listening the queue with name 'hello'
await channel.basic_consume(declare_ok.queue, on_message)
loop = asyncio.get_event_loop()
loop.create_task(main(loop))
# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()
RPC client
import asyncio
import uuid
import aiormq
import aiormq.types
class FibonacciRpcClient:
def __init__(self):
self.connection = None # type: aiormq.Connection
self.channel = None # type: aiormq.Channel
self.callback_queue = ''
self.futures = {}
self.loop = loop
async def connect(self):
self.connection = await aiormq.connect("amqp://guest:guest@localhost/")
self.channel = await self.connection.channel()
declare_ok = await self.channel.queue_declare(
exclusive=True, auto_delete=True
)
await self.channel.basic_consume(declare_ok.queue, self.on_response)
self.callback_queue = declare_ok.queue
return self
async def on_response(self, message: aiormq.types.DeliveredMessage):
future = self.futures.pop(message.correlation_id)
future.set_result(message.body)
async def call(self, n):
correlation_id = str(uuid.uuid4())
future = loop.create_future()
self.futures[correlation_id] = future
await self.channel.basic_publish(
str(n).encode(), routing_key='rpc_queue',
properties=aiormq.spec.Basic.Properties(
content_type='text/plain',
correlation_id=correlation_id,
reply_to=self.callback_queue,
)
)
return int(await future)
async def main():
fibonacci_rpc = await FibonacciRpcClient().connect()
print(" [x] Requesting fib(30)")
response = await fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file aiormq-3.3.1.tar.gz
.
File metadata
- Download URL: aiormq-3.3.1.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 8218dd9f7198d6e7935855468326bbacf0089f926c70baa8dd92944cb2496573 |
|
MD5 | eb7e00c16d5b6ca69554b7c9887b18ba |
|
BLAKE2b-256 | 2af50c695f35fe50f52577e0e3aa3e100b488badbd720d96c268e546f72014d4 |
File details
Details for the file aiormq-3.3.1-py3-none-any.whl
.
File metadata
- Download URL: aiormq-3.3.1-py3-none-any.whl
- Upload date:
- Size: 28.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.45.0 CPython/3.8.3
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | e584dac13a242589aaf42470fd3006cb0dc5aed6506cbd20357c7ec8bbe4a89e |
|
MD5 | 29947b1907a9279f1ea67c74286d5212 |
|
BLAKE2b-256 | 0bc4dc5b9d50c15af2ee187974a5a0c3f20c06cce6559eea4c065d372e846b6a |