Generic asynchronous message channel with routing by predicators
Project description
message-channel
This library provides a message channel object which subtract particular messages from mass of messages. It's like group by of SQL or ReactiveX but for asynchronous reader.
Installation
pip install python-message-channel
Usage
For example, assume that you have a string stream which messages are prefixed by a
, b
, ... e
and you'd like to split subchannels for messages prefixed by b
or d
like below.
=============================================
---------------------------------> a:foo
--------------------+
--------------------|------------> c:foo
--------------------|------------> d:foo
--------------------|------------> e:foo
====================|========================
channel |
=|========================
+------------> b:foo
==========================
subchannel `m.startswith('b')`
This library is a tool for handling such situation.
First, create a Channel
instance from a steram reader and you can receive messages by
channel.recv()
method.
In this example, we use asyncio.Queue
as a stream.
import asyncio
from message_channel import Channel
async def main():
# Create original stream
stream = asyncio.Queue()
# Create stream reader
async def reader():
return await stream.get()
# Create stream channel
async with Channel(reader) as channel:
stream.put_nowait('a:foo')
stream.put_nowait('b:foo')
stream.put_nowait('c:foo')
stream.put_nowait('d:foo')
stream.put_nowait('e:foo')
assert (await channel.recv()) == 'a:foo'
assert (await channel.recv()) == 'b:foo'
assert (await channel.recv()) == 'c:foo'
assert (await channel.recv()) == 'd:foo'
assert (await channel.recv()) == 'e:foo'
if __name__ == '__main__':
asyncio.run(main())
And you can split the channel by channel.split()
method by a predicator like
async with Channel(reader) as channel:
def predicator(m):
return m.startswith('b:')
async with channel.split(predicator) as sub:
stream.put_nowait('a:foo')
stream.put_nowait('b:foo')
stream.put_nowait('c:foo')
stream.put_nowait('d:foo')
stream.put_nowait('e:foo')
# sub receive messages starts from 'b:'
assert (await sub.recv()) == 'b:foo'
# channel (original) receive messages other than above
assert (await channel.recv()) == 'a:foo'
assert (await channel.recv()) == 'c:foo'
assert (await channel.recv()) == 'd:foo'
assert (await channel.recv()) == 'e:foo'
API documentation
https://fixpoint.github.io/python-message-channel/
powered by pdoc.
License
Distributed under the terms of the MIT License
Project details
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file python-message-channel-0.2.3.tar.gz
.
File metadata
- Download URL: python-message-channel-0.2.3.tar.gz
- Upload date:
- Size: 5.5 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.9.1 Linux/5.4.0-1036-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 2fc6a060a601823b6f338f9201d377f2629937de71c3856b9c956ffd9a745426 |
|
MD5 | 512c5a148178e1e4d63cb8d5fba02bc6 |
|
BLAKE2b-256 | 632c294d8ec1c88ecf8094e97de96fb86d1846817ae736c173c21861cb0e0b4e |
File details
Details for the file python_message_channel-0.2.3-py3-none-any.whl
.
File metadata
- Download URL: python_message_channel-0.2.3-py3-none-any.whl
- Upload date:
- Size: 5.7 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.4 CPython/3.9.1 Linux/5.4.0-1036-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 9a466caf4c295d9f25a6550ef6d4f6cb8541933f65e0dcd26e7d16b17e4efa7c |
|
MD5 | 125068add7bd918ada22c5d329e65eb0 |
|
BLAKE2b-256 | 4f3450c42fb8ad637da75c264eaa2c7268c246d93c923659eb19e57fcb3fd03d |