Skip to main content

Twisted Python client for Apache Kafka

Project description

Afkak is a Twisted-native Apache Kafka client library. It provides support for:

  • Producing messages, with automatic batching and optional compression.
  • Consuming messages, with group coordination and automatic commit.

Learn more in the documentation, download from PyPI, or review the contribution guidelines. Please report any issues on GitHub.

Status

Afkak supports these Pythons:

  • CPython 2.7
  • CPython 3.5, 3.6, and 3.7 (in Afkak 3.0.0 and later)
  • PyPy and PyPy3 6.0+

We aim to support Kafka 1.1.x and later. Integration tests are run against these Kafka broker versions:

  • 0.9.0.1
  • 1.1.1

Testing against 2.0.0 is planned (see #45).

Newer broker releases will generally function, but not all Afkak features will work on older brokers. In particular, the coordinated consumer won’t work before Kafka 0.9.0.1. We don’t recommend deploying such old releases anyway, as they have serious bugs.

Usage

High level

Note: This code is not meant to be runnable. See producer_example and consumer_example for runnable example code.

from afkak.client import KafkaClient
from afkak.consumer import Consumer
from afkak.producer import Producer
from afkak.common import (OFFSET_EARLIEST, PRODUCER_ACK_ALL_REPLICAS,
    PRODUCER_ACK_LOCAL_WRITE)

kClient = KafkaClient("localhost:9092")

# To send messages
producer = Producer(kClient)
d1 = producer.send_messages("my-topic", msgs=[b"some message"])
d2 = producer.send_messages("my-topic", msgs=[b"takes a list", b"of messages"])
# To get confirmations/errors on the sends, add callbacks to the returned deferreds
d1.addCallbacks(handleResponses, handleErrors)

# To wait for acknowledgements
# PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written to
#                         a local log before sending response
# [ the default ]
# PRODUCER_ACK_ALL_REPLICAS : server will block until the message is committed
#                            by all in sync replicas before sending a response
producer = Producer(kClient,
                    req_acks=Producer.PRODUCER_ACK_LOCAL_WRITE,
                    ack_timeout=2000)

responseD = producer.send_messages("my-topic", msgs=[b"message"])

# Using twisted's @inlineCallbacks:
responses = yield responseD
if response:
    print(response[0].error)
    print(response[0].offset)

# To send messages in batch: You can use a producer with any of the
# partitioners for doing this. The following producer will collect
# messages in batch and send them to Kafka after 20 messages are
# collected or every 60 seconds (whichever comes first). You can
# also batch by number of bytes.
# Notes:
# * If the producer dies before the messages are sent, the caller would
# * not have had the callbacks called on the send_messages() returned
# * deferreds, and so can retry.
# * Calling producer.stop() before the messages are sent will
# errback() the deferred(s) returned from the send_messages call(s)
producer = Producer(kClient, batch_send=True,
                    batch_send_every_n=20,
                    batch_send_every_t=60)
responseD1 = producer.send_messages("my-topic", msgs=[b"message"])
responseD2 = producer.send_messages("my-topic", msgs=[b"message 2"])

# To consume messages
# define a function which takes a list of messages to process and
# possibly returns a deferred which fires when the processing is
# complete.
def processor_func(consumer, messages):
    #  Store_Messages_In_Database may return a deferred
    result = store_messages_in_database(messages)
    # record last processed message
    consumer.commit()
    return result

the_partition = 3  # Consume only from partition 3.
consumer = Consumer(kClient, "my-topic", the_partition, processor_func)
d = consumer.start(OFFSET_EARLIEST)  # Start reading at earliest message
# The deferred returned by consumer.start() will fire when an error
# occurs that can't handled by the consumer, or when consumer.stop()
# is called
yield d

consumer.stop()
kClient.close()

Keyed messages

from afkak.client import KafkaClient
from afkak.producer import Producer
from afkak.partitioner import HashedPartitioner, RoundRobinPartitioner

kafka = KafkaClient("localhost:9092")

# Use the HashedPartitioner so that the producer will use the optional key
# argument on send_messages()
producer = Producer(kafka, partitioner_class=HashedPartitioner)
producer.send_messages("my-topic", "key1", [b"some message"])
producer.send_messages("my-topic", "key2", [b"this method"])

Low level

from afkak.client import KafkaClient
kafka = KafkaClient("localhost:9092")
req = ProduceRequest(topic="my-topic", partition=1,
    messages=[KafkaProtocol.encode_message(b"some message")])
resps = afkak.send_produce_request(payloads=[req], fail_on_error=True)
kafka.close()

resps[0].topic      # b"my-topic"
resps[0].partition  # 1
resps[0].error      # 0 (hopefully)
resps[0].offset     # offset of the first message sent in this request

Install

Afkak releases are available on PyPI.

Because the Afkak dependencies Twisted and python-snappy have binary extension modules you will need to install the Python development headers for the interpreter you wish to use:

Debian/Ubuntu: sudo apt-get install build-essential python-dev python3-dev pypy-dev pypy3-dev libsnappy-dev
OS X brew install python pypy snappy
pip install virtualenv

Then Afkak can be installed with pip as usual:

License

Copyright 2013, 2014, 2015 David Arthur under Apache License, v2.0. See LICENSE

Copyright 2014, 2015 Cyan, Inc. under Apache License, v2.0. See LICENSE

Copyright 2015, 2016, 2017, 2018, 2019 Ciena Corporation under Apache License, v2.0. See LICENSE

This project began as a port of the kafka-python library to Twisted.

See AUTHORS.md for the full contributor list.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

afkak-19.10.0.tar.gz (155.9 kB view details)

Uploaded Source

Built Distribution

afkak-19.10.0-py2.py3-none-any.whl (172.4 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file afkak-19.10.0.tar.gz.

File metadata

  • Download URL: afkak-19.10.0.tar.gz
  • Upload date:
  • Size: 155.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.6.8

File hashes

Hashes for afkak-19.10.0.tar.gz
Algorithm Hash digest
SHA256 c740ed51f8490f7a3b66df4dedc5e340bdbbef228ccbe53a24bf022c0ce92d52
MD5 92f9cc0df408c07a1707dce0d475b54c
BLAKE2b-256 7aee361c795c6f230d3c5d63c4793e11269f993637a3b2b519ac9950cef57811

See more details on using hashes here.

File details

Details for the file afkak-19.10.0-py2.py3-none-any.whl.

File metadata

  • Download URL: afkak-19.10.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 172.4 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.2.0 requests-toolbelt/0.9.1 tqdm/4.36.1 CPython/3.6.8

File hashes

Hashes for afkak-19.10.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 bcf324332ebdf92517fa6ca0af862b401f6a00072f553f5a02c90d409e558ded
MD5 14c29f3c2fdab29061c3105cbe93a68f
BLAKE2b-256 c309983a69bc064a3a48ae81ea81c2b5b9254f79e2f43b3fd493a9067d7ea441

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page