Skip to main content

Pure python AMQP asynchronous client library

Project description

Coveralls Status Build status Latest Version https://img.shields.io/pypi/wheel/aiormq.svg https://img.shields.io/pypi/pyversions/aiormq.svg https://img.shields.io/pypi/l/aiormq.svg

aiormq is a pure python AMQP client library.

Status

  • 3.x.x branch - Production/Stable

  • 4.x.x branch - Unstable (Experimental)

Features

  • Connecting by URL

  • amqp example: amqp://user:password@server.host/vhost

  • secure amqp example: amqps://user:password@server.host/vhost?cafile=ca.pem&keyfile=key.pem&certfile=cert.pem&no_verify_ssl=0

  • Buffered queue for received frames

  • Only PLAIN auth mechanism support

  • Publisher confirms support

  • Transactions support

  • Channel based asynchronous locks

  • Tracking unroutable messages (Use connection.channel(on_return_raises=False) for disabling)

  • Full SSL/TLS support, using your choice of:
    • amqps:// url query parameters:
      • cafile= - string contains path to ca certificate file

      • capath= - string contains path to ca certificates

      • cadata= - base64 encoded ca certificate data

      • keyfile= - string contains path to key file

      • certfile= - string contains path to certificate file

      • no_verify_ssl - boolean disables certificates validation

    • context= SSLContext keyword argument to connect().

  • Python type hints

  • Uses pamqp as an AMQP 0.9.1 frame encoder/decoder

Tutorial

Introduction

Simple consumer

import asyncio
import aiormq

async def on_message(message):
    """
    on_message doesn't necessarily have to be defined as async.
    Here it is to show that it's possible.
    """
    print(" [x] Received message %r" % message)
    print("Message body is: %r" % message.body)
    print("Before sleep!")
    await asyncio.sleep(5)   # Represents async I/O operations
    print("After sleep!")


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('helo')
    consume_ok = await channel.basic_consume(
        declare_ok.queue, on_message, no_ack=True
    )


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.run_forever()

Simple publisher

import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()

    # Sending the message
    await channel.basic_publish(b'Hello World!', routing_key='hello')
    print(" [x] Sent 'Hello World!'")


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Work Queues

Create new task

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body,
        routing_key='task_queue',
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1,
        )
    )

    print(" [x] Sent %r" % body)

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Simple worker

import asyncio
import aiormq
import aiormq.types


async def on_message(message: aiormq.types.DeliveredMessage):
    print(" [x] Received message %r" % (message,))
    print("     Message body is: %r" % (message.body,))


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")


    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message, no_ack=True)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data and runs
# callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Publish Subscribe

Publisher

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    body = b' '.join(sys.argv[1:]) or b"Hello World!"

    # Sending the message
    await channel.basic_publish(
        body, routing_key='info', exchange='logs'
    )

    print(" [x] Sent %r" % (body,))

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Subscriber

import asyncio
import aiormq
import aiormq.types


async def on_message(message: aiormq.types.DeliveredMessage):
    print("[x] %r" % (message.body,))

    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    await channel.exchange_declare(
        exchange='logs', exchange_type='fanout'
    )

    # Declaring queue
    declare_ok = await channel.queue_declare(exclusive=True)

    # Binding the queue to the exchange
    await channel.queue_bind(declare_ok.queue, 'logs')

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(' [*] Waiting for logs. To exit press CTRL+C')
loop.run_forever()

Routing

Direct consumer

import sys
import asyncio
import aiormq
import aiormq.types


async def on_message(message: aiormq.types.DeliveredMessage):
    print(" [x] %r:%r" % (message.delivery.routing_key, message.body))
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = aiormq.Connection("amqp://guest:guest@localhost/")
    await connection.connect()

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    severities = sys.argv[1:]

    if not severities:
        sys.stderr.write(
            "Usage: %s [info] [warning] [error]\n" % sys.argv[0]
        )
        sys.exit(1)

    # Declare an exchange
    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    # Declaring random queue
    declare_ok = await channel.queue_declare(durable=True, auto_delete=True)

    for severity in severities:
        await channel.queue_bind(
            declare_ok.queue, 'logs', routing_key=severity
        )

    # Start listening the random queue
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Emitter

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare(
        exchange='logs', exchange_type='direct'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'info'

    await channel.basic_publish(
        body, exchange='logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(" [x] Sent %r" % body)

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Topics

Publisher

import sys
import asyncio
import aiormq


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    await channel.exchange_declare('topic_logs', exchange_type='topic')

    routing_key = (
        sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'
    )

    body = (
        b' '.join(arg.encode() for arg in sys.argv[2:])
        or
        b"Hello World!"
    )

    # Sending the message
    await channel.basic_publish(
        body, exchange='topic_logs', routing_key=routing_key,
        properties=aiormq.spec.Basic.Properties(
            delivery_mode=1
        )
    )

    print(" [x] Sent %r" % (body,))

    await connection.close()


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Consumer

import asyncio
import sys
import aiormq
import aiormq.types


async def on_message(message: aiormq.types.DeliveredMessage):
    print(" [x] %r:%r" % (message.delivery.routing_key, message.body))
    await message.channel.basic_ack(
        message.delivery.delivery_tag
    )


async def main():
    # Perform connection
    connection = await aiormq.connect(
        "amqp://guest:guest@localhost/", loop=loop
    )

    # Creating a channel
    channel = await connection.channel()
    await channel.basic_qos(prefetch_count=1)

    # Declare an exchange
    await channel.exchange_declare('topic_logs', exchange_type='topic')

    # Declaring queue
    declare_ok = await channel.queue_declare('task_queue', durable=True)

    binding_keys = sys.argv[1:]

    if not binding_keys:
        sys.stderr.write(
            "Usage: %s [binding_key]...\n" % sys.argv[0]
        )
        sys.exit(1)

    for binding_key in binding_keys:
        await channel.queue_bind(
            declare_ok.queue, 'topic_logs', routing_key=binding_key
        )

    # Start listening the queue with name 'task_queue'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for
# data and runs callbacks whenever necessary.
print(" [*] Waiting for messages. To exit press CTRL+C")
loop.run_forever()

Remote procedure call (RPC)

RPC server

import asyncio
import aiormq
import aiormq.types


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n-1) + fib(n-2)


async def on_message(message:aiormq.types.DeliveredMessage):
    n = int(message.body.decode())

    print(" [.] fib(%d)" % n)
    response = str(fib(n)).encode()

    await message.channel.basic_publish(
        response, routing_key=message.header.properties.reply_to,
        properties=aiormq.spec.Basic.Properties(
            correlation_id=message.header.properties.correlation_id
        ),

    )

    await message.channel.basic_ack(message.delivery.delivery_tag)
    print('Request complete')


async def main():
    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost/")

    # Creating a channel
    channel = await connection.channel()

    # Declaring queue
    declare_ok = await channel.queue_declare('rpc_queue')

    # Start listening the queue with name 'hello'
    await channel.basic_consume(declare_ok.queue, on_message)


loop = asyncio.get_event_loop()
loop.create_task(main())

# we enter a never-ending loop that waits for data
# and runs callbacks whenever necessary.
print(" [x] Awaiting RPC requests")
loop.run_forever()

RPC client

import asyncio
import uuid
import aiormq
import aiormq.types


class FibonacciRpcClient:
    def __init__(self):
        self.connection = None      # type: aiormq.Connection
        self.channel = None         # type: aiormq.Channel
        self.callback_queue = ''
        self.futures = {}
        self.loop = loop

    async def connect(self):
        self.connection = await aiormq.connect("amqp://guest:guest@localhost/")

        self.channel = await self.connection.channel()
        declare_ok = await self.channel.queue_declare(
            exclusive=True, auto_delete=True
        )

        await self.channel.basic_consume(declare_ok.queue, self.on_response)

        self.callback_queue = declare_ok.queue

        return self

    async def on_response(self, message: aiormq.types.DeliveredMessage):
        future = self.futures.pop(message.header.properties.correlation_id)
        future.set_result(message.body)

    async def call(self, n):
        correlation_id = str(uuid.uuid4())
        future = loop.create_future()

        self.futures[correlation_id] = future

        await self.channel.basic_publish(
            str(n).encode(), routing_key='rpc_queue',
            properties=aiormq.spec.Basic.Properties(
                content_type='text/plain',
                correlation_id=correlation_id,
                reply_to=self.callback_queue,
            )
        )

        return int(await future)


async def main():
    fibonacci_rpc = await FibonacciRpcClient().connect()
    print(" [x] Requesting fib(30)")
    response = await fibonacci_rpc.call(30)
    print(" [.] Got %r" % response)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiormq-6.5.0.tar.gz (30.6 kB view details)

Uploaded Source

Built Distribution

aiormq-6.5.0-py3-none-any.whl (34.4 kB view details)

Uploaded Python 3

File details

Details for the file aiormq-6.5.0.tar.gz.

File metadata

  • Download URL: aiormq-6.5.0.tar.gz
  • Upload date:
  • Size: 30.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.3 readme-renderer/32.0 requests/2.28.1 requests-toolbelt/0.9.1 urllib3/1.26.12 tqdm/4.62.3 importlib-metadata/4.11.0 keyring/21.8.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2

File hashes

Hashes for aiormq-6.5.0.tar.gz
Algorithm Hash digest
SHA256 6a8f513eb18eac86a0ac7c6758204dbea7a4a0a31567a97d11d1f085052549b5
MD5 9e65c38411b9b4705e84a0a49af889c7
BLAKE2b-256 67a4fdc2c09a986ffcd6a67faae622552b81fb7a0e606a290797c38f6dd354be

See more details on using hashes here.

File details

Details for the file aiormq-6.5.0-py3-none-any.whl.

File metadata

  • Download URL: aiormq-6.5.0-py3-none-any.whl
  • Upload date:
  • Size: 34.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.8.0 pkginfo/1.8.3 readme-renderer/32.0 requests/2.28.1 requests-toolbelt/0.9.1 urllib3/1.26.12 tqdm/4.62.3 importlib-metadata/4.11.0 keyring/21.8.0 rfc3986/2.0.0 colorama/0.4.4 CPython/3.10.2

File hashes

Hashes for aiormq-6.5.0-py3-none-any.whl
Algorithm Hash digest
SHA256 e74fde1ff60821371c2889c038980128821379e8a6a3f9ec96f61a2d49322b42
MD5 3804b43d6cf261ce89dc098495d4169a
BLAKE2b-256 5db0bfb89037a0d81ae03cad2cb8932b00343eecceff713d2f31b57b1274ba85

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page