Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Implemented parts

  • secure connection
  • basic auth
  • async client
  • streams
    • append
    • batch append
    • delete
    • read stream
    • read all with stream/event type filters
    • transient subscriptions
    • tombstone
    • filtering
  • persistent subscriptions
    • create
    • read
    • update
    • delete
    • list
    • info
    • reply parked events
  • CRUD for projections
  • users
  • other connection options
    • multi-node gossip

Installation

Using pip:

pip install esdb

Using poetry:

poetry add esdb

Development

  1. Install poetry
  2. Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
  1. Install deps with poetry install
  2. Start eventstore in docker: make run-esdb
  3. Run the tests: pytest tests

Usage:

Append/Read

import datetime
import uuid

from esdb.client import ESClient

# For insecure connection without basic auth:
# client = ESClient("localhost:2113", tls=False)
with open("certs/ca/ca.crt", "rb") as fh:
  root_cert = fh.read()

client = ESClient(
    "localhost:2111",
    root_certificates=root_cert,
    username="admin",
    password="changeit",
    keepalive_time_ms=5000,
    keepalive_timeout_ms=10000,
)

stream = f"test-{str(uuid.uuid4())}"

with client.connect() as conn:
    for i in range(10):
        append_result = conn.streams.append(
            stream=stream,
            event_type="test_event",
            data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
        )

    print("Forwards!")
    for result in conn.streams.read(stream=stream, count=10):
        print(result.data)

    print("Backwards!")
    for result in conn.streams.read(stream=stream, count=10, backwards=True):
        print(result.data)

    print("Forwards start from middle!")
    for result in conn.streams.read(stream=stream, count=10, revision=5):
        print(result.data)

    print("Backwards start from middle!")
    for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
        print(result.data)
    
    # Create a transient subscription to a stream
    for result in conn.streams.read(stream=stream, subscribe=True):
        print(result.data)

Transient subscription to all events with filtering


Async example:

import asyncio

from esdb.client import AsyncESClient


async def append():
    client = AsyncESClient("localhost:2113", tls=False)
    async with client.connect() as conn:
        result = await conn.streams.append("stream", "type", {"x": 1})
        assert result.commit_position > 0
        async for event in conn.streams.read("stream", count=10):
            print(event)


asyncio.run(append())

Read all events with filters:

import asyncio

from esdb.client import AsyncESClient
from esdb.client.streams import Filter


async def main():
    async with AsyncESClient("localhost:2113", tls=False).connect() as conn:
        async for event in conn.streams.read_all(
            subscribe=True,  # creates a transient subscription
            filter_by=Filter(
                kind=Filter.Kind.EVENT_TYPE,
                regex="^prefix-",
                checkpoint_interval_multiplier=1000,
            ),
        ):
            print(event)


asyncio.run(main())

Subscriptions:

from esdb.client import ESClient
from esdb.client.subscriptions import SubscriptionSettings, NackAction

client = ESClient("localhost:2113", tls=False)
stream = "stream-name"
group = "group-name"

with client.connect() as conn:
    # emit some events to the same stream
    for _ in range(10):
        conn.streams.append(stream, "foobar", b"data")

    # create a subscription
    conn.subscriptions.create_stream_subscription(
        stream=stream,
        group_name=group,
        settings=SubscriptionSettings(
            read_batch_size=5,
            live_buffer_size=10,
            history_buffer_size=10,
            checkpoint=SubscriptionSettings.DurationType(
                type=SubscriptionSettings.DurationType.Type.MS,
                value=10000,
            ),
        ),
    )

    # Read from subscription
    # This will block and wait for messages
    subscription = conn.subscriptions.subscribe_to_stream(stream, group, buffer_size=10)
    for event in subscription:
        try:
            # ... do work with the event ...
            # ack the event
            subscription.ack([event])
        except Exception as err:
            subscription.nack([event], NackAction.RETRY, reason=str(err))
          
        

Async subscriptions

from esdb.client import AsyncESClient
from esdb.client.subscriptions import SubscriptionSettings

client = AsyncESClient("localhost:2113", tls=False)

stream = "stream-foo"
group = "group-bar"

async with client.connect() as conn:
    # emit some events to the same stream
    for i in range(50):
        await conn.streams.append(stream, "foobar", {"i": i})

    # create a subscription
    await conn.subscriptions.create_stream_subscription(
        stream=stream,
        group_name=group,
        settings=SubscriptionSettings(
            max_subscriber_count=50,
            read_batch_size=5,
            live_buffer_size=10,
            history_buffer_size=10,
            consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
            checkpoint=SubscriptionSettings.DurationType(
                type=SubscriptionSettings.DurationType.Type.MS,
                value=10000,
            ),
        ),
    )

async with client.connect() as conn:
    subscription = conn.subscriptions.subscribe_to_stream(stream=stream, group_name=group, buffer_size=5)
    async for event in subscription:
        await subscription.ack([event])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.1.8.tar.gz (66.7 kB view details)

Uploaded Source

Built Distribution

esdb-0.1.8-py3-none-any.whl (84.3 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.1.8.tar.gz.

File metadata

  • Download URL: esdb-0.1.8.tar.gz
  • Upload date:
  • Size: 66.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Linux/5.18.14-arch1-1

File hashes

Hashes for esdb-0.1.8.tar.gz
Algorithm Hash digest
SHA256 5b17ba5a51e0f6236f62343e8149acc96400b2d5988df559df0a9f10b2d4d78e
MD5 1aee9d6fa50474e3fb5709d50e2ef3e9
BLAKE2b-256 10368390ef3c504de33f0d0e0c55e20940a9ed7f8d375168d4edf2164b7559d7

See more details on using hashes here.

File details

Details for the file esdb-0.1.8-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.1.8-py3-none-any.whl
  • Upload date:
  • Size: 84.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Linux/5.18.14-arch1-1

File hashes

Hashes for esdb-0.1.8-py3-none-any.whl
Algorithm Hash digest
SHA256 bce15f541f04a4adae8ed72d9dd809a0daf0f0cef86322bb2ea3e73cb522330e
MD5 f0f307d81b4c54366db88cc8e10b005e
BLAKE2b-256 dba1e69edb0130d88ef0f3cd31529c4389e15b9031256bfa6d2bed562d0511ca

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page