Skip to main content

Microsoft Azure Event Hubs checkpointer implementation with Blob Storage Client Library for Python

Project description

Azure EventHubs Checkpoint Store client library for Python using Storage Blobs

Azure EventHubs Checkpoint Store is used for storing checkpoints while processing events from Azure Event Hubs. This Checkpoint Store package works as a plug-in package to EventProcessor. It uses Azure Storage Blob as the persistent store for maintaining checkpoints and partition ownership information.

Source code | Package (PyPi) | API reference documentation | Azure Eventhubs documentation | Azure Storage documentation

Getting started

Install the package

$ pip install --pre azure-eventhub-checkpointstoreblob-aio

Prerequisites

  • Python 3.5.3 or later.

  • Microsoft Azure Subscription: To use Azure services, including Azure Event Hubs, you'll need a subscription. If you do not have an existing Azure account, you may sign up for a free trial or use your MSDN subscriber benefits when you create an account.

  • Event Hubs namespace with an Event Hub: To interact with Azure Event Hubs, you'll also need to have a namespace and Event Hub available. If you are not familiar with creating Azure resources, you may wish to follow the step-by-step guide for creating an Event Hub using the Azure portal. There, you can also find detailed instructions for using the Azure CLI, Azure PowerShell, or Azure Resource Manager (ARM) templates to create an Event Hub.

  • Azure Storage Account: You'll need to have an Azure Storage Account and create a Azure Blob Storage Block Container to store the checkpoint data with blobs. You may follow the guide creating an Azure Block Blob Storage Account.

Key concepts

Checkpointing

Checkpointing is a process by which readers mark or commit their position within a partition event sequence. Checkpointing is the responsibility of the consumer and occurs on a per-partition basis within a consumer group. This responsibility means that for each consumer group, each partition reader must keep track of its current position in the event stream, and can inform the service when it considers the data stream complete. If a reader disconnects from a partition, when it reconnects it begins reading at the checkpoint that was previously submitted by the last reader of that partition in that consumer group. When the reader connects, it passes the offset to the event hub to specify the location at which to start reading. In this way, you can use checkpointing to both mark events as "complete" by downstream applications, and to provide resiliency if a failover between readers running on different machines occurs. It is possible to return to older data by specifying a lower offset from this checkpointing process. Through this mechanism, checkpointing enables both failover resiliency and event stream replay.

Offsets & sequence numbers

Both offset & sequence number refer to the position of an event within a partition. You can think of them as a client-side cursor. The offset is a byte numbering of the event. The offset/sequence number enables an event consumer (reader) to specify a point in the event stream from which they want to begin reading events. You can specify a timestamp such that you receive events enqueued only after the given timestamp. Consumers are responsible for storing their own offset values outside of the Event Hubs service. Within a partition, each event includes an offset, sequence number and the timestamp of when it was enqueued.

Examples

Create an Azure Storage Blobs ContainerClient

The easiest way to create a ContainerClient is to use a connection string.

from azure.storage.blob.aio import ContainerClient
container_client = ContainerClient.from_connection_string("my_storageacount_connection_string", container="mycontainer")

For other ways of creating a ContainerClient, go to Blob Storage library for more details.

Create an EventHubClient

The easiest way to create a EventHubClient is to use a connection string.

from azure.eventhub.aio import EventHubClient
eventhub_client = EventHubClient.from_connection_string("my_eventhub_namespace_connection_string", event_hub_path="myeventhub")

For other ways of creating a EventHubClient, refer to EventHubs library for more details.

Consume events using an EventProcessor that uses a BlobPartitionManager to do checkpointing

import asyncio

from azure.eventhub.aio import EventHubClient
from azure.eventhub.aio.eventprocessor import EventProcessor, PartitionProcessor
from azure.storage.blob.aio import ContainerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobPartitionManager

eventhub_connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
storage_container_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
storage_container_name = '<< STORAGE CONTAINER NAME>>'

class MyPartitionProcessor(PartitionProcessor):
    async def process_events(self, events, partition_context):
        if events:
            # write your code here to process events
            # save checkpoint to the data store
            await partition_context.update_checkpoint(events[-1].offset, events[-1].sequence_number)

async def main():
    eventhub_client = EventHubClient.from_connection_string(eventhub_connection_str, receive_timeout=5, retry_total=3)
    storage_container_client = ContainerClient.from_connection_string(storage_container_connection_str, container=storage_container_name)
    partition_manager = BlobPartitionManager(storage_container_client)  # use the BlobPartitonManager to save
    event_processor = EventProcessor(eventhub_client, "$default", MyPartitionProcessor, partition_manager)    
    async with storage_container_client:
        asyncio.ensure_future(event_processor.start())
        await asyncio.sleep(60)  # run for a while
        await event_processor.stop()

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Troubleshooting

General

Enabling logging will be helpful to do trouble shooting. Refer to Logging to enable loggers for related libraries.

Next steps

Examples

Documentation

Reference documentation is available at https://azure.github.io/azure-sdk-for-python/ref/azure.eventhub.extensions.html.

Logging

  • Enable azure.eventhub.extensions.checkpointstoreblobaio logger to collect traces from the library.
  • Enable azure.eventhub.aio.eventprocessor logger to collect traces from package eventprocessor of the azure-eventhub library.
  • Enable azure.eventhub logger to collect traces from the main azure-eventhub library.
  • Enable azure.storage.blob logger to collect traces from azure storage blob library.
  • Enable uamqp logger to collect traces from the underlying uAMQP library.
  • Enable AMQP frame level trace by setting network_tracing=True when creating the client.

Provide Feedback

If you encounter any bugs or have suggestions, please file an issue in the Issues section of the project.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Impressions

Release History

1.0.0b1 (2019-09-10)

New features

  • BlobPartitionManager that uses Azure Blob Storage Block Blob to store EventProcessor checkpoint data

Impressions

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

Built Distribution

File details

Details for the file azure-eventhub-checkpointstoreblob-aio-1.0.0b1.zip.

File metadata

  • Download URL: azure-eventhub-checkpointstoreblob-aio-1.0.0b1.zip
  • Upload date:
  • Size: 20.3 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.14.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/40.6.2 requests-toolbelt/0.9.1 tqdm/4.35.0 CPython/3.6.8

File hashes

Hashes for azure-eventhub-checkpointstoreblob-aio-1.0.0b1.zip
Algorithm Hash digest
SHA256 8d3dd5a0e6d808675bfa31089c2ae22005d59758a878ca825cba904614880cf4
MD5 2bc0c3816f78a9beccbb7defdd9c661b
BLAKE2b-256 c2979744d13fabb07a7bf4aeffcbf9e5b128ab0f561de41e51c9ec6c05983806

See more details on using hashes here.

File details

Details for the file azure_eventhub_checkpointstoreblob_aio-1.0.0b1-py3-none-any.whl.

File metadata

File hashes

Hashes for azure_eventhub_checkpointstoreblob_aio-1.0.0b1-py3-none-any.whl
Algorithm Hash digest
SHA256 0a00d60e48b2fa59be633d86d903bee8aa9182ba90b35619fc701970740a387d
MD5 ff333b0216487a58aff2a48b7a27ecdd
BLAKE2b-256 f665425e68675aaaf3ce0bc9c39ef7dffbcf5732ad8b3177244ccfdc2a8ff194

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page