Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove caching logic from xarray.Variable #1128

Merged
merged 14 commits into from
Nov 30, 2016
Merged

Conversation

shoyer
Copy link
Member

@shoyer shoyer commented Nov 16, 2016

This is a follow-up to generalize the changes from #1024:

  • Caching and copy-on-write behavior has been moved to separate array classes
    that are explicitly used in open_dataset to wrap arrays loaded from disk (if
    cache=True).
  • Dask specific logic has been removed from the caching/loading logic on
    xarray.Variable.
  • Pickle no longer caches automatically under any circumstances.
  • DataStore objects are now pickleable. This makes it feasible to use xarray's IO with dask-distributed or multi-processing.

Still needs tests for the cache argument to open_dataset, but everything
else seems to be working.

@crusaderky @kynan please review

This is a follow-up to generalize the changes from pydata#1024:

- Caching and copy-on-write behavior has been moved to separate array classes
  that are explicitly used in `open_dataset` to wrap arrays loaded from disk (if
  `cache=True`).
- Dask specific logic has been removed from the caching/loading logic on
  `xarray.Variable`.
- Pickle no longer caches automatically under any circumstances.

Still needs tests for the `cache` argument to `open_dataset`, but everything
else seems to be working.
@shoyer shoyer changed the title Disable all caching on xarray.Variable Move caching from xarray.Variable to separate objects Nov 16, 2016
@shoyer shoyer changed the title Move caching from xarray.Variable to separate objects Remove caching logic from xarray.Variable Nov 16, 2016
Copy link

@kynan kynan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than the case mentioned below this seems to preserve lazy data in the few cases I tried.

In the long run I think it would be more robust to check for attributes (duck type style) rather than types in the various places.

One option could be to equip all array wrappers with a method like to_ndarray (which would be a no-op for lazy wrappers) which would be called instead of explicitly casting to np.ndarray. That would also open the door to using lazy arrays other than dask.

Does that sound feasible or am I missing something here?

Regardless, that shouldn't prevent this pr from going in.

for name, variable in dataset.variables.items():
if name not in variable.dims:
# no need to protect IndexVariable objects
data = indexing.CopyOnWriteArray(variable._data)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran a test where I create a DataSet from a custom data store which initializes Variables using dask arrays for data. In this case the dask arrays is still converted to an ndarray when accessing the Variable's data property, since it checks is for a dask array type, however here the array is wrapped into a CopyOnWriteArray, which means Variable.values is called, which loads eagerly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, you would need to use cache=False in such a case.

Xarray's decoding logic in conventions.py uses it's own array objects instead of dask arrays (for reasons I could get into), which unfortunately makes using dask.array objects to produce variables in a custom data store non-ideal. The problem is that the graphs from such dask arrays don't get linked up into xarray, which means that even if you rechunk the arrays in the xarray Dataset, they still get executed separately by dask. Duck typing for dask objects would probably help here (dask/dask#1068) .

if not isinstance(v.data, dask_array_type):
v.load()
"""Get this object's state for pickling"""
# we need a custom method to avoid
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incomplete

@shoyer
Copy link
Member Author

shoyer commented Nov 18, 2016

In the long run I think it would be more robust to check for attributes (duck type style) rather than types in the various places.

Indeed, in particular I'm not very happy with the isinstance check for indexing.MemoryCachedArray in Variable.copy() -- it's rather poor separation of concerns.

It exists so that variable.compute() does not cache data in-memory on variable but only on the computed variable. Otherwise, there's basically no point to the separate compute method: if you use cache=True, you are stuck with caching on the original object. Likewise, it ensures that .copy() creates an array with a new cache, which is consistent with the current behavior of .copy().

As for type checking for dask arrays in .data: yes, it would be nice to have a well defined array interface layer that other array types could plug into. That would entail a significant amount of further work, however.

Copy link
Contributor

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Stephen, I think that NoPickleMixin is conceptually a bad idea - see my comments to the code

# self.__dict__ is the default pickle object, we don't need to
# implement our own __setstate__ method to make pickle work
return self.__dict__

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't understand the purpose of this? The comment where you explain it is truncated.

# throw away any references to datastores in the pickle
state['_file_obj'] = None
return state
return self.__dict__
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you remove this method entirely?

'cannot pickle objects of type %r: call .compute() or .load() '
'to load data into memory first.' % type(self))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm really not a fan of this design.
A better approach to pickling file-based objects is to save the file path on getstate and open a new file descriptor on setstate. Relative vs. absolute paths should be preserved.
Of course there's no guarantee that the file will be there when you unpickle, or that it will be identical to the one that you pickled - but this is a caveat that's easily explained in the documentation.
The benefits of this are huge when you work interactively in Jupyter with plugins that pickle all session variables at once.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually hurts my xarray-based financial simulation project at my company. I start with a DataArray loaded from disk through dask, and then heavily manipulate it. I end up with ~3000 dask-based xarrays, each with up to 80,000 key:value pairs. At that point, since just generating the final objects took 5-10 minutes (no actual calculations yet), I pickle them, save them to disk and send them over the network.

Insofar I've had to load bespoke data from disk (numpy.fromfile manually wrapped with dask). In the future, however, I'd love to move to NetCDF for my on-disk storage... in other words I would have a huge dask tree with, at the very bottom of it, a dataset created by open_dataset(). And I would need to pickle the whole thing, with the obvious caveat that all your NetCDF files have to be in the same place, or pickle.load with fail with FileNotFoundError.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree -- adding pickle support to xarray's data stores would be better (it would also enable dask distributed). I add this only because it's better to give a sensible error message than for it to be silently broken, which is the existing state of affairs. For example, until Unidata/netcdf4-python#604 netCDF4 object wouldn't even error when pickled, even though they cannot be restored.

I will take a look at enabling pickle, but I think it may be tricky to get right (lots of related discussion in #798).

Can you clarify how this hurts your simulation project compared to the present state of affairs? I thought it would be possible to restore the current behavior by simply calling .compute() or .load() prior to pickle.dump.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I wasn't too clear - this change isn't a regression as much as my project is concerned - it just keeps the netcdf backend unusable. I agree that a proper pickling support for file-based stores can be added later on.

@shoyer
Copy link
Member Author

shoyer commented Nov 20, 2016

I removed the custom pickle override on Dataset/DataArray -- the issue I was working around was actually a indirect manifestation of bug on IndexVariable.load() (introduced in this PR).

@shoyer
Copy link
Member Author

shoyer commented Nov 21, 2016

I added pickle support to DataStores. This should solve the basic serialization issue for dask.distributed (#798), but does not yet resolve the "too many open files" issue.

@mrocklin this could use your review.

@shoyer
Copy link
Member Author

shoyer commented Nov 21, 2016

This isn't yet working with dask multiprocessing for reading a netCDF4 file with in-memory compression. I'm not quite sure why:

In [5]: from multiprocessing.pool import Pool

In [7]: ds = xr.open_dataset('big-random.nc', lock=False, chunks={'x': 2500})

In [8]: dask.set_options(pool=Pool(4))
Out[8]: <dask.context.set_options at 0x1087c3898>

In [9]: %time ds.sum().compute()
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-9-4c43356c48db> in <module>()
----> 1 get_ipython().magic('time ds.sum().compute()')

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/IPython/core/interactiveshell.py in magic(self, arg_s)
   2156         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2157         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2158         return self.run_line_magic(magic_name, magic_arg_s)
   2159
   2160     #-------------------------------------------------------------------------

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/IPython/core/interactiveshell.py in run_line_magic(self, magic_name, line)
   2077                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2078             with self.builtin_trap:
-> 2079                 result = fn(*args,**kwargs)
   2080             return result
   2081

<decorator-gen-59> in time(self, line, cell, local_ns)

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/IPython/core/magic.py in <lambda>(f, *a, **k)
    186     # but it's overkill for just that one bit of state.
    187     def magic_deco(arg):
--> 188         call = lambda f, *a, **k: f(*a, **k)
    189
    190         if callable(arg):

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/IPython/core/magics/execution.py in time(self, line, cell, local_ns)
   1174         if mode=='eval':
   1175             st = clock2()
-> 1176             out = eval(code, glob, local_ns)
   1177             end = clock2()
   1178         else:

<timed eval> in <module>()

/Users/shoyer/dev/xarray/xarray/core/dataset.py in compute(self)
    348         """
    349         new = self.copy(deep=False)
--> 350         return new.load()
    351
    352     @classmethod

/Users/shoyer/dev/xarray/xarray/core/dataset.py in load(self)
    325
    326             # evaluate all the dask arrays simultaneously
--> 327             evaluated_data = da.compute(*lazy_data.values())
    328
    329             for k, data in zip(lazy_data, evaluated_data):

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/base.py in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179
    180     results_iter = iter(results)

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/threaded.py in get(dsk, result, cache, num_workers, **kwargs)
     67     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     68                         cache=cache, get_id=_thread_get_id,
---> 69                         **kwargs)
     70
     71     # Cleanup pools associated to dead threads

/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
    500                     _execute_task(task, data)  # Re-execute locally
    501                 else:
--> 502                     raise(remote_exception(res, tb))
    503             state['cache'][key] = res
    504             finish_task(dsk, key, state, results, keyorder.get)

RuntimeError: NetCDF: HDF error

Traceback
---------
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 268, in execute_task
    result = _execute_task(task, data)
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 248, in _execute_task
    args2 = [_execute_task(a, cache) for a in args]
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 248, in <listcomp>
    args2 = [_execute_task(a, cache) for a in args]
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 245, in _execute_task
    return [_execute_task(a, cache) for a in arg]
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 245, in <listcomp>
    return [_execute_task(a, cache) for a in arg]
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/Users/shoyer/dev/xarray/xarray/core/indexing.py", line 417, in __array__
    return np.asarray(self.array, dtype=dtype)
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/Users/shoyer/dev/xarray/xarray/core/indexing.py", line 392, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/Users/shoyer/conda/envs/xarray-dev/lib/python3.5/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/Users/shoyer/dev/xarray/xarray/core/indexing.py", line 392, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/Users/shoyer/dev/xarray/xarray/backends/netCDF4_.py", line 56, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37914)
  File "netCDF4/_netCDF4.pyx", line 4376, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:47134)

with close_on_error(ds):
self.ds = _nc4_group(ds, group, mode)
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth noting that this is only cloud-picklable, not stdlib pickleable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why this won't work with stdlib pickle? Is the issue doing the h5netcdf import inside the function definition? My understand was that functools.partial is pickleable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, you're right. I'm surprised:

In [1]: from operator import add

In [2]: from functools import partial

In [3]: from pickle import dumps, loads

In [4]: loads(dumps(partial(add, 1)))
Out[4]: functools.partial(<built-in function add>, 1)

In [5]: loads(dumps(partial(add, 1)))(2)
Out[5]: 3

engine=engine)
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_dataset_allclose(original, computed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks good to me.

@mrocklin
Copy link
Contributor

Does your failure work with the following spawning pool in Python 3?

In [1]: import multiprocessing

In [2]: ctx = multiprocessing.get_context('spawn')

In [3]: ctx.Pool(4)
Out[3]: <multiprocessing.pool.Pool at 0x7fec70afca20>

@shoyer
Copy link
Member Author

shoyer commented Nov 21, 2016

Does your failure work with the following spawning pool in Python 3?

Why, yes it does -- and it shows a nice speedup, as well! What was I missing here?

@mrocklin
Copy link
Contributor

Why, yes it does -- and it shows a nice speedup, as well! What was I missing here?

Spawn is only available in Python 3, so it's not a full solution. Something isn't fork-safe, possibly something within the HDF5 library?

You might also want to try forkserver and look at this semi-related PR dask/distributed#687

roundtripped.close()
unpickled_ds = pickle.loads(raw_pickle)
self.assertDatasetIdentical(expected, unpickled_ds)
unpickled_ds.close()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only failing on Windows in NetCDF4ViaDaskDataTest now.

I'm not quite sure how we are missing clean up there. In that case, the file/DataStore object is ending up in the dask graph of the arrays in roundtripped. We do close the file with .close(), but maybe those references to closed files hanging around are causing trouble?

@shoyer
Copy link
Member Author

shoyer commented Nov 28, 2016

OK, I'm ready to give up on the remaining test failures and merge this anyways (marking them as expected failures). They are specific to our test suite and for Windows only, due to the inability to delete files that are not closed.

If these manifest themselves as issues for real users, I am happy to revisit, especially if someone who uses Windows can help debug. The 5 minute feedback cycle of pushing a commit and then seeing what Appveyor says is too painful.

@mrocklin
Copy link
Contributor

@shoyer
Copy link
Member Author

shoyer commented Nov 28, 2016

@mrocklin OK, so one option is to just ignore the permission errors and not remove the files on Windows. But is it really better to make the test suite leak temp files?

@mrocklin
Copy link
Contributor

I agree that it's not great. This was more a show of solidarity that we've also run into this same issue and had to resort to similar hacks.

@shoyer
Copy link
Member Author

shoyer commented Nov 30, 2016

I decided that between the choices of not running these tests on Windows and leaking a few temp files, I would rather leak some temp files. So that's exactly what I've done in the latest commit, for explicitly whitelisted tests.

@shoyer
Copy link
Member Author

shoyer commented Nov 30, 2016

@kynan @crusaderky Do you have concerns about merging this in the current state?

@crusaderky
Copy link
Contributor

crusaderky commented Nov 30, 2016 via email

@kynan
Copy link

kynan commented Nov 30, 2016

No objections, go ahead!

@shoyer shoyer merged commit cc10cc5 into pydata:master Nov 30, 2016
@shoyer
Copy link
Member Author

shoyer commented Nov 30, 2016

OK, in it goes!

@mangecoeur
Copy link
Contributor

I'm trying out the latest code to subset a set of netcdf4 files with dask.multiprocessing using set_options(get=dask.multiprocessing.get) but I'm still getting TypeError: can't pickle _thread.lock objects - this expect or there something specific I need to do to make it work?

@shoyer
Copy link
Member Author

shoyer commented Dec 8, 2016

@mangecoeur You still need to use lock=False (or lock=dask.utils.SerializableLock() with the dev version of dask) and use a spawning process pool (#1128 (comment)).

The former should be updated internally, and the later should be a documentation note.

@mangecoeur
Copy link
Contributor

@shoyer thanks, with a little testing it seems lock=False is fine (so don't automatically need dask dev for lock=dask.utils.SerializableLock()). Using spawning pool is necessary, just doesn't work without. Also looks like using dask distributed ipython backend works fine (works similar to spawn pool in that the worker engines aren't forked but kinda live in their own little world) - this is really nice because ipython in turn has good support for HPC systems (SGE batch scheduling + MPI for process handling).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants