Skip to main content

Low level, multiprocessing based AWS Kinesis producer & consumer library

Project description

# Kinesis Python

The [official Kinesis python library](https://github.com/awslabs/amazon-kinesis-client-python) requires the use of
Amazon's "MultiLangDaemon", which is a Java executable that operates by piping messages over STDIN/STDOUT.

ಠ\_ಠ

While the desire to have a single implementation of the client library from a maintenance standpoint makes sense for
the team responsible for the KPL, requiring the JRE to be installed and having to account for the overhead of the
stream being consumed by Java and Python is not desireable for teams working in environments without Java.

This is a pure-Python implementation of Kinesis producer and consumer classes that leverages Python's multiprocessing
module to spawn a process per shard and then sends the messages back to the main process via a Queue. It only depends
on the boto3 library.


# Overview

All of the functionality is wrapped in two classes: `KinesisConsumer` and `KinesisProducer`

## Consumer

The consumer works by launching a process per shard in the stream and then implementing the Python iterator protocol.

```python
from kinesis.consumer import KinesisConsumer

consumer = KinesisConsumer(stream_name='my-stream')
for message in consumer:
print "Received message: {0}".format(message)
```

Messages received from each of the shard processes are passed back to the main process through a Python Queue where
they are yielded for processing. Messages are not strictly ordered, but this is a property of Kinesis and not this
implementation.


### Locking, Checkpointing & Multi-instance consumption

When deploying an application with multiple instances DynamoDB can be leveraged as a way to coordinate which instance
is responsible for which shard, as it is not desirable to have each instance process all records.

With or without multiple nodes it is also desirable to checkpoint the stream as you process records so that you can
pickup from where you left off if you restart the consumer.

A "state" backend that leverages DynamoDB allows consumers to coordinate which node is responsible which shards and
where in the stream we are currently reading from.

```python
from kinesis.consumer import KinesisConsumer
from kinesis.state import DynamoDB

consumer = KinesisConsumer(stream_name='my-stream', state=DynamoDB(table_name='my-kinesis-state'))
for message in consumer:
print "Received message: {0}".format(message)
```


## Producer

The producer works by launching a single process for accumulation and publishing to the stream.

```python
from kinesis.producer import KinesisProducer

producer = KinesisProducer(stream_name='my-stream')
producer.put('Hello World from Python')
```

By default the accumulation buffer time is 500ms, or the max record size of 1Mb, whichever occurs first. You can
change the buffer time when you instantiate the producer via the `buffer_time` kwarg, specified in seconds. For
example, if your primary concern is budget and not performance you could accumulate over a 60 second duration.

```python
producer = KinesisProducer(stream_name='my-stream', buffer_time=60)
```

The background process takes precaution to ensure that any accumulated messages are flushed to the stream at
shutdown time through signal handlers and the python atexit module, but it is not fully durable and if you were to
send a `kill -9` to the producer process any accumulated messages would be lost.


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kinesis-python-0.1.5.tar.gz (8.8 kB view details)

Uploaded Source

Built Distributions

kinesis_python-0.1.5-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

kinesis_python-0.1.5-py2-none-any.whl (12.1 kB view details)

Uploaded Python 2

File details

Details for the file kinesis-python-0.1.5.tar.gz.

File metadata

File hashes

Hashes for kinesis-python-0.1.5.tar.gz
Algorithm Hash digest
SHA256 6de1bcfdb439033e3bf9b7d1e6398e576a62964cb12049cd585732c4efcb63f1
MD5 c14efadb64aab08b3fbf775fb554c97d
BLAKE2b-256 cb1568817ac1f497ecc3e6711d848d325237881270e9cc4832df762fcde8fc00

See more details on using hashes here.

File details

Details for the file kinesis_python-0.1.5-py3-none-any.whl.

File metadata

File hashes

Hashes for kinesis_python-0.1.5-py3-none-any.whl
Algorithm Hash digest
SHA256 654a6be32911635f67a3299862e84a399eb7dc7249be82b288176eef22c57425
MD5 1d73932a0c5c848ca07c32d7bf4dbaf1
BLAKE2b-256 a5f21d0a17cf6351e0703cef03edbce0c6d18d835be7196e2670bd6ab775f3b9

See more details on using hashes here.

File details

Details for the file kinesis_python-0.1.5-py2-none-any.whl.

File metadata

File hashes

Hashes for kinesis_python-0.1.5-py2-none-any.whl
Algorithm Hash digest
SHA256 87edabb4568f2c0c9c156e8281ebebab173f645081e790b7c4cb854eba2a3830
MD5 35323cb36a8478f2708c38e870f74bf5
BLAKE2b-256 75c9bf9a3c05801b2c12476a83dac39657978a6cc03a333abbcc6ea5a499b0c1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page