Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version codecov

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Completed features

  • secure connection
  • basic auth
  • connection string parsing
  • streams
    • append
    • batch append (v21.10+)
    • delete
    • read stream
    • read all with stream/event type filters (v21.10+)
    • transient subscriptions
    • tombstone
    • filtering
  • persistent subscriptions
    • create
    • read stream
    • read all with filter (v21.10+)
    • update
    • delete
    • list
    • info
    • reply parked events
  • CRUD for projections
  • users

Installation

Using pip:

pip install esdb

Using poetry:

poetry add esdb

Development

  1. Install poetry
  2. Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
  1. Install deps with poetry install
  2. Start eventstore in docker: make run-esdb
  3. Run the tests: pytest tests

Usage

Have a look at tests for more examples.

Discovery and node preferences

from esdb import ESClient
from esdb.client import Preference

client = ESClient(
  endpoint="localhost:2112",
  discover=True,  # Discover the available nodes via gossip
  node_preference=Preference.FOLLOWER,  # Connect to a preferred node type
)

Append/Read

import asyncio
import datetime
import uuid

from esdb import ESClient

# For insecure connection without basic auth:
# client = ESClient("localhost:2113", insecure=True)
with open("certs/ca/ca.crt", "rb") as fh:
    root_cert = fh.read()

client = ESClient(
    "localhost:2111",
    root_certificates=root_cert,
    username="admin",
    password="changeit",
    keepalive_time_ms=5000,
    keepalive_timeout_ms=10000,
)

stream = f"test-{str(uuid.uuid4())}"


async def streams():
    async with client.connect() as conn:
        for i in range(10):
            append_result = await conn.streams.append(
                stream=stream,
                event_type="test_event",
                data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
            )

        print("Forwards!")
        async for result in conn.streams.read(stream=stream, count=10):
            print(result.data)

        print("Backwards!")
        async for result in conn.streams.read(stream=stream, count=10, backwards=True):
            print(result.data)

        print("Forwards start from middle!")
        async for result in conn.streams.read(stream=stream, count=10, revision=5):
            print(result.data)

        print("Backwards start from middle!")
        async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
            print(result.data)

        # Create a transient subscription to a stream
        async for result in conn.streams.read(stream=stream, subscribe=True):
            print(result.data)


asyncio.run(streams())

Batch append

import asyncio
import uuid

from esdb import ESClient
from esdb.streams import Message


async def batch_append():
    # Batch append is not supported on EventStore < v21.10
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("localhost:2113", insecure=True).connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())

Transient subscription to all events with filtering

import uuid
import asyncio

from esdb import ESClient
from esdb.shared import Filter


async def filters():
    async with ESClient("localhost:2113", insecure=True).connect() as conn:
        for i in range(10):
            await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
        async for event in conn.streams.read_all(
                subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
                filter_by=Filter(
                    kind=Filter.Kind.EVENT_TYPE,
                    regex="^prefix-",
                    # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
                    checkpoint_interval_multiplier=1000,
                ),
        ):
            print(event)


asyncio.run(filters())

Persistent subscriptions

import asyncio
from esdb import ESClient
from esdb.shared import Filter
from esdb.subscriptions import SubscriptionSettings, NackAction

client = ESClient("localhost:2113", insecure=True)

stream = "stream-foo"
group = "group-bar"


async def persistent():
    async with client.connect() as conn:
        # emit some events to the same stream
        for i in range(50):
            await conn.streams.append(stream, "foobar", {"i": i})

        # create a subscription
        await conn.subscriptions.create_stream_subscription(
            stream=stream,
            group_name=group,
            settings=SubscriptionSettings(
                max_subscriber_count=50,
                read_batch_size=5,
                live_buffer_size=10,
                history_buffer_size=10,
                consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
                checkpoint=SubscriptionSettings.DurationType(
                    type=SubscriptionSettings.DurationType.Type.MS,
                    value=10000,
                ),
            ),
        )

        # Only supported on EventStore v21.10+
        await conn.subscriptions.create_all_subscription(
            group_name="subscription_group",
            filter_by=Filter(kind=Filter.Kind.EVENT_TYPE, regex="^some_type$", checkpoint_interval_multiplier=200),
            settings=SubscriptionSettings(
                read_batch_size=50,
                live_buffer_size=100,
                history_buffer_size=100,
                max_retry_count=2,
                checkpoint=SubscriptionSettings.DurationType(
                    type=SubscriptionSettings.DurationType.Type.MS,
                    value=10000,
                ),
            ),
        )

    async with client.connect() as conn:
        sub = conn.subscriptions.subscribe(stream=stream, group_name=group, buffer_size=5)
        async for event in sub:
            try:
                # do work with event
                print(event)
                await sub.ack([event])
            except Exception as err:
                await sub.nack([event], NackAction.RETRY, reason=str(err))


asyncio.run(persistent())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.2.3.tar.gz (69.0 kB view details)

Uploaded Source

Built Distribution

esdb-0.2.3-py3-none-any.whl (83.6 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.2.3.tar.gz.

File metadata

  • Download URL: esdb-0.2.3.tar.gz
  • Upload date:
  • Size: 69.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.3.tar.gz
Algorithm Hash digest
SHA256 bc7e4c4c4d912d2cd82d2401bce4c757f0a6feb1163cff81993176c67be994fb
MD5 22263e6cec3c5b85526a76107517671a
BLAKE2b-256 23ee73dd4246b83b6139b8264fccd65209b61d430e1f7517c496211cf271f46f

See more details on using hashes here.

File details

Details for the file esdb-0.2.3-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.2.3-py3-none-any.whl
  • Upload date:
  • Size: 83.6 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.3-py3-none-any.whl
Algorithm Hash digest
SHA256 9867df6da8aacf0f49646d2acf7691e06fdff4865569f2739b2939bdc5950c82
MD5 f7ca3d333cd2a4bb4cce7795a3ba07b1
BLAKE2b-256 fc56c1fa2fe891bfbf8676587df15509cf2155c4cf39da66b2aac3d4f16ffcc1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page