Skip to main content

Microsoft Azure Event Hubs Client Library for Python

Project description

Azure Event Hubs client library for Python

Azure Event Hubs is a big data streaming platform and event ingestion service. It can receive and process millions of events per second.

Use the Event Hubs client library for Python to:

  • Publish events to the Event Hubs service through a sender.

  • Read events from the Event Hubs service through a receiver.

On Python 3.5 and above, it also includes:

  • An async sender and receiver that supports async/await methods.

  • An Event Processor Host module that manages the distribution of partition readers.

Source code | Package (PyPi) | API reference documentation | Product documentation

Getting started

Install the package

Install the Azure Event Hubs client library for Python with pip:

$ pip install azure-eventhub

Prerequisites

  • An Azure subscription.

  • Python 3.4 or later.

  • An existing Event Hubs namespace and event hub. You can create these entities by following the instructions in this article.

Authenticate the client

Interaction with Event Hubs starts with an instance of the EventHubClient class. You need the host name, sas policy name, sas key and event hub name to instantiate the client object.

Get credentials

You can find credential information in Azure Portal.

Create client

There are several ways to instantiate the EventHubClient object and the following code snippets demonstrate one way:

import os
from azure.eventhub import EventHubClient

connection_str = "Endpoint=sb://{}/;SharedAccessKeyName={};SharedAccessKey={};EntityPath={}".format(
    os.environ['EVENT_HUB_HOSTNAME'],
    os.environ['EVENT_HUB_SAS_POLICY'],
    os.environ['EVENT_HUB_SAS_KEY'],
    os.environ['EVENT_HUB_NAME'])
client = EventHubClient.from_connection_string(connection_str)

Key concepts

  • Namespace: An Event Hubs namespace provides a unique scoping container, referenced by its fully qualified domain name, in which you create one or more event hubs or Kafka topics.

  • Event publishers: Any entity that sends data to an event hub is an event producer, or event publisher. Event publishers can publish events using HTTPS or AMQP 1.0 or Kafka 1.0 and later. Event publishers use a Shared Access Signature (SAS) token to identify themselves to an event hub, and can have a unique identity, or use a common SAS token.

  • Event consumers: Any entity that reads event data from an event hub is an event consumer. All Event Hubs consumers connect via the AMQP 1.0 session and events are delivered through the session as they become available. The client does not need to poll for data availability.

  • SAS tokens: Event Hubs uses Shared Access Signatures, which are available at the namespace and event hub level. A SAS token is generated from a SAS key and is an SHA hash of a URL, encoded in a specific format. Using the name of the key (policy) and the token, Event Hubs can regenerate the hash and thus authenticate the sender.

For more information about these concepts, see Features and terminology in Azure Event Hubs.

Examples

The following sections provide several code snippets covering some of the most common Event Hubs tasks, including:

Send event data

Sends an event data and blocks until acknowledgement is received or operation times out.

client = EventHubClient.from_connection_string(connection_str)
sender = client.add_sender(partition="0")
 try:
    client.run()
    event_data = EventData(b"A single event")
    sender.send(event_data)
except:
    raise
finally:
    client.stop()

Receive event data

Receive events from the EventHub.

client = EventHubClient.from_connection_string(connection_str)
receiver = client.add_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
 try:
    client.run()
    logger = logging.getLogger("azure.eventhub")
    received = receiver.receive(timeout=5, max_batch_size=100)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
except:
    raise
finally:
    client.stop()

Async send event data

Sends an event data and asynchronously waits until acknowledgement is received or operation times out.

client = EventHubClientAsync.from_connection_string(connection_str)
sender = client.add_async_sender(partition="0")
try:
    await client.run_async()
    event_data = EventData(b"A single event")
    await sender.send(event_data)
except:
    raise
finally:
    await client.stop_async()

Async receive event data

Receive events asynchronously from the EventHub.

client = EventHubClientAsync.from_connection_string(connection_str)
receiver = client.add_async_receiver(consumer_group="$default", partition="0", offset=Offset('@latest'))
try:
    await client.run_async()
    logger = logging.getLogger("azure.eventhub")
    received = await receiver.receive(timeout=5)
    for event_data in received:
        logger.info("Message received:{}".format(event_data.body_as_str()))
except:
    raise
finally:
    await client.stop_async()

Troubleshooting

General

The Event Hubs APIs generate exceptions that can fall into the following categories, along with the associated action you can take to try to fix them.

  • User coding error: System.ArgumentException, System.InvalidOperationException, System.OperationCanceledException, System.Runtime.Serialization.SerializationException. General action: try to fix the code before proceeding.

  • Setup/configuration error: Microsoft.ServiceBus.Messaging.MessagingEntityNotFoundException, Microsoft.Azure.EventHubs.MessagingEntityNotFoundException, System.UnauthorizedAccessException. General action: review your configuration and change if necessary.

  • Transient exceptions: Microsoft.ServiceBus.Messaging.MessagingException, Microsoft.ServiceBus.Messaging.ServerBusyException, Microsoft.Azure.EventHubs.ServerBusyException, Microsoft.ServiceBus.Messaging.MessagingCommunicationException. General action: retry the operation or notify users.

  • Other exceptions: System.Transactions.TransactionException, System.TimeoutException, Microsoft.ServiceBus.Messaging.MessageLockLostException, Microsoft.ServiceBus.Messaging.SessionLockLostException. General action: specific to the exception type; refer to the table in Event Hubs messaging exceptions.

For more detailed infromation about excpetions and how to deal with them , see Event Hubs messaging exceptions.

Next steps

Examples

  • ./examples/send.py - use sender to publish events

  • ./examples/recv.py - use receiver to read events

  • ./examples/send_async.py - async/await support of a sender

  • ./examples/recv_async.py - async/await support of a receiver

  • ./examples/eph.py - event processor host

Documentation

Reference documentation is available at docs.microsoft.com/python/api/azure-eventhub.

Logging

  • enable ‘azure.eventhub’ logger to collect traces from the library

  • enable ‘uamqp’ logger to collect traces from the underlying uAMQP library

  • enable AMQP frame level trace by setting debug=True when creating the Client

Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the Issues section of the project.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Release History

1.3.3 (2019-12-4)

Features

  • Added reconnect_timeout and max_reconnect_tries parameters to receive functions for better control of connection behaviour during receive.

  • Added an option release_partition_on_checkpoint_failure to EPHOptions for EventProcessorHost to instruct the EventProcessorHost to fail fast on a checkpoint failure and proactively release the partition. This should reduce spurious reprocessing of non-checkpointed events, at the cost of a small amount of additional latency if the checkpoint interruption was actually transient.

BugFixes

  • Increments UAMQP dependency min version to 1.2.5 to include a set of fixes, including handling of large messages and mitigation of segfaults.

  • Fixes bug preventing application_properties from being transmitted when set individually in key-value form.

  • Fixed send timeout threadthrough to sender so it is now passed in proper units and leveraged within UAMQP.

  • Fixed bug where on reconnect, receive functions returned an empty list.

  • Fixed bug in partition pump logger interfering with certain failure mode logs.

  • Fixed bug to pass proper args to process_error_async within EventHubPartitionPump.

  • Demoted error-level logging in the cases of EPH existing leases not found or out-of-date leases being ignored. These will now be logged at info-level.

1.3.2 (2019-09-18)

BugFixes

  • Fixed bug where errors were not handled when EventProcessorHost was initializing EventHubClient.

1.3.1 (2019-02-28)

BugFixes

  • Fixed bug where datetime offset filter was using a local timestamp rather than UTC.

  • Fixed stackoverflow error in continuous connection reconnect attempts.

1.3.0 (2019-01-29)

Bugfixes

  • Added support for auto reconnect on token expiration and other auth errors (issue #89).

Features

  • Added ability to create ServiceBusClient from an existing SAS auth token, including provding a function to auto-renew that token on expiry.

  • Added support for storing a custom EPH context value in checkpoint (PR #84, thanks @konstantinmiller)

1.2.0 (2018-11-29)

  • Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).

  • Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)

1.1.1 (2018-10-03)

  • Fixed bug in Azure namespace package.

1.1.0 (2018-09-21)

  • Changes to AzureStorageCheckpointLeaseManager parameters to support other connection options (issue #61):

    • The storage_account_name, storage_account_key and lease_container_name arguments are now optional keyword arguments.

    • Added a sas_token argument that must be specified with storage_account_name in place of storage_account_key.

    • Added an endpoint_suffix argument to support storage endpoints in National Clouds.

    • Added a connection_string argument that, if specified, overrides all other endpoint arguments.

    • The lease_container_name argument now defaults to “eph-leases” if not specified.

  • Fix for clients failing to start if run called multipled times (issue #64).

  • Added convenience methods body_as_str and body_as_json to EventData object for easier processing of message data.

1.0.0 (2018-08-22)

  • API stable.

  • Renamed internal _async module to async_ops for docs generation.

  • Added optional auth_timeout parameter to EventHubClient and EventHubClientAsync to configure how long to allow for token negotiation to complete. Default is 60 seconds.

  • Added optional send_timeout parameter to EventHubClient.add_sender and EventHubClientAsync.add_async_sender to determine the timeout for Events to be successfully sent. Default value is 60 seconds.

  • Reformatted logging for performance.

0.2.0 (2018-08-06)

  • Stability improvements for EPH.

  • Updated uAMQP version.

  • Added new configuration options for Sender and Receiver; keep_alive and auto_reconnect. These flags have been added to the following:

    • EventHubClient.add_receiver

    • EventHubClient.add_sender

    • EventHubClientAsync.add_async_receiver

    • EventHubClientAsync.add_async_sender

    • EPHOptions.keey_alive_interval

    • EPHOptions.auto_reconnect_on_error

0.2.0rc2 (2018-07-29)

  • Breaking change EventData.offset will now return an object of type ~uamqp.common.Offset rather than str. The original string value can be retrieved from ~uamqp.common.Offset.value.

  • Each sender/receiver will now run in its own independent connection.

  • Updated uAMQP dependency to 0.2.0

  • Fixed issue with IoTHub clients not being able to retrieve partition information.

  • Added support for HTTP proxy settings to both EventHubClient and EPH.

  • Added error handling policy to automatically reconnect on retryable error.

  • Added keep-alive thread for maintaining an unused connection.

0.2.0rc1 (2018-07-06)

  • Breaking change Restructured library to support Python 3.7. Submodule async has been renamed and all classes from this module can now be imported from azure.eventhub directly.

  • Breaking change Removed optional callback argument from Receiver.receive and AsyncReceiver.receive.

  • Breaking change EventData.properties has been renamed to EventData.application_properties. This removes the potential for messages to be processed via callback for not yet returned in the batch.

  • Updated uAMQP dependency to v0.1.0

  • Added support for constructing IoTHub connections.

  • Fixed memory leak in receive operations.

  • Dropped Python 2.7 wheel support.

0.2.0b2 (2018-05-29)

  • Added namespace_suffix to EventHubConfig() to support national clouds.

  • Added device_id attribute to EventData to support IoT Hub use cases.

  • Added message header to workaround service bug for PartitionKey support.

  • Updated uAMQP dependency to vRC1.

0.2.0b1 (2018-04-20)

  • Updated uAMQP to latest version.

  • Further testing and minor bug fixes.

0.2.0a2 (2018-04-02)

  • Updated uAQMP dependency.

0.2.0a1 (unreleased)

  • Swapped out Proton dependency for uAMQP.

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

azure-eventhub-1.3.3.zip (74.5 kB view details)

Uploaded Source

Built Distribution

azure_eventhub-1.3.3-py2.py3-none-any.whl (60.3 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file azure-eventhub-1.3.3.zip.

File metadata

  • Download URL: azure-eventhub-1.3.3.zip
  • Upload date:
  • Size: 74.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8

File hashes

Hashes for azure-eventhub-1.3.3.zip
Algorithm Hash digest
SHA256 42fb05027d31d06a5a5cea9287191d57b172e4e8ecc1f4e9dd174907b8270530
MD5 3c1d7039908e345a0f9649813dfe936e
BLAKE2b-256 e4662c7018c2ea6fb045dd9e285f7b8cc258465d9396e3a862610bca6b4ce2db

See more details on using hashes here.

File details

Details for the file azure_eventhub-1.3.3-py2.py3-none-any.whl.

File metadata

  • Download URL: azure_eventhub-1.3.3-py2.py3-none-any.whl
  • Upload date:
  • Size: 60.3 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.40.2 CPython/3.6.8

File hashes

Hashes for azure_eventhub-1.3.3-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 bd6cc86e20068191511b3195c0ad298319935fc96454bb9f96794478e495d454
MD5 d0b1b9f3420298497939f6d59b7085bf
BLAKE2b-256 fdceee0fac84acc5f3ed6f1494a9f74bf57a793b56f8d085f95a1f8024454586

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page