A simple command/stream multiplexer
Project description
Rationale
MoaT contains some components which require a possibly bidirectional stream of asynchronous messaging, including request/reply interactions and data streaming.
This library supports such interactions.
Prerequisites
The library requires a reliable underlying transport. MoaT uses CBOR, but any reliable, non-reordering messsage stream that can encode basic Python data structures (plus whatever objects you send/receive) works.
The MoaT-Cmd library does not itself call the transport. Instead it affords basic async methods to iterate on messages to send, and to feed incoming lower-level data in.
Usage
from moat.util import packer, stream_unpacker
async def handle_command(msg):
if msg.cmd[0] == "Start":
return "OK starting"
if msg.cmd[0] == "gimme data":
async with msg.stream_w("Start") as st:
for i in range(10):
await st.send(i+msg.data["x"])
return "OK I'm done"
if msg.cmd[0] == "alive":
async with msg.stream_r("Start") as st:
async for data in st:
print("We got", data)
return "OK nice"
raise ValueError(f"Unknown: {msg !r}")
async with Transport(handle_command) as tr, anyio.create_task_group() as tg:
decoder = stream_unpacker(cbor=True)
def reader():
# receive messages from channel
async for msg in channel.receive():
decoder.feed(msg)
for m in decoder:
await tr.msg_in(m)
def sender():
# send messages to channel
while True:
msg = await tr.msg_out()
await channel.send(packer(msg))
def request():
# streaming data in
msg = await tr.cmd("Start", x=123)
print("Start", msg)
async with tr.stream_r("gimme data") as st:
print("They are starting", st.msg)
async for msg in st:
print("I got", msg)
print("They are done", st.msg)
# may be None if they didn't send a stream
def int_stream():
# streaming data out
async with tr.stream_w("alive") as st:
print("They replied", st.msg)
i = 0
while i < 100:
await st.send(i)
i += 1
anyio.sleep(1/10)
st.msg = "The end."
print("I am done", st.msg)
tg.start_soon(reader)
tg.start_soon(sender)
tg.start_soon(handler)
tg.start_soon(request)
tg.start_soon(int_stream)
Specification
All MoaT-Cmd messages are non-empty lists whose first element is a small integer, identifying a sub-channel. Messages that don’t match this description MAY be used for out-of-band communication.
A transport that enforces message boundaries MAY send each message without the leading array mark byte(s).
MoaT-Cmd messaging is simple by design and basically consists of a command (sent from A to B) followed by a reply (sent from B to A). Both directions may independently indicate that more, streamed data will follow. The first and last message of a streamed command or reply are considered to be out-of-band.
The side opening a sub-channel uses non-negative integers as channel ID. Replies carry the ID’s bitwise-negated value. Thus the ID spaces of both directions are separate.
IDs are allocated with the first message on a sub-channel. They MUST NOT be reused until final messages have been exchanged. Exactly one final message MUST be sent in both directions.
Message format
A Moat-Cmd message consist of a preferably-small signed integer, plus a variable and usually non-empty amount of data.
The integer is interpreted as follows.
Bit 0: if set, the message starts or continues a data stream; if clear, the message is the final message for this subchannel and direction.
Bit 1: Error/Warning. If bit 0 is set, the message is a warning or similar information and SHOULD be attached to the following command or reply. Otherwise it is an error.
All other bits contain the message ID, left-shifted by two bits. This scheme allows for five concurrent messages per direction before encoding to two bytes is required.
Negative integers signal that the ID has been allocated by that message’s recipient. They are inverted bit-wise, i.e. (-1-id). Thus an ID of zero is legal. The bits described above are not affected by his inversion. Thus a command with ID=1 (no streaming, no error) is sent with an initial integer of 4; the reply uses -5.
Streaming
Data streams are inherently bidirectional. The command’s semantics SHOULD specify which side of a stream is supposed to send data. Error -2 will be sent (once) if a streamed item is received that won’t be handled.
Streaming may start when both sides have exchanged initial messages. Sending a stream SHOULD NOT commence before the initial command has been replied to.
Messages with both the streaming and error bits set carry out-of-band data while the stream is open, e.g. advising the recipient of data loss. Otherwise they MAY be delivered as warnings or similar out-of-band data. Conceptally, these messages are attached to the command or reply that immediately follows them.
For both directions, the initial and final message are assumed to be out-of-band data. This also applies to warnings.
Flow Control
For the most part: None. MoaT-Cmd is mostly used for monitoring events or enumerating small data sets.
However, if a stream’s recipient has limited buffer space and sends a command that might trigger a nontrivial amount of messages, it MAY send a specific warning (i.e. a message with both Error and Streaming bits set) before its initial command or reply. This warning MUST consist of a single non-negative integer that advises the sender of the number of streamed messages it may transmit.
During stream transmission, the recipient then SHOULD periodically send some more (positive) integers to signal the availability of more buffer space. It MUST send such a message if the counter is zero (after space becomes available of course) and more messages are expected.
The initial flow control messages SHOULD be sent before the initial command or reply, but MAY be deferred until later.
A receiver SHOULD start flow control sufficiently early, but that isn’t always feasible. It MUST notify the remote side (error -5, below) if an incoming message gets dropped due to resource exhaustion; likewise, the API is required to notify the local side.
Error handling
The exact semantics of error messages are application specific.
Error messages with the streaming bit clear terminate the command. They should be treated as fatal.
Error messages with the streaming bit set are either flow control messages (see above) or warnings.
Known errors
-1: Unspecified
The .stop() API method was called.
This message MAY be sent as a warning.
Usage: assume that a sender reads and transmits a block of ten measurements each second. If a “stop” warning arrives, the sender should complete the current block before terminating, while a “stop” error forces the current transmission to end immediately.
-2: Can’t receive this stream
Sent if a command isn’t prepared to receive a streamed reply.
-3: Cancel
The sender’s or receiver’s task is cancelled: the work is no longer required / performed.
This message SHOULD NOT be transmitted as a warning; that would be pointless.
-4: No Commands
The sender on this side doesn’t process commands at all.
-5: Data loss
An incoming message was dropped due to resource exhaustion (full queue).
This message SHOULD be sent as a warning, but MAY be interpreted as a hard error by its receiver.
-6: Must stream
Sent if a command isn’t prepared to handle a non-streamed request or reply.
-11 …: No Command
The command is not recognized.
The error number encodes the command’s position for a hierarchical lookup at the destination, i.e. if the command is (“foo”,”bahr”,”baz”) and “foo” doesn’t know about “bahr”, the error is -12.
Examples
S |
E |
D |
Data |
---|---|---|---|
Hello |
|||
You too |
S |
E |
D |
Data |
---|---|---|---|
Hello again |
|||
Meh. you already said that |
S |
E |
D |
Data |
---|---|---|---|
gimme some data |
|||
OK here they are |
|||
ONE |
|||
TWO |
|||
Missed some |
|||
FIVE |
|||
Oops? better stop |
|||
SIX |
|||
stopped |
S |
E |
D |
Data |
---|---|---|---|
I want to send some data |
|||
OK send them |
|||
FOO |
|||
Nonono I don’t want those after all |
|||
BAR |
|||
OK OK I’ll stop |
S |
E |
D |
Data |
---|---|---|---|
gimme some more data |
|||
OK here they are |
|||
NINE |
|||
TEN |
|||
oops I crashed |
|||
sigh |
S |
E |
D |
Data |
---|---|---|---|
Let’s talk |
|||
OK |
|||
voice data … |
|||
also voice data … |
|||
hanging up |
|||
oh well |
S |
E |
D |
Data |
---|---|---|---|
2 |
|||
gimme your database |
|||
OK here they are |
|||
A |
|||
1 |
|||
BB |
|||
1 |
|||
CCC |
|||
DDDD |
|||
5 |
|||
EEEEE |
|||
FFFFFF |
|||
GGGGGGG |
|||
that’s all |
|||
thx |
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file moat_lib_cmd-0.3.2.tar.gz
.
File metadata
- Download URL: moat_lib_cmd-0.3.2.tar.gz
- Upload date:
- Size: 17.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 3ab0f7bc2c42e9980c31a750a7d64fd3e11058ace25356cba99adb1405c30462 |
|
MD5 | e07d5aad8a4884ad95edb2a999d3d1ec |
|
BLAKE2b-256 | 2ec895dc23f24f458b80da650f4be2a8df94994cb351ff9c5ae4ae1357934af9 |
File details
Details for the file moat_lib_cmd-0.3.2-py3-none-any.whl
.
File metadata
- Download URL: moat_lib_cmd-0.3.2-py3-none-any.whl
- Upload date:
- Size: 11.4 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.1 CPython/3.12.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 7f298a19e98fe022cf45771a66263ba356d87a48c6d3661b5c7a136dfbda4881 |
|
MD5 | 7db5b4041f5a0c5dc64d98184b83136e |
|
BLAKE2b-256 | d9c5bcbb3f678345b3b60be4c99252e023c2df988ffaac8b7895f4faf0fc8609 |