Skip to main content

Wrapper for ZMQ comunication.

Project description

PyPI PyPI - Python Version License

ZMQ Tubes

ZMQ Tubes is a managing system for ZMQ communication. It can manage many ZMQ sockets by one interface. The whole system is hierarchical, based on topics (look at MQTT topics).

Classes

  • TubeMessage - This class represents a request/response message. Some types of tubes require a response in this format.
  • Tube - This class wraps a ZMQ socket. It represents a connection between client and server.
  • TubeNode - This represents an application interface for communication via tubes.

Asyncio / Threading

The library support bot method. Asyncio from Python 3.7.

from zmq_tubes import TubeNode, Tube            # Asyncio classes
from zmq_tubes.threads import TubeNode, Tube    # Threads classes

Usage:

Node definitions in yml file

We can define all tubes for one TubeNode by yml file.

# test.yml
tubes:
  - name: Client REQ
    addr:  ipc:///tmp/req.pipe      
    tube_type: REQ
    topics:
      - foo/#
      - +/bar
  
  - name: Client PUB
    addr:  ipc:///tmp/pub.pipe      
    tube_type: PUB
    topics:
      - foo/pub/#

  - name: Server ROUTER
    addr:  ipc:///tmp/router.pipe      
    tube_type: ROUTER
    server: yes
    sockopts:
      LINGER: 0
    topics:
      - server/#
import asyncio
import yaml
from zmq_tubes import TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return request.create_response('response')


async def run():
  with open('test.yml', 'r+') as fd:
    schema = yaml.safe_load(fd)
  node = TubeNode(schema=schema)
  node.register_handler('server/#', handler)
  with node:
      node.publish('foo/pub/test', 'message 1')
      print(await node.request('foo/xxx', 'message 2'))

asyncio.run(run())

Request / Response

This is a simple scenario, the server processes the requests serially.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'answer'
  # or return request.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()

# output: 'question'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_resp.pipe',
  tube_type='REQ'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
response = await node.request('test/xxx', 'question')
print(response.payload)
# output: 'answer'

Subscribe / Publisher

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/sub_pub.pipe',
  server=True,
  tube_type='SUB'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/sub_pub.pipe',
  tube_type='PUB'
)
# In the case of publishing, the first message is very often
# lost. The workaround is to connect the tube manually as soon as possible.
tube.connect()

node = TubeNode()
node.register_tube(tube, 'test/#')
node.publish('test/xxx', 'message')        

Request / Router

The server is asynchronous. It means it is able to process more requests at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/req_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

import asyncio
from zmq_tubes import Tube, TubeNode

tube = Tube(
  name='Client',
  addr='ipc:///tmp/req_router.pipe',
  tube_type='REQ'
)


async def task(node, text):
  print(await node.request('test/xxx', text))


node = TubeNode()
node.register_tube(tube, 'test/#')
asyncio.create_task(task(node, 'wait'))
asyncio.create_task(task(node, 'message'))
# output: 'message'
# output: 'wait'

Dealer / Response

The client is asynchronous. It means it is able to send more requests at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  return 'response'
  # or return requset.create_response('response')


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_resp.pipe',
  server=True,
  tube_type='REP'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_resp.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message')

# output: 'response'

Dealer / Router

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

import asyncio
from zmq_tubes import Tube, TubeNode, TubeMessage


async def handler(request: TubeMessage):
  print(request.payload)
  if request.payload == 'wait':
    await asyncio.sleep(10)
  return request.create_response(request.payload)


tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_router.pipe',
  server=True,
  tube_type='ROUTER'
)

node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)
await node.start()
# output: 'wait'
# output: 'message'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_router.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'wait')
node.send('test/xxx', 'message')

# output: 'message'
# output: 'wait'

Dealer / Dealer

The client and server are asynchronous. It means it is able to send and process more requests/responses at the same time.

Server:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Server',
  addr='ipc:///tmp/dealer_dealer.pipe',
  server=True,
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from server')
# output: 'message from client'

Client:

from zmq_tubes import Tube, TubeNode, TubeMessage

tube = Tube(
  name='Client',
  addr='ipc:///tmp/dealer_dealer.pipe',
  tube_type='DEALER'
)


async def handler(response: TubeMessage):
  print(response.payload)


node = TubeNode()
node.register_tube(tube, 'test/#')
node.register_handler('test/#', handler)

node.send('test/xxx', 'message from client')
# output: 'message from server'

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

zmq_tubes-1.6.0.tar.gz (12.1 kB view details)

Uploaded Source

Built Distribution

zmq_tubes-1.6.0-py3-none-any.whl (12.1 kB view details)

Uploaded Python 3

File details

Details for the file zmq_tubes-1.6.0.tar.gz.

File metadata

  • Download URL: zmq_tubes-1.6.0.tar.gz
  • Upload date:
  • Size: 12.1 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.9.14 Linux/5.15.0-1020-azure

File hashes

Hashes for zmq_tubes-1.6.0.tar.gz
Algorithm Hash digest
SHA256 8c87a576eee8a614e5c382c9a18b9a97c1dd309de545808a82aa8cc6411d2891
MD5 e4a3ca9920de11a9e63179b44adcc34b
BLAKE2b-256 64e1e5d42e9d6cbc142ff77f52f8cbd79fc282ffea546eecda52f337c119df0d

See more details on using hashes here.

File details

Details for the file zmq_tubes-1.6.0-py3-none-any.whl.

File metadata

  • Download URL: zmq_tubes-1.6.0-py3-none-any.whl
  • Upload date:
  • Size: 12.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: poetry/1.2.1 CPython/3.9.14 Linux/5.15.0-1020-azure

File hashes

Hashes for zmq_tubes-1.6.0-py3-none-any.whl
Algorithm Hash digest
SHA256 afcd1f14a56b4af8f9d14e5513e937801c6f79693e76019b1c207ab8b2ab6656
MD5 b604c43d8afc7c28ba9255e5c9cbf715
BLAKE2b-256 7870208e5db9082117717e04c0e15e5f629563e361e9a19fa8564a5cd1f4dbcd

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page