Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version codecov

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Completed features

  • secure connection
  • basic auth
  • connection string parsing
  • streams
    • append
    • batch append (v21.10+)
    • delete
    • read stream
    • read all with stream/event type filters (v21.10+)
    • catch-up subscriptions
    • tombstone
    • filtering
  • persistent subscriptions
    • create
    • read stream
    • read all with filter (v21.10+)
    • update
    • delete
    • list
    • info
    • reply parked events
  • CRUD for projections
  • users

Installation

Using pip:

pip install esdb

Using poetry:

poetry add esdb

Development

  1. Install poetry

  2. Create virtualenv (i.e. using pyenv):

    pyenv install 3.10.5
    pyenv virtualenv 3.10.5 esdb-py
    pyenv local esdb-py
    
  3. Install deps with poetry install

  4. Start eventstore in docker: make run-esdb

  5. Run the tests: pytest tests

Usage

Have a look at tests for more examples.

Discovery and node preferences

from esdb import ESClient
from esdb.client import Preference

client = ESClient(
  "localhost:2112",
  discover=True,  # Discover the available nodes via gossip
  node_preference=Preference.FOLLOWER,  # Connect to a preferred node type
)

Append, Read, Catch-up subscriptions

import asyncio
import datetime
import uuid

from esdb import ESClient

# For insecure connection without basic auth:
# client = ESClient("localhost:2113", insecure=True)
with open("certs/ca/ca.crt", "rb") as fh:
    root_cert = fh.read()

client = ESClient(
    "localhost:2111",
    root_certificates=root_cert,
    username="admin",
    password="changeit",
    keepalive_time_ms=5000,
    keepalive_timeout_ms=10000,
)

stream = f"test-{str(uuid.uuid4())}"


async def streams():
    async with client.connect() as conn:
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )

        print("Forwards!")
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)

        print("Backwards!")
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)

        print("Forwards start from middle!")
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)

        print("Backwards start from middle!")
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)

        # Create a catch-up subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)


asyncio.run(streams())

Batch append

import asyncio
import uuid

from esdb import ESClient
from esdb.streams import Message


async def batch_append():
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("localhost:2113", insecure=True).connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())

Catch-up subscription to all events with filtering

import uuid
import asyncio

from esdb import ESClient
from esdb.shared import Filter


async def filters():
    async with ESClient("localhost:2113", insecure=True).connect() as conn:
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)


asyncio.run(filters())

Persistent subscriptions

import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction

client = ESClient("localhost:2113", insecure=True)

stream = "stream-foo"
group = "group-bar"


async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})

        # create a subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint_ms=10000,
            ),
        )

        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint_ms=20000,
            ),
        )

    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))

        # get subscription info
        info = await conn.subscriptions.get_info(group, stream)
        assert info.group_name == group

        # delete subscription
        await conn.subscriptions.delete(group, stream)



asyncio.run(persistent())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.2.4.tar.gz (69.5 kB view details)

Uploaded Source

Built Distribution

esdb-0.2.4-py3-none-any.whl (84.1 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.2.4.tar.gz.

File metadata

  • Download URL: esdb-0.2.4.tar.gz
  • Upload date:
  • Size: 69.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.4.tar.gz
Algorithm Hash digest
SHA256 931967319772a11083b462ebe70cf317a760c1c0944e24502205efe82abe9d11
MD5 83d101bd716fa7f69a71917c8fcc4693
BLAKE2b-256 fc40e0c32841a675ac87bae2fa5828a6b93a8d1070b5ef7cb30886ab8cee7955

See more details on using hashes here.

File details

Details for the file esdb-0.2.4-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.2.4-py3-none-any.whl
  • Upload date:
  • Size: 84.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.4-py3-none-any.whl
Algorithm Hash digest
SHA256 6a447c5656e6a972d6b79ff237bd0ad3db68d45f0aebdc4190be72deebfc1cab
MD5 20ff68e3eace40a18304186fab137f46
BLAKE2b-256 d5981ac690bc9c9d68ec45ec7250434b9e2f6140c7b9d49965f3000935c15953

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page