Skip to main content

Bounded-memory serverless distributed N-dimensional array processing

Project description

Cubed

Codename: Barry

Note: this is a proof-of-concept, and many things are incomplete or don't work.

Bounded-memory serverless distributed N-dimensional array processing

Cubed is a distributed N-dimensional array library implemented in Python using bounded-memory serverless processing and Zarr for storage.

Example

>>> import cubed.array_api as xp
>>> import cubed.random
>>> spec = cubed.Spec(work_dir="tmp", max_mem=100_000)
>>> a = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> b = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> c = xp.matmul(a, b)
>>> c.compute()
array([[1.22171031, 0.93644194, 1.83459119, 1.8087655 ],
       [1.3540541 , 1.13054495, 2.24504742, 2.05022751],
       [0.98211893, 0.62740696, 1.21686602, 1.26402294],
       [1.58566331, 1.33010476, 2.3994953 , 2.29258764]])

See more in the demo notebook.

See the examples README for more about running on cloud services.

Motivation

Managing memory is one of the major challenges in designing and running distributed systems. Computation frameworks like Apache Hadoop's MapReduce, Apache Spark, Beam, and Dask all provide a general-purpose processing model, which has lead to their widespread adoption. Successful use at scale however requires the user to carefully configure memory for worker nodes, and to understand how work is allocated to workers, which breaks the high-level programming abstraction. A disproportionate amount of time is often spent tuning the memory configuration of a large computation.

A common theme here is that most interesting computations are not embarrassingly parallel, but involve shuffling data between nodes. A lot of engineering effort has been put into optimizing the shuffle in Hadoop, Spark, Beam (Google Dataflow), and to a lesser extent Dask. This has undoubtedly improved performance, but has not made the memory problems go away.

Another approach has started gaining traction in the last few years. Lithops (formerly Pywren) and Rechunker, eschew centralized systems like the shuffle, and do everything via serverless cloud services and cloud storage.

Rechunker is interesting, since it implements a very targeted use case (rechunking persistent N-dimensional arrays), using only stateless (serverless) operations, with guaranteed memory usage. Even though it can run on systems like Beam and Dask, it deliberately avoids passing array chunks between worker nodes using the shuffle. Instead, all bulk data operations are reads from, or writes to, cloud storage (Zarr in this case). Since chunks are always of known size it is possible to tightly control memory usage, thereby avoiding unpredictable memory use at runtime.

This project is an attempt to go further: implement all distributed array operations using a bounded-memory serverless model.

Design

Cubed is composed of five layers: from the storage layer at the bottom, to the Array API layer at the top:

Five layer diagram

Blue blocks are implemented in Cubed, green in Rechunker, and red in other projects like Zarr and Beam.

Let's go through the layers from the bottom:

Storage

Every array in Cubed is backed by a Zarr array. This means that the array type inherits Zarr attributes including the underlying store (which may be on local disk, or a cloud store, for example), as well as the shape, dtype, and chunks. Chunks are the unit of storage and computation in this system.

Runtime

Cubed uses external runtimes for computation. It follows the Rechunker model (and uses its API) to delegate tasks to stateless executors, which include Python (in-process), Beam, Lithops, and other Rechunker executors like Dask and Prefect.

Primitive operations

There are two primitive operations on arrays:

blockwise
Applies a function to multiple blocks from multiple inputs, expressed using concise indexing rules.
rechunk
Changes the chunking of an array, without changing its shape or dtype.

Core operations

These are built on top of the primitive operations, and provide functions that are needed to implement all array operations.

elemwise
Applies a function elementwise to its arguments, respecting broadcasting.
index
Subsets an array, along one or more axes.
map_blocks
Applies a function to corresponding blocks from multiple inputs.
map_direct
Applies a function across blocks of a new array, reading directly from side inputs (not necessarily in a blockwise fashion).
reduction
Applies a function to reduce an array along one or more axes.
arg_reduction
A reduction that returns the array indexes, not the values.

Array API

The new Python Array API was chosen for the public API as it provides a useful, well-defined subset of the NumPy API. There are a few extensions, including Zarr IO, random number generation, and operations like map_blocks which are heavily used in Dask applications.

Features

Task failure handling
If a task fails - with an IO exception when reading or writing to Zarr, for example - it will be retried (up to a total of three attempts).
Resume a computation from a checkpoint
Since intermediate arrays are persisted to Zarr, it is possible to resume a computation without starting from scratch. To do this, the Cubed Array object should be stored persistently (using pickle), so it can be reloaded in a new process and then compute() called on it to finish the computation.
Straggler mitigation
A few slow running tasks (called stragglers) can disproportionately slow down the whole computation. To mitigate this, speculative duplicate tasks are launched in certain circumstances, acting as backups that complete more quickly than the straggler, hence bringing down the overall time taken.

Implementation

Cubed has a lazy computation model. As array functions are invoked, a computation plan is built up, and it is only executed when explicitly triggered with a call to compute, or when implicitly triggered by converting an array to an in-memory (NumPy) or on-disk (Zarr) representation.

A Plan object is a directed acyclic graph (DAG), where the nodes are arrays and the edges express primitive operations. For example, one array may be rechunked to another using a rechunk operation. Or a pair of arrays may be added together using a blockwise operation.

The primitive operations, blockwise and rechunk both have memory requirements that are known ahead of time. Each operation runs a task to compute each chunk of the output. The memory needed for each task is a function of chunk size, dtype, and the nature of the operation, which can be computed while building the plan.

While it is not possible in general to compute the precise amount of memory that will be used (since it is not known ahead of time how well a Zarr chunk will compress), it is possible to put a conservative upper bound on the memory usage. This upper bound is the required_mem for a task. The user must specify the maximum amount of memory that tasks can use, and if it is less than required_mem, an exception is raised during the planning phase. This means that the user can have high confidence that the operation will run reliably.

A plan is executed by traversing the DAG and materializing arrays by writing them to Zarr storage. Details of how a plan is executed depends on the runtime. Distributed runtimes, for example, may choose to materialize arrays that don't depend on one another in parallel for efficiency.

This processing model has advantages and disadvantages. One advantage is that since there is no shuffle involved, it is a straightforward model that can scale up with very high-levels of parallelism - for example in a serverless environment. This also makes it straightforward to make it run on multiple execution engines.

The main disadvantage of this model is that every intermediate array is written to storage, which can be slow. However, there are opportunities to optimize the DAG before running it (such as map fusion).

Comparison with Dask

Dask is a "flexible library for parallel computing in Python". It has several components: Dask Array, Dask DataFrame, Dask Bag, and Dask Delayed. Cubed supports only distributed arrays, corresponding to Dask Array.

Dask converts high-level operations into a task graph, where tasks correspond to chunk-level operations. In Cubed, the computational graph (or plan) is defined at the level of array operations and is decomposed into fine-grained tasks by the primitive operators for the runtime. Higher-level graphs are more useful for users, since they are easier to visualize and reason about. (Dask's newer High Level Graph abstraction is similar.)

Dask only has a single distributed runtime, Dask Distributed, whereas Cubed has the advantage of running on a variety of distributed runtimes, including more mature ones like Google Cloud Dataflow (a Beam runner).

The core operations and array API layers in Cubed are heavily influenced by Dask Array - in both naming, as well as implementation (Cubed has a dependency on Dask Array for some chunking utilities).

Previous work

This project is a continuation of a similar project I worked on, called Zappy. What's changed in the intervening three years? Rechunker was created (I wasn't so concerned with memory when working on Zappy). The Python Array API standard has been created, which makes implementing a new array API less daunting than implementing the NumPy API. And I have a better understanding of Dask, and the fundamental nature of the blockwise operation.

Development

Create an environment with

conda create --name cubed python=3.8
conda activate cubed
pip install -r requirements.txt
pip install -e .

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubed-0.2.0.tar.gz (51.9 kB view details)

Uploaded Source

Built Distribution

cubed-0.2.0-py3-none-any.whl (62.1 kB view details)

Uploaded Python 3

File details

Details for the file cubed-0.2.0.tar.gz.

File metadata

  • Download URL: cubed-0.2.0.tar.gz
  • Upload date:
  • Size: 51.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.2.0.tar.gz
Algorithm Hash digest
SHA256 4e9492601bec6db3e4ec3afa47ea6b50b749a165470f2486b68e4cd48241991d
MD5 269ca4d28f803a8e5d60992838859901
BLAKE2b-256 3eaca01396e4079c4aa18fe5cb2f2bd8dc3f270ab5c5a62b15108955a40f8575

See more details on using hashes here.

File details

Details for the file cubed-0.2.0-py3-none-any.whl.

File metadata

  • Download URL: cubed-0.2.0-py3-none-any.whl
  • Upload date:
  • Size: 62.1 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.2.0-py3-none-any.whl
Algorithm Hash digest
SHA256 4ada06152c67ad5cacdb67ee68782226579312369e9d1590f1cd2a1c4c13cec5
MD5 1081f9d16958b225f16149478687d0f4
BLAKE2b-256 0fffe30b76245a07d4ce9ffeb31d69988d6abd77271e478785094bb491f2d12d

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page