Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

Uses default connection parameters and connection retryer expecting a rabbitmq server running in its default configuration.

from talus import DurableConsumer
from talus import DurableProducer
from talus import ConnectionRetryerFactory
from talus import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus import MessageProcessorBase
from talus import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus import Queue
from talus import Exchange
from talus import Binding
from typing import Type

##########################
# Consumer Configurations#
##########################
# Configure messages that will be consumed
class ConsumeMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ConsumeMessageBody] = ConsumeMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")


###########################
# Producer Configurations #
###########################
# Configure messages that will be produced
class ProducerMessageBody(MessageBodyBase):
    key: str
    code: str

class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ProducerMessageBody] = ProducerMessageBody
    default_routing_key: str = "outbound.message.m"

# Configure the queues the message should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing (Publish Message -> Outbound Queues)
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.


############################
# Processor Configurations #
############################

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(message)
        outbound_message = PublishMessage(
            body=ProducerMessageBody(
                key=message.body.objectName,
                code="newBucket",
                conversationId=message.body.conversationId,
            )
        )  # crosswalk the values from the consumed message to the produced message
        self.producer.publish(outbound_message)
        print(outbound_message)


# Actually Connect and run the consumer
def main():
    """Starts a listener which will consume messages from the inbound queue and publish messages to the outbound queues."""
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(message_cls=ConsumeMessage, producer=producer)
            consumer.listen(message_processor)


if __name__ == "__main__":
    # First message to consume
    class InitialMessage(PublishMessageBase):
        message_body_cls: Type[
            ConsumeMessageBody] = ConsumeMessageBody
        default_routing_key: str = "inbound.message.m"

    initial_message_bindings = [Binding(queue=inbound_queue, message=InitialMessage)]

    with DurableProducer(
            queue_bindings=initial_message_bindings,
            publish_exchange=publish_exchange,
            connection_parameters=ProducerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        producer.publish(InitialMessage(body={"objectName": "object", "bucket": "bucket"}))
    # Consume the message and process it
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.2.0rc1.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.2.0rc1-py3-none-any.whl (25.0 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.2.0rc1.tar.gz.

File metadata

  • Download URL: talus-1.2.0rc1.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for talus-1.2.0rc1.tar.gz
Algorithm Hash digest
SHA256 ead1e120af734c90e6a601c2a7d825c3a0f90a30fc094afa100863214e8a0d40
MD5 5241ca0bc9ba809e80aa5d3eace2bd8a
BLAKE2b-256 6dcca5fa8ad27a0ddda696e17ed122b206c8ebde585baa13e065533e217fc56b

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.2.0rc1-py3-none-any.whl.

File metadata

  • Download URL: talus-1.2.0rc1-py3-none-any.whl
  • Upload date:
  • Size: 25.0 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.9

File hashes

Hashes for talus-1.2.0rc1-py3-none-any.whl
Algorithm Hash digest
SHA256 d9132f73ebe7a2b485f0b41ed5a153576369f76c67ef4c39f1038d21d7ff678d
MD5 28b4f32ae8f395b6289c0d9e7166225b
BLAKE2b-256 8372266d28557f58290f9bd306ee7bf0d5910412e636f909cab725769bcb4e16

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page