Skip to main content

Abstraction layer that is used by the DKIST Science Data Processing pipelines to process DKIST data using Apache Airflow.

Project description

Overview

The dkist-processing-core package provides an abstraction layer between the dkist data processing code, the workflow engine that supports it (airflow), and the logging infrastructure. By providing the abstraction layer to airflow specifically a versioning system is implemented.

Core, Common, and Instrument Brick Diagram

There are 3 main entities which implement the abstraction:

Task : The Task provides a definition of the dkist data processing task interface. It additionally implements some methods that should be global for all dkist processing tasks. It also provides an api to the application performance monitoring infrastructure.

Workflow : The Workflow defines an api independent of the workflow engine it abstracts. It also implements the translation to engine specific workflow definitions. In the case of airflow this is a DAG.

Node : The Node is used by the Workflow to translate a Task to the engine specific implementation of the Task which runs inside of a python virtual environment. The virtual environment enables the loading of only that tasks dependencies.

Additional support functions are provided in build_utils.

Usage

The Workflow and Task are the primary objects used by client libraries. The Task is used as a base class and the subclass must at a minimum implement run. A Workflow is used to give the tasks an order of execution and a name for the flow.

from dkist_processing_core import TaskBase
from dkist_processing_core import Workflow

# Task definitions
class MyTask1(TaskBase):
    def run(self):
        print("Running MyTask1")


class MyTask2(TaskBase):
    def run(self):
        print("Running MyTask2")

# Workflow definition
# MyTask1 -> MyTask2
w = Workflow(process_category="My", process_name="Workflow", workflow_package=__package__, workflow_version="dev")
w.add_node(MyTask1, upstreams=None)
w.add_node(MyTask2, upstreams=MyTask1)

Using the dkist-processing-core for data processing library/repo with airflow involves a project structure and build process that results in code artifacts deployed to [PyPI](https://pypi-hypernode.com/project/dkist/) and a zip of workflow artifacts deployed to artifactory.

Build Artifacts Diagram

The client dkist data processing libraries should implement a structure and build pipeline using [dkist-processing-test](https://bitbucket.org/dkistdc/dkist-processing-test/src/main/) as an example. The build pipelines for a client repo can leverage the [build_utils](dkist_processing_core/build_utils.py) for test and export.

Specifically for airflow, the resulting deployment has the versioned workflow artifacts all available to the scheduler and the versioned code artifacts available to workers for task execution

Airflow Deployment Diagram

Build

dkist-processing-core is built using [bitbucket-pipelines](bitbucket-pipelines.yml)

Deployment

dkist-processing-core is deployed to [PyPI](https://pypi-hypernode.com/project/dkist-processing-core/)

Environment Variables

VARIABLE

Description

Type

default

BUILD_VERSION MESH_CONFIG ISB_USERNAME ISB_PASSWORD

Build/Export pipelines only. This is the value that will be appended to all artifacts and represents their unique version Provides the dkistdc cloud mesh configuration. Specifically the location of the message broker Message broker user name Message broker password

STR JSON STR STR

dev

Development

git clone git@bitbucket.org:dkistdc/dkist-processing-core.git
cd dkist-processing-core
pre-commit install
pip install -e .[test]
pytest -v --cov dkist_processing_core

Project details


Release history Release notifications | RSS feed

Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

dkist-processing-core-0.2.7.tar.gz (369.9 kB view details)

Uploaded Source

File details

Details for the file dkist-processing-core-0.2.7.tar.gz.

File metadata

  • Download URL: dkist-processing-core-0.2.7.tar.gz
  • Upload date:
  • Size: 369.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/3.4.2 importlib_metadata/4.8.1 pkginfo/1.7.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.8.12

File hashes

Hashes for dkist-processing-core-0.2.7.tar.gz
Algorithm Hash digest
SHA256 0df1ee5a4dab764ac139edf9d3dc3255380c257dc99dbc84ab3deda004b52723
MD5 106d426d16718671ea90e132bb16f152
BLAKE2b-256 a8983c1ab4232c0634a829cd74fe762dbf1b60cdb98eae3d062a71331b1144a4

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page