Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version codecov

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Completed features

  • secure connection
  • basic auth
  • connection string parsing
  • streams
    • append
    • batch append (v21.10+)
    • delete
    • read stream
    • read all with stream/event type filters (v21.10+)
    • catch-up subscriptions
    • tombstone
    • filtering
  • persistent subscriptions
    • create
    • read stream
    • read all with filter (v21.10+)
    • update
    • delete
    • list
    • info
    • reply parked events
  • CRUD for projections
  • users

Installation

Using pip:

pip install esdb

Using poetry:

poetry add esdb

Development

  1. Install poetry

  2. Create virtualenv (i.e. using pyenv):

    pyenv install 3.10.5
    pyenv virtualenv 3.10.5 esdb-py
    pyenv local esdb-py
    
  3. Install deps with poetry install

  4. Start eventstore in docker: make run-esdb

  5. Run the tests: pytest tests

Usage

Have a look at tests for more examples.

Connection string examples

DNS discovery with credentials, discovery configuration, node preference and ca file path

esdb+discover://admin:changeit@localhost:2111?discoveryinterval=0&maxdiscoverattempts=3&tlscafile=certs/ca/ca.crt&nodepreference=follower

Single-node insecure connection

esdb://localhost:2111?tls=false

Supported parameters:

  • discoveryInterval
  • gossipTimeout
  • maxDiscoverAttempts
  • nodePreference
  • keepAliveInterval
  • keepAliveTimeout
  • tls
  • tlsCafile
  • tlsVerifyCert
  • defaultDeadline

Connection string can be generated here.

Discovery and node preferences

from esdb import ESClient

client = ESClient("esdb+discover://admin:changeit@localhost:2111?nodepreference=follower")

Connection configuration

from esdb import ESClient

# Connect without TLS
client = ESClient("esdb://localhost:2111?tls=false")

# Secure connection with basic auth and keepalive
client = ESClient("esdb://admin:changeit@localhost:2111?tlscafile=certs/ca/ca.crt&keepaliveinterval=5&keepalivetimeout=5")

Append, Read, Catch-up subscriptions

import asyncio
import datetime
import uuid

from esdb import ESClient


client = ESClient("esdb+discover://admin:changeit@localhost:2111")
stream = f"test-{str(uuid.uuid4())}"


async def streams():
    async with client.connect() as conn:
        # Appending to stream
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )

        # Read up to 10 events
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)

        # Read up to 10 events, backwards
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)

        # Read up to 10 events, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)

        # Read up to 10 events backwards, starting from 5th event
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)

        # Create a catch-up subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)


asyncio.run(streams())

Batch append

import asyncio
import uuid

from esdb import ESClient
from esdb.streams import Message


async def batch_append():
    # Append multiple events in as a single batch
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())

Catch-up subscription to all events with filtering

import uuid
import asyncio

from esdb import ESClient
from esdb.shared import Filter


async def filters():
    async with ESClient("esdb+discover://admin:changeit@localhost:2111").connect() as conn:
        # Append 10 events with the same prefix to random streams
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        # subscribe to events from all streams, filtering by event type
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)


asyncio.run(filters())

Persistent subscriptions

import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction

client = ESClient("esdb+discover://admin:changeit@localhost:2111")

stream = "stream-foo"
group = "group-bar"


async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})

        # create a stream subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint_ms=10000,
            ),
        )

        # create subscription to all events with filtering
        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint_ms=20000,
            ),
        )

    # read from a subscription
    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))

        # get subscription info
        info = await conn.subscriptions.get_info(group, stream)
        assert info.group_name == group

        # delete subscription
        await conn.subscriptions.delete(group, stream)
        
        # list subscriptions
        subs = await conn.subscriptions.list()
        for sub in subs:
            print(sub.total_items)


asyncio.run(persistent())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.3.0.tar.gz (72.4 kB view details)

Uploaded Source

Built Distribution

esdb-0.3.0-py3-none-any.whl (86.9 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.3.0.tar.gz.

File metadata

  • Download URL: esdb-0.3.0.tar.gz
  • Upload date:
  • Size: 72.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.10.2 Darwin/21.6.0

File hashes

Hashes for esdb-0.3.0.tar.gz
Algorithm Hash digest
SHA256 562fe4f558802d7cc11994a8deb5be379eea1cca62ec21bbc09ba662ec15571c
MD5 7ad6fdf4d4ea1a4a7397832a1e66a14b
BLAKE2b-256 f94f0e5f6e14674ea0e5e9a76810f41ac7d3338713da380d73bfe01c64bc44fb

See more details on using hashes here.

File details

Details for the file esdb-0.3.0-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.3.0-py3-none-any.whl
  • Upload date:
  • Size: 86.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.10.2 Darwin/21.6.0

File hashes

Hashes for esdb-0.3.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f8cb04f5b8e28d7feba0217b19239246a2b4482a48dfac42c678bb2d6f34c68d
MD5 6ab7389f814e05d45dff201d8e67788c
BLAKE2b-256 992c4f0fd8894889747390192067baea3f87b37f254d9aa3fb997999a0dfead5

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page