Skip to main content

Wrapper for ZMQ comunication.

Project description

PyPI PyPI - Python Version License

ZMQ Tubes

ZMQ Tubes is a managing system for ZMQ communication. It can manage many ZMQ sockets by one interface. The whole system is hierarchical, based on topics (look at MQTT topics).

Classes

  • TubeMessage - This class represents a request/response message. Some types of tubes require a response in this format.
  • Tube - This class wraps a ZMQ socket. It represents a connection between client and server.
  • TubeNode - This represents an application interface for communication via tubes.

Asyncio / Threading

The library support bot method. Asyncio from Python 3.7.

from zmq_tubes import TubeNode, Tube            # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube    # Threads classes

Usage:

Node definitions in yml file

We can define all tubes for one TubeNode by yml file.

# test.yml
tubes:
  - name: Client REQ
    addr:  ipc:///tmp/req.pipe      
    tube_type: REQ
    topics:
      - foo/#
      - +/bar
  
  - name: Client PUB
    addr:  ipc:///tmp/pub.pipe      
    tube_type: PUB
    topics:
      - foo/pub/#

  - name: Server ROUTER
    addr:  ipc:///tmp/router.pipe      
    tube_type: ROUTER
    server: yes
    sockopts:
      LINGER: 0
    topics:
      - server/#
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return request.create_response('response')


async def run():
  with open('test.yml', 'r+') as fd:
    schema = yaml.safe_load(fd)
  node = TubeNode(schema=schema)
  node.register_handler('server/#', handler)
  with node:
      node.publish('foo/pub/test', 'message 1')
      print(await node.request('foo/xxx', 'message 2'))

asyncio.run(run())

Request / Response

This is a simple scenario, the server processes the requests serially.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'answer'
  # or return request.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()

# output: 'question'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_resp.pipe',
  tube_type='REQ'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'

Subscribe / Publisher

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/sub_pub.pipe',
  server=True,
  tube_type='SUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/sub_pub.pipe',
  tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()

node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')        

Request / Router

The server is asynchronous. It means it is able to process more requests at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

import asyncio
from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_router.pipe',
  tube_type='REQ'
)


async def task(node, text):
  print(await node.request('test/xxx', text))


node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'

Dealer / Response

The client is asynchronous. It means it is able to send more requests at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'response'
  # or return requset.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_resp.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message')

# output: 'response'

Dealer / Router

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_router.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'wait')
node.send('test/xxx', 'message')

# output: 'message'
# output: 'wait'

Dealer / Dealer

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_dealer.pipe',
  server=True,
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from server')
# output: 'message from client'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_dealer.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from client')
# output: 'message from server'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_tubes-1.7.0.tar.gz (12.0 kB view details)

Uploaded Source

Built Distribution

zmq_tubes-1.7.0-py3-none-any.whl (12.0 kB view details)

Uploaded Python 3

File details

Details for the file zmq_tubes-1.7.0.tar.gz.

File metadata

  • Download URL: zmq_tubes-1.7.0.tar.gz
  • Upload date:
  • Size: 12.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.9.14 Linux/5.15.0-1020-azure

File hashes

Hashes for zmq_tubes-1.7.0.tar.gz
Algorithm Hash digest
SHA256 b3a68c5a3687c6fd7757b040708da8d1898b0d8266c62d06ce8bd33fc36d2865
MD5 9fc5795bdd39c01fbedae6741f2652ec
BLAKE2b-256 54178790936d9853944fcb0db84e291db640f6d07f0f2192e4e84b1aefbfcd98

See more details on using hashes here.

File details

Details for the file zmq_tubes-1.7.0-py3-none-any.whl.

File metadata

  • Download URL: zmq_tubes-1.7.0-py3-none-any.whl
  • Upload date:
  • Size: 12.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.9.14 Linux/5.15.0-1020-azure

File hashes

Hashes for zmq_tubes-1.7.0-py3-none-any.whl
Algorithm Hash digest
SHA256 fa650937958b9ec80175d1272e95b8a6a604ac052366d5684973e6e99e2a5a61
MD5 cb99ae3f347b952e4162fa90e1763468
BLAKE2b-256 b02d41c1e3a582c066855791b28fd8f406dd615214ed347f8270f522d7b3629b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page