Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Implemented parts

  • secure connection
  • basic auth
  • other connection options
    • multi-node gossip
    • keepalive
  • async client
  • streams
    • append
    • batch append
    • delete
    • read
    • tombstone
    • filtering
    • exception handling
  • subscriptions
  • users

Setting things up

  1. Install poetry
  2. Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
  1. Install deps with poetry install
  2. Start eventstore in docker: make run-esdb
  3. Run the tests: pytest tests

Usage:

import datetime
import uuid

from esdb.client.client import ESClient

# For insecure connection without basic auth:
# client = ESClient("localhost:2113", tls=False)
with open("certs/ca/ca.crt", "rb") as fh:
  root_cert = fh.read()

client = ESClient(
    "localhost:2111",
    root_certificates=root_cert,
    username="admin",
    password="changeit",
    keepalive_time_ms=5000,
    keepalive_timeout_ms=10000,
)

stream = f"test-{str(uuid.uuid4())}"

with client.connect() as conn:
    for i in range(10):
        append_result = conn.streams.append(
            stream=stream,
            event_type="test_event",
            data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
        )

    print("Forwards!")
    for result in conn.streams.read(stream=stream, count=10):
        print(result.data)

    print("Backwards!")
    for result in conn.streams.read(stream=stream, count=10, backwards=True):
        print(result.data)

    print("Forwards start from middle!")
    for result in conn.streams.read(stream=stream, count=10, revision=5):
        print(result.data)

    print("Backwards start from middle!")
    for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
        print(result.data)

Async example:

import asyncio

from esdb.client.client import AsyncESClient


async def append():
    client = AsyncESClient("localhost:2113", tls=False)
    async with client.connect() as conn:
        result = await conn.streams.append("stream", "type", {"x": 1})
        assert result.commit_position > 0
        async for event in conn.streams.read("stream", count=10):
            print(event)


asyncio.run(append())

Subscriptions:

from esdb.client.client import ESClient
from esdb.client.subscriptions.base import SubscriptionSettings, NackAction

client = ESClient("localhost:2113", tls=False)
stream = "stream-name"
group = "group-name"

with client.connect() as conn:
    # emit some events to the same stream
    for _ in range(10):
        conn.streams.append(stream, "foobar", b"data")

    # create a subscription
    conn.subscriptions.create_stream_subscription(
        stream=stream,
        group_name=group,
        settings=SubscriptionSettings(
            read_batch_size=5,
            live_buffer_size=10,
            history_buffer_size=10,
            checkpoint=SubscriptionSettings.DurationType(
                type=SubscriptionSettings.DurationType.Type.MS,
                value=10000,
            ),
        ),
    )

    # Read from subscription
    # This will block and wait for messages
    subscription = conn.subscriptions.subscribe_to_stream(stream, group, buffer_size=10)
    for event in subscription:
        try:
            # ... do work with the event ...
            # ack the event
            subscription.ack([event])
        except Exception as err:
            subscription.nack([event], NackAction.RETRY, reason=str(err))
          
        

Async subscriptions

from esdb.client.client import AsyncESClient
from esdb.client.subscriptions.base import SubscriptionSettings

client = AsyncESClient("localhost:2113", tls=False)

stream = "stream-foo"
group = "group-bar"

async with client.connect() as conn:
    # emit some events to the same stream
    for i in range(50):
        await conn.streams.append(stream, "foobar", {"i": i})

    # create a subscription
    await conn.subscriptions.create_stream_subscription(
        stream=stream,
        group_name=group,
        settings=SubscriptionSettings(
            max_subscriber_count=50,
            read_batch_size=5,
            live_buffer_size=10,
            history_buffer_size=10,
            consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
            checkpoint=SubscriptionSettings.DurationType(
                type=SubscriptionSettings.DurationType.Type.MS,
                value=10000,
            ),
        ),
    )

async with client.connect() as conn:
    subscription = conn.subscriptions.subscribe_to_stream(stream=stream, group_name=group, buffer_size=5)
    async for event in subscription:
        await subscription.ack([event])

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.1.4.tar.gz (65.2 kB view details)

Uploaded Source

Built Distribution

esdb-0.1.4-py3-none-any.whl (82.0 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.1.4.tar.gz.

File metadata

  • Download URL: esdb-0.1.4.tar.gz
  • Upload date:
  • Size: 65.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.1.4.tar.gz
Algorithm Hash digest
SHA256 22d220495d1d0b649ba1aaf5dd3d8d64bf21720423f0bdee2b14ebf0c4ef8663
MD5 843b0c656809994d85c37232ccb92f97
BLAKE2b-256 cc10ac7949a298bfc1bb27f8f1f58c098fc7a5a0c99c1386a9fee0f84a892a6b

See more details on using hashes here.

File details

Details for the file esdb-0.1.4-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.1.4-py3-none-any.whl
  • Upload date:
  • Size: 82.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.1.4-py3-none-any.whl
Algorithm Hash digest
SHA256 9d618c53a775e22ede71771fde6f0ed568899f7e9712d8eca2d33fc177cd235d
MD5 e90dfc1ac631549bde696bde875caecf
BLAKE2b-256 a392b6e5565a451850f30b4db461085d4f2a0823a81d76d007ab673e55a42c7d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page