Skip to main content

Job scheduler for managing background tasks (asyncio)

Project description

https://travis-ci.org/aio-libs/aiojobs.svg?branch=master https://codecov.io/gh/aio-libs/aiojobs/branch/master/graph/badge.svg https://img.shields.io/pypi/v/aiojobs.svg Documentation Status Chat on Gitter

Job scheduler for managing background tasks (asyncio)

The library gives a controlled way for scheduling background tasks for asyncio applications.

Installation

$ pip3 install aiojobs

Usage example

import asyncio
import aiojobs

async def coro(timeout):
    await asyncio.sleep(timeout)

async def main():
    async with aiojobs.Scheduler() as scheduler:
        for i in range(100):
            # spawn jobs
            await scheduler.spawn(coro(i/10))

        await asyncio.sleep(5.0)
        # not all scheduled jobs are finished at the moment
    # Exit from context will gracefully wait on tasks before closing
    # any remaining spawned jobs

asyncio.run(main())

Shielding tasks with a scheduler

It is typically recommended to use asyncio.shield to protect tasks from cancellation. However, the inner shielded tasks can’t be tracked and are therefore at risk of being cancelled during application shutdown.

To resolve this issue aiojobs includes a aiojobs.Scheduler.shield method to shield tasks while also keeping track of them in the scheduler. In combination with the aiojobs.Scheduler.wait_and_close method, this allows shielded tasks the required time to complete successfully during application shutdown.

For example:

import asyncio
import aiojobs
from contextlib import suppress

async def important():
    print("START")
    await asyncio.sleep(5)
    print("DONE")

async def run_something(scheduler):
    # If we use asyncio.shield() here, then the task doesn't complete and DONE is never printed.
    await scheduler.shield(important())

async def main():
    async with aiojobs.Scheduler() as scheduler:
        t = asyncio.create_task(run_something(scheduler))
        await asyncio.sleep(0.1)
        t.cancel()
        with suppress(asyncio.CancelledError):
            await t

asyncio.run(main())

Integration with aiohttp.web

from aiohttp import web
from aiojobs.aiohttp import setup, spawn

async def handler(request):
    await spawn(request, coro())
    return web.Response()

app = web.Application()
app.router.add_get('/', handler)
setup(app)

or just

from aiojobs.aiohttp import atomic

@atomic
async def handler(request):
    return web.Response()

For more information read documentation: https://aiojobs.readthedocs.io

Communication channels

aio-libs google group: https://groups.google.com/forum/#!forum/aio-libs

Feel free to post your questions and ideas here.

Gitter Chat https://gitter.im/aio-libs/Lobby

We support Stack Overflow. Please add python-asyncio or aiohttp tag to your question there.

Author and License

The aiojobs package is written by Andrew Svetlov.

It’s Apache 2 licensed and freely available.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

aiojobs-1.3.0.tar.gz (138.9 kB view hashes)

Uploaded Source

Built Distribution

aiojobs-1.3.0-py3-none-any.whl (9.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page