Skip to main content

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Project description

codecov

talus (noun) - ta·​lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.

A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.

Features

  • Guided separation of connections for producers and consumers

  • Re-establish connections to the server when lost

  • Constrained interface to support simple produce / consume use cases for direct exchanges

Installation

pip install talus

Examples

Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing

from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type

############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
    objectName: str
    bucket: str

class ConsumeMessage(ConsumeMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody

# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")

# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
    def process_message(self, message: ConsumeMessage):
        print(f"Received message: {message}")
        outbound_message = PublishMessage(
            body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
        )  # change the bucket name for some reason
        self.producer.publish(outbound_message)


############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
    message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
    default_routing_key: str = "outbound.message.m"

# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")


# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
            Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.

# Actually Connect and run the consumer
def main():
    with DurableProducer(
        queue_bindings=bindings,
        publish_exchange=publish_exchange,
        connection_parameters=ProducerConnectionParameterFactory(),
        connection_retryer=ConnectionRetryerFactory(),
    ) as producer:
        with DurableConsumer(
            consume_queue=inbound_queue,
            connection_parameters=ConsumerConnectionParameterFactory(),
            connection_retryer=ConnectionRetryerFactory(),
        ) as consumer:
            message_processor = MessageProcessor(producer=producer)
            consumer.listen(message_processor)

if __name__ == "__main__":
    main()

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

talus-1.0.0rc10.tar.gz (21.6 kB view details)

Uploaded Source

Built Distribution

talus-1.0.0rc10-py3-none-any.whl (25.2 kB view details)

Uploaded Python 3

File details

Details for the file talus-1.0.0rc10.tar.gz.

File metadata

  • Download URL: talus-1.0.0rc10.tar.gz
  • Upload date:
  • Size: 21.6 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc10.tar.gz
Algorithm Hash digest
SHA256 2a6392ce5b44c76a3f35d29503fbbe5e10e6456b1adfe401b193cfa6ab13d4ea
MD5 ac503f4b117b6c7cffa7b2e67c378979
BLAKE2b-256 d4bcf886e5034cc26dbf6dc83c1547697aa3932bcd216605c93e2d861f149f10

See more details on using hashes here.

Provenance

File details

Details for the file talus-1.0.0rc10-py3-none-any.whl.

File metadata

  • Download URL: talus-1.0.0rc10-py3-none-any.whl
  • Upload date:
  • Size: 25.2 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.0 CPython/3.11.9

File hashes

Hashes for talus-1.0.0rc10-py3-none-any.whl
Algorithm Hash digest
SHA256 698116580b84a1e92d59ee6052466c5e9cb1c79caf80faea995e6fca0f2dfe0e
MD5 f6e096942634a9189a1109001d88e1d7
BLAKE2b-256 573c110d9eb63ad1e93f6b4d906878973ef19eaa9acdad3bae2fa765b8af8f59

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page