Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type

############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(f"Received message: {message}")
        outbound_message = PublishMessage(
            body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
        )  # change the bucket name for some reason
        self.producer.publish(outbound_message)


############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
    default_routing_key: str = "outbound.message.m"

# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

# Actually Connect and run the consumer
def main():
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(producer=producer)
            consumer.listen(message_processor)

if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.0.0.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.0.0-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.0.0.tar.gz.

File metadata

  • Download URL: talus-1.0.0.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0.tar.gz
Algorithm Hash digest
SHA256 181b160c5a7b684accd7f1d4e92e544b7f925adf164ab3a772837fca0b9b1f36
MD5 3b211fd8ff7a0a93edeb645139478af9
BLAKE2b-256 5553ac707412a3110adad827479d9dd6f97755f0db7d1108b0908bcb27fa9655

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.0.0-py3-none-any.whl.

File metadata

  • Download URL: talus-1.0.0-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0-py3-none-any.whl
Algorithm Hash digest
SHA256 f391439497d87a2481246a2a2eae4b82338fb6145ba4030895d9b2d825e3035d
MD5 39305dbe17c7acb100c5c60e45f0f773
BLAKE2b-256 a84e34a3ce5af1b584ccd63d55ca9d81cc0f20bc5d40c59bbf3bfb08e5f374a1

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page