Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type

############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(f"Received message: {message}")
        outbound_message = PublishMessage(
            body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
        )  # change the bucket name for some reason
        self.producer.publish(outbound_message)


############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
    default_routing_key: str = "outbound.message.m"

# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

# Actually Connect and run the consumer
def main():
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(producer=producer)
            consumer.listen(message_processor)

if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.0.0rc12.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.0.0rc12-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.0.0rc12.tar.gz.

File metadata

  • Download URL: talus-1.0.0rc12.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc12.tar.gz
Algorithm Hash digest
SHA256 da42ef4809a0bd468c07dd49d6afcebfc6bba681575b1dcbb6bc88e5f2aa218a
MD5 8baf50ede44a27cd2e6d3d91b1269c38
BLAKE2b-256 a0c9dd9ef3f8ec38832d98226840a95a9c356ae8dc0a397b079ad27483805a5f

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.0.0rc12-py3-none-any.whl.

File metadata

  • Download URL: talus-1.0.0rc12-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc12-py3-none-any.whl
Algorithm Hash digest
SHA256 82d9561bdaaa930767449b9efbaaae08419bb7c3a9b3408ca0a91430ce90e1d8
MD5 b38382b6e844c5fede020a3bf9e956f8
BLAKE2b-256 9771875a18fa035482a7f8a5619d71fdf6df4b168955117e137ab5e0db0e2ffb

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page