Skip to main content

The cmd_queue module for a DAG of bash commands

Project description

Command Queue - cmd_queue

Pypi Downloads GitlabCIPipeline GitlabCICoverage ReadTheDocs

Read the docs

https://cmd-queue.readthedocs.io

Gitlab

https://gitlab.kitware.com/computer-vision/cmd_queue

Pypi

https://pypi-hypernode.com/project/cmd_queue

Slides

https://docs.google.com/presentation/d/1BjJkjMx6bxu1uek-hAGpwj760u9rraVn7st8J5OsZME

This is a simple module for “generating” a bash script that schedules multiples jobs (in parallel if possible) on a single machine. There are 3 backends with increasing levels of complexity: serial, tmux, and slurm.

In serial mode, a single bash script gets written that executes your jobs in sequence. There are no external dependencies

In tmux mode, multiple tmux sessions get opened and each of them executes your independent parts of your jobs. Dependencies are handled.

In slurm mode, a real heavy-weight scheduling algorithm is used. In this mode we simply convert your jobs to slurm commands and execute them.

Under the hood we build a DAG based on your specified dependencies and use this to appropriately order jobs.

By default, bash scripts that would execute your jobs print to the console. This gives the user fine-grained control if they only want to run a subset of a pipeline manually. But if asked to run, cmd_queue will execute the bash jobs.

Features

  • Bash command scheduling

  • Execution is optional, can just print commands instead

  • No-parallelism always-available serial backend

  • Tmux based lightweight backend

  • Slurm based heavyweight backend

  • Python and Bash interface

  • Rich monitoring / live-control

Installation

The cmd_queue package is available on pypi.

pip install cmd_queue

The serial queue backend will always work. To gain access other backends you must install their associated dependencies. The tmux backend is the easiest and simply requires that tmux is installed (e.g. sudo apt install tmux on Debian systems).

Other backends require more complex setups. The slurm backend will require that slurm is installed and the daemon is running. The slurm backend is functional and tested, but improvements can still be made (help wanted). The airflow backend similarly requires a configured airflow server, but is not fully functional or tested (contributions to make airflow work / easier are wanted!).

Tmux Queue Demo

After installing, the following command runs a demo of the tmux queue:

# Reproduce the
INTERACTIVE_TEST=1 xdoctest -m cmd_queue.tmux_queue TMUXMultiQueue.monitor:1

This executes the following code, which creates two parallel tmux workers and submits several bash jobs with non-trivial dependencies.

 # xdoctest: +REQUIRES(env:INTERACTIVE_TEST)
 from cmd_queue.tmux_queue import *  # NOQA
 # Setup a lot of longer running jobs
 n = 2
 self = TMUXMultiQueue(size=n, name='demo_cmd_queue')
 first_job = None
 for i in range(n):
     prev_job = None
     for j in range(4):
        command = f'sleep 1 && echo "This is job {i}.{j}"'
        job = self.submit(command, depends=prev_job)
        prev_job = job
        first_job = first_job or job
command = f'sleep 1 && echo "this is the last job"'
job = self.submit(command, depends=[prev_job, first_job])
self.print_commands(style='rich')
self.print_graph()
if self.is_available():
    self.run(block=True, other_session_handler='kill')

When running the print_commands command will first display all of the submitted commands that will be distributed across multiple new tmux sessions. These are the commands will be executed. This is useful for spot checking that your bash command templating is correct before the queue is executed with run.

https://i.imgur.com/rVbyHzM.png

The print_graph command will render the DAG to be executed using network text. And finally run is called with block=True, which starts executing the DAG and displays progress and job status in rich or textual monitor.

https://i.imgur.com/4mxFIMk.gif

While this is running it is possible to simply attach to a tmux sessions (e.g. tmux a) and inspect a specific queue while it is running. (We recommend using <ctrl-b>s inside of a tmux session to view and navigate through the tmux sessions). Unlike the slurm backend, the entire execution of the DAG is entirely transparent to the developer! The following screenshot shows the tmux sessions spawned while running this demo.

https://i.imgur.com/46LRK8M.png

By default, if there are no errors, these sessions will exit after execution completes, but this is configurable. Likewise if there are errors, the tmux sessions will persist to allow for debugging.

Modivation

Recently, I needed to run several jobs on 4 jobs across 2 GPUs and then execute a script after all of them were done. What I should have done was use slurm or some other proper queuing system to schedule the jobs, but instead I wrote my own hacky scheduler using tmux. I opened N (number of parallel workers) tmux sessions and then I ran independent jobs in each different sessions.

This worked unreasonably well for my use cases, and it was nice to be able to effectively schedule jobs without heavyweight software like slurm on my machine.

Eventually I did get slurm on my machine, and I abstracted the API of my tmux_queue to be a general “command queue” that can use 1 of 3 backends: serial, tmux, or slurm.

Niche

There are many DAG schedulers out there:

  • airflow

  • luigi

  • submitit

  • rq_scheduler

The the niche for this is when you have large pipelines of bash commands that depend on each other and you want to template out those parameters with logic that you define in Python.

We plan on adding an airflow backend.

Usage

There are two ways to use cmd_queue:

  1. In Python create a Queue object, and then call the .submit method to pass it a shell invocation. It returns an object that you can use to specify dependencies of any further calls to .submit. This simply organizes all of your CLI invocations into a bash script, which can be inspected and then run. There are different backends that enable parallel execution of jobs when dependencies allow.

  2. There is a way to use it via the CLI, with details shown in cmd_queue –help. Usage is basically the same. You create a queue, submit jobs to it, you can inspect it, and you can run it.

Example usage in Python:

import cmd_queue

# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')

# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello')
job2 = self.submit('echo world', depends=[job1])
job3 = self.submit('echo foo')
job4 = self.submit('echo bar', depends=[job2, job3])
job5 = self.submit('echo spam', depends=[job1])

# Print a graph of job dependencies
self.print_graph()

# Display the simplified bash script to be executed.
self.print_commands()

# Execute the jobs
self.run()

Example usage in the CLI:

# Create a Queue
cmd_queue new "demo_cli_queue"

# Submit bash invocations that you want to run, and mark dependencies.
cmd_queue submit --jobname job1 "demo_cli_queue" -- echo hello
cmd_queue submit --jobname job2 --depends job1 "demo_cli_queue" -- echo world
cmd_queue submit --jobname job3 "demo_cli_queue" -- echo foo
cmd_queue submit --jobname job4 --depends job1,job2 "demo_cli_queue" -- echo bar
cmd_queue submit --jobname job5 --depends job1  "demo_cli_queue" -- echo spam

# Display the simplified bash script to be executed.
cmd_queue show "demo_cli_queue" --backend=serial

# Execute the jobs
cmd_queue run "demo_cli_queue" --backend=serial

Examples

All of the dependency checking and book keeping logic is handled in bash itself. Write (or better yet template) your bash scripts in Python, and then use cmd_queue to “transpile” these sequences of commands to pure bash.

import cmd_queue

# Create a Queue object
self = cmd_queue.Queue.create(name='demo_queue', backend='serial')

# Submit bash invocations that you want to run, and mark dependencies.
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the simplified bash script to be executed.
self.print_commands()

# Execute the jobs
self.run()

This prints the bash commands in an appropriate order to resolve dependencies.

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_cc9d551e/demo_queue_2022-04-08_cc9d551e.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 10 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 10 - demo_queue-job-1
echo world && sleep 0.5
#
### Command 3 / 10 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 4 / 10 - demo_queue-job-3
echo bar && sleep 0.5
#
### Command 5 / 10 - demo_queue-job-4
echo spam && sleep 0.5
#
### Command 6 / 10 - demo_queue-job-5
echo spam && sleep 0.5
#
### Command 7 / 10 - demo_queue-job-6
echo err && false
#
### Command 8 / 10 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 9 / 10 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 10 / 10 - demo_queue-job-9
echo bazbiz && sleep 0.5

The same code can be run in parallel by chosing a more powerful backend. The tmux backend is the lightest weight parallel backend.

# Need to tell the tmux queue how many processes can run at the same time
import cmd_queue
self = cmd_queue.Queue.create(size=4, name='demo_queue', backend='tmux')
job1 = self.submit('echo hello && sleep 0.5')
job2 = self.submit('echo world && sleep 0.5', depends=[job1])
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.print_commands()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

This prints the sequence of bash commands that will be executed in each tmux session.

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_0_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 3 - demo_queue-job-7
echo spam && sleep 0.5
#
### Command 2 / 3 - demo_queue-job-8
echo eggs && sleep 0.5
#
### Command 3 / 3 - demo_queue-job-9
echo bazbiz && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_1_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-2
echo foo && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-6
echo err && false

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_2_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 2 - demo_queue-job-0
echo hello && sleep 0.5
#
### Command 2 / 2 - demo_queue-job-5
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_3_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-3
echo bar && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_4_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-4
echo spam && sleep 0.5

# --- /home/joncrall/.cache/base_queue/demo_queue_2022-04-08_a1ef7600/queue_demo_queue_5_2022-04-08_a1ef7600.sh

#!/bin/bash
#
# Jobs
#
### Command 1 / 1 - demo_queue-job-1
echo world && sleep 0.5

Slurm mode is the real deal. But you need slurm installed on your machint to use it. Asking for tmux is a might ligher weight tool. We can specify slurm options here

import cmd_queue
self = cmd_queue.Queue.create(name='demo_queue', backend='slurm')
job1 = self.submit('echo hello && sleep 0.5', cpus=4, mem='8GB')
job2 = self.submit('echo world && sleep 0.5', depends=[job1], parition='default')
job3 = self.submit('echo foo && sleep 0.5')
job4 = self.submit('echo bar && sleep 0.5')
job5 = self.submit('echo spam && sleep 0.5', depends=[job1])
job6 = self.submit('echo spam && sleep 0.5')
job7 = self.submit('echo err && false')
job8 = self.submit('echo spam && sleep 0.5')
job9 = self.submit('echo eggs && sleep 0.5', depends=[job8])
job10 = self.submit('echo bazbiz && sleep 0.5', depends=[job9])

# Display the "user-friendly" pure bash
self.print_commands()

# Display the real bash that gets executed under the hood
# that is independencly executable, tracks the success / failure of each job,
# and manages dependencies.
self.print_commands(1, 1)

# Blocking will display a job monitor while it waits for everything to
# complete
self.run(block=True)

This prints the very simple slurm submission script:

# --- /home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/demo_queue-20220408T170615-a9e238b5.sh

mkdir -p "$HOME/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs"
JOB_000=$(sbatch --job-name="J0000-demo_queue-20220408T170615-a9e238b5" --cpus-per-task=4 --mem=8000 --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0000-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo hello && sleep 0.5' --parsable)
JOB_001=$(sbatch --job-name="J0002-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0002-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo foo && sleep 0.5' --parsable)
JOB_002=$(sbatch --job-name="J0003-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0003-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bar && sleep 0.5' --parsable)
JOB_003=$(sbatch --job-name="J0005-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0005-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_004=$(sbatch --job-name="J0006-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0006-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo err && false' --parsable)
JOB_005=$(sbatch --job-name="J0007-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0007-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' --parsable)
JOB_006=$(sbatch --job-name="J0001-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0001-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo world && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_007=$(sbatch --job-name="J0004-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0004-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo spam && sleep 0.5' "--dependency=afterok:${JOB_000}" --parsable)
JOB_008=$(sbatch --job-name="J0008-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0008-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo eggs && sleep 0.5' "--dependency=afterok:${JOB_005}" --parsable)
JOB_009=$(sbatch --job-name="J0009-demo_queue-20220408T170615-a9e238b5" --output="/home/joncrall/.cache/slurm_queue/demo_queue-20220408T170615-a9e238b5/logs/J0009-demo_queue-20220408T170615-a9e238b5.sh" --wrap 'echo bazbiz && sleep 0.5' "--dependency=afterok:${JOB_008}" --parsable)

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cmd_queue-0.2.1.tar.gz (87.4 kB view details)

Uploaded Source

Built Distribution

cmd_queue-0.2.1-py3-none-any.whl (87.9 kB view details)

Uploaded Python 3

File details

Details for the file cmd_queue-0.2.1.tar.gz.

File metadata

  • Download URL: cmd_queue-0.2.1.tar.gz
  • Upload date:
  • Size: 87.4 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for cmd_queue-0.2.1.tar.gz
Algorithm Hash digest
SHA256 283d1a21ceb43bef1dd012d7ff5fc54d30cdd25f37e11a12d65e11d1fa6b0f44
MD5 7154952a24706d5a05f93186c62871f9
BLAKE2b-256 8875a6c701a86aeddbb35ecc402e96bbc0aa0f8462c129341a5799732b1af553

See more details on using hashes here.

File details

Details for the file cmd_queue-0.2.1-py3-none-any.whl.

File metadata

  • Download URL: cmd_queue-0.2.1-py3-none-any.whl
  • Upload date:
  • Size: 87.9 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/5.1.1 CPython/3.11.10

File hashes

Hashes for cmd_queue-0.2.1-py3-none-any.whl
Algorithm Hash digest
SHA256 31ee9375de59f10980011c5aacdb27c459101790adc3534a33708b033ab3eceb
MD5 f478298b009f2e8c06e5e64e732858a8
BLAKE2b-256 db3849410b2d0a51c5e34912ec87d84b1e392728c0a5bd82d13bd164f21bf65a

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page