Skip to main content

Microsoft Azure Service Bus Client Library for Python

Project description

Azure Service Bus client library for Python

Azure Service Bus is a high performance cloud-managed messaging service for providing real-time and fault-tolerant communication between distributed senders and receivers.

Service Bus provides multiple mechanisms for asynchronous highly reliable communication, such as structured first-in-first-out messaging, publish/subscribe capabilities, and the ability to easily scale as your needs grow.

Use the Service Bus client library for Python to communicate between applications and services and implement asynchronous messaging patterns.

  • Create Service Bus namespaces, queues, topics, and subscriptions, and modify their settings.
  • Send and receive messages within your Service Bus channels.
  • Utilize message locks, sessions, and dead letter functionality to implement complex messaging patterns.

Source code | Package (PyPi) | API reference documentation | Product documentation | Samples | Changelog

NOTE: This document has instructions, links and code snippets for the preview of the next version of the azure-servicebus package which has different APIs than the current version (0.50). Please view the resources below for references on the existing library.

V0.50 Source code | V0.50 Package (PyPi) | V0.50 API reference documentation | V0.50 Product documentation | V0.50 Samples | V0.50 Changelog

We also provide a migration guide for users familiar with the existing package that would like to try the preview: migration guide to move from Service Bus V0.50 to Service Bus V7 Preview

Getting started

Install the package

Install the Azure Service Bus client library for Python with pip:

pip install azure-servicebus --pre

Prerequisites:

To use this package, you must have:

If you need an Azure service bus namespace, you can create it via the Azure Portal. If you do not wish to use the graphical portal UI, you can use the Azure CLI via Cloud Shell, or Azure CLI run locally, to create one with this Azure CLI command:

az servicebus namespace create --resource-group <resource-group-name> --name <servicebus-namespace-name> --location <servicebus-namespace-location>

Authenticate the client

Interaction with Service Bus starts with an instance of the ServiceBusClient class. You either need a connection string with SAS key, or a namespace and one of its account keys to instantiate the client object.

Create client from connection string

  • Get credentials: Use the Azure CLI snippet below to populate an environment variable with the service bus connection string (you can also find these values in the Azure Portal by following the step-by-step guide to Get a service bus connection string). The snippet is formatted for the Bash shell.
RES_GROUP=<resource-group-name>
NAMESPACE_NAME=<servicebus-namespace-name>

export SERVICE_BUS_CONN_STR=$(az servicebus namespace authorization-rule keys list --resource-group $RES_GROUP --namespace-name $NAMESPACE_NAME --name RootManageSharedAccessKey --query primaryConnectionString --output tsv)

Once you've populated the SERVICE_BUS_CONN_STR environment variable, you can create the ServiceBusClient.

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']

with ServiceBusClient.from_connection_string(connstr) as client:
    ...

Create client using the azure-identity library:

import os
from azure.servicebus import ServiceBusClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()

FULLY_QUALIFIED_NAMESPACE = os.environ['SERVICE_BUS_FULLY_QUALIFIED_NAMESPACE']
with ServiceBusClient(FULLY_QUALIFIED_NAMESPACE, credential):
    ...
  • This constructor takes the fully qualified namespace of your Service Bus instance and a credential that implements the TokenCredential protocol. There are implementations of the TokenCredential protocol available in the azure-identity package. The fully qualified namespace is of the format <yournamespace.servicebus.windows.net>.
  • When using Azure Active Directory, your principal must be assigned a role which allows access to Service Bus, such as the Azure Service Bus Data Owner role. For more information about using Azure Active Directory authorization with Service Bus, please refer to the associated documentation.

Note: client can be initialized without a context manager, but must be manually closed via client.close() to not leak resources.

Key concepts

Once you've initialized a ServiceBusClient, you can interact with the primary resource types within a Service Bus Namespace, of which multiple can exist and on which actual message transmission takes place, the namespace often serving as an application container:

  • Queue: Allows for Sending and Receiving of message. Often used for point-to-point communication.

  • Topic: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from.

  • Subscription: The mechanism to consume from a Topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and Filters can be used to tailor which messages are received by a specific subscription.

For more information about these resources, see What is Azure Service Bus?.

To interact with these resources, one should be familiar with the following SDK concepts:

  • ServiceBusClient: This is the object a user should first initialize to connect to a Service Bus Namespace. To interact with a queue, topic, or subscription, one would spawn a sender or receiver off of this client.

  • Sender: To send messages to a Queue or Topic, one would use the corresponding get_queue_sender or get_topic_sender method off of a ServiceBusClient instance as seen here.

  • Receiver: To receive messages from a Queue or Subscription, one would use the corresponding get_queue_receiver or get_subscription_receiver method off of a ServiceBusClient instance as seen here.

  • Message: When sending, this is the type you will construct to contain your payload. When receiving, this is where you will access the payload and control how the message is "settled" (completed, dead-lettered, etc); these functions are only available on a received message.

Examples

The following sections provide several code snippets covering some of the most common Service Bus tasks, including:

To perform management tasks such as creating and deleting queues/topics/subscriptions, please utilize the azure-mgmt-servicebus library, available here.

Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.

Send a message to a queue

This example sends a message to a queue that is assumed to already exist, created via the Azure portal or az commands.

from azure.servicebus import ServiceBusClient, Message

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_sender(queue_name) as sender:

        message = Message("Single message")
        sender.send(message)

Receive a message from a queue

To receive from a queue, you can either perform a one-off receive via "receiver.receive()" or receive persistently as follows:

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            msg.complete()

Sending and receiving a message from a session enabled subscription

Sessions provide first-in-first-out and single-receiver semantics on top of a queue or subscription. While the actual receive syntax is the same, initialization differs slightly.

from azure.servicebus import ServiceBusClient, Message

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
topic_name = os.environ['SERVICE_BUS_TOPIC_NAME']
subscription_name = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
session_id = os.environ.get('SERVICE_BUS_SESSION_ID')

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_topic_sender(topic_name) as sender:
        sender.send(Message("Session Enabled Message", session_id=session_id))

    # If session_id is null here, will receive from the first available session.
    with client.get_subscription_session_receiver(topic_name, subscription_name, session_id) as receiver:
        for msg in receiver:
            print(str(msg))
            msg.complete()

Defer a message on receipt

When receiving from a queue, you have multiple actions you can take on the messages you receive. Where the prior example completes a message, permanently removing it from the queue and marking as complete, this example demonstrates how to defer the message, sending it back to the queue such that it must now be received via sequence number:

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONN_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            msg.defer()

Other settlement methods (beyond complete and defer) include:

  • dead_letter, removing the message from the primary queue and sending it to a special "dead-letter sub-queue" where it can be accessed using the get_queue_deadletter_receiver function.
  • abandon, immediately returning the message back to the queue to be picked up by another (or the same) receiver.

Troubleshooting

Logging

  • Enable azure.servicebus logger to collect traces from the library.
  • Enable uamqp logger to collect traces from the underlying uAMQP library.
  • Enable AMQP frame level trace by setting logging_enable=True when creating the client.

Timeouts

There are various timeouts a user should be aware of within the library.

  • 10 minute service side link closure: A link, once opened, will be closed after 10 minutes idle to protect the service against resource leakage. This should largely be transparent to a user, but if you notice a reconnect occuring after such a duration, this is why. Performing any operations, including management operations, on the link will extend this timeout.
  • idle_timeout: Provided on creation of a receiver, the time after which the underlying UAMQP link will be closed after no traffic. This primarily dictates the length a generator-style receive will run for before exiting if there are no messages. Passing None (default) will wait forever, up until the 10 minute threshold if no other action is taken.
  • max_wait_time: Provided when calling receive() to fetch a batch of messages. Dictates how long the receive() will wait for more messages before returning, similarly up to the aformentioned limits.

AutoLockRenew

If for any reason auto-renewal has been interrupted or failed, this can be observed via the auto_renew_error property on the object being renewed. It would also manifest when trying to take action (such as completing a message) on the specified object.

Common Exceptions

Please view the exceptions file for detailed descriptions of our common Exception types.

Next steps

More sample code

Please find further examples in the samples directory demonstrating common Service Bus scenarios such as sending, receiving, session management and message handling.

Additional documentation

For more extensive documentation on the Service Bus service, see the Service Bus documentation on docs.microsoft.com.

Management capabilities and documentation

For users seeking to perform management operations against ServiceBus (Creating a queue/topic/etc, altering filter rules, enumerating entities) please see the azure-mgmt-servicebus documentation for API documentation. Terse usage examples can be found here as well.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Release History

7.0.0b3 (2020-06-08)

New Features

  • Added support for management of queue entities.
    • Use azure.servicebus.management.ServiceBusManagementClient (azure.servicebus.management.aio.ServiceBusManagementClient for aio) to create, update, delete, list queues and get settings as well as runtime information of queues under a ServiceBus namespace.
  • Added methods get_queue_deadletter_receiver and get_subscription_deadletter_receiver in ServiceBusClient to get a ServiceBusReceiver for the dead-letter sub-queue of the target entity.

BugFixes

  • Updated uAMQP dependency to 1.2.8.
    • Fixed bug where reason and description were not being set when dead-lettering messages.

7.0.0b2 (2020-05-04)

New Features

  • Added method get_topic_sender in ServiceBusClient to get a ServiceBusSender for a topic.
  • Added method get_subscription_receiver in ServiceBusClient to get a ServiceBusReceiver for a subscription under specific topic.
  • Added support for scheduling messages and scheduled message cancellation.
    • Use ServiceBusSender.schedule(messages, schedule_time_utc) for scheduling messages.
    • Use ServiceBusSender.cancel_scheduled_messages(sequence_numbers) for scheduled messages cancellation.
  • ServiceBusSender.send() can now send a list of messages in one call, if they fit into a single batch. If they do not fit a ValueError is thrown.
  • BatchMessage.add() and ServiceBusSender.send() would raise MessageContentTooLarge if the content is over-sized.
  • ServiceBusReceiver.receive() raises ValueError if its param max_batch_size is greater than param prefetch of ServiceBusClient.
  • Added exception classes MessageError, MessageContentTooLarge, ServiceBusAuthenticationError.
    • MessageError: when you send a problematic message, such as an already sent message or an over-sized message.
    • MessageContentTooLarge: when you send an over-sized message. A subclass of ValueError and MessageError.
    • ServiceBusAuthenticationError: on failure to be authenticated by the service.
  • Removed exception class InvalidHandlerState.

BugFixes

  • Fixed bug where http_proxy and transport_type in ServiceBusClient are not propagated into Sender/Receiver creation properly.
  • Updated uAMQP dependency to 1.2.7.
    • Fixed bug in setting certificate of tlsio on MacOS. #7201
    • Fixed bug that caused segmentation fault in network tracing on MacOS when setting logging_enable to True in ServiceBusClient.

Breaking Changes

  • Session receivers are now created via their own top level functions, e.g. get_queue_sesison_receiver and get_subscription_session_receiver. Non session receivers no longer take session_id as a paramter.
  • ServiceBusSender.send() no longer takes a timeout parameter, as it should be redundant with retry options provided when creating the client.
  • Exception imports have been removed from module azure.servicebus. Import from azure.servicebus.exceptions instead.
  • ServiceBusSender.schedule() has swapped the ordering of parameters schedule_time_utc and messages for better consistency with send() syntax.

7.0.0b1 (2020-04-06)

Version 7.0.0b1 is a preview of our efforts to create a client library that is user friendly and idiomatic to the Python ecosystem. The reasons for most of the changes in this update can be found in the Azure SDK Design Guidelines for Python. For more information, please visit https://aka.ms/azure-sdk-preview1-python.

  • Note: Not all historical functionality exists in this version at this point. Topics, Subscriptions, scheduling, dead_letter management and more will be added incrementally over upcoming preview releases.

New Features

  • Added new configuration parameters when creating ServiceBusClient.
    • credential: The credential object used for authentication which implements TokenCredential interface of getting tokens.
    • http_proxy: A dictionary populated with proxy settings.
    • For detailed information about configuration parameters, please see docstring in ServiceBusClient and/or the reference documentation for more information.
  • Added support for authentication using Azure Identity credentials.
  • Added support for retry policy.
  • Added support for http proxy.
  • Manually calling reconnect should no longer be necessary, it is now performed implicitly.
  • Manually calling open should no longer be necessary, it is now performed implicitly.
    • Note: close()-ing is still required if a context manager is not used, to avoid leaking connections.
  • Added support for sending a batch of messages destined for heterogenous sessions.

Breaking changes

  • Simplified API and set of clients
    • get_queue no longer exists, utilize get_queue_sender/receiver instead.
    • peek and other queue_client functions have moved to their respective sender/receiver.
    • Renamed fetch_next to receive.
    • Renamed session to session_id to normalize naming when requesting a receiver against a given session.
    • reconnect no longer exists, and is performed implicitly if needed.
    • open no longer exists, and is performed implicitly if needed.
  • Normalized top level client parameters with idiomatic and consistent naming.
    • Renamed debug in ServiceBusClient initializer to logging_enable.
    • Renamed service_namespace in ServiceBusClient initializer to fully_qualified_namespace.
  • New error hierarchy, with more specific semantics
    • azure.servicebus.exceptions.ServiceBusError
    • azure.servicebus.exceptions.ServiceBusConnectionError
    • azure.servicebus.exceptions.ServiceBusResourceNotFound
    • azure.servicebus.exceptions.ServiceBusAuthorizationError
    • azure.servicebus.exceptions.NoActiveSession
    • azure.servicebus.exceptions.OperationTimeoutError
    • azure.servicebus.exceptions.InvalidHandlerState
    • azure.servicebus.exceptions.AutoLockRenewTimeout
    • azure.servicebus.exceptions.AutoLockRenewFailed
    • azure.servicebus.exceptions.EventDataSendError
    • azure.servicebus.exceptions.MessageSendFailed
    • azure.servicebus.exceptions.MessageLockExpired
    • azure.servicebus.exceptions.MessageSettleFailed
    • azure.servicebus.exceptions.MessageAlreadySettled
    • azure.servicebus.exceptions.SessionLockExpired
  • BatchMessage creation is now initiated via create_batch on a Sender, using add() on the batch to add messages, in order to enforce service-side max batch sized limitations.
  • Session is now set on the message itself, via session_id parameter or property, as opposed to on Send or get_sender via session. This is to allow sending a batch of messages destined to varied sessions.
  • Session management is now encapsulated within a property of a receiver, e.g. receiver.session, to better compartmentalize functionality specific to sessions.
    • To use AutoLockRenew against sessions, one would simply pass the inner session object, instead of the receiver itself.

0.50.2 (2019-12-09)

New Features

  • Added support for delivery tag lock tokens

BugFixes

  • Fixed bug where Message would pass through invalid kwargs on init when attempting to thread through subject.
  • Increments UAMQP dependency min version to 1.2.5, to include a set of fixes, including handling of large messages and mitigation of segfaults.

0.50.1 (2019-06-24)

BugFixes

  • Fixed bug where enqueued_time and scheduled_enqueue_time of message being parsed as local timestamp rather than UTC.

0.50.0 (2019-01-17)

Breaking changes

  • Introduces new AMQP-based API.
  • Original HTTP-based API still available under new namespace: azure.servicebus.control_client
  • For full API changes, please see updated reference documentation.

Within the new namespace, the original HTTP-based API from version 0.21.1 remains unchanged (i.e. no additional features or bugfixes) so for those intending to only use HTTP operations - there is no additional benefit in updating at this time.

New Features

  • New API supports message send and receive via AMQP with improved performance and stability.
  • New asynchronous APIs (using asyncio) for send, receive and message handling.
  • Support for message and session auto lock renewal via background thread or async operation.
  • Now supports scheduled message cancellation.

0.21.1 (2017-04-27)

This wheel package is now built with the azure wheel extension

0.21.0 (2017-01-13)

New Features

  • str messages are now accepted in Python 3 and will be encoded in 'utf-8' (will not raise TypeError anymore)
  • broker_properties can now be defined as a dict, and not only a JSON str. datetime, int, float and boolean are converted.
  • #902 add send_topic_message_batch operation (takes an iterable of messages)
  • #902 add send_queue_message_batch operation (takes an iterable of messages)

Bugfixes

  • #820 the code is now more robust to unexpected changes on the SB RestAPI

0.20.3 (2016-08-11)

News

  • #547 Add get dead letter path static methods to Python
  • #513 Add renew lock

Bugfixes

  • #628 Fix custom properties with double quotes

0.20.2 (2016-06-28)

Bugfixes

  • New header in Rest API which breaks the SDK #658 #657

0.20.1 (2015-09-14)

News

  • Create a requests.Session() if the user doesn't pass one in.

0.20.0 (2015-08-31)

Initial release of this package, from the split of the azure package. See the azure package release note for 1.0.0 for details and previous history on Service Bus.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure-servicebus-7.0.0b3.zip (260.4 kB view details)

Uploaded Source

Built Distribution

azure_servicebus-7.0.0b3-py2.py3-none-any.whl (145.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file azure-servicebus-7.0.0b3.zip.

File metadata

  • Download URL: azure-servicebus-7.0.0b3.zip
  • Upload date:
  • Size: 260.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for azure-servicebus-7.0.0b3.zip
Algorithm Hash digest
SHA256 1c7850aa69fc3d3bb9084ebc0d011f899c30c9b4019213ab19976193171e0109
MD5 d83d4ffb0b199c812f6aa74312b5b24b
BLAKE2b-256 30a8da3fb6977324a92aa484fe473c948392d269e9d93740b2de155e50db72a2

See more details on using hashes here.

File details

Details for the file azure_servicebus-7.0.0b3-py2.py3-none-any.whl.

File metadata

  • Download URL: azure_servicebus-7.0.0b3-py2.py3-none-any.whl
  • Upload date:
  • Size: 145.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.46.1 CPython/3.8.3

File hashes

Hashes for azure_servicebus-7.0.0b3-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 2961ff9c1938b6f1a145f4c650e1a7e28fd118c3f85960951e28fad8ac75d29e
MD5 a5220c1b89624518351bb931ab883237
BLAKE2b-256 21c1e68dc3eb541b63d1bbf1569bcb0c733c7712ffa01c2ae47b12256d9a1f0b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page