Skip to main content

asyncio framework for task-based execution

Project description

Python asyncio framework for task-based execution.

https://travis-ci.org/jreese/tasky.svg?branch=master

Overview

Tasky provides a framework for using Python’s asyncio module to encapsulate execution of your program or service as a set of distinct “tasks” with a variety of execution styles, including “periodic” tasks, timers, and more.

Tasks are defined by subclassing the appropriate type and implementing a run() method. Tasky runs tasks on the asyncio event loop, but also keeps track of which tasks are running, and can terminate automatically when all tasks are completed or after a predetermined amount of time.

Usage

The simplest type of task executes the run() once and then completes. Hello World in Tasky can be accomplished with the following code:

class HelloWorld(Task):
    async def run(self):
        print('Hello world!')

Tasky([HelloWorld]).run_until_complete()

Note the use of async def. All tasks are coroutines, meaning they have full access to the asyncio event loop.

Another common pattern is to execute code every X number of seconds, a “periodic” task similar to a cron job. In Tasky, this is possible by subclassing PeriodicTask and defining your job INTERVAL:

class Counter(PeriodicTask):
    INTERVAL = 1.0

    value = 0

    async def run(self):
        value += 1
        print(self.value)

Tasky([Counter]).run_for_time(10)

Note the use of run_for_time(). This will gracefully stop the Tasky event loop after the given number of seconds have passed. The periodic task will automatically stop running, giving us the expected output of counting to ten.

The third type of common task is a timer. The run() method is only executed once after a defined delay. If the timer is reset after execution completes, then the timer will be executed again. Otherwise, resets simply increase the time before execution back to the originally defined delay:

class Celebrate(TimerTask):
    DELAY = 10

    async def run(self):
        print('Surprise!')

Tasky([Counter, Celebrate]).run_for_time(10)

Note that we’re now starting multiple tasks as once. The counter output from the previous example is accompanied by the message “Surprise!” at the end.

The last major task is a queue consumer. A shared work queue is created for one or more worker tasks of the same class, and the run() method is then called for every work item popped from the queue. Any task can insert work items directly from the class definition, or call QueueTask.close() to signal that workers should stop once the shared work queue becomes empty:

class QueueConsumer(QueueTask):
    WORKERS = 2
    MAXSIZE = 5

    async def run(self, item):
        print('consumer got {}'.format(item))
        await self.sleep(0.1)

class QueueProducer(Task):
    async def run(self):
        for i in range(10):
            item = random.randint(0, 100)
            await QueueConsumer.QUEUE.put(item)
            print('producer put {}'.format(i))
        QueueConsumer.close()

Tasky([QueueConsumer, QueueProducer]).run_until_complete()

Note that if work items need to be reprocessed, they should be manually inserted back into the shared queue by the worker.

Install

Tasky depends on syntax changes introduced in Python 3.5. You can install it from PyPI with the following command:

$ pip install tasky

License

Copyright 2016 John Reese, and licensed under the MIT license. See the LICENSE file for details.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

tasky-0.6.0.tar.gz (8.9 kB view details)

Uploaded Source

File details

Details for the file tasky-0.6.0.tar.gz.

File metadata

  • Download URL: tasky-0.6.0.tar.gz
  • Upload date:
  • Size: 8.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No

File hashes

Hashes for tasky-0.6.0.tar.gz
Algorithm Hash digest
SHA256 5474bdb316c8947039d3657241338496d0135e34baaa46409c4b5240ca847502
MD5 0d3dc41939b94fab6d6de6b8e7d66a69
BLAKE2b-256 d46a3d336ee9dacc2bd0f3afa2ad2d2741f86eb03d5829705e5228513f6c73b2

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page