Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alternative parallel execution frameworks in xarray #6807

Closed
TomNicholas opened this issue Jul 18, 2022 · 12 comments · Fixed by #7019
Closed

Alternative parallel execution frameworks in xarray #6807

TomNicholas opened this issue Jul 18, 2022 · 12 comments · Fixed by #7019

Comments

@TomNicholas
Copy link
Member

Is your feature request related to a problem?

Since early on the project xarray has supported wrapping dask.array objects in a first-class manner. However recent work on flexible array wrapping has made it possible to wrap all sorts of array types (and with #6804 we should support wrapping any array that conforms to the array API standard).

Currently though the only way to parallelize array operations with xarray "automatically" is to use dask. (You could use xarray-beam or other options too but they don't "automatically" generate the computation for you like dask does.)

When dask is the only type of parallel framework exposing an array-like API then there is no need for flexibility, but now we have nascent projects like cubed to consider too. @tomwhite

Describe the solution you'd like

Refactor the internals so that dask is one option among many, and that any newer options can plug in in an extensible way.

In particular cubed deliberately uses the same API as dask.array, exposing:

  1. the methods needed to conform to the array API standard
  2. a .chunk and .compute method, which we could dispatch to
  3. dask-like functions to create computation graphs including blockwise, map_blocks, and rechunk

I would like to see xarray able to wrap any array-like object which offers this set of methods / functions, and call the corresponding version of that method for the correct library (i.e. dask vs cubed) automatically.

That way users could try different parallel execution frameworks simply via a switch like

ds.chunk(**chunk_pattern, manager="dask")

and see which one works best for their particular problem.

Describe alternatives you've considered

If we leave it the way it is now then xarray will not be truly flexible in this respect.

Any library can wrap (or subclass if they are really brave) xarray objects to provide parallelism but that's not the same level of flexibility.

Additional context

cubed repo

PR about making xarray able to wrap objects conforming to the new array API standard

cc @shoyer @rabernat @dcherian @keewis

@dcherian
Copy link
Contributor

This sounds great! We should finish up #4972 to make it easier to test.

@dcherian
Copy link
Contributor

Another parallel framework would be Ramba

cc @DrTodd13

@shoyer
Copy link
Member

shoyer commented Jul 19, 2022

Sounds good to me. The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.

@andersy005
Copy link
Member

at SciPy i learned of fugue which tries to provide a unified API for distributed DataFrames on top of Spark and Dask. it could be a great source of inspiration.

@tomwhite
Copy link
Contributor

Thanks for opening this @TomNicholas

The challenge will be defining a parallel computing API that works across all these projects, with their slightly different models.

Agreed. I feel like there's already an implicit set of "chunked array" methods that xarray expects from Dask that could be formalised a bit and exposed as an integration point.

@sdbachman
Copy link

Might I propose Arkouda?

https://github.com/Bears-R-Us/arkouda
https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf

@DrTodd13
Copy link

Might I propose Arkouda?

https://github.com/Bears-R-Us/arkouda https://chapel-lang.org/presentations/Arkouda_SIAM_PP-22.pdf

Have they improved recently to support more than 1D arrays?

@benbovy
Copy link
Member

benbovy commented Oct 13, 2022

Not really a generic and parallel execution back-end, but Open-EO looks like an interesting use case too (it is a framework for managing remote execution of processing tasks on multiple big Earth observation cloud back-ends via a common API). I've suggested the idea of reusing the Xarray API here: Open-EO/openeo-python-client#334.

@TomNicholas
Copy link
Member Author

@rabernat just pointed out to me that in order for this to work well we might also need lazy concatenation of arrays.

Xarray currently has it's own internal wrappers that allow lazy indexing, but they don't yet allow lazy concatenation. Instead dask is what does lazy concatenation under the hood right now.

This is a problem - it means that concatenating two cubed-backed DataArrays will trigger loading both into memory, whereas concatenating two dask-backed DataArrays will not. If #4628 was implemented then xarray would never load the underlying array into memory regardless of the backend.

@shoyer
Copy link
Member

shoyer commented Oct 21, 2022

Cubed should define a concatenate function, so that should be OK

@tomwhite
Copy link
Contributor

Cubed implements concat, but perhaps xarray needs richer concat functionality than that?

@dcherian
Copy link
Contributor

dcherian commented Oct 21, 2022

IIUC the issue Ryan & Tom are talking about is tied to reading from files.

For example, we read from a zarr store using zarr, then wrap that zarr.Array (or h5Py Dataset) with a large number of ExplicitlyIndexed Classes that enable more complicated indexing, lazy decoding etc.

IIUC #4628 is about concatenating such arrays i.e. neither zarr.Array nor ExplicitlyIndexed support concatenation, so we end up calling np.array and forcing a disk read.

With dask or cubed we would have dask(ExplicitlyIndexed(zarr)) or cubed(ExplicitlyIndexed(zarr)) so as long as dask and cubed define concat and we dispatch to them, everything is 👍🏾

PS: This is what I was attempting to explain (not very clearly) in the distributed arrays meeting. We don't ever use dask.array.from_zarr (for e.g.). We use zarr to read, then wrap in ExplicitlyIndexed and then pass to dask.array.from_array.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants