Message exchange engine to build pipelines using brokers like RabbitMQ
Project description
BarterDude
Message exchange engine to build pipelines using brokers like RabbitMQ. This project is build on top of the great async-worker.
Install
Using Python 3.6+
pip install barterdude
if you want Prometheus integration, run the command:
pip install barterdude[prometheus] # or pip install barterdude[all]
Usage
Build your consumer with this complete example:
from barterdude import BarterDude
from barterdude.monitor import Monitor
from barterdude.hooks.healthcheck import Healthcheck
from barterdude.hooks.logging import Logging
from barterdude.hooks.metrics.prometheus import Prometheus
from asyncworker.rabbitmq.message import RabbitMQMessage
# from my_project import MyHook # (you can build your own hooks)
# configure RabbitMQ
barterdude = BarterDude(
hostname="localhost",
username="guest",
password="guest",
prefetch=256,
)
# Prometheus labels for automatic metrics
labels = dict(
app_name="my_app",
team_name="my_team"
)
healthcheck = Healthcheck(barterdude) # automatic and customizable healthcheck
prometheus = Prometheus(barterdude, labels) # automatic and customizable Prometheus integration
monitor = Monitor(
healthcheck,
prometheus,
# MyHook(barterdude, "/path") # (will make localhost:8080/path url)
Logging() # automatic and customizable logging
)
my_metric = prometheus.metrics.counter(name="fail", description="fail again") # It's the same as https://github.com/prometheus/client_python
@barterdude.consume_amqp(
["queue1", "queue2"],
monitor,
coroutines = 10, # number of coroutines spawned to consume messages (1 per message)
bulk_flush_interval = 60.0, # max waiting time for messages to start process n_coroutines
requeue_on_fail = True # should retry or not the message
)
async def your_consumer(msg: RabbitMQMessage): # you receive only one message and we parallelize processing for you
await barterdude.publish_amqp(
exchange="my_exchange",
data=msg.body
)
if msg.body == "fail":
my_metric.inc() # you can use prometheus metrics
healthcheck.force_fail() # you can use your hooks inside consumer too
msg.reject(requeue=False) # You can force to reject a message, exactly equal https://b2wdigital.github.io/async-worker/src/asyncworker/asyncworker.rabbitmq.html#asyncworker.rabbitmq.message.RabbitMQMessage
if msg.body == "exception":
raise Exception() # this will reject the message and requeue
# if everything is fine, than message automatically is accepted
barterdude.run() # you will start consume and start a server on http://localhost:8080
# Change host and port with ASYNCWORKER_HTTP_HOST and ASYNCWORKER_HTTP_PORT env vars
Build your own Hook
Base Hook (Simple One)
These hooks are called when message retreive, have success and fail.
from barterdude.hooks import BaseHook
from asyncworker.rabbitmq.message import RabbitMQMessage
class MyCounterHook(BaseHook):
_consume = _fail = _success = 0
async def on_success(self, message: RabbitMQMessage):
self._success += 1
async def on_fail(self, message: RabbitMQMessage, error: Exception):
self._fail += 1
async def before_consume(self, message: RabbitMQMessage):
self._consume += 1
Http Hook (Open Route)
These hooks can do everything simple hook does, but responding to a route.
from aiohttp import web
from barterdude.hooks import HttpHook
from asyncworker.rabbitmq.message import RabbitMQMessage
class MyCounterHttpHook(HttpHook):
_consume = _fail = _success = 0
async def __call__(self, req: web.Request):
return web.json_response(dict(
consumed=self._consume,
success=self._success,
fail=self._fail
))
async def on_success(self, message: RabbitMQMessage):
self._success += 1
async def on_fail(self, message: RabbitMQMessage, error: Exception):
self._fail += 1
async def before_consume(self, message: RabbitMQMessage):
self._consume += 1
Testing
To test async consumers we recommend asynctest
lib
from asynctest import TestCase
class TestMain(TestCase):
def test_should_pass(self):
self.assertTrue(True)
We hope you enjoy! :wink:
Contribute
For development and contributing, please follow Contributing Guide and ALWAYS respect the Code of Conduct
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
File details
Details for the file barterdude-0.0.12.tar.gz
.
File metadata
- Download URL: barterdude-0.0.12.tar.gz
- Upload date:
- Size: 14.8 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.23.0 setuptools/45.2.0 requests-toolbelt/0.9.1 tqdm/4.43.0 CPython/3.6.7
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 363ff0762f2036ec1b2cfa21da8de89a4e22d1f692b66ea268b941f11a7c29c6 |
|
MD5 | c273643caa078d2d95e787330afef9d5 |
|
BLAKE2b-256 | c8444e953cac182e4f1d50199270f0c7182fd0ce1b5c825e0ddf98d1abb67700 |