Send and receive domain events via RabbitMQ
Project description
Domain event broker
This library provides a shallow layer on top of RabbitMQ topic exchanges for publishing and receiving domain events. Publisher and subscriber need not know about each other and can be started and stopped in any order. Each subscriber controls their own retry policy, whether they need a durable queue for the time they are down, or a dead-letter queue in case there is an error in the subscriber.
Configuration
This library needs to connect to RabbitMQ. By default, a local instance of
RabbitMQ is used. This can be changed by passing an amqp
URL
to publish_domain_event
or when instantiating Publisher
or Subscriber
:
from domain_event_broker import Subscriber
subscriber = Subscriber('amqp://user:password@rabbitmq-host/domain-events')
Integrations
Django
This library can be configured via your Django settings. Add
domain_event_broker.django to your INSTALLED_APPS
and set the
DOMAIN_EVENT_BROKER
in your settings:
INSTALLED_APPS = (
'domain_event_broker.django',
)
DOMAIN_EVENT_BROKER = 'amqp://user:password@rabbitmq-host/domain-events'
More information can be found in the documentation.
Sending events
Events can be sent by calling publish_domain_event
:
from domain_event_broker import publish_domain_event
publish_domain_event('user.registered', {'user_id': user.id})
Domain events are sent immediately. When emitting domain events from within a database transaction, it's recommended to defer publishing until the transaction is committed. Using a commit hook avoids spurious domain events if a transaction is rolled back after an error.
Receiving events
Subscribers can listen to one or more domain events - controlled via the binding keys. Binding keys may contain wildcards. A queue will be created for each subscriber. RabbitMQ takes care of routing only the relevant events to this queue.
This script will receive all events that are sent in the user domain:
from domain_event_broker import Subscriber
def log_user_event(event):
print(event)
subscriber = Subscriber()
subscriber.register(log_user_event, 'printer', ['user.*'])
subscriber.start_consuming()
Retry policy
If there is a problem consuming a message - for example a web service is down - the subscriber can raise an error to retry handling the event after the given delay:
from domain_event_broker import Subscriber
def sync_user_data(event):
try:
publish_to_service(event)
except ServiceIsDown:
raise Retry(5.0 ** event.retries) # 1s, 5s, 25s
subscriber = Subscriber()
subscriber.register(sync_user_data, 'sync_data', ['user.*'], max_retries=3)
subscriber.start_consuming()
The delayed retries are bound to the consumer, not the event. If max_retries
is exceeded, the event will be dropped or dead-lettered.
Development
Make sure you have RabbitMQ installed locally for testing.
- Create virtualenv and activate it
- Install dependencies with
pip install -r requirements.txt -r dev_requirements.txt -e .
- Run tests with
pytest
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file domain-event-broker-3.0.2.tar.gz
.
File metadata
- Download URL: domain-event-broker-3.0.2.tar.gz
- Upload date:
- Size: 32.2 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.7.3 pkginfo/1.7.0 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | c1a0894fb1038d0590901115bb5a5ce72ce1cf71ba9ed598552388f1fc87de64 |
|
MD5 | 77961f7d896f68ea86d9cf851516850f |
|
BLAKE2b-256 | a73f7ecb9c82d2efe793856d631413b75f0ef140b8bdcd0c8eaa7ecc2b26b5c5 |
File details
Details for the file domain_event_broker-3.0.2-py3-none-any.whl
.
File metadata
- Download URL: domain_event_broker-3.0.2-py3-none-any.whl
- Upload date:
- Size: 21.5 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.4.1 importlib_metadata/3.7.3 pkginfo/1.7.0 requests/2.22.0 requests-toolbelt/0.9.1 tqdm/4.59.0 CPython/3.8.6
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | fb804658dab811d417d3b802450659b9b52343fd4b9b35e7578b532bf7f55de5 |
|
MD5 | 31d0ae09f2a0a4a02ff65cb760aa55f8 |
|
BLAKE2b-256 | 104820198c2a6aaa284bbf8430b2bcbe6f8d3cf040eaaa0f298797d670f1363a |