Skip to main content

Robust, high-volume, message based communication made easy

Project description

kiwiPy

kiwiPy Travis CI Docs status Latest Version https://img.shields.io/pypi/wheel/kiwipy.svg https://img.shields.io/pypi/pyversions/kiwipy.svg https://img.shields.io/pypi/l/kiwipy.svg

kiwiPy is a library that makes remote messaging using RabbitMQ (and possibly other message brokers) EASY. It was designed to support high-throughput workflows in big-data and computational science settings and is currently used by AiiDA for computational materials research around the world. That said, kiwiPy is entirely general and can be used anywhere where high-throughput and robust messaging are needed.

Here’s what you get:

  • RPC

  • Broadcast (with filters)

  • Task queue messages

Let’s dive in, with some examples taken from the rmq tutorial. To see more detail head over to the documentation.

RPC

The client:

import kiwipy

with kiwipy.connect('amqp://localhost') as comm:
    # Send an RPC message
    print(" [x] Requesting fib(30)")
    response = comm.rpc_send('fib', 30).result()
    print((" [.] Got %r" % response))

(rmq_rpc_client.py source)

The server:

import threading
import kiwipy

def fib(comm, num):
    if num == 0:
        return 0
    if num == 1:
        return 1

    return fib(comm, num - 1) + fib(comm, num - 2)

with kiwipy.connect('amqp://127.0.0.1') as comm:
    # Register an RPC subscriber with the name 'fib'
    comm.add_rpc_subscriber(fib, 'fib')
    # Now wait indefinitely for fibonacci calls
    threading.Event().wait()

(rmq_rpc_server.py source)

Worker

Create a new task:

import sys
import kiwipy

message = ' '.join(sys.argv[1:]) or "Hello World!"

with rmq.connect('amqp://localhost') as comm:
    comm.task_send(message)

(rmq_new_task.py source)

And the worker:

import time
import threading
import kiwipy

print(' [*] Waiting for messages. To exit press CTRL+C')


def callback(_comm, task):
    print((" [x] Received %r" % task))
    time.sleep(task.count(b'.'))
    print(" [x] Done")


try:
    with kiwipy.connect('amqp://localhost') as comm:
        comm.add_task_subscriber(callback)
        threading.Event().wait()
except KeyboardInterrupt:
    pass

(rmq_worker.py source)

Versioning

This software follows Semantic Versioning

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

kiwipy-0.6.0.tar.gz (43.5 kB view details)

Uploaded Source

Built Distribution

kiwipy-0.6.0-py2.py3-none-any.whl (27.5 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file kiwipy-0.6.0.tar.gz.

File metadata

  • Download URL: kiwipy-0.6.0.tar.gz
  • Upload date:
  • Size: 43.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.6.0 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8

File hashes

Hashes for kiwipy-0.6.0.tar.gz
Algorithm Hash digest
SHA256 cd5cbc8eb5dca397066da0487b9a8a1821c2c7d8e07a899a42989a858ed944d0
MD5 4b46b485dd0baf0401a6bdca7289bf21
BLAKE2b-256 81293e02c16717ffe246b5820f0722b40a9c5479fe125a44430b3eace90e2d22

See more details on using hashes here.

Provenance

File details

Details for the file kiwipy-0.6.0-py2.py3-none-any.whl.

File metadata

  • Download URL: kiwipy-0.6.0-py2.py3-none-any.whl
  • Upload date:
  • Size: 27.5 kB
  • Tags: Python 2, Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/2.0.0 pkginfo/1.5.0.1 requests/2.22.0 setuptools/41.6.0 requests-toolbelt/0.9.1 tqdm/4.37.0 CPython/3.6.8

File hashes

Hashes for kiwipy-0.6.0-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 2e68ba8a74bffb9319c48aa4a33ab78ba5f4c460a1ce7b39092ea2183eb857e5
MD5 db36f592a7041ede1f90a7f6482fdba7
BLAKE2b-256 fb5b9ab8910fcb7b861ae63f0c571bf2be790734a12487c3e5358d6610eef117

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page