Skip to main content

Low level, multiprocessing based AWS Kinesis producer & consumer library

Project description

https://img.shields.io/travis/NerdWalletOSS/kinesis-python.svg https://img.shields.io/codecov/c/github/NerdWalletOSS/kinesis-python.svg Latest PyPI version

The official Kinesis python library requires the use of Amazon’s “MultiLangDaemon”, which is a Java executable that operates by piping messages over STDIN/STDOUT.

ಠ_ಠ

While the desire to have a single implementation of the client library from a maintenance standpoint makes sense for the team responsible for the KPL, requiring the JRE to be installed and having to account for the overhead of the stream being consumed by Java and Python is not desireable for teams working in environments without Java.

This is a pure-Python implementation of Kinesis producer and consumer classes that leverages Python’s multiprocessing module to spawn a process per shard and then sends the messages back to the main process via a Queue. It only depends on boto3 (AWS SDK), offspring (Subprocess implementation) and six (py2/py3 compatibility).

It also includes a DynamoDB state back-end that allows for multi-instance consumption of multiple shards, and stores the checkpoint data so that you can resume where you left off in a stream following restarts or crashes.

Overview

All of the functionality is wrapped in two classes: KinesisConsumer and KinesisProducer

Consumer

The consumer works by launching a process per shard in the stream and then implementing the Python iterator protocol.

from kinesis.consumer import KinesisConsumer

consumer = KinesisConsumer(stream_name='my-stream')
for message in consumer:
    print "Received message: {0}".format(message)

Messages received from each of the shard processes are passed back to the main process through a Python Queue where they are yielded for processing. Messages are not strictly ordered, but this is a property of Kinesis and not this implementation.

Locking, Checkpointing & Multi-instance consumption

When deploying an application with multiple instances DynamoDB can be leveraged as a way to coordinate which instance is responsible for which shard, as it is not desirable to have each instance process all records.

With or without multiple nodes it is also desirable to checkpoint the stream as you process records so that you can pickup from where you left off if you restart the consumer.

A “state” backend that leverages DynamoDB allows consumers to coordinate which node is responsible which shards and where in the stream we are currently reading from.

from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB

consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state'))
for message in consumer:
    print "Received message: {0}".format(message)

The DynamoDB table must already exist and must have a HASH key of shard, with type S (string).

Producer

The producer works by launching a single process for accumulation and publishing to the stream.

from kinesis.producer import KinesisProducer

producer = KinesisProducer(stream_name='my-stream')
producer.put('Hello World from Python')

By default the accumulation buffer time is 500ms, or the max record size of 1Mb, whichever occurs first. You can change the buffer time when you instantiate the producer via the buffer_time kwarg, specified in seconds. For example, if your primary concern is budget and not performance you could accumulate over a 60 second duration.

producer = KinesisProducer(stream_name='my-stream', buffer_time=60)

The background process takes precaution to ensure that any accumulated messages are flushed to the stream at shutdown time through signal handlers and the python atexit module, but it is not fully durable and if you were to send a kill -9 to the producer process any accumulated messages would be lost.

AWS Permissions

By default the producer, consumer & state classes all use the default boto3 credentials chain. If you wish to alter this you can instantiate your own boto3.Session object and pass it into the constructor via the boto3_session keyword argument of KinesisProducer, KinesisConsumer or DynamoDB.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kinesis-python-0.1.8.tar.gz (9.6 kB view details)

Uploaded Source

Built Distributions

kinesis_python-0.1.8-py3-none-any.whl (10.7 kB view details)

Uploaded Python 3

kinesis_python-0.1.8-py2-none-any.whl (10.7 kB view details)

Uploaded Python 2

File details

Details for the file kinesis-python-0.1.8.tar.gz.

File metadata

  • Download URL: kinesis-python-0.1.8.tar.gz
  • Upload date:
  • Size: 9.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.20.1 setuptools/40.6.2 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/2.7.14

File hashes

Hashes for kinesis-python-0.1.8.tar.gz
Algorithm Hash digest
SHA256 00650e396edfbb604a506b36acdb033405f0869bc23014d1802d657e66ea42bc
MD5 a3bb70439b7fee363dbc02a53bd31fa7
BLAKE2b-256 5bda385c893a883cc7ccad1b56e9f585b43d19f0473ecd9c59585d2256801039

See more details on using hashes here.

File details

Details for the file kinesis_python-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: kinesis_python-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 10.7 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.20.1 setuptools/40.6.2 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/3.5.6

File hashes

Hashes for kinesis_python-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 c1412ae8562b328c04bfd09cda0f3f131914267348b814369e4a3fc081797ac3
MD5 3960314ad91aa01edfef2dd4dc838cee
BLAKE2b-256 d631168d9b0c23686f3f207667dc38239ceddce6c2ebd68bf3e791db2d3cf4ce

See more details on using hashes here.

File details

Details for the file kinesis_python-0.1.8-py2-none-any.whl.

File metadata

  • Download URL: kinesis_python-0.1.8-py2-none-any.whl
  • Upload date:
  • Size: 10.7 kB
  • Tags: Python 2
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.12.1 pkginfo/1.4.2 requests/2.20.1 setuptools/40.6.2 requests-toolbelt/0.8.0 tqdm/4.28.1 CPython/2.7.14

File hashes

Hashes for kinesis_python-0.1.8-py2-none-any.whl
Algorithm Hash digest
SHA256 30b5da18f593dd2c7b0fce9f94dfb737b35dee9a2b67c4448d8fffad7cb9fd76
MD5 2fd60123b834d81b130cde40e64fabc8
BLAKE2b-256 cba7f3a1a8ceab131c9de9f5224b50f4f4bc1441c2f24fbd196e9626d08af027

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page