Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type

############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(f"Received message: {message}")
        outbound_message = PublishMessage(
            body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
        )  # change the bucket name for some reason
        self.producer.publish(outbound_message)


############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
    default_routing_key: str = "outbound.message.m"

# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

# Actually Connect and run the consumer
def main():
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(producer=producer)
            consumer.listen(message_processor)

if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.0.0rc11.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.0.0rc11-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.0.0rc11.tar.gz.

File metadata

  • Download URL: talus-1.0.0rc11.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc11.tar.gz
Algorithm Hash digest
SHA256 d856fed8d867708d91cb0c48bdef53c72ebcf6dc914c0770760e4839faa06a5c
MD5 14037e8a5eb84f60539dd3b161fe7088
BLAKE2b-256 ea8e10ad780fad8a6e3d4ef2716eb2db734b8e426ce24ea440e211907ea3c674

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.0.0rc11-py3-none-any.whl.

File metadata

  • Download URL: talus-1.0.0rc11-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc11-py3-none-any.whl
Algorithm Hash digest
SHA256 cce97b5552ff4816da9dcd63f065275f38fdca15072e841365163d2160a4d8b3
MD5 ba13a9123578caa439fde91ea12eb1b0
BLAKE2b-256 44cd492453c9ea2acea359c90d19207978ed1b1ae73e5ecf11cd961115fed513

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page