Skip to main content

Etcd client built with pure asyncio gRPC library

Project description

etcetra

Pure python asyncio Etcd client.

Installation

pip install etcetra

API Documentation

Refer here.

Basic usage

All etcd operations managed by etcetra can be executed using EtcdClient. EtcdClient instance is a wrapper which holds connection information to Etcd channel. This instance is reusable, since actual connection to gRPC channel will be established when you initiate connection calls (see below).

from etcetra import EtcdClient, HostPortPair
etcd = EtcdClient(HostPortPair('127.0.0.1', 2379))

Like I mentioned above, actual connection establishment with Etcd's gRPC channel will be done when you call EtcdClient.connect(). This call returns async context manager, which manages EtcdCommunicator instance.

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
    value = await communicator.get('testkey')
    print(value)  # testvalue

EtcdCommunicator.get_prefix(prefix) will return a dictionary containing all key-values with given key prefix.

async with etcd.connect() as communicator:
    await communicator.put('/testdir', 'root')
    await communicator.put('/testdir/1', '1')
    await communicator.put('/testdir/2', '2')
    await communicator.put('/testdir/2/3', '3')
    test_dir = await communicator.get_prefix('/testdir')
    print(test_dir)  # {'/testdir': 'root', '/testdir/1': '1', '/testdir/2': '2', '/testdir/2/3': '3'}

Operating with Etcd lock

Just like EtcdClient.connect(), you can easilly use etcd lock by calling EtcdClient.with_lock(lock_name, timeout=None).

async def first():
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('first:', value, end=' | ')

async def second():
    await asyncio.sleep(0.1)
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('second:', value)

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second())  # first: testvalue | second: testvalue

Adding timeout parameter to EtcdClient.with_lock() call will add a timeout to lock acquiring process.

async def first():
    async with etcd.with_lock('foolock') as communicator:
        value = await communicator.get('testkey')
        print('first:', value)
        await asyncio.sleep(10)

async def second():
    await asyncio.sleep(0.1)
    async with etcd.with_lock('foolock', timeout=5) as communicator:
        value = await communicator.get('testkey')
        print('second:', value)

async with etcd.connect() as communicator:
    await communicator.put('testkey', 'testvalue')
await asyncio.gather(first(), second())  # asyncio.TimeoutError followed by first: testvalue output

Watch

You can watch changes on key with EtcdCommunicator.watch(key).

async def watch():
    async with etcd.connect() as communicator:
        async for event in communicator.watch('testkey'):
            print(event.event, event.value)

async def update():
    await asyncio.sleep(0.1)
    async with etcd.connect() as communicator:
        await communicator.put('testkey', '1')
        await communicator.put('testkey', '2')
        await communicator.put('testkey', '3')
        await communicator.put('testkey', '4')
        await communicator.put('testkey', '5')

await asyncio.gather(watch(), update())
# WatchEventType.PUT 1
# WatchEventType.PUT 2
# WatchEventType.PUT 3
# WatchEventType.PUT 4
# WatchEventType.PUT 5

Watching changes on keys with specific prefix can be also done by EtcdCommunicator.watch_prefix(key_prefix).

async def watch():
    async with etcd.connect() as communicator:
        async for event in communicator.watch_prefix('/testdir'):
            print(event.event, event.key, event.value)

async def update():
    await asyncio.sleep(0.1)
    async with etcd.connect() as communicator:
        await communicator.put('/testdir', '1')
        await communicator.put('/testdir/foo', '2')
        await communicator.put('/testdir/bar', '3')
        await communicator.put('/testdir/foo/baz', '4')

await asyncio.gather(watch(), update())
# WatchEventType.PUT /testdir 1
# WatchEventType.PUT /testdir/foo 2
# WatchEventType.PUT /testdir/bar 3
# WatchEventType.PUT /testdir/foo/baz 4

Transaction

You can run etcd transaction by calling EtcdCommunicator.txn_compare(compares, txn_builder).

Constructing compares

Constructing compare operations can be done by comparing CompareKey instance with value with Python's built-in comparison operators (==, !=, >, <).

from etcetra import CompareKey
compares = [
    CompareKey('cmpkey1').value == 'foo',
    CompareKey('cmpkey2').value > 'bar',
]

Executing transaction calls

async with etcd.connect() with communicator:
    await communicator.put('cmpkey1', 'foo')
    await communicator.put('cmpkey2', 'baz')
    await communicator.put('successkey', 'asdf')

    def _txn(success, failure):
        success.get('successkey')

    values = await communicator.txn_compare(compares, _txn)
    print(values)  # ['asdf']
compares = [
    CompareKey('cmpkey1').value == 'foo',
    CompareKey('cmpkey2').value < 'bar',
]
async with etcd.connect() with communicator:
    await communicator.put('failurekey', 'asdf')

    def _txn(success, failure):
        failure.get('failurekey')

    values = await communicator.txn_compare(compares, _txn)
    print(values)  # ['asdf']

If you don't need compare conditions for transaction, you can use EtcdCommunicator.txn(txn_builder), which is a shorthand for EtcdCommunicator.txn_compare([], lambda success, failure: txn_builder(success)).

async with etcd.connect() with communicator:
    def _txn(action):
        action.get('cmpkey1')
        action.get('cmpkey2')

    values = await communicator.txn(_txn)
    print(values)  # ['foo', 'baz']

Compiling Protobuf

$ scripts/compile_protobuf.py <target Etcd version>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

etcetra-0.1.2.tar.gz (47.8 kB view details)

Uploaded Source

Built Distribution

etcetra-0.1.2-py3-none-any.whl (59.2 kB view details)

Uploaded Python 3

File details

Details for the file etcetra-0.1.2.tar.gz.

File metadata

  • Download URL: etcetra-0.1.2.tar.gz
  • Upload date:
  • Size: 47.8 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for etcetra-0.1.2.tar.gz
Algorithm Hash digest
SHA256 e78792d68b553cedc823703d78c7e278c56cc703cfe6fcc7a398dbd925f1bf0d
MD5 5304361d897858acd3a6931300e9d71f
BLAKE2b-256 1683a405d49739972b4723ebe3af82368a85eb8cd9386e7e24ff8609e7c6ec58

See more details on using hashes here.

Provenance

File details

Details for the file etcetra-0.1.2-py3-none-any.whl.

File metadata

  • Download URL: etcetra-0.1.2-py3-none-any.whl
  • Upload date:
  • Size: 59.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.9.12

File hashes

Hashes for etcetra-0.1.2-py3-none-any.whl
Algorithm Hash digest
SHA256 8cb76c58de9da4ba2f11d6c649e67a9dd60b67053f504c56d08aa29082167097
MD5 5df92ec82442fdc8d48f56f9e6aad6b2
BLAKE2b-256 69e5a55cf8effb352dda97da1cbd73608f84170498bb5af7c70eab6de72fd078

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page