Microsoft Azure Event Hubs Client Library for Python
Project description
Azure Event Hubs client library for Python
Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second.
Use the Event Hubs client library for Python to:
Publish events to the Event Hubs service through a sender.
Read events from the Event Hubs service through a receiver.
On Python 3.5 and above, it also includes:
An async sender and receiver that supports async/await methods.
An Event Processor Host module that manages the distribution of partition readers.
Source code | Package (PyPi) | API reference documentation | Product documentation
Getting started
Install the package
Install the Azure Event Hubs client library for Python with pip:
$ pip install azure-eventhub
Prerequisites
An Azure subscription.
Python 3.4 or later.
An existing Event Hubs namespace and event hub. You can create these entities by following the instructions in this article.
Authenticate the client
Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, sas policy name, sas key and event hub name to instantiate the client object.
Get credentials
You can find credential information in Azure Portal.
Create client
There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate one way:
import os
from azure.eventhub import EventHubClient
connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
os.environ['EVENT_HUB_HOSTNAME'],
os.environ['EVENT_HUB_SAS_POLICY'],
os.environ['EVENT_HUB_SAS_KEY'],
os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)
Key concepts
Namespace: An Event Hubs namespace provides a unique scoping container, referenced by its fully qualified domain name, in which you create one or more event hubs or Kafka topics.
Event publishers: Any entity that sends data to an event hub is an event producer, or event publisher. Event publishers can publish events using HTTPS or AMQP 1.0 or Kafka 1.0 and later. Event publishers use a Shared Access Signature (SAS) token to identify themselves to an event hub, and can have a unique identity, or use a common SAS token.
Event consumers: Any entity that reads event data from an event hub is an event consumer. All Event Hubs consumers connect via the AMQP 1.0 session and events are delivered through the session as they become available. The client does not need to poll for data availability.
SAS tokens: Event Hubs uses Shared Access Signatures, which are available at the namespace and event hub level. A SAS token is generated from a SAS key and is an SHA hash of a URL, encoded in a specific format. Using the name of the key (policy) and the token, Event Hubs can regenerate the hash and thus authenticate the sender.
For more information about these concepts, see Features and terminology in Azure Event Hubs.
Examples
The following sections provide several code snippets covering some of the most common Event Hubs tasks, including:
Send event data
Sends an event data and blocks until acknowledgement is received or operation times out.
client = EventHubClient.from_connection_string(connection_str)
sender = client.add_sender(partition="0")
try:
client.run()
event_data = EventData(b"A single event")
sender.send(event_data)
except:
raise
finally:
client.stop()
Receive event data
Receive events from the EventHub.
client = EventHubClient.from_connection_string(connection_str)
receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
try:
client.run()
logger = logging.getLogger("azure.eventhub")
received = receiver.receive(timeout=5, max_batch_size=100)
for event_data in received:
logger.info("Message received:{}".format(event_data.body_as_str()))
except:
raise
finally:
client.stop()
Async send event data
Sends an event data and asynchronously waits until acknowledgement is received or operation times out.
client = EventHubClientAsync.from_connection_string(connection_str)
sender = client.add_async_sender(partition="0")
try:
await client.run_async()
event_data = EventData(b"A single event")
await sender.send(event_data)
except:
raise
finally:
await client.stop_async()
Async receive event data
Receive events asynchronously from the EventHub.
client = EventHubClientAsync.from_connection_string(connection_str)
receiver = client.add_async_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
try:
await client.run_async()
logger = logging.getLogger("azure.eventhub")
received = await receiver.receive(timeout=5)
for event_data in received:
logger.info("Message received:{}".format(event_data.body_as_str()))
except:
raise
finally:
await client.stop_async()
Troubleshooting
General
The Event Hubs APIs generate exceptions that can fall into the following categories, along with the associated action you can take to try to fix them.
User coding error: System.ArgumentException, System.InvalidOperationException, System.OperationCanceledException, System.Runtime.Serialization.SerializationException. General action: try to fix the code before proceeding.
Setup/configuration error: Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException, Microsoft.Azure.EventHubs.MessagingEntityNotFoundException, System.UnauthorizedAccessException. General action: review your configuration and change if necessary.
Transient exceptions: Microsoft.ServiceBus.Messaging.MessagingException, Microsoft.ServiceBus.Messaging.ServerBusyException, Microsoft.Azure.EventHubs.ServerBusyException, Microsoft.ServiceBus.Messaging.MessagingCommunicationException. General action: retry the operation or notify users.
Other exceptions: System.Transactions.TransactionException, System.TimeoutException, Microsoft.ServiceBus.Messaging.MessageLockLostException, Microsoft.ServiceBus.Messaging.SessionLockLostException. General action: specific to the exception type; refer to the table in Event Hubs messaging exceptions.
For more detailed infromation about excpetions and how to deal with them , see Event Hubs messaging exceptions.
Next steps
Examples
./examples/send.py - use sender to publish events
./examples/recv.py - use receiver to read events
./examples/send_async.py - async/await support of a sender
./examples/recv_async.py - async/await support of a receiver
./examples/eph.py - event processor host
Documentation
Reference documentation is available at docs.microsoft.com/python/api/azure-eventhub.
Logging
enable ‘azure.eventhub’ logger to collect traces from the library
enable ‘uamqp’ logger to collect traces from the underlying uAMQP library
enable AMQP frame level trace by setting debug=True when creating the Client
Provide Feedback
If you encounter any bugs or have suggestions, please file an issue in the Issues section of the project.
Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.
Release History
1.3.3 (2019-12-4)
Features
Added reconnect_timeout and max_reconnect_tries parameters to receive functions for better control of connection behaviour during receive.
Added an option release_partition_on_checkpoint_failure to EPHOptions for EventProcessorHost to instruct the EventProcessorHost to fail fast on a checkpoint failure and proactively release the partition. This should reduce spurious reprocessing of non-checkpointed events, at the cost of a small amount of additional latency if the checkpoint interruption was actually transient.
BugFixes
Increments UAMQP dependency min version to 1.2.5 to include a set of fixes, including handling of large messages and mitigation of segfaults.
Fixes bug preventing application_properties from being transmitted when set individually in key-value form.
Fixed send timeout threadthrough to sender so it is now passed in proper units and leveraged within UAMQP.
Fixed bug where on reconnect, receive functions returned an empty list.
Fixed bug in partition pump logger interfering with certain failure mode logs.
Fixed bug to pass proper args to process_error_async within EventHubPartitionPump.
Demoted error-level logging in the cases of EPH existing leases not found or out-of-date leases being ignored. These will now be logged at info-level.
1.3.2 (2019-09-18)
BugFixes
Fixed bug where errors were not handled when EventProcessorHost was initializing EventHubClient.
1.3.1 (2019-02-28)
BugFixes
Fixed bug where datetime offset filter was using a local timestamp rather than UTC.
Fixed stackoverflow error in continuous connection reconnect attempts.
1.3.0 (2019-01-29)
Bugfixes
Added support for auto reconnect on token expiration and other auth errors (issue #89).
Features
Added ability to create ServiceBusClient from an existing SAS auth token, including provding a function to auto-renew that token on expiry.
Added support for storing a custom EPH context value in checkpoint (PR #84, thanks @konstantinmiller)
1.2.0 (2018-11-29)
Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).
Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)
1.1.1 (2018-10-03)
Fixed bug in Azure namespace package.
1.1.0 (2018-09-21)
Changes to AzureStorageCheckpointLeaseManager parameters to support other connection options (issue #61):
The storage_account_name, storage_account_key and lease_container_name arguments are now optional keyword arguments.
Added a sas_token argument that must be specified with storage_account_name in place of storage_account_key.
Added an endpoint_suffix argument to support storage endpoints in National Clouds.
Added a connection_string argument that, if specified, overrides all other endpoint arguments.
The lease_container_name argument now defaults to “eph-leases” if not specified.
Fix for clients failing to start if run called multipled times (issue #64).
Added convenience methods body_as_str and body_as_json to EventData object for easier processing of message data.
1.0.0 (2018-08-22)
API stable.
Renamed internal _async module to async_ops for docs generation.
Added optional auth_timeout parameter to EventHubClient and EventHubClientAsync to configure how long to allow for token negotiation to complete. Default is 60 seconds.
Added optional send_timeout parameter to EventHubClient.add_sender and EventHubClientAsync.add_async_sender to determine the timeout for Events to be successfully sent. Default value is 60 seconds.
Reformatted logging for performance.
0.2.0 (2018-08-06)
Stability improvements for EPH.
Updated uAMQP version.
Added new configuration options for Sender and Receiver; keep_alive and auto_reconnect. These flags have been added to the following:
EventHubClient.add_receiver
EventHubClient.add_sender
EventHubClientAsync.add_async_receiver
EventHubClientAsync.add_async_sender
EPHOptions.keey_alive_interval
EPHOptions.auto_reconnect_on_error
0.2.0rc2 (2018-07-29)
Breaking change EventData.offset will now return an object of type ~uamqp.common.Offset rather than str. The original string value can be retrieved from ~uamqp.common.Offset.value.
Each sender/receiver will now run in its own independent connection.
Updated uAMQP dependency to 0.2.0
Fixed issue with IoTHub clients not being able to retrieve partition information.
Added support for HTTP proxy settings to both EventHubClient and EPH.
Added error handling policy to automatically reconnect on retryable error.
Added keep-alive thread for maintaining an unused connection.
0.2.0rc1 (2018-07-06)
Breaking change Restructured library to support Python 3.7. Submodule async has been renamed and all classes from this module can now be imported from azure.eventhub directly.
Breaking change Removed optional callback argument from Receiver.receive and AsyncReceiver.receive.
Breaking change EventData.properties has been renamed to EventData.application_properties. This removes the potential for messages to be processed via callback for not yet returned in the batch.
Updated uAMQP dependency to v0.1.0
Added support for constructing IoTHub connections.
Fixed memory leak in receive operations.
Dropped Python 2.7 wheel support.
0.2.0b2 (2018-05-29)
Added namespace_suffix to EventHubConfig() to support national clouds.
Added device_id attribute to EventData to support IoT Hub use cases.
Added message header to workaround service bug for PartitionKey support.
Updated uAMQP dependency to vRC1.
0.2.0b1 (2018-04-20)
Updated uAMQP to latest version.
Further testing and minor bug fixes.
0.2.0a2 (2018-04-02)
Updated uAQMP dependency.
0.2.0a1 (unreleased)
Swapped out Proton dependency for uAMQP.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file azure-eventhub-1.3.3.zip
.
File metadata
- Download URL: azure-eventhub-1.3.3.zip
- Upload date:
- Size: 74.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 42fb05027d31d06a5a5cea9287191d57b172e4e8ecc1f4e9dd174907b8270530 |
|
MD5 | 3c1d7039908e345a0f9649813dfe936e |
|
BLAKE2b-256 | e4662c7018c2ea6fb045dd9e285f7b8cc258465d9396e3a862610bca6b4ce2db |
File details
Details for the file azure_eventhub-1.3.3-py2.py3-none-any.whl
.
File metadata
- Download URL: azure_eventhub-1.3.3-py2.py3-none-any.whl
- Upload date:
- Size: 60.3 kB
- Tags: Python 2, Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bd6cc86e20068191511b3195c0ad298319935fc96454bb9f96794478e495d454 |
|
MD5 | d0b1b9f3420298497939f6d59b7085bf |
|
BLAKE2b-256 | fdceee0fac84acc5f3ed6f1494a9f74bf57a793b56f8d085f95a1f8024454586 |