Skip to main content

A microservices nanoframework.

Project description

PyPI version Python Versions https://travis-ci.org/barrachri/rampante.svg?branch=master https://codecov.io/gh/barrachri/rampante/branch/master/graph/badge.svg

🐎 Rampante

A fancy and opinionated nanoframework for microservices.

Installation

pip install rampante

How to use subscribe_on

from rampante import subscribe_on

# The function should accept 3 params
# queue_name, for example could be "user.subscribed"
# data is a dictionary, it's a msgpacked message sent to Kafka
# producer is an instance of AIOKafkaProducer, if you want to send new events

@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")

@subscribe_on("user.subscribed", "user.created")
async def send_another_message(queue_name, data, producer):
    log.info("Event received!")

Example

import asyncio
import logging

import msgpack
from aiokafka import AIOKafkaProducer
from rampante import scheduler, subscribe_on

log = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
    '[%(asctime)s %(name)s %(levelname)s] %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)

KAFKA_URI = 'localhost:9092'

@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")

async def stop_task_manager(app):
    """Cancel task manager."""
    if 'task_manager' in app:
        app['task_manager'].cancel()
        await app['task_manager']

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(scheduler(kafka_uri=KAFKA_URI, loop=loop, queue_size=10))
    except KeyboardInterrupt:
        log.warning("Shutting down!")

Example with aiohttp

import asyncio
import logging

import msgpack
from aiohttp import web
from aiokafka import AIOKafkaProducer
from rampante import scheduler, subscribe_on

log = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
    '[%(asctime)s %(name)s %(levelname)s] %(message)s')
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.INFO)

KAFKA_URI = 'localhost:9092'


@subscribe_on("user.subscribed")
async def send_a_message(queue_name, data, producer):
    log.info("Event received!")


async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    body = msgpack.packb({"message": "Hello", "priority": 3})
    await request.app['events_queue'].send_and_wait("user.subscribed", body)
    return web.Response(text=text)


async def start_event_connection(app):
    """Connect to Kafka."""
    connection = AIOKafkaProducer(loop=app.loop, bootstrap_servers=KAFKA_URI)
    await connection.start()
    app['events_queue'] = connection


async def stop_event_connection(app):
    """Close connection with Kafka."""
    if 'events_queue' in app:
        await app['events_queue'].stop()


async def start_task_manager(app):
    """Load task manager."""
    app['task_manager'] = asyncio.ensure_future(
        scheduler(kafka_uri=KAFKA_URI, loop=app.loop, queue_size=10))


async def stop_task_manager(app):
    """Cancel task manager."""
    if 'task_manager' in app:
        app['task_manager'].cancel()
        await app['task_manager']

if __name__ == '__main__':
    app = web.Application()
    app.router.add_get('/{name}', handle)
    # On-startup tasks
    app.on_startup.append(start_event_connection)
    app.on_startup.append(start_task_manager)
    # Clean-up tasks
    app.on_cleanup.append(stop_task_manager)
    app.on_cleanup.append(stop_event_connection)
    web.run_app(app)

The name

Rampante means “rampant” in Italian.

Why Kafka?

I like aiokafka, but I plan to switch to Redis as soon as Stream will be officially available.

To Do

  • add circuit breaker

  • add retry

  • add logic when tasks fail

  • add consumer position

Pull requests are encouraged!

License

Apache 2.0

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

rampante-0.0.2.tar.gz (6.0 kB view details)

Uploaded Source

Built Distribution

rampante-0.0.2-py2.py3-none-any.whl (5.1 kB view details)

Uploaded Python 2 Python 3

File details

Details for the file rampante-0.0.2.tar.gz.

File metadata

  • Download URL: rampante-0.0.2.tar.gz
  • Upload date:
  • Size: 6.0 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for rampante-0.0.2.tar.gz
Algorithm Hash digest
SHA256 b2d74c0597f06e9b5510616b892a38cbdb7d96de623048f3fc5e902466f50ff6
MD5 0885d2f4e14dd600599a940bfa283839
BLAKE2b-256 8a73805a04821863933737d0409f65849d282ca0c835fa4173e43d54c7741048

See more details on using hashes here.

File details

Details for the file rampante-0.0.2-py2.py3-none-any.whl.

File metadata

File hashes

Hashes for rampante-0.0.2-py2.py3-none-any.whl
Algorithm Hash digest
SHA256 87d378a9bf90830de5a4f0c2a76d6730dbe6c91b42868908745d6ba959f864f9
MD5 fe9b548cf70b1135dad8715177ec2332
BLAKE2b-256 53df75bb9d9de877bb221d0fd7fac276a5449aa1c93082e991e02db79486b4b1

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page