Skip to main content

Wrapper for ZMQ comunication.

Project description

ZMQ Tubes

ZMQ Tubes is a managing system for ZMQ communication. It can manage many ZMQ sockets by one interface. The whole system is hierarchical, based on topics (look at MQTT topics).

Classes

  • TubeMessage - This class represents a request/response message. Some types of tubes require a response in this format.
  • Tube - This class wraps a ZMQ socket. It represents a connection between client and server.
  • TubeNode - This represents an application interface for communication via tubes.

Usage:

Node definitions in yml file

We can define all tubes for one TubeNode by yml file.

# test.yml
tubes:
  - name: Client REQ
    addr:  ipc:///tmp/req.pipe      
    tube_type: REQ
    topics:
      - foo/#
      - +/bar
  
  - name: Client PUB
    addr:  ipc:///tmp/pub.pipe      
    tube_type: PUB
    topics:
      - foo/pub/#

  - name: Server ROUTER
    addr:  ipc:///tmp/router.pipe      
    tube_type: ROUTER
    server: yes
    sockopts:
      LINGER: 0
    topics:
      - server/#
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return request.get_response('response')


with open('test.yml', 'r+') as fd:    
    schema = yaml.safe_load(fd)

node = TubeNode(schema=schema)
node.register_handler('server/#', handler)
asyncio.current_task(node.start(), name="Server")

node.publish('foo/pub/test', 'message 1')
print(await node.request('foo/xxx', 'message 2'))

Request / Response

This is a simple scenario, the server processes the requests serially.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'answer'
  # or return request.get_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()

# output: 'question'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_resp.pipe',
  tube_type='REQ'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'

Subscribe / Publisher

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/sub_pub.pipe',
  server=True,
  tube_type='SUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/sub_pub.pipe',
  tube_type='PUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')        

Request / Router

The server is asynchronous. It means it is able to process more requests at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.get_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

import asyncio
from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_router.pipe',
  tube_type='REQ'
)


async def task(node, text):
  print(await node.request('test/xxx', text))


node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'

Dealer / Response

The client is asynchronous. It means it is able to send more requests at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'response'
  # or return requset.get_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_resp.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message')

# output: 'response'

Dealer / Router

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.get_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_router.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'wait')
node.send('test/xxx', 'message')

# output: 'message'
# output: 'wait'

Dealer / Dealer

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_dealer.pipe',
  server=True,
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from server')
# output: 'message from client'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_dealer.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from client')
# output: 'message from server'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_tubes-0.1.0.tar.gz (5.9 kB view details)

Uploaded Source

Built Distribution

zmq_tubes-0.1.0-py3-none-any.whl (6.3 kB view details)

Uploaded Python 3

File details

Details for the file zmq_tubes-0.1.0.tar.gz.

File metadata

  • Download URL: zmq_tubes-0.1.0.tar.gz
  • Upload date:
  • Size: 5.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.6 CPython/3.9.6 Linux/5.13.6-200.fc34.x86_64

File hashes

Hashes for zmq_tubes-0.1.0.tar.gz
Algorithm Hash digest
SHA256 2aac49e435b71b56c9d0c1b2d89eb9b484d8a2ea1fbd19b5b23de64215ad5af9
MD5 9e6115637195eca310a84c163164a438
BLAKE2b-256 4805c02d2cd0c426618cf63daac3fefe866d0b345a41fb2a13f14535862a712e

See more details on using hashes here.

File details

Details for the file zmq_tubes-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: zmq_tubes-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 6.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.1.6 CPython/3.9.6 Linux/5.13.6-200.fc34.x86_64

File hashes

Hashes for zmq_tubes-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 668cfc755278a24d0467befc862e205fdfbbaefc66f232a4eb50510f235f30f9
MD5 aa170196d26ed962c01ccca4e9e9e6d7
BLAKE2b-256 6397832744175aeb8fd74918b63c6bb517f99a61fe7af4917df3fa2d6db7eea7

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page