GroupBy operations for dask.array
Project description
flox
This project explores strategies for fast GroupBy reductions with dask.array. It used to be called dask_groupby
It was motivated by
(See a presentation about this package, from the Pangeo Showcase).
Acknowledgements
This work was funded in part by NASA-ACCESS 80NSSC18M0156 "Community tools for analysis of NASA Earth Observing System Data in the Cloud" (PI J. Hamman), and NCAR's Earth System Data Science Initiative. It was motivated by many discussions in the Pangeo community.
API
There are two main functions
flox.groupby_reduce(dask_array, by_dask_array, "mean")
"pure" dask array interfaceflox.xarray.xarray_reduce(xarray_object, by_dataarray, "mean")
"pure" xarray interface; though work is ongoing to integrate this package in xarray.
Xarray's current GroupBy strategy
Xarray's current strategy is to find all unique group labels, index out each group, and then apply the reduction operation. Note that this only works if we know the group labels (i.e. you cannot use this strategy to group by a dask array).
Schematically, this looks like (colors indicate group labels; separated groups of colors indicate different blocks of an array):
The first step is to extract all members of a group, which involves a lot of communication and is quite expensive (in dataframe terminology, this is a "shuffle"). This is fundamentally why many groupby reductions don't work well right now with big datasets.
Implementation
flox
outsources the core GroupBy operation to the vectorized implementations in
numpy_groupies. Constructing
an efficient groupby reduction with dask is hard, and depends on how the
groups are distributed amongst the blocks of an array. flox
implements 3 strategies for
effective grouped reductions, each is appropriate for a particular distribution of groups
among the blocks of a dask array.
Switch between the various strategies by passing method
to either groupby_reduce
or xarray_reduce
.
method="mapreduce"
The first idea is to use the "map-reduce" strategy (inspired by dask.dataframe
).
The GroupBy reduction is first applied blockwise. Those intermediate results are
combined by concatenating to form a new array which is then reduced
again. The combining of intermediate results uses dask's _tree_reduce
till all group results are in one block. At that point the result is
"finalized" and returned to the user.
Tradeoffs:
- Allows grouping by a dask array so group labels need not be known at graph construction time.
- Works well when either the initial blockwise reduction is effective, or if the reduction at the first combine step is effective. "effective" means we actually reduce values and release some memory.
method="blockwise"
One case where "mapreduce"
doesn't work well is the case of "resampling" reductions. An
example here is resampling from daily frequency to monthly frequency data: da.resample(time="M").mean()
For resampling type reductions,
- Group members occur sequentially (all days in January 2001 occur one after the other)
- All groups are roughly equal length (31 days in January but 28 in most Februaries)
- All members in a group are next to each other (if the time series is sorted, which it usually is).
In this case, it makes sense to use dask.dataframe
resample strategy which is to rechunk
so that all members of a group are in a single block. Then, the groupby operation can be applied blockwise.
Tradeoffs
- Only works for certain groupings.
- Group labels must be known at graph construction time, so this only works for numpy arrays
- Currently the rechunking is only implemented for 1D arrays (being motivated by time resampling), but a nD generalization seems possible.
- Works better when multiple groups are already in a single block; so that the intial rechunking only involves a small amount of communication.
method="cohorts"
We can combine all of the above ideas for cases where members from different groups tend to occur close to each other.
One example is the construction of "climatologies" which is a climate science term for something like groupby("time.month")
("monthly climatology") or groupby("time.dayofyear")
("daily climatology"). In these cases,
- Groups occur sequentially (day 2 is always after day 1; and February is always after January)
- Groups are approximately periodic (some years have 365 days and others have 366)
The idea here is to copy xarray's subsetting strategy but instead index out "cohorts" or group labels that tend to occur next to each other.
Consider this example of monthly average data; where 4 months are present in a single block (i.e. chunksize=4)
Because a chunksize of 4 evenly divides the number of groups (12) all we need to do is index out blocks
0, 3, 7 and then apply the "mapreduce"
strategy to form the final result for months Jan-Apr. Repeat for the
remaining groups of months (May-Aug; Sep-Dec) and then concatenate.
flox
can find these cohorts, below it identifies the cohorts with labels 1,2,3,4
; 5,6,7,8
, and 9,10,11,12
.
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
[[[1, 2, 3, 4], [5, 6, 7, 8], [9, 10, 11, 12]] # 3 cohorts
For each cohort, it counts the number of blocks that need to be reduced. If 1
then it applies the reduction blockwise.
If > 1; then it use "mapreduce"
.
One annoyance is that if the chunksize doesn't evenly divide the number of groups, we still end up splitting a number of chunks.
For example, when chunksize=5
>>> flox.core.find_group_cohorts(labels, array.chunks[-1]))
[[1], [2, 3], [4, 5], [6], [7, 8], [9, 10], [11], [12]] # 8 cohorts
We find 8 cohorts (note the original xarray strategy is equivalent to constructing 10 cohorts).
It's possible that some initial rechunking makes the situation better (just rechunk from 5-4), but it isn't an obvious improvement. If you have ideas for improving this case, please open an issue.
Tradeoffs
- Generalizes well; when there's exactly one groups per chunk, this replicates Xarray's
strategy which is optimal. For resampling type reductions, as long as the array
is chunked appropriately (
flox.core.rechunk_for_blockwise
,flox.xarray.rechunk_for_blockwise
),method="cohorts"
is equivalent tomethod="blockwise"
! - Group labels must be known at graph construction time, so this only works for numpy arrays
- Currenltly implemented for grouping by 1D arrays. An nD generalization seems possible, but hard?
Custom reductions
flox
implements all common reductions provided by numpy_groupies
in aggregations.py
.
It also allows you to specify a custom Aggregation (again inspired by dask.dataframe),
though this might not be fully functional at the moment. See aggregations.py
for examples.
mean = Aggregation(
# name used for dask tasks
name="mean",
# operation to use for pure-numpy inputs
numpy="mean",
# blockwise reduction
chunk=("sum", "count"),
# combine intermediate results: sum the sums, sum the counts
combine=("sum", "sum"),
# generate final result as sum / count
finalize=lambda sum_, count: sum_ / count,
# Used when "reindexing" at combine-time
fill_value=0,
# Used when any member of `expected_groups` is not found
final_fill_value=np.nan,
)
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
File details
Details for the file flox-0.2.1.tar.gz
.
File metadata
- Download URL: flox-0.2.1.tar.gz
- Upload date:
- Size: 197.3 kB
- Tags: Source
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 838af646435b6d67e75503098823120e218d55591b96bdc2d686140ac025516f |
|
MD5 | 95aa1ae0631dcf1df75c451929314162 |
|
BLAKE2b-256 | 3a48f3c8774cfdb2af9553bd34e50d0260320deba3d8842e7fc91f1fc569ccd0 |
File details
Details for the file flox-0.2.1-py3-none-any.whl
.
File metadata
- Download URL: flox-0.2.1-py3-none-any.whl
- Upload date:
- Size: 45.0 kB
- Tags: Python 3
- Uploaded using Trusted Publishing? No
- Uploaded via: twine/3.6.0 importlib_metadata/4.8.2 pkginfo/1.8.1 requests/2.26.0 requests-toolbelt/0.9.1 tqdm/4.62.3 CPython/3.10.0
File hashes
Algorithm | Hash digest | |
---|---|---|
SHA256 | 891c7732d656a552cfa23a143b000106162c6562f69a23f3f01ee6d839d282ff |
|
MD5 | 0e5506899243dc0f6b64feb25bc63297 |
|
BLAKE2b-256 | 8ed29607613bc4f8a6c9104da25156c907374c2c0f87405209e6b6e258713021 |