Skip to main content

SDK for Argo Workflows

Project description

argo-python-sdk   Release

License   CI  

Python SDK for Argo Workflows

If you're new to Argo, we recommend checking out the examples in pure YAML. The language is descriptive and the Argo examples provide an exhaustive explanation.

For a more experienced audience, this SDK grants you the ability to programatically define Argo Workflows in Python which is then translated to the Argo YAML specification.

The SDK makes use of the Argo models defined in the Argo Python client repository. Combining the two approaches we are given the whole low-level control over Argo Workflows.

Getting started

Hello World

This example demonstrates the simplest functionality. Defining a Workflow by subclassing the @Workflow class and a single template with the @template decorator.

The entrypoint to the workflow is defined as an entrypoint class property.

Argo YAMLArgo Python

# @file: hello-world.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: hello-world
  generateName: hello-world-
spec:
  entrypoint: whalesay
  templates:
  - name: whalesay
    container:
      name: whalesay
      image: docker/whalesay:latest
      command: [cowsay]
      args: ["hello world"]

from argo.workflows.sdk import Workflow
from argo.workflows.sdk import template

from argo.workflows.sdk.templates import V1Container


class HelloWorld(Workflow):

    entrypoint = "whalesay"

    @template
    def whalesay(self) -> V1Container:
        container = V1Container(
            image="docker/whalesay:latest",
            name="whalesay",
            command=["cowsay"],
            args=["hello world"]
        )

        return container

DAG: Tasks

This example demonstrates tasks defined via dependencies forming a diamond structure. Tasks are defined using the @task decorator and they must return a valid template.

The entrypoint is automatically created as main for the top-level tasks of the Workflow.

Argo YAMLArgo Python

# @file: dag-diamond.yaml
# The following workflow executes a diamond workflow
#
#   A
#  / \
# B   C
#  \ /
#   D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: dag-diamond
  generateName: dag-diamond-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

  # @task: [A, B, C, D]
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      name: echo
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]

from argo.workflows.sdk import Workflow

from argo.workflows.sdk.tasks import *
from argo.workflows.sdk.templates import *


class DagDiamond(Workflow):

    @task
    @parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @task
    @parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)

    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )

        return container

Artifacts

Artifacts can be passed similarly to parameters in three forms: arguments, inputs and outputs, where arguments is the default one (simply @artifact or @parameter).

I.e.: inputs.artifact(...)

Both artifacts and parameters are passed one by one, which means that for multiple artifacts (parameters), one should call:

@inputs.artifact(name="artifact", ...)
@inputs.parameter(name="parameter_a", ...)
@inputs.parameter(...)
def foo(self, artifact: V1alpha1Artifact, prameter_b: V1alpha1Parameter, ...): pass

A complete example:

Argo YAMLArgo Python

# @file: artifacts.yaml
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: artifact-passing
  generateName: artifact-passing-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: generate-artifact
        template: whalesay
      - name: consume-artifact
        template: print-message
        arguments:
          artifacts:
          # bind message to the hello-art artifact
          # generated by the generate-artifact step
          - name: message
            from: "{{tasks.generate-artifact.outputs.artifacts.hello-art}}"

  - name: whalesay
    container:
      name: "whalesay"
      image: docker/whalesay:latest
      command: [sh, -c]
      args: ["cowsay hello world | tee /tmp/hello_world.txt"]
    outputs:
      artifacts:
      # generate hello-art artifact from /tmp/hello_world.txt
      # artifacts can be directories as well as files
      - name: hello-art
        path: /tmp/hello_world.txt

  - name: print-message
    inputs:
      artifacts:
      # unpack the message input artifact
      # and put it at /tmp/message
      - name: message
        path: /tmp/message
    container:
      name: "print-message"
      image: alpine:latest
      command: [sh, -c]
      args: ["cat", "/tmp/message"]

from argo.workflows.sdk import Workflow

from argo.workflows.sdk.tasks import *
from argo.workflows.sdk.templates import *

class ArtifactPassing(Workflow):

    @task
    def generate_artifact(self) -> V1alpha1Template:
        return self.whalesay()

    @task
    @artifact(
        name="message",
        _from="{{tasks.generate-artifact.outputs.artifacts.hello-art}}"
    )
    def consume_artifact(self, message: V1alpha1Artifact) -> V1alpha1Template:
        return self.print_message(message=message)

    @template
    @outputs.artifact(name="hello-art", path="/tmp/hello_world.txt")
    def whalesay(self) -> V1Container:
        container = V1Container(
            name="whalesay",
            image="docker/whalesay:latest",
            command=["sh", "-c"],
            args=["cowsay hello world | tee /tmp/hello_world.txt"]
        )

        return container

    @template
    @inputs.artifact(name="message", path="/tmp/message")
    def print_message(self, message: V1alpha1Artifact) -> V1Container:
        container = V1Container(
            name="print-message",
            image="alpine:latest",
            command=["sh", "-c"],
            args=["cat", "/tmp/message"],
        )

        return container


Going further: closure and scope

This is where it gets quite interesting. So far, we've only scratched the benefits that the Python implementation provides.

What if we want to use native Python code and execute it as a step in the Workflow. What are our options?

Option A) is to reuse the existing mindset, dump the code in a string, pass it as the source to the V1ScriptTemplate model and wrap it with the template decorator. This is illustrated in the following code block:

import textwrap

class ScriptsPython(Workflow):

    ...

    @template
    def gen_random_int(self) -> V1alpha1ScriptTemplate:
        source = textwrap.dedent("""\
          import random
          i = random.randint(1, 100)
          print(i)
        """)

        template = V1alpha1ScriptTemplate(
            image="python:alpine3.6",
            name="gen-random-int",
            command=["python"],
            source=source
        )

        return template

Which results in:

api_version: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generate_name: scripts-python-
  name: scripts-python
spec:
  entrypoint: main

  ...

  templates:
  - name: gen-random-int
    script:
      command:
      - python
      image: python:alpine3.6
      name: gen-random-int
      source: 'import random\ni = random.randint(1, 100)\nprint(i)\n'

Not bad, but also not living up to the full potential. Since we're already writing Python, why would we wrap the code in a string? This is where we introduce closures.

closures

The logic of closures is quite simple. Just wrap the function you want to execute in a container in the @closure decorator. The closure then takes care of the rest and returns a template (just as the @template decorator).

The only thing we need to take care of is to provide it an image which has the necessary Python dependencies installed and is present in the cluster.

There is a plan to eliminate even this step in the future, but currently it is inavoidable.

Following the previous example:

class ScriptsPython(Workflow):

    ...

    @closure(
      image="python:alpine3.6"
    )
    def gen_random_int() -> V1alpha1ScriptTemplate:
          import random

          i = random.randint(1, 100)
          print(i)

The closure implements the V1alpha1ScriptTemplate, which means that you can pass in things like resources, env, etc...

Also, make sure that you import whatever library you are using, the context is not preserved --- closure behaves as a staticmethod and is sandboxed from the module scope.

scopes

Now, what if we had a function (or a whole script) which is quite big. Wrapping it in a single Python function is not very Pythonic and it gets tedious. This is where we can make use of scopes.

Say that we, for example, wanted to initialize logging before running our gen_random_int function.

    ...

    @closure(
      scope="main",
      image="python:alpine3.6"
    )
    def gen_random_int(main) -> V1alpha1ScriptTemplate:
          import random

          main.init_logging()

          i = random.randint(1, 100)
          print(i)

    @scope(name="main")
    def init_logging(level="DEBUG"):
        import logging

        logging_level = getattr(logging, level, "INFO")
        logging.getLogger("__main__").setLevel(logging_level)

Notice the 3 changes that we've made:

    @closure(
      scope="main",  # <--- provide the closure a scope
      image="python:alpine3.6"
    )
    def gen_random_int(main):  # <--- use the scope name
    @scope(name="main")  # <--- add function to a scope
    def init_logging(level="DEBUG"):

Each function in the given scope is then namespaced by the scope name and injected to the closure.

I.e. the resulting YAML looks like this:

...
spec:
  ...
  templates:
    - name: gen-random-int
      script:
        command:
        - python
        image: python:alpine3.6
        name: gen-random-int
        source: |-
          import logging
          import random

          class main:
            """Scoped objects injected from scope 'main'."""

            @staticmethod
            def init_logging(level="DEBUG"):
              logging_level = getattr(logging, level, "INFO")
              logging.getLogger("__main__").setLevel(logging_level)


          main.init_logging()

          i = random.randint(1, 100)
          print(i)

The compilation also takes all imports to the front and remove duplicates for convenience and more natural look so that you don't feel like poking your eyes when you look at the resulting YAML.


For more examples see the examples folder.



Authors:

@AICoE, Red Hat

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

argo-workflows-sdk-0.1.0.dev0.tar.gz (22.5 kB view details)

Uploaded Source

Built Distribution

argo_workflows_sdk-0.1.0.dev0-py3-none-any.whl (21.1 kB view details)

Uploaded Python 3

File details

Details for the file argo-workflows-sdk-0.1.0.dev0.tar.gz.

File metadata

  • Download URL: argo-workflows-sdk-0.1.0.dev0.tar.gz
  • Upload date:
  • Size: 22.5 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.1.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.6

File hashes

Hashes for argo-workflows-sdk-0.1.0.dev0.tar.gz
Algorithm Hash digest
SHA256 9a4e90565494bc6f9163f496d940e3be552163251d037217995e768b97aebbb8
MD5 144aa9fee7e1b153398827677699b4fd
BLAKE2b-256 1adfe430f9c03cfee08979ad2c19b96fc91422ca5893eef389f78354edef9677

See more details on using hashes here.

File details

Details for the file argo_workflows_sdk-0.1.0.dev0-py3-none-any.whl.

File metadata

  • Download URL: argo_workflows_sdk-0.1.0.dev0-py3-none-any.whl
  • Upload date:
  • Size: 21.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.1.1 pkginfo/1.5.0.1 requests/2.22.0 setuptools/45.1.0 requests-toolbelt/0.9.1 tqdm/4.41.1 CPython/3.7.6

File hashes

Hashes for argo_workflows_sdk-0.1.0.dev0-py3-none-any.whl
Algorithm Hash digest
SHA256 43deae4b30191a44341cb9fd1d5cd170b51ec7661e367259dd157cd2fe29decd
MD5 8facb476a0215c326c9282921e949e70
BLAKE2b-256 e2ea0fd3a956c691ff1a0a78d136c8c35c531c2bd26cb7491de400dda114fe9b

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page