Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed writes #1793

Merged
merged 40 commits into from
Mar 10, 2018
Merged

fix distributed writes #1793

merged 40 commits into from
Mar 10, 2018

Conversation

jhamman
Copy link
Member

@jhamman jhamman commented Dec 19, 2017

Right now, I've just modified the dask distributed integration tests so we can all see the failing tests.

I'm happy to push this further but I thought I'd see if either @shoyer or @mrocklin have an idea where to start?

@shoyer
Copy link
Member

shoyer commented Dec 19, 2017

yes, see #1464 (comment)

@mrocklin
Copy link
Contributor

The zarr test seems a bit different. I think your issue here is that you are trying to use synchronous API with the async test harness. I've changed your test and pushed to your branch (hope you don't mind). Relevant docs are here: http://distributed.readthedocs.io/en/latest/develop.html#writing-tests

Async testing is nicer in many ways, but does require you to be a bit familiar with the async/tornado API. I also suspect that operations like to_zarr really aren't yet async friendly.

@jhamman jhamman added this to the 0.10.1 milestone Jan 2, 2018
@jhamman jhamman mentioned this pull request Jan 11, 2018
5 tasks
with self.datastore.ensure_open(autoclose=True):
data = self.get_array()
data[key] = value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoyer, is this what you were describing in #1464 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks right to me.

@jhamman
Copy link
Member Author

jhamman commented Jan 11, 2018

@mrocklin -

I have a test failing here with a familiar message.

E       TypeError: 'Future' object is not iterable

We saw this last week when debugging some pangeo things. Can you remind me what our solution was?

@mrocklin
Copy link
Contributor

I don't know. I would want to look at the fail case locally. I can try to do this near term, no promises though :/

@jhamman
Copy link
Member Author

jhamman commented Jan 25, 2018

I've just taken another swing at this and come up empty. I open to ideas in the following areas:

  1. scipy backend is failing to roundtrip a length 1 datetime array: https://travis-ci.org/pydata/xarray/jobs/333068098#L4504
  2. scipy, netcdf4, and h5netcdf backends are all failing inside dask-distributed: https://travis-ci.org/pydata/xarray/jobs/333068098#L4919

The good news here is that only 8 tests are failing after applying the array wrapper so I suspect we're quite close. I'm hoping @shoyer may have some ideas on (1) since I think he had implemented some scipy workarounds in the past. @mrocklin, I'm hoping you can point me in the right direction.

All of these tests are reproducible locally.

(BTW, I have a use case that is going to need this functionality so I'm personally motivated to see it across the finish line)

@@ -55,6 +55,18 @@ def __getitem__(self, key):
copy = self.datastore.ds.use_mmap
return np.array(data, dtype=self.dtype, copy=copy)

def __setitem__(self, key, value):
with self.datastore.ensure_open(autoclose=True):
data = self.get_array()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be self.datastore.ds.variables[self.variable_name] (a netcdf_variable object), not self.get_array(), which returns the .data (a numpy array). You can't enlarge a numpy array, but can enlarge a scipy netcdf variable.

(This manifests itself in writing a netcdf file with a time dimension of length 0. Xarray then crashes when attempting to a decode a length 0 time variable, which is an unrelated bug.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shoyer - this appears to fix the scipy issue.

@rabernat
Copy link
Contributor

Kudos for pushing this forward. I don't have much help to offer, but I wanted to recognize your effort...this is hard stuff!.

@shoyer
Copy link
Member

shoyer commented Jan 25, 2018

Has anyone successfully used dask.array.store() with the distributed scheduler?

@mrocklin
Copy link
Contributor

I can take a look at the future not iterable issue sometime tomorrow.

Has anyone successfully used dask.array.store() with the distributed scheduler?

My guess is that this would be easy with a friendly storage target. I'm not sure though. cc @jakirkham who has been active on this topic recently.

@jakirkham
Copy link

Yep, using dask.array.store regularly with the distributed scheduler both on our cluster and in a local Docker image for testing. Am using Zarr Arrays as the targets for store to write to. Basically rechunk the data to match the chunking selected for the Zarr Array and then write out in parallel lock-free.

Our cluster uses NFS for things like one's home directory. So these are accessible across nodes. Also there are other types of storage available that are a bit faster and still remain accessible across nodes. So these work pretty well.

@jhamman
Copy link
Member Author

jhamman commented Jan 26, 2018

Yes, the zarr backend here in xarray is also using dask.array.store and seems to work with distributed just fine.

@rabernat
Copy link
Contributor

rabernat commented Jan 26, 2018

I have definitely used the distributed scheduler with dask.array.store both via Zarr and via a custom store class I wrote: https://gist.github.com/rabernat/e54755e7de4eb5a93cc4e7f9f903e3cc

But I cannot recall if I ever got it to work with netCDF.

@jhamman
Copy link
Member Author

jhamman commented Jan 28, 2018

xref: #798 and dask/dask#2488 which are both seem to be relevant to this discussion.

I'm also remembering @pwolfram was quite involved with the original distributed integration so pinging him to see if he is interested in this.

@shoyer
Copy link
Member

shoyer commented Feb 2, 2018

Looking into this a little bit, this looks like a dask-distributed bug to me. Somehow Client.get() is returning a tornado.concurrent.Future object, even though sync=True.

if self._autoclose and not self._isopen:

if autoclose is None:
autoclose = self._autoclose
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably use some additional thinking.

@jhamman
Copy link
Member Author

jhamman commented Feb 28, 2018

I've added some additional tests and cleaned up the implementation a bit. I'd like to get reviews from a few folks and hopefully get this merged later this week.


# Question: Should we be dropping one of these two locks when they are they
# are basically the same. For instance, when using netcdf4 and dask is not
# installed, locks will be [threading.Lock(), threading.Lock()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is harmless, as long as these are different lock instances.

On the other hand, something like CombinedLock([lock, lock]) would never be satisfied because it's impossible to unlock a lock twice.

# per file lock
# Dask locks take a name argument (e.g. filename)
locks.append(SchedulerLock(path_or_file))
except TypeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be less error prone to pass the name to get_scheduler_lock and have it return a lock instance.

return any(lock.locked for lock in self.locks)

def __repr__(self):
return "CombinedLock(%s)" % [repr(lock) for lock in self.locks]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you could equivalently substitute "CombinedLock(%r)" % list(self.locks) here.



# Does this belong elsewhere?
os.environ["HDF5_USE_FILE_LOCKING"] = "FALSE"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to use a context manager or decorator on the test, something along the lines of https://stackoverflow.com/questions/2059482/python-temporarily-modify-the-current-processs-environment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I think we can actually do this with pytest/monkeypatch.

Copy link
Contributor

@rabernat rabernat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pretty heavy duty! Nice work Joe!

@@ -38,6 +38,13 @@ Documentation
Enhancements
~~~~~~~~~~~~

- Support for writing netCDF files from xarray datastores (scipy and netcdf4 only)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be "to xarray datastores"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, I think it makes sense as is.

Maybe "Support for writing xarray datasets to netCDF files..."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nice to see you were able to get this to work with SciPy!

@@ -356,6 +356,8 @@ def prepare_variable(self, name, variable, check_encoding=False,

fill_value = _ensure_valid_fill_value(attrs.pop('_FillValue', None),
dtype)
if variable.encoding == {'_FillValue': None} and fill_value is None:
variable.encoding = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this fix a specific issue?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, this crept in from #1869.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this fix #1955?


import pytest

dask = pytest.importorskip('dask') # isort:skip
distributed = pytest.importorskip('distributed') # isort:skip

from dask import array
from dask.distributed import Client, Lock
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mrocklin - would you mind looking at the test implementation I have here and let us know if you see anything that would be causing the default (global) dask scheduler to be permanently overridden. In #1971, I pointed to some test failures that appear coming from a unexpected scheduler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything seems fine to me. What is dask._config.globals['get']? Is it the get method of a client? To debug you might consider giving each of your clients a name

with Client(s['address'], name='test-foo') as client

and then seeing which one isn't getting cleaned up? You can also try distributed.client.default_client(). We clean things up in the __exit__() call though, so as long as you're using context managers or @gen_cluster everything should work fine.

"""

def __init__(self, locks):
self.locks = tuple(set(locks)) # remove duplicates
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previous test failures were having trouble in __enter__ when iterating over a set of locks. casting to list/tuple seems to have resolved that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. I would if non-deterministic ordering of set iteration (e.g., after serialization/unserialization) contributed to that.

@jhamman
Copy link
Member Author

jhamman commented Mar 8, 2018

All the test are passing here. I would appreciate another round of reviews.

@shoyer - all of your previous comments have been addressed.

Copy link
Member

@shoyer shoyer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all looks good to me now.

Nice work tracking down everything that could go wrong here!

"""

def __init__(self, locks):
self.locks = tuple(set(locks)) # remove duplicates
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh. I would if non-deterministic ordering of set iteration (e.g., after serialization/unserialization) contributed to that.

@jhamman
Copy link
Member Author

jhamman commented Mar 9, 2018

Any final comments on this? If not, I'll probably merge this in the next day or two.

@shoyer shoyer mentioned this pull request Mar 9, 2018
3 tasks
@jhamman jhamman mentioned this pull request Mar 10, 2018
4 tasks
@jhamman jhamman merged commit 2f590f7 into pydata:master Mar 10, 2018
@jhamman jhamman deleted the feature/distributed_writes branch March 10, 2018 15:43
return nc4_var, variable.data
target = NetCDF4ArrayWrapper(name, self)

return target, variable.data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhamman
This is too late, but I think nc4_var is never used. Is it correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fujiisoup - it is used in line 405 (nc4_var.setncattr(k, v)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants