gRPC client for EventStore DB
Project description
esdb-py
EventStoreDB Python gRPC client
NOTE: This project is still work in progress
Implemented parts
- secure connection
- basic auth
- other connection options
- multi-node gossip
- keepalive
- async client
- streams
- append
- batch append
- delete
- read
- tombstone
- filtering
- exception handling
- subscriptions
- users
Installation
Using pip:
pip install esdb
Using poetry:
poetry add esdb
Development
- Install poetry
- Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
- Install deps with
poetry install
- Start eventstore in docker:
make run-esdb
- Run the tests:
pytest tests
Usage:
import datetime
import uuid
from esdb.client.client import ESClient
# For insecure connection without basic auth:
# client = ESClient("localhost:2113", tls=False)
with open("certs/ca/ca.crt", "rb") as fh:
root_cert = fh.read()
client = ESClient(
"localhost:2111",
root_certificates=root_cert,
username="admin",
password="changeit",
keepalive_time_ms=5000,
keepalive_timeout_ms=10000,
)
stream = f"test-{str(uuid.uuid4())}"
with client.connect() as conn:
for i in range(10):
append_result = conn.streams.append(
stream=stream,
event_type="test_event",
data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
)
print("Forwards!")
for result in conn.streams.read(stream=stream, count=10):
print(result.data)
print("Backwards!")
for result in conn.streams.read(stream=stream, count=10, backwards=True):
print(result.data)
print("Forwards start from middle!")
for result in conn.streams.read(stream=stream, count=10, revision=5):
print(result.data)
print("Backwards start from middle!")
for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
print(result.data)
Async example:
import asyncio
from esdb.client.client import AsyncESClient
async def append():
client = AsyncESClient("localhost:2113", tls=False)
async with client.connect() as conn:
result = await conn.streams.append("stream", "type", {"x": 1})
assert result.commit_position > 0
async for event in conn.streams.read("stream", count=10):
print(event)
asyncio.run(append())
Subscriptions:
from esdb.client.client import ESClient
from esdb.client.subscriptions.base import SubscriptionSettings, NackAction
client = ESClient("localhost:2113", tls=False)
stream = "stream-name"
group = "group-name"
with client.connect() as conn:
# emit some events to the same stream
for _ in range(10):
conn.streams.append(stream, "foobar", b"data")
# create a subscription
conn.subscriptions.create_stream_subscription(
stream=stream,
group_name=group,
settings=SubscriptionSettings(
read_batch_size=5,
live_buffer_size=10,
history_buffer_size=10,
checkpoint=SubscriptionSettings.DurationType(
type=SubscriptionSettings.DurationType.Type.MS,
value=10000,
),
),
)
# Read from subscription
# This will block and wait for messages
subscription = conn.subscriptions.subscribe_to_stream(stream, group, buffer_size=10)
for event in subscription:
try:
# ... do work with the event ...
# ack the event
subscription.ack([event])
except Exception as err:
subscription.nack([event], NackAction.RETRY, reason=str(err))
Async subscriptions
from esdb.client.client import AsyncESClient
from esdb.client.subscriptions.base import SubscriptionSettings
client = AsyncESClient("localhost:2113", tls=False)
stream = "stream-foo"
group = "group-bar"
async with client.connect() as conn:
# emit some events to the same stream
for i in range(50):
await conn.streams.append(stream, "foobar", {"i": i})
# create a subscription
await conn.subscriptions.create_stream_subscription(
stream=stream,
group_name=group,
settings=SubscriptionSettings(
max_subscriber_count=50,
read_batch_size=5,
live_buffer_size=10,
history_buffer_size=10,
consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
checkpoint=SubscriptionSettings.DurationType(
type=SubscriptionSettings.DurationType.Type.MS,
value=10000,
),
),
)
async with client.connect() as conn:
subscription = conn.subscriptions.subscribe_to_stream(stream=stream, group_name=group, buffer_size=5)
async for event in subscription:
await subscription.ack([event])
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
esdb-0.1.5.tar.gz
(64.9 kB
view details)
Built Distribution
esdb-0.1.5-py3-none-any.whl
(82.0 kB
view details)
File details
Details for the file esdb-0.1.5.tar.gz
.
File metadata
- Download URL: esdb-0.1.5.tar.gz
- Upload date:
- Size: 64.9 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.14 CPython/3.10.5 Linux/5.18.14-arch1-1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 22da9585a38f4f2db244517fbb675e0383099760c8e2b5b2785e8d6e77d9c180 |
|
MD5 | dc233a59ba6790046f7c2a32f599dab4 |
|
BLAKE2b-256 | 8bb5d69ac8258f515cb2c7f74c385b353331147289b9006117bed2da2f782556 |
File details
Details for the file esdb-0.1.5-py3-none-any.whl
.
File metadata
- Download URL: esdb-0.1.5-py3-none-any.whl
- Upload date:
- Size: 82.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.14 CPython/3.10.5 Linux/5.18.14-arch1-1
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 09443fe8d7970682fda0788a2d37cf1b1ba9e7d6f9a137f1c8f56a24fae2b5f0 |
|
MD5 | 417c3e7b09960a050f8e2b61cd9827e6 |
|
BLAKE2b-256 | bb8e96cf5bc681cda74c4bcb4e3598387bfc2aa90b16831f0338717e828e469d |