Skip to main content

Filesystem based task queue

Project description

Filesystem Task Queue

A task queue using the filesystem as the message queue. This project was motivated by the use case where it is hard or near impossible to run a persistent service like redis, rabbitmq, or database. If you are able to run a persistent service you should prefer that approach. The initial motivation for this package was a way to submit tasks to an HPC cluster and process the tasks in HPC worker nodes without a running service on the login node.

This project uses filelock. With this library it is safe to say that if the underlying filesystem obeys flock calls then each task is guaranteed to be executed once. If any of the workers are on a non-conforming filesystems at least once execution is guaranteed.

Keep in mind that NFS v2 and v3 have an external file lock system via rpc.lockd which is not enabled everywhere since it is an external service. The current NFS v4 has built in support for file locks but the problem is that many HPC centers still use v3. Otherwise it is safe these days to assume your filesystem supports locks.

Keep in mind that task state is managed on the filesystem. So do not use this if you have an enormous amount of tasks. Think of possibly chunking them or using plugins like file_queue.plugins.dask.DaskWorker to send tasks to dask (then breaking it into many small tasks). Each task state modifications results in 2-4 IOPS on the filesystem.

Install

  • pip install fs-task-queue

API

Creating a queue is as simple as supplying a directory where the queue will reside.

from fs_task_queue import Queue

queue = Queue("path/to/queue")

Submitting jobs and monitoring over SSH is also supported via the same interface. Workers currently cannot connect over SSH.

from fs_task_queue.plugins import SSHQueue

queue = SSHQueue("ssh://<username>:<password>@<hostname>:<port>/<path>")

Next we can submit/enqueue jobs to the queue.

import operator

job = queue.enqueue(operator.add, 1, 2)

You can immediately try and fetch the result of the job or get its status.

print(job.get_status())
print(job.result)

You can wait on the job to finish

result = job.wait()

Worker

Starting a worker is as simple as giving a filesystem directory where the queue will reside.

fs-task-queue-worker --path ./path/to/queue

A dask worker is supported via fs_task_queue.plugin.dask.DaskWorker for sending jobs to the dask cluster instead of executing locally.

A worker runs a continuous loop gathering tasks in the task queue. The worker creates a file path/to/queue/workers/<worker-id> where it will:

  • continuously touch the file every 30 seconds
  • check that the file exists and if not stop the worker

License

BSD-3

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

fs-task-queue-0.1.7.tar.gz (12.2 kB view details)

Uploaded Source

File details

Details for the file fs-task-queue-0.1.7.tar.gz.

File metadata

  • Download URL: fs-task-queue-0.1.7.tar.gz
  • Upload date:
  • Size: 12.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.8.15

File hashes

Hashes for fs-task-queue-0.1.7.tar.gz
Algorithm Hash digest
SHA256 75c7b8f91895a541e68158f7774f3f5a312c0ea8c07b7f5c4c1d2ac0b5e4e265
MD5 67c20a7d55e7a200cc7f106ec2700074
BLAKE2b-256 5f7ecb5caa7b6e087770ad1b21f7080079391567211c2f3bd2bf0f0b0782aa43

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page