A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.
Project description
talus (noun) - ta·lus | ˈtā-ləs: a slope formed especially by an accumulation of rock debris; Occasional habitat of the pika.
A wrapper for connecting to RabbitMQ which constrains clients to a single purpose channel (producer or consumer) with healing for intermittent connectivity.
Features
Guided separation of connections for producers and consumers
Re-establish connections to the server when lost
Constrained interface to support simple produce / consume use cases for direct exchanges
Installation
pip install talus
Examples
Creating a consumer which listens on a queue, processes valid messages and publishes as part of processing
from talus.consumer DurableConsumer
from talus.producer DurableProducer
from talus.models.retryer import ConnectionRetryerFactory
from talus.models.connection_parameters import ConsumerConnectionParameterFactory, ProducerConnectionParameterFactory
from talus.models.processor import MessageProcessorBase
from talus.models.message import ConsumeMessageBase, PublishMessageBase, MessageBodyBase
from talus.models.queue import Queue
from talus.models.exchange import Exchange
from talus.models.binding import Binding
from typing import Type
############
# Consumer #
############
# Configure messages that will be consumed
class ObjectMessageBody(MessageBodyBase):
objectName: str
bucket: str
class ConsumeMessage(ConsumeMessageBase):
message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody
# Configure the queue the messages should be consumed from
inbound_queue = Queue(name="inbound.q")
# Configure a message processor to handle the consumed messages
class MessageProcessor(MessageProcessorBase):
def process_message(self, message: ConsumeMessage):
print(f"Received message: {message}")
outbound_message = PublishMessage(
body=ObjectMessageBody(objectName=message.body.objectName, bucket="newBucket"),
) # change the bucket name for some reason
self.producer.publish(outbound_message)
############
# Producer #
############
# Configure messages that will be produced
class PublishMessage(PublishMessageBase):
message_body_cls: Type[ObjectMessageBody] = ObjectMessageBody # using the same schema for simplicity
default_routing_key: str = "outbound.message.m"
# Configure the queue the messages should be routed to
outbound_queue_one = Queue(name="outbound.one.q")
outbound_queue_two = Queue(name="outbound.two.q")
# Configure the exchange and queue bindings for publishing
publish_exchange = Exchange(name="outbound.exchange") # Direct exchange by default
bindings = [Binding(queue=outbound_queue_one, message=PublishMessage),
Binding(queue=outbound_queue_two, message=PublishMessage)] # publishing PublishMessage will route to both queues.
# Actually Connect and run the consumer
def main():
with DurableProducer(
queue_bindings=bindings,
publish_exchange=publish_exchange,
connection_parameters=ProducerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as producer:
with DurableConsumer(
consume_queue=inbound_queue,
connection_parameters=ConsumerConnectionParameterFactory(),
connection_retryer=ConnectionRetryerFactory(),
) as consumer:
message_processor = MessageProcessor(producer=producer)
consumer.listen(message_processor)
if __name__ == "__main__":
main()
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file talus-1.0.0rc11.tar.gz
.
File metadata
- Download URL: talus-1.0.0rc11.tar.gz
- Upload date:
- Size: 21.6 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | d856fed8d867708d91cb0c48bdef53c72ebcf6dc914c0770760e4839faa06a5c |
|
MD5 | 14037e8a5eb84f60539dd3b161fe7088 |
|
BLAKE2b-256 | ea8e10ad780fad8a6e3d4ef2716eb2db734b8e426ce24ea440e211907ea3c674 |
Provenance
File details
Details for the file talus-1.0.0rc11-py3-none-any.whl
.
File metadata
- Download URL: talus-1.0.0rc11-py3-none-any.whl
- Upload date:
- Size: 25.2 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/5.1.0 CPython/3.11.9
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | cce97b5552ff4816da9dcd63f065275f38fdca15072e841365163d2160a4d8b3 |
|
MD5 | ba13a9123578caa439fde91ea12eb1b0 |
|
BLAKE2b-256 | 44cd492453c9ea2acea359c90d19207978ed1b1ae73e5ecf11cd961115fed513 |