Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type

############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(f"Received message: {message}")
        outbound_message = PublishMessage(
            body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
        )  # change the bucket name for some reason
        self.producer.publish(outbound_message)


############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
    default_routing_key: str = "outbound.message.m"

# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

# Actually Connect and run the consumer
def main():
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(producer=producer)
            consumer.listen(message_processor)

if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.0.0rc9.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.0.0rc9-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.0.0rc9.tar.gz.

File metadata

  • Download URL: talus-1.0.0rc9.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc9.tar.gz
Algorithm Hash digest
SHA256 d8f96aac99c0358502f7de66e84b0ce6d7a29755ac51b23494a3f713e09531b6
MD5 dabbaf1d986008e2b35b2cfc98929ae8
BLAKE2b-256 33cb6d323630fb5f8a09fcd61d2428efab596cb4cf9d6b8f65d27de23b8df060

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.0.0rc9-py3-none-any.whl.

File metadata

  • Download URL: talus-1.0.0rc9-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc9-py3-none-any.whl
Algorithm Hash digest
SHA256 a370c3deb5a38ceb3d5a7380cbe135d8e06bfeaceff1f077e54404ed22becb16
MD5 0be7cd71e3dc2c01f443475bfff55528
BLAKE2b-256 8859d50580def2f98ae3abddea681411c88e2304530cdb01990c2d1db181aa82

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page