Wrapper for ZMQ comunication.
Project description
ZMQ Tubes
ZMQ Tubes is a managing system for ZMQ communication. It can manage many ZMQ sockets by one interface. The whole system is hierarchical, based on topics (look at MQTT topics).
Classes
- TubeMessage - This class represents a request/response message. Some types of tubes require a response in this format.
- Tube - This class wraps a ZMQ socket. It represents a connection between client and server.
- TubeNode - This represents an application interface for communication via tubes.
Usage:
Node definitions in yml file
We can define all tubes for one TubeNode by yml file.
# test.yml
tubes:
- name: Client REQ
addr: ipc:///tmp/req.pipe
tube_type: REQ
topics:
- foo/#
- +/bar
- name: Client PUB
addr: ipc:///tmp/pub.pipe
tube_type: PUB
topics:
- foo/pub/#
- name: Server ROUTER
addr: ipc:///tmp/router.pipe
tube_type: ROUTER
server: yes
sockopts:
LINGER: 0
topics:
- server/#
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return request.get_response('response')
with open('test.yml', 'r+') as fd:
schema = yaml.safe_load(fd)
node = TubeNode(schema=schema)
node.register_handler('server/#', handler)
asyncio.current_task(node.start(), name="Server")
node.publish('foo/pub/test', 'message 1')
print(await node.request('foo/xxx', 'message 2'))
Request / Response
This is a simple scenario, the server processes the requests serially.
Server:
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'answer'
# or return request.get_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/req_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'question'
Client:
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_resp.pipe',
tube_type='REQ'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'
Subscribe / Publisher
Server:
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/sub_pub.pipe',
server=True,
tube_type='SUB'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
Client:
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/sub_pub.pipe',
tube_type='PUB'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')
Request / Router
The server is asynchronous. It means it is able to process more requests at the same time.
Server:
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.get_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/req_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
Client:
import asyncio
from zmq_tubes import Tube, TubeNode
tube = Tube(
name='Client',
addr='ipc:///tmp/req_router.pipe',
tube_type='REQ'
)
async def task(node, text):
print(await node.request('test/xxx', text))
node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'
Dealer / Response
The client is asynchronous. It means it is able to send more requests at the same time.
Server:
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
return 'response'
# or return requset.get_response('response')
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_resp.pipe',
server=True,
tube_type='REP'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'
Client:
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_resp.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
node.send('test/xxx', 'message')
# output: 'response'
Dealer / Router
The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.
Server:
import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage
async def handler(request: TubeMessage):
print(request.payload)
if request.payload == 'wait':
await asyncio.sleep(10)
return request.get_response(request.payload)
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_router.pipe',
server=True,
tube_type='ROUTER'
)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'
Client:
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_router.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
node.send('test/xxx', 'wait')
node.send('test/xxx', 'message')
# output: 'message'
# output: 'wait'
Dealer / Dealer
The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.
Server:
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Server',
addr='ipc:///tmp/dealer_dealer.pipe',
server=True,
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
node.send('test/xxx', 'message from server')
# output: 'message from client'
Client:
from zmq_tubes import Tube, TubeNode, TubeMessage
tube = Tube(
name='Client',
addr='ipc:///tmp/dealer_dealer.pipe',
tube_type='DEALER'
)
async def handler(response: TubeMessage):
print(response.payload)
node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
node.send('test/xxx', 'message from client')
# output: 'message from server'
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file zmq_tubes-1.2.0.tar.gz
.
File metadata
- Download URL: zmq_tubes-1.2.0.tar.gz
- Upload date:
- Size: 9.7 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.9.10 Linux/5.11.0-1028-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | dd33f5287570d656e8bc77a28cf723081e548e9192c645cdf0f6c42445d24776 |
|
MD5 | a64982814d2fa54924c522cf981f2987 |
|
BLAKE2b-256 | c91d0e8992455acce3d086f0092269d2e765524ec8b00ec06eede0527fe19fbe |
File details
Details for the file zmq_tubes-1.2.0-py3-none-any.whl
.
File metadata
- Download URL: zmq_tubes-1.2.0-py3-none-any.whl
- Upload date:
- Size: 8.1 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: poetry/1.1.12 CPython/3.9.10 Linux/5.11.0-1028-azure
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | ef05a3be78572df11d2163891873103853582cbe456212dc2e2b48f31bc88492 |
|
MD5 | 3d3732d89847fa7fb370c1ca969a3da2 |
|
BLAKE2b-256 | 27f99cca8451df3b6041c8c24803292c97bc027b7887c392b3409a57f23c5d24 |