Kafka integration with asyncio.
Project description
asyncio client for Kafka
AIOKafkaProducer
AIOKafkaProducer is a high-level, asynchronous message producer.
Example of AIOKafkaProducer usage:
import asyncio
from aiokafka import AIOKafkaProducer
@asyncio.coroutine
def produce(loop):
# Just adds message to sending queue
future = yield from producer.send('foobar', b'some_message_bytes')
# waiting for message to be delivered
resp = yield from future
print("Message produced: partition {}; offset {}".format(
resp.partition, resp.offset))
# Also can use a helper to send and wait in 1 call
resp = yield from producer.send_and_wait(
'foobar', key=b'foo', value=b'bar')
resp = yield from producer.send_and_wait(
'foobar', b'message for partition 1', partition=1)
loop = asyncio.get_event_loop()
producer = AIOKafkaProducer(loop=loop, bootstrap_servers='localhost:9092')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(producer.start())
loop.run_until_complete(produce(loop))
# Wait for all pending messages to be delivered or expire
loop.run_until_complete(producer.stop())
loop.close()
AIOKafkaConsumer
AIOKafkaConsumer is a high-level, asynchronous message consumer. It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
Example of AIOKafkaConsumer usage:
import asyncio
from kafka.common import KafkaError
from aiokafka import AIOKafkaConsumer
@asyncio.coroutine
def consume_task(consumer):
while True:
try:
msg = yield from consumer.getone()
print("consumed: ", msg.topic, msg.partition, msg.offset,
msg.key, msg.value, msg.timestamp)
except KafkaError as err:
print("error while consuming message: ", err)
loop = asyncio.get_event_loop()
consumer = AIOKafkaConsumer(
'topic1', 'topic2', loop=loop, bootstrap_servers='localhost:1234')
# Bootstrap client, will get initial cluster metadata
loop.run_until_complete(consumer.start())
c_task = loop.create_task(consume_task(consumer))
try:
loop.run_forever()
finally:
# Will gracefully leave consumer group; perform autocommit if enabled
loop.run_until_complete(consumer.stop())
c_task.cancel()
loop.close()
Running tests
Docker is required to run tests. See https://docs.docker.com/engine/installation for installation notes. Also note, that lz4 compression libraries for python will require python-dev package, or python source header files for compilation on Linux.
Setting up tests requirements (assuming you’re within virtualenv on ubuntu 14.04+):
sudo apt-get install -y libsnappy-dev pip install -r requirements-dev.txt .
Running tests:
make cov
To run tests with a specific version of Kafka (default one is 0.9.0.1) use KAFKA_VERSION variable:
make cov KAFKA_VERSION=0.10.0.0
CHANGES
0.1.4 (2016-11-07)
Bumped python-kafka version to 1.3.1 and Kafka to 0.10.1.0.
Fixed auto version detection, to correctly handle 0.10.0.0 version
Updated Fetch and Produce requests to use v2 with v0.10.0 message format on brokers. This allows a timestamp to be associated with messages.
Changed lz4 compression framing, as it was changed due to KIP-57 in new message format.
Minor refactorings
Big thanks to @fabregas for the hard work on this release (PR #60)
0.1.3 (2016-10-18)
Fixed bug with infinite loop on heartbeats with autocommit=True. #44
Bumped python-kafka to version 1.1.1
Fixed docker test runner with multiple interfaces
Minor documentation fixes
0.1.2 (2016-04-30)
Added Python3.5 usage example to docs
Don’t raise retriable exceptions in 3.5’s async for iterator
Fix Cancellation issue with producer’s send_and_wait method
0.1.1 (2016-04-15)
Fix packaging issues. Removed unneded files from package.
0.1.0 (2016-04-15)
Initial release
Added full support for Kafka 9.0. Older Kafka versions are not tested.
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.