Skip to main content

Fixed-memory serverless distributed N-dimensional array processing

Project description

Cubed

Codename: Barry

Note: this is a proof-of-concept, and many things are incomplete or don't work.

Fixed-memory serverless distributed N-dimensional array processing

Cubed is a distributed N-dimensional array library implemented in Python using fixed-memory serverless processing and Zarr for storage.

Example

>>> import cubed
>>> import cubed.random
>>> spec = cubed.Spec(work_dir="tmp", max_mem=100_000)
>>> a = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> b = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> c = cubed.matmul(a, b)
>>> c.compute()
array([[1.22171031, 0.93644194, 1.83459119, 1.8087655 ],
       [1.3540541 , 1.13054495, 2.24504742, 2.05022751],
       [0.98211893, 0.62740696, 1.21686602, 1.26402294],
       [1.58566331, 1.33010476, 2.3994953 , 2.29258764]])

See more in the demo notebook.

Motivation

Managing memory is one of the major challenges in designing and running distributed systems. Computation frameworks like Apache Hadoop's MapReduce, Apache Spark, Beam, and Dask all provide a general-purpose processing model, which has lead to their widespread adoption. Successful use at scale however requires the user to carefully configure memory for worker nodes, and to understand how work is allocated to workers, which breaks the high-level programming abstraction. A disproportionate amount of time is often spent tuning the memory configuration of a large computation.

A common theme here is that most interesting computations are not embarrassingly parallel, but involve shuffling data between nodes. A lot of engineering effort has been put into optimizing the shuffle in Hadoop, Spark, Beam (Google Dataflow), and to a lesser extent Dask. This has undoubtedly improved performance, but has not made the memory problems go away.

Another approach has started gaining traction in the last few years. Lithops (formerly Pywren) and Rechunker, eschew centralized systems like the shuffle, and do everything via serverless cloud services and cloud storage.

Rechunker is interesting, since it implements a very targeted use case (rechunking persistent N-dimensional arrays), using only stateless (serverless) operations, with guaranteed memory usage. Even though it can run on systems like Beam and Dask, it deliberately avoids passing array chunks between worker nodes using the shuffle. Instead, all bulk data operations are reads from, or writes to, cloud storage (Zarr in this case). Since chunks are always of known size it is possible to tightly control memory usage, thereby avoiding unpredictable memory use at runtime.

This project is an attempt to go further: implement all distributed array operations using a fixed-memory serverless model.

Design

Cubed is composed of five layers: from the storage layer at the bottom, to the Array API layer at the top:

Five layer diagram

Blue blocks are implemented in Cubed, green in Rechunker, and red in other projects like Zarr and Beam.

Let's go through the layers from the bottom:

Storage

Every array in Cubed is backed by a Zarr array. This means that the array type inherits Zarr attributes including the underlying store (which may be on local disk, or a cloud store, for example), as well as the shape, dtype, and chunks. Chunks are the unit of storage and computation in this system.

Runtime

Cubed uses external runtimes for computation. It follows the Rechunker model (and uses its API) to delegate tasks to stateless executors, which include Python (in-process), Beam, Dask, Lithops, and Prefect.

Primitive operations

There are a small number of primitive operations on arrays:

blockwise
Applies a function to multiple blocks from multiple inputs, expressed using concise indexing rules.
rechunk
Changes the chunking of an array, without changing its shape or dtype.
broadcast_to
Expands array dimensions to a given shape by repeating values, but without physically copying data.
indexing
Subsets an array, along one or more axes.

(This list may need to be expanded to support structural operations, like concatenation.)

Core operations

These are built on top of the primitive operations, and provide functions that are needed to implement all array operations.

map_blocks
Applies a function to corresponding blocks from multiple inputs.
reduction
Applies a function to reduce an array along one or more axes.
arg_reduction
A reduction that returns the array indexes, not the values.

Note that these operations are all included in Dask, and Dask uses them as building blocks to implement a large subset of the NumPy API.

Array API

The new Python Array API was chosen for the public API as it provides a useful, well-defined subset of the NumPy API. There are a few extensions, including Zarr IO, random number generation, and operations like map_blocks which are heavily used in Dask applications.

Implementation

Cubed has a lazy computation model. As array functions are invoked, a computation plan is built up, and it is only executed when explicitly triggered with a call to compute, or when implicitly triggered by converting an array to an in-memory (NumPy) or on-disk (Zarr) representation.

A Plan object is a directed acyclic graph (DAG), where the nodes are arrays and the edges express primitive operations. For example, one array may be rechunked to another using a rechunk operation. Or a pair of arrays may be added together using a blockwise operation.

Of the primitive operations, blockwise, rechunk, and indexing operations all have memory requirements that are known ahead of time. Each operation runs a task to compute each chunk of the output. The memory needed for each task is a function of chunk size, dtype, and the precise nature of the operation, but it can be computed before the whole operation is run, while building the plan. The user is required to specify the maximum amount of memory that tasks can use, and if the computation would exceed that amount, an exception is raised during the planning phase. This means that the user can have high confidence that the operation will run reliably.

The broadcast_to operation is different to the others, since it is effectively a view on a Zarr array, and therefore does not require any computation.

A plan is executed by traversing the DAG and materializing arrays by writing them to Zarr storage. Details of how a plan is executed depends on the runtime. Distributed runtimes, for example, may choose to materialize arrays that don't depend on one another in parallel for efficiency.

This processing model has advantages and disadvantages. One advantage is that since there is no shuffle involved, it is a straightforward model that can scale up with very high-levels of parallelism - for example in a serverless environment. This also makes it straightforward to make it run on multiple execution engines - the implementation here uses an adapation of Rechunker which supports local Python, Beam, Dask, and Prefect out of the box.

The main disadvantage of this model is that every intermediate array is written to storage, which can be slow. However, there are opportunities to optimize the DAG before running it (such as map fusion).

Comparison with Dask

Dask is a "flexible library for parallel computing in Python". It has several components: Dask Array, Dask DataFrame, Dask Bag, and Dask Delayed. Cubed supports only distributed arrays, corresponding to Dask Array.

Dask converts high-level operations into a task graph, where tasks correspond to chunk-level operations. In Cubed, the computational graph (or plan) is defined at the level of array operations and is decomposed into fine-grained tasks by the primitive operators for the runtime. Higher-level graphs are more useful for users, since they are easier to visualize and reason about. (Dask's newer High Level Graph abstraction is similar.)

Dask only has a single distributed runtime, Dask Distributed, whereas Cubed has the advantage of running on a variety of distributed runtimes, including more mature ones like Google Cloud Dataflow (a Beam runner).

The core operations and array API layers in Cubed are heavily influenced by Dask Array - in both naming, as well as implementation (Cubed has a dependency on Dask Array for some chunking utilities).

Previous work

This project is a continuation of a similar project I worked on, called Zappy. What's changed in the intervening three years? Rechunker was created (I wasn't so concerned with memory when working on Zappy). The Python Array API standard has been created, which makes implementing a new array API less daunting than implementing the NumPy API. And I have a better understanding of Dask, and the fundamental nature of the blockwise operation.

Development

Create an environment with

conda create --name cubed python=3.8
conda activate cubed
pip install -r requirements.txt

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubed-0.0.1.tar.gz (30.2 kB view details)

Uploaded Source

Built Distribution

cubed-0.0.1-py3-none-any.whl (35.4 kB view details)

Uploaded Python 3

File details

Details for the file cubed-0.0.1.tar.gz.

File metadata

  • Download URL: cubed-0.0.1.tar.gz
  • Upload date:
  • Size: 30.2 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.0.1.tar.gz
Algorithm Hash digest
SHA256 bbc10061d385641e9f2085c1d2c4f45d69afbee706a05c328ce47670348f35c3
MD5 117f5b060dce2e66d0462268a6f12260
BLAKE2b-256 c32fa1d31e30a34ea1caee59641a7a26c0a179d9aef5709f65542dabb3a12408

See more details on using hashes here.

File details

Details for the file cubed-0.0.1-py3-none-any.whl.

File metadata

  • Download URL: cubed-0.0.1-py3-none-any.whl
  • Upload date:
  • Size: 35.4 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.0.1-py3-none-any.whl
Algorithm Hash digest
SHA256 760c62da4acab6c796a6d7cdfd9478384b25ecf80a69e550be866ea9d237c933
MD5 200f1856b63952bcddfad970b5bd98ac
BLAKE2b-256 0607eed9fc50eb64760670375bef7b95c0090292839ace8d9f2f91a1feb66f4f

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page