Skip to main content

Bounded-memory serverless distributed N-dimensional array processing

Project description

Cubed

Codename: Barry

Note: this is a proof-of-concept, and many things are incomplete or don't work.

Bounded-memory serverless distributed N-dimensional array processing

Cubed is a distributed N-dimensional array library implemented in Python using bounded-memory serverless processing and Zarr for storage.

Example

>>> import cubed.array_api as xp
>>> import cubed.random
>>> spec = cubed.Spec(work_dir="tmp", max_mem=100_000)
>>> a = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> b = cubed.random.random((4, 4), chunks=(2, 2), spec=spec)
>>> c = xp.matmul(a, b)
>>> c.compute()
array([[1.22171031, 0.93644194, 1.83459119, 1.8087655 ],
       [1.3540541 , 1.13054495, 2.24504742, 2.05022751],
       [0.98211893, 0.62740696, 1.21686602, 1.26402294],
       [1.58566331, 1.33010476, 2.3994953 , 2.29258764]])

See more in the demo notebook.

See the examples README for more about running on cloud services.

Motivation

Managing memory is one of the major challenges in designing and running distributed systems. Computation frameworks like Apache Hadoop's MapReduce, Apache Spark, Beam, and Dask all provide a general-purpose processing model, which has lead to their widespread adoption. Successful use at scale however requires the user to carefully configure memory for worker nodes, and to understand how work is allocated to workers, which breaks the high-level programming abstraction. A disproportionate amount of time is often spent tuning the memory configuration of a large computation.

A common theme here is that most interesting computations are not embarrassingly parallel, but involve shuffling data between nodes. A lot of engineering effort has been put into optimizing the shuffle in Hadoop, Spark, Beam (Google Dataflow), and to a lesser extent Dask. This has undoubtedly improved performance, but has not made the memory problems go away.

Another approach has started gaining traction in the last few years. Lithops (formerly Pywren) and Rechunker, eschew centralized systems like the shuffle, and do everything via serverless cloud services and cloud storage.

Rechunker is interesting, since it implements a very targeted use case (rechunking persistent N-dimensional arrays), using only stateless (serverless) operations, with guaranteed memory usage. Even though it can run on systems like Beam and Dask, it deliberately avoids passing array chunks between worker nodes using the shuffle. Instead, all bulk data operations are reads from, or writes to, cloud storage (Zarr in this case). Since chunks are always of known size it is possible to tightly control memory usage, thereby avoiding unpredictable memory use at runtime.

This project is an attempt to go further: implement all distributed array operations using a bounded-memory serverless model.

Design

Cubed is composed of five layers: from the storage layer at the bottom, to the Array API layer at the top:

Five layer diagram

Blue blocks are implemented in Cubed, green in Rechunker, and red in other projects like Zarr and Beam.

Let's go through the layers from the bottom:

Storage

Every array in Cubed is backed by a Zarr array. This means that the array type inherits Zarr attributes including the underlying store (which may be on local disk, or a cloud store, for example), as well as the shape, dtype, and chunks. Chunks are the unit of storage and computation in this system.

Runtime

Cubed uses external runtimes for computation. It follows the Rechunker model (and uses its API) to delegate tasks to stateless executors, which include Python (in-process), Beam, Dask, Lithops, and Prefect.

Primitive operations

There are two primitive operations on arrays:

blockwise
Applies a function to multiple blocks from multiple inputs, expressed using concise indexing rules.
rechunk
Changes the chunking of an array, without changing its shape or dtype.

Core operations

These are built on top of the primitive operations, and provide functions that are needed to implement all array operations.

elemwise
Applies a function elementwise to its arguments, respecting broadcasting.
index
Subsets an array, along one or more axes.
map_blocks
Applies a function to corresponding blocks from multiple inputs.
map_direct
Applies a function across blocks of a new array, reading directly from side inputs (not necessarily in a blockwise fashion).
reduction
Applies a function to reduce an array along one or more axes.
arg_reduction
A reduction that returns the array indexes, not the values.

Array API

The new Python Array API was chosen for the public API as it provides a useful, well-defined subset of the NumPy API. There are a few extensions, including Zarr IO, random number generation, and operations like map_blocks which are heavily used in Dask applications.

Features

Task failure handling
If a task fails - with an IO exception when reading or writing to Zarr, for example - it will be retried (up to a total of three attempts).
Resume a computation from a checkpoint
Since intermediate arrays are persisted to Zarr, it is possible to resume a computation without starting from scratch. To do this, the Cubed Array object should be stored persistently (using pickle), so it can be reloaded in a new process and then compute() called on it to finish the computation.
Straggler mitigation
A few slow running tasks (called stragglers) can disproportionately slow down the whole computation. To mitigate this, speculative duplicate tasks are launched in certain circumstances, acting as backups that complete more quickly than the straggler, hence bringing down the overall time taken. [Not yet implemented]

Implementation

Cubed has a lazy computation model. As array functions are invoked, a computation plan is built up, and it is only executed when explicitly triggered with a call to compute, or when implicitly triggered by converting an array to an in-memory (NumPy) or on-disk (Zarr) representation.

A Plan object is a directed acyclic graph (DAG), where the nodes are arrays and the edges express primitive operations. For example, one array may be rechunked to another using a rechunk operation. Or a pair of arrays may be added together using a blockwise operation.

Of the primitive operations, blockwise, rechunk, and indexing operations all have memory requirements that are known ahead of time. Each operation runs a task to compute each chunk of the output. The memory needed for each task is a function of chunk size, dtype, and the precise nature of the operation, but it can be computed before the whole operation is run, while building the plan. The user is required to specify the maximum amount of memory that tasks can use, and if the computation would exceed that amount, an exception is raised during the planning phase. This means that the user can have high confidence that the operation will run reliably.

A plan is executed by traversing the DAG and materializing arrays by writing them to Zarr storage. Details of how a plan is executed depends on the runtime. Distributed runtimes, for example, may choose to materialize arrays that don't depend on one another in parallel for efficiency.

This processing model has advantages and disadvantages. One advantage is that since there is no shuffle involved, it is a straightforward model that can scale up with very high-levels of parallelism - for example in a serverless environment. This also makes it straightforward to make it run on multiple execution engines - the implementation here uses an adapation of Rechunker which supports local Python, Beam, Dask, and Prefect out of the box.

The main disadvantage of this model is that every intermediate array is written to storage, which can be slow. However, there are opportunities to optimize the DAG before running it (such as map fusion).

Comparison with Dask

Dask is a "flexible library for parallel computing in Python". It has several components: Dask Array, Dask DataFrame, Dask Bag, and Dask Delayed. Cubed supports only distributed arrays, corresponding to Dask Array.

Dask converts high-level operations into a task graph, where tasks correspond to chunk-level operations. In Cubed, the computational graph (or plan) is defined at the level of array operations and is decomposed into fine-grained tasks by the primitive operators for the runtime. Higher-level graphs are more useful for users, since they are easier to visualize and reason about. (Dask's newer High Level Graph abstraction is similar.)

Dask only has a single distributed runtime, Dask Distributed, whereas Cubed has the advantage of running on a variety of distributed runtimes, including more mature ones like Google Cloud Dataflow (a Beam runner).

The core operations and array API layers in Cubed are heavily influenced by Dask Array - in both naming, as well as implementation (Cubed has a dependency on Dask Array for some chunking utilities).

Previous work

This project is a continuation of a similar project I worked on, called Zappy. What's changed in the intervening three years? Rechunker was created (I wasn't so concerned with memory when working on Zappy). The Python Array API standard has been created, which makes implementing a new array API less daunting than implementing the NumPy API. And I have a better understanding of Dask, and the fundamental nature of the blockwise operation.

Development

Create an environment with

conda create --name cubed python=3.8
conda activate cubed
pip install -r requirements.txt

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

cubed-0.1.0.tar.gz (45.9 kB view details)

Uploaded Source

Built Distribution

cubed-0.1.0-py3-none-any.whl (53.3 kB view details)

Uploaded Python 3

File details

Details for the file cubed-0.1.0.tar.gz.

File metadata

  • Download URL: cubed-0.1.0.tar.gz
  • Upload date:
  • Size: 45.9 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.1.0.tar.gz
Algorithm Hash digest
SHA256 54b61c482a73b5ac3bf5a5592799b7e25a44a212e06fb1c44da13dd6ad171cb2
MD5 6ec7ae17a16c9328f65304aa89016c2f
BLAKE2b-256 92b56648537c42196e5d9834dfefec54eff5023a8690af580fe5306b46ec5eec

See more details on using hashes here.

File details

Details for the file cubed-0.1.0-py3-none-any.whl.

File metadata

  • Download URL: cubed-0.1.0-py3-none-any.whl
  • Upload date:
  • Size: 53.3 kB
  • Tags: Python 3
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.0 CPython/3.8.13

File hashes

Hashes for cubed-0.1.0-py3-none-any.whl
Algorithm Hash digest
SHA256 2a2e63a6808e99ddd3a8d2fec386f230059b8a3daf8bf335dc33178ba2e6e1b1
MD5 44378f7313f0b19b5267842d0a012ae1
BLAKE2b-256 0b8fbb4ff8485b8e193a93f7753cbbd9ed17104868ad87c8c614a81ed0bc4d6e

See more details on using hashes here.

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page