gRPC client for EventStore DB
Project description
esdb-py
EventStoreDB Python gRPC client
NOTE: This project is still work in progress
Completed features
- secure connection
- basic auth
- streams
- append
- batch append
- delete
- read stream
- read all with stream/event type filters
- transient subscriptions
- tombstone
- filtering
- persistent subscriptions
- create
- read
- update
- delete
- list
- info
- reply parked events
- CRUD for projections
- users
- other connection options
- multi-node gossip
Installation
Using pip:
pip install esdb
Using poetry:
poetry add esdb
Development
- Install poetry
- Create virtualenv (i.e. using pyenv):
pyenv install 3.10.5
pyenv virtualenv 3.10.5 esdb-py
pyenv local esdb-py
- Install deps with
poetry install
- Start eventstore in docker:
make run-esdb
- Run the tests:
pytest tests
Usage
Have a look at tests for more examples.
Append/Read
import asyncio
import datetime
import uuid
from esdb.client import ESClient
# For insecure connection without basic auth:
# client = ESClient("localhost:2113", tls=False)
with open("certs/ca/ca.crt", "rb") as fh:
root_cert = fh.read()
client = ESClient(
"localhost:2111",
root_certificates=root_cert,
username="admin",
password="changeit",
keepalive_time_ms=5000,
keepalive_timeout_ms=10000,
)
stream = f"test-{str(uuid.uuid4())}"
async def streams():
async with client.connect() as conn:
for i in range(10):
append_result = await conn.streams.append(
stream=stream,
event_type="test_event",
data={"i": i, "ts": datetime.datetime.utcnow().isoformat()},
)
print("Forwards!")
async for result in conn.streams.read(stream=stream, count=10):
print(result.data)
print("Backwards!")
async for result in conn.streams.read(stream=stream, count=10, backwards=True):
print(result.data)
print("Forwards start from middle!")
async for result in conn.streams.read(stream=stream, count=10, revision=5):
print(result.data)
print("Backwards start from middle!")
async for result in conn.streams.read(stream=stream, count=10, backwards=True, revision=5):
print(result.data)
# Create a transient subscription to a stream
async for result in conn.streams.read(stream=stream, subscribe=True):
print(result.data)
asyncio.run(streams())
Batch append
import asyncio
import uuid
from esdb.client import ESClient
from esdb.client.streams import Message
async def batch_append():
stream = str(uuid.uuid4())
messages: list[Message] = [
Message(event_type="one", data={"item": 1}),
Message(event_type="one", data={"item": 2}),
Message(event_type="one", data={"item": 3}),
Message(event_type="two", data={"item": 1}),
Message(event_type="two", data={"item": 2}),
Message(event_type="two", data={"item": 3}),
]
async with ESClient("localhost:2113", tls=False).connect() as conn:
response = await conn.streams.batch_append(stream=stream, messages=messages)
assert response.current_revision == 5
events = [e async for e in conn.streams.read(stream=stream, count=50)]
assert len(events) == 6
asyncio.run(batch_append())
Transient subscription to all events with filtering
import uuid
import asyncio
from esdb.client import ESClient
from esdb.client.streams import Filter
async def filters():
async with ESClient("localhost:2113", tls=False).connect() as conn:
for i in range(10):
await conn.streams.append(stream=str(uuid.uuid4()), event_type=f"prefix-{i}", data=b"")
async for event in conn.streams.read_all(
subscribe=True, # subscribe will wait for events, use count=<n> to read <n> events and stop
filter_by=Filter(
kind=Filter.Kind.EVENT_TYPE,
regex="^prefix-",
# Checkpoint only required when subscribe=True, it's not needed when using count=<int>
checkpoint_interval_multiplier=1000,
),
):
print(event)
asyncio.run(filters())
Persistent subscriptions
import asyncio
from esdb.client import ESClient
from esdb.client.subscriptions import SubscriptionSettings, NackAction
client = ESClient("localhost:2113", tls=False)
stream = "stream-foo"
group = "group-bar"
async def persistent():
async with client.connect() as conn:
# emit some events to the same stream
for i in range(50):
await conn.streams.append(stream, "foobar", {"i": i})
# create a subscription
await conn.subscriptions.create_stream_subscription(
stream=stream,
group_name=group,
settings=SubscriptionSettings(
max_subscriber_count=50,
read_batch_size=5,
live_buffer_size=10,
history_buffer_size=10,
consumer_strategy=SubscriptionSettings.ConsumerStrategy.ROUND_ROBIN,
checkpoint=SubscriptionSettings.DurationType(
type=SubscriptionSettings.DurationType.Type.MS,
value=10000,
),
),
)
async with client.connect() as conn:
subscription = conn.subscriptions.subscribe_to_stream(stream=stream, group_name=group, buffer_size=5)
async for event in subscription:
try:
# do work with event
print(event)
await subscription.ack([event])
except Exception as err:
await subscription([event], NackAction.RETRY, reason=str(err))
asyncio.run(persistent())
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
esdb-0.2.0.tar.gz
(66.7 kB
view details)
Built Distribution
esdb-0.2.0-py3-none-any.whl
(81.5 kB
view details)
File details
Details for the file esdb-0.2.0.tar.gz
.
File metadata
- Download URL: esdb-0.2.0.tar.gz
- Upload date:
- Size: 66.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | bf079ac8e8eb64a1e30d22df2b28ec69e734ec72e038e72bf035303492d36c42 |
|
MD5 | ae004094d1bea1c1e5c13e17d24c2172 |
|
BLAKE2b-256 | 0f0910fd9f5e85462dffcceceb4d4f08aec894b7929546ddab6410ec2799fa31 |
File details
Details for the file esdb-0.2.0-py3-none-any.whl
.
File metadata
- Download URL: esdb-0.2.0-py3-none-any.whl
- Upload date:
- Size: 81.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.14 CPython/3.10.5 Darwin/21.6.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cb1de2f5215480e3c5408e78133b66efc81bd8bfdef9d5da2722b3e2b3463ad0 |
|
MD5 | 38ff9a29e3656f22c85223afb9b982e4 |
|
BLAKE2b-256 | 66007273b0de684d6ead789624e6d9fb714516149dd56122fe543b05b2b8e1ad |