Skip to main content

Generic event bus for inter process asyncio communication

Project description

# Lahja

**DISCLAIMER: This is a proof of concept. It's untested, has never been used anywhere in production and is probably full of bugs**

*Lahja is a generic event bus implementation written in Python 3.6+ to enable lightweight inter-process communication, based on non-blocking asyncio.*

## What is this for?

Lahja is tailored around one primary use case: enabling multi process Python applications to communicate through events between processes in a non-blocking
asyncio fashion.

Key facts:

- non-blocking APIs based on asyncio
- lightweight and simple (e.g no IPC pipes etc)
- easy multicasting of events (one event, many independent receivers)
- multiple consuming APIs to adapt to different use cases and styles


## Quickstart

### Install the library

```sh
pip install lahja
```

### Import `EventBus` and `Endpoint`.

```python
from lahja import (
Endpoint,
EventBus,
)
```

### Setup `EventBus` with `Endpoint`s and pass the `Endpoint`s into the different processes.

```python
event_bus = EventBus()
e1 = event_bus.create_endpoint('e1')
e2 = event_bus.create_endpoint('e2')
event_bus.start()

p1 = multiprocessing.Process(target=run_proc1, args=(e1,))
p1.start()

p2 = multiprocessing.Process(target=run_proc2, args=(e2,))
p2.start()
```

### Inside each process, `connect()` the endpoint with the event bus and start receiving and broadcasting events.

```Python
def run_proc1(endpoint):
loop = asyncio.get_event_loop()
endpoint.connect()
endpoint.subscribe(SecondThingHappened, lambda event:
print("Received via SUBSCRIBE API in proc1: ", event.payload)
)
endpoint.subscribe(FirstThingHappened, lambda event:
print("Receiving own event: ", event.payload)
)

loop.run_until_complete(proc1_worker(endpoint))

async def proc1_worker(endpoint):
while True:
print("Hello from proc1")
if is_nth_second(5):
endpoint.broadcast(
FirstThingHappened("Hit from proc1 ({})".format(time.time()))
)
await asyncio.sleep(1)
```

## API


### Broadcasting events

Events can be broadcasted via the `broadcast` API. Notice that events are internally handled via `asyncio`, which means, this is a non-blocking fire and forget API.

**Broadcast API**

`def broadcast(self, item: BaseEvent) -> None:`

*Example:*

```Python
endpoint.broadcast(
FirstThingHappened("Hit from proc1 ({})".format(time.time()))
)
```

#### Limiting the scope of broadcasts

By default, broadcasted events are send into each and every endpoint. Since endpoints are usually living in different processes, broadcasting events comes with a performance cost. Often, it can be beneficial to reduce the scope of where events are broadcasted to.

*Example: Broadcasting into a specific endpoint*

```Python
endpoint.broadcast(
FirstThingHappened("Hit from proc1 ({})".format(time.time())),
BroadcastConfig(filter_endpoint="some_endpoint)
)
```

For scenarios where we only want to *answer* a specific request, a `BroadcastConfig` can be retrieved from an incoming event to send the response only to that specific call site that sent the request.

*Example: Broadcasting into a specific endpoint*

```Python
endpoint.subscribe(GetSomethingRequest, lambda event:
# Send a response back to *only* who made that request
endpoint.broadcast(DeliverSomethingResponse("Yay"), event.broadcast_config())
)
```

### Listening to events

Events can be received in three different fashions. All APIs are non-blocking.

**Subscribe API**

`def subscribe(self, event_type: Type[BaseEvent], handler: Callable[[BaseEvent], None]) -> Subscription:`

*Example:*

```Python
subscription = endpoint.subscribe(SecondThingHappened, lambda event:
print("Received via SUBSCRIBE API in proc1: ", event.payload)
)
```

The handler will be called every time that a `SecondThingHappened` event is fired. Notice that the returned `Subscription` allows deregistering from the event at any later point in time.

*Example:*

```Python
subscription.unsubscribe()
```

**Stream API**

`async def stream(self, event_type: Type[BaseEvent]) -> AsyncIterable[BaseEvent]:`

*Example:*

```Python
async for event in endpoint.stream(SecondThingHappened):
print(event.payload)
```

**Request API**

This API is combining the concepts of broadcasting an event and waiting for someone to respond on it. Notice that the response will be a routed event only delivered to the
callsite making the request.

`async def request(self, item: BaseEvent) -> BaseEvent:`

*Example:*

```Python
result = await endpoint.request(RequestSomething()):
print(result.payload)
```

## Running the full examples

**Example 1**

```Python
python3 examples/inter_process_ping_pong.py
```

The output will look like this:

```sh
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1: Hit from proc2 (1533887068.9261594)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1: Hit from proc2 (1533887070.9296985)
Receiving own event: Hit from proc1 (1533887070.9288142)
Received via SUBSCRIBE API in proc2: Hit from proc1 (1533887070.9288142)
Received via STREAM API in proc2: Hit from proc1 (1533887070.9288142)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1: Hit from proc2 (1533887072.9331954)
Hello from proc1
Hello from proc2
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc1: Hit from proc2 (1533887074.937018)
Hello from proc1
Hello from proc2
Received via SUBSCRIBE API in proc2: Hit from proc1 (1533887075.9378386)
Received via STREAM API in proc2: Hit from proc1 (1533887075.9378386)
Receiving own event: Hit from proc1 (1533887075.9378386)
```

**Example 2**

```Python
python3 examples/request_api.py
```

The output will look like this:

```sh
Yay
Yay
Yay
```


## TODOs

- Filter support (e.g. only subscribe to `EventX` from origin `y`)
- Push boundaries (don't push this into process x)
- Testing
- Performance analysis

## Developer Setup

If you would like to hack on lahja, please check out the
[Ethereum Development Tactical Manual](https://github.com/pipermerriam/ethereum-dev-tactical-manual)
for information on how we do:

- Testing
- Pull Requests
- Code Style
- Documentation

### Development Environment Setup

You can set up your dev environment with:

```sh
git clone https://github.com/cburgdorf/lahja
cd lahja
virtualenv -p python3 venv
. venv/bin/activate
pip install -e .[dev]
```

### Testing Setup

During development, you might like to have tests run on every file save.

Show flake8 errors on file change:

```sh
# Test flake8
when-changed -v -s -r -1 lahja/ tests/ -c "clear; flake8 lahja tests && echo 'flake8 success' || echo 'error'"
```

Run multi-process tests in one command, but without color:

```sh
# in the project root:
pytest --numprocesses=4 --looponfail --maxfail=1
# the same thing, succinctly:
pytest -n 4 -f --maxfail=1
```

Run in one thread, with color and desktop notifications:

```sh
cd venv
ptw --onfail "notify-send -t 5000 'Test failure ⚠⚠⚠⚠⚠' 'python 3 test on lahja failed'" ../tests ../lahja
```

### Release setup

For Debian-like systems:
```
apt install pandoc
```

To release a new version:

```sh
make release bump=$$VERSION_PART_TO_BUMP$$
```

#### How to bumpversion

The version format for this repo is `{major}.{minor}.{patch}` for stable, and
`{major}.{minor}.{patch}-{stage}.{devnum}` for unstable (`stage` can be alpha or beta).

To issue the next version in line, specify which part to bump,
like `make release bump=minor` or `make release bump=devnum`.

If you are in a beta version, `make release bump=stage` will switch to a stable.

To issue an unstable version when the current version is stable, specify the
new version explicitly, like `make release bump="--new-version 4.0.0-alpha.1 devnum"`


Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

lahja-0.3.1.tar.gz (6.9 kB view details)

Uploaded Source

Built Distribution

lahja-0.3.1-py3-none-any.whl (6.9 kB view details)

Uploaded Python 3

File details

Details for the file lahja-0.3.1.tar.gz.

File metadata

  • Download URL: lahja-0.3.1.tar.gz
  • Upload date:
  • Size: 6.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.0.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/3.6.5

File hashes

Hashes for lahja-0.3.1.tar.gz
Algorithm Hash digest
SHA256 9c2ed9576c285cc567f03e0e29d8bd508eeaa6174fa3d960800eed37c85246e2
MD5 3611aa4af2d224ec89710c8d4b675ac5
BLAKE2b-256 19a11586aaac11226db56531271f6d8e1b6bd5fdf4220c02eab3040b2fc87142

See more details on using hashes here.

File details

Details for the file lahja-0.3.1-py3-none-any.whl.

File metadata

  • Download URL: lahja-0.3.1-py3-none-any.whl
  • Upload date:
  • Size: 6.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/1.11.0 pkginfo/1.4.2 requests/2.19.1 setuptools/40.0.0 requests-toolbelt/0.8.0 tqdm/4.24.0 CPython/3.6.5

File hashes

Hashes for lahja-0.3.1-py3-none-any.whl
Algorithm Hash digest
SHA256 e0bb48caa91d41714f89e13b1267c5fc4d8911a4f3572fc311aeaf934500dcdb
MD5 a5311c94d25734e60efb015cbe174018
BLAKE2b-256 7988645f88577fbf4a77d4c59c94d7bb239d5232372b22d4635873a9b51c50ec

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page