Skip to main content

Provides an embarassingly parallel tool with sql backend.

Project description

BluePyParallel: Bluebrain Python Embarassingly Parallel library

Provides an embarassingly parallel tool with sql backend.

Introduction

Provides an embarassingly parallel tool with sql backend, inspired by BluePyMM.

Installation

This package should be installed using pip:

pip install bluepyparallel

Usage

General computation

factory_name = "multiprocessing"  # Can also be None, dask or ipyparallel
batch_size = 10  # This value is used to split the data into batches before processing them
chunk_size = 1000  # This value is used to gather the elements to process before sending them to the workers

# Setup the parallel factory
parallel_factory = init_parallel_factory(
    factory_name,
    batch_size=batch_size,
    chunk_size=chunk_size,
    processes=4,  # This parameter is specific to the multiprocessing factory
)

# Get the mapper from the factory
mapper = parallel_factory.get_mapper()

# Use the mapper to map the given function to each element of mapped_data and gather the results
result = sorted(mapper(function, mapped_data, *function_args, **function_kwargs))

Working with Pandas and SQL backend

This library provides a specific function working with large :class:pandas.DataFrame: :func:bluepyparallel.evaluator.evaluate. This function converts the DataFrame into a list of dict (one for each row), then maps a given function to element and finally gathers the results. As it aims at working with time consuming functions, it also provides a checkpoint and resume mechanism using a SQL backend. The SQL backend uses the SQLAlchemy library, so it can work with a large variety of database types (like SQLite, PostgreSQL, MySQL, ...). To activate this feature, just pass a URL that can be processed by SQLAlchemy to the db_url parameter of :func:bluepyparallel.evaluator.evaluate.

.. note:: A specific driver might have to be installed to access the database (like psycopg2 <https://www.psycopg.org/docs/>_ for PostgreSQL for example).

Example:

# Use the mapper to map the given function to each element of the DataFrame
result_df = evaluate(
    input_df,  # This is the DataFrame to process
    evaluation_function,  # This is the function that should be applied to each row of the DataFrame
    parallel_factory="multiprocessing",  # This could also be a Factory previously defined
    db_url="sqlite:///db.sql",  # This could also just be "db.sql" and would be automatically turned to SQLite URL
)

Now, if the computation crashed for any reason, the partial result is stored in the db.sql file. If the crash was due to an external cause (therefore executing the code again should work), it is possible to resume the computation from the last computed element. Thus, only the missing elements are computed, which can save a lot of time.

Running using Dask

This is an example of a sbatch script that can be adapted to execute the script using multiple nodes and workers. In this example, the code called by the <command> should parallelized using BluePyParallel.

Dask variables are not strictly required, but highly recommended, and they can be fine tuned.

#!/bin/bash -l

# Dask configuration
export DASK_DISTRIBUTED__LOGGING__DISTRIBUTED="info"
export DASK_DISTRIBUTED__WORKER__USE_FILE_LOCKING=False
export DASK_DISTRIBUTED__WORKER__MEMORY__TARGET=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__SPILL=False  # don't spill to disk
export DASK_DISTRIBUTED__WORKER__MEMORY__PAUSE=0.80  # pause execution at 80% memory use
export DASK_DISTRIBUTED__WORKER__MEMORY__TERMINATE=0.95  # restart the worker at 95% use
export DASK_DISTRIBUTED__WORKER__MULTIPROCESSING_METHOD=spawn
export DASK_DISTRIBUTED__WORKER__DAEMON=True
# Reduce dask profile memory usage/leak (see https://github.com/dask/distributed/issues/4091)
export DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL=10000ms  # Time between statistical profiling queries
export DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE=1000000ms  # Time between starting new profile

# Split tasks to avoid some dask errors (e.g. Event loop was unresponsive in Worker)
export PARALLEL_BATCH_SIZE=1000

srun -v <command>

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

bluepyparallel-0.0.8.tar.gz (30.7 kB view details)

Uploaded Source

Built Distribution

bluepyparallel-0.0.8-py3-none-any.whl (12.3 kB view details)

Uploaded Python 3

File details

Details for the file bluepyparallel-0.0.8.tar.gz.

File metadata

  • Download URL: bluepyparallel-0.0.8.tar.gz
  • Upload date:
  • Size: 30.7 kB
  • Tags: Source
  • Uploaded using Trusted Publishing? No
  • Uploaded via: twine/4.0.2 CPython/3.10.12

File hashes

Hashes for bluepyparallel-0.0.8.tar.gz
Algorithm Hash digest
SHA256 3de8c66ffe06841b6665d94575141ce2e0d888215a7e75924b583a30943dfc57
MD5 fb70b6da704130c98369e65a024ddb0e
BLAKE2b-256 ce5c76aefe360ad7c3b4023f349d73e5b91b5b9f70012d2c05f671b84916386c

See more details on using hashes here.

Provenance

File details

Details for the file bluepyparallel-0.0.8-py3-none-any.whl.

File metadata

File hashes

Hashes for bluepyparallel-0.0.8-py3-none-any.whl
Algorithm Hash digest
SHA256 da6fb1f93c307c962325ed9224f3cec80b8a3d2e6fec2452b031137d627976a0
MD5 3c875e4ae38f6667d35c088be92c22d4
BLAKE2b-256 76a2df1e6ac347798d950b4649a3612a8689f9e2f4f06e8bf08ee4ae5bfdbba7

See more details on using hashes here.

Provenance

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page