Skip to main content

Tools to sink kafka messages to a database table

Project description

dbsink

Read from a kafka topic and sink to a database table, one row per message.

This is not unlike the Kafka Connect JdbcConnector. This project has a much lower bar of entry and doesn't require diving into the Kafka Connect ecosystem. I wrote the equivilent to this project using a custom JdbcConnector and it was getting out of control and was basically un-testable. So here we are.

You can choose to unpack the data as avro, msgpack or the default json. avro requires an additional registry parameter.

Docker images: https://hub.docker.com/r/axiom/dbsink/builds

WHY?

I needed to read from well-defined kafka topics and store the results in a database table so collaborators could interact with the data in a more familiar way.

It is also a very convienent and easy to setup PostgREST on top of the resulting tables to get a quick read-only REST API on top of the tabled messages.

Mapping messages to tables

You can define custom mappings between messages and tables using a python class. You may register your custom mappings with the dbsink.maps entrypoint to have them available to dbsink at run-time.

entry_points = {
    'dbsink.maps': [
        'YourCustomMap    = you.custom.map.module:CustomMapClass',
        # ...
    ]
}

Custom mapping classes should inherit from the BaseMap class in dbsink and override the following functions as needed:

  • upsert_constraint_name - Name of the constraint to use for upserting data. Set to to None to disable upserting. Use this class property when creating the upsert constraint on your table (see example below).

  • unique_index_name - Unique index name based on the table name. Use this if defining a single unique index on your table.

  • sequence_name - Unique sequence name based on the table name. Use this if defining a single sequence column on your table.

  • _check_key - Checks for validity of a message's key before trying to sink. Return True if valid and raise an error if not.

  • _check_value - Checks for validity of a message's value before trying to sink. Return True if valid and raise an error if not.

  • schema - A list of SQLAlchmy Column, Index, and Constraint schema definitions to use in table creation and updating. This fully describes your table's schema.

  • message_to_values - A function accepting key and value arguments and returning a tuple key, dict where the dict is the values to pass to SQLAlchemy's insert().values method. The value argument to this function will already be unpacked if avro or msgpack packing was specified.

    insert(table).values(
      # dict_returned_ends_up_here
    )
    

Example

A simple example is the StringMap mapping included with dbsink

from datetime import datetime

import pytz
import sqlalchemy as sql
import simplejson as json

from dbsink.maps import BaseMap


class StringMap(BaseMap):

    @property
    def upsert_constraint_name(self):
        return None  # Ignore upserts

    def _check_key(self, key):
        return True  # All keys are valid

    def _check_value(self, value):
        # Make sure values are JSON parsable
        _ = json.loads(json.dumps(value, ignore_nan=True))
        return True

    @property
    def schema(self):
        return [
            sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
            sql.Column('sinked',   sql.DateTime(timezone=True), index=True),
            sql.Column('key',      sql.String, default='', index=True),
            sql.Column('payload',  sql.String)
        ]

    def message_to_values(self, key, value):
        # Raises if invalid. This calls `._check_key` and `._check_value`
        self.check(key, value)

        values = {
            'sinked':  datetime.utcnow().replace(tzinfo=pytz.utc).isoformat(),
            'key':     key,
            'payload': json.dumps(value),
        }

        return key, values

Advanced Example

There are no restrictions on table schemas or how you map the message data into the schema. Take this example below that uses a PostGIS column.

from datetime import datetime

import pytz
import sqlalchemy as sql
import simplejson as json
from shapely.geometry import shape
from geoalchemy2.types import Geography

from dbsink.maps import BaseMap


class NamedGenericGeography(BaseMap):

    def _check_key(self, key):
        return True  # All keys are valid

    def _check_value(self, value):
        # Make sure values are JSON parsable
        _ = json.loads(json.dumps(value, ignore_nan=True))
        return True

    @property
    def schema(self):
        return [
            sql.Column('id',       sql.Integer, sql.Sequence(self.sequence_name), primary_key=True),
            sql.Column('name',     sql.String, default='', index=True),
            sql.Column('time',     sql.DateTime(timezone=True), index=True),
            sql.Column('geom',     Geography(srid=4326)),
            sql.Index(
                self.unique_index_name,
                'name',
                'time',
                unique=True,
            ),
            sql.UniqueConstraint(
                'name',
                'time',
                name=self.upsert_constraint_name
            )
        ]

    def message_to_values(self, key, value):
        """ Assumes a message format of
        {
          "time": 1000000000, # unix epoch
          "name": "my cool thing",
          "geojson": {
            "geometry": {
              "type": "Polygon",
              "coordinates": [ [ [ -118.532116484818843, 32.107425500492766 ], [ -118.457544847012443, 32.107425500492702 ], [ -118.457544847012443, 32.054517056541435 ], [ -118.532116484818872, 32.054517056541464 ], [ -118.532116484818843, 32.107425500492766 ] ] ]
            }
          }
        }
        """
        # Raises if invalid
        self.check(key, value)

        # GeoJSON `geometry` attribute to WKT
        geometry = shape(value['geojson']['geometry']).wkt

        values = {
            'name': value['name']
            'time': datetime.fromtimestamp(value['time'], pytz.utc).isoformat()
            'geom': geometry
        }

        return key, values

Configuration

This program uses Click for the CLI interface. For all options please use the help:

$ dbsink --help

Environmental Variables

All configuration options can be specified with environmental variables using the pattern DBSINK_[argument_name]=[value]. For more information see the click documentation.

DBSINK_TOPIC="topic-to-listen-to" \
DBSINK_LOOKUP="StringMap" \
DBSINK_TABLE="MyCoolTable" \
DBSINK_CONSUMER="myconsumer" \
DBSINK_PACKING="msgpack" \
DBSINK_OFFSET="earlist" \
DBSINK_DROP="true" \
DBSINK_VERBOSE="1" \
    dbsink

Testing

You can run the tests using pytest. To run the integration tests, start a database with docker run -p 30300:5432 --name dbsink-int-testing-db -e POSTGRES_USER=sink -e POSTGRES_PASSWORD=sink -e POSTGRES_DB=sink -d mdillon/postgis:11 and run pytest -m integration

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dbsink-2.6.0.tar.gz (17.1 kB view details)

Uploaded Source

Built Distribution

dbsink-2.6.0-py3-none-any.whl (15.6 kB view details)

Uploaded Python 3

File details

Details for the file dbsink-2.6.0.tar.gz.

File metadata

  • Download URL: dbsink-2.6.0.tar.gz
  • Upload date:
  • Size: 17.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2

File hashes

Hashes for dbsink-2.6.0.tar.gz
Algorithm Hash digest
SHA256 9b94b458e6887234bb7cd6e422f6cc41ea3dcbc758d829de5ad5aac8d16563ba
MD5 2411f078c69a3e5482573a3a6b9d7e04
BLAKE2b-256 9ac0776ca96a601693e70c64a837519b1ffd919b0de3f77a6ee8578d14b70d06

See more details on using hashes here.

Provenance

File details

Details for the file dbsink-2.6.0-py3-none-any.whl.

File metadata

  • Download URL: dbsink-2.6.0-py3-none-any.whl
  • Upload date:
  • Size: 15.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/46.1.3 requests-toolbelt/0.9.1 tqdm/4.44.1 CPython/3.8.2

File hashes

Hashes for dbsink-2.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 c72d186e9877e292bd34fa6b71f8cb3a6b3fd3cd6de09f569a66a7ccb4df424a
MD5 cd51db33e8b81fd16dd1ee0a0571ff3c
BLAKE2b-256 17e3d782fa3da27c8897ef6dfcdb0f55fd8b2b408f17e74e01b0d88feb8837ef

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page