Skip to main content

gRPC client for EventStore DB

Project description

esdb-py

PyPI version

EventStoreDB Python gRPC client

NOTE: This project is still work in progress

Completed features

  • secure connection
  • basic auth
  • streams
    • append
    • batch append
    • delete
    • read stream
    • read all with stream/event type filters
    • transient subscriptions
    • tombstone
    • filtering
  • persistent subscriptions
    • create
    • read
    • update
    • delete
    • list
    • info
    • reply parked events
  • CRUD for projections
  • users
  • other connection options
    • multi-node gossip

Installation

Using pip:

pip install esdb

Using poetry:

poetry add esdb

Development

  1. Install poetry
  2. Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
  1. Install deps with poetry install
  2. Start eventstore in docker: make run-esdb
  3. Run the tests: pytest tests

Usage

Have a look at tests for more examples.

Append/Read

import asyncio
import datetime
import uuid

from esdb.client import ESClient

# For insecure connection without basic auth:
# client = ESClient("localhost:2113", tls=False)
with open("certs/ca/ca.crt", "rb") as fh:
  root_cert = fh.read()

client = ESClient(
  "localhost:2111",
  root_certificates=root_cert,
  username="admin",
  password="changeit",
  keepalive_time_ms=5000,
  keepalive_timeout_ms=10000,
)

stream = f"test-{str(uuid.uuid4())}"


async def streams():
  async with client.connect() as conn:
    for i in range(10):
      append_result = await conn.streams.append(
        stream=stream,
        event_type="test_event",
        data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
      )

    print("Forwards!")
    async for result in conn.streams.read(stream=stream, count=10):
      print(result.data)

    print("Backwards!")
    async for result in conn.streams.read(stream=stream, count=10, backwards=True):
      print(result.data)

    print("Forwards start from middle!")
    async for result in conn.streams.read(stream=stream, count=10, revision=5):
      print(result.data)

    print("Backwards start from middle!")
    async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
      print(result.data)

    # Create a transient subscription to a stream
    async for result in conn.streams.read(stream=stream, subscribe=True):
      print(result.data)


asyncio.run(streams())

Batch append

import asyncio
import uuid

from esdb.client import ESClient
from esdb.client.streams import Message


async def batch_append():
    stream = str(uuid.uuid4())
    messages: list[Message] = [
        Message(event_type="one", data={"item": 1}),
        Message(event_type="one", data={"item": 2}),
        Message(event_type="one", data={"item": 3}),
        Message(event_type="two", data={"item": 1}),
        Message(event_type="two", data={"item": 2}),
        Message(event_type="two", data={"item": 3}),
    ]
    async with ESClient("localhost:2113", tls=False).connect() as conn:
        response = await conn.streams.batch_append(stream=stream, messages=messages)
        assert response.current_revision == 5
        events = [e async for e in conn.streams.read(stream=stream, count=50)]
        assert len(events) == 6


asyncio.run(batch_append())

Transient subscription to all events with filtering

import uuid
import asyncio

from esdb.client import ESClient
from esdb.client.streams import Filter


async def filters():
  async with ESClient("localhost:2113", tls=False).connect() as conn:
    for i in range(10):
        await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
    async for event in conn.streams.read_all(
            subscribe=True,  # subscribe will wait for events, use count=<n> to read <n> events and stop
            filter_by=Filter(
              kind=Filter.Kind.EVENT_TYPE,
              regex="^prefix-",
              # Checkpoint only required when subscribe=True, it's not needed when using count=<int>
              checkpoint_interval_multiplier=1000,
            ),
    ):
      print(event)


asyncio.run(filters())

Persistent subscriptions

import asyncio
from esdb.client import ESClient
from esdb.client.subscriptions import SubscriptionSettings, NackAction

client = ESClient("localhost:2113", tls=False)

stream = "stream-foo"
group = "group-bar"


async def persistent():
  async with client.connect() as conn:
    # emit some events to the same stream
    for i in range(50):
      await conn.streams.append(stream, "foobar", {"i": i})

    # create a subscription
    await conn.subscriptions.create_stream_subscription(
      stream=stream,
      group_name=group,
      settings=SubscriptionSettings(
        max_subscriber_count=50,
        read_batch_size=5,
        live_buffer_size=10,
        history_buffer_size=10,
        consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
        checkpoint=SubscriptionSettings.DurationType(
          type=SubscriptionSettings.DurationType.Type.MS,
          value=10000,
        ),
      ),
    )

  async with client.connect() as conn:
    subscription = conn.subscriptions.subscribe_to_stream(stream=stream, group_name=group, buffer_size=5)
    async for event in subscription:
      try:
        # do work with event
        print(event)
        await subscription.ack([event])
      except Exception as err:
        await subscription([event], NackAction.RETRY, reason=str(err))


asyncio.run(persistent())

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

esdb-0.2.0.tar.gz (66.7 kB view details)

Uploaded Source

Built Distribution

esdb-0.2.0-py3-none-any.whl (81.5 kB view details)

Uploaded Python 3

File details

Details for the file esdb-0.2.0.tar.gz.

File metadata

  • Download URL: esdb-0.2.0.tar.gz
  • Upload date:
  • Size: 66.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.0.tar.gz
Algorithm Hash digest
SHA256 bf079ac8e8eb64a1e30d22df2b28ec69e734ec72e038e72bf035303492d36c42
MD5 ae004094d1bea1c1e5c13e17d24c2172
BLAKE2b-256 0f0910fd9f5e85462dffcceceb4d4f08aec894b7929546ddab6410ec2799fa31

See more details on using hashes here.

File details

Details for the file esdb-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: esdb-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 81.5 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0

File hashes

Hashes for esdb-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 cb1de2f5215480e3c5408e78133b66efc81bd8bfdef9d5da2722b3e2b3463ad0
MD5 38ff9a29e3656f22c85223afb9b982e4
BLAKE2b-256 66007273b0de684d6ead789624e6d9fb714516149dd56122fe543b05b2b8e1ad

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page