Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Challenges running xarray wrapped netcdf files #629

Closed
pwolfram opened this issue Nov 6, 2016 · 48 comments
Closed

Challenges running xarray wrapped netcdf files #629

pwolfram opened this issue Nov 6, 2016 · 48 comments

Comments

@pwolfram
Copy link

pwolfram commented Nov 6, 2016

This is a traceback from calling compute on an XArray computation on dask.distributed.

We're able to use dask.array on a NetCDF4 object without locks if our workers have single threads. However, when computing on the .data attribute backed by a NetCDF object wrapped by a few XArray containers we run into the following error. It appears to be coming from computing the shape, which is odd. Traceback below:

cc @mrocklin @shoyer

In [168]: ds = xr.open_mfdataset(fname, lock=False)

In [169]: ds.yParticle.data.sum().compute()
/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/conventions.pyc in shape()
    447     @property
    448     def shape(self):
--> 449         return self.array.shape[:-1]
    450 
    451     def __str__(self):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
@mrocklin
Copy link
Member

mrocklin commented Nov 6, 2016

I worked with @pwolfram to produce that traceback. We were able to get things working if we dove into the .dask graph to get the NetCDF file and then wrap it with a dask.array, something like the following:

xr_var = ds.yParticle.data.dask['some-key']
var = xr_var.array.array.array # unpack

x = da.from_array(var, chunks=(...))
x.sum().compute()  # works ok

@shoyer
Copy link
Member

shoyer commented Nov 6, 2016

This makes sense now that I'm looking at the full traceback.

This making use of conventions.CharToStringArray, another array-like type that xarray uses for decoding string data, for which we are relying on default serialization (pickle). The netCDF4.Variable get unpickled an no longer points to an open file.

Let's try this again opening the dataset with decode_cf=False.

@pwolfram
Copy link
Author

pwolfram commented Nov 6, 2016

It looks the same.

@shoyer
Copy link
Member

shoyer commented Nov 6, 2016

Somehow we seem to be using a CharToStringArray....

On Sun, Nov 6, 2016 at 2:33 PM, Phillip Wolfram notifications@github.com
wrote:

It looks the same.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/ABKS1stI69DmsWS6PvJ_rjuZMoWRurNxks5q7lWngaJpZM4Kqsny
.

@shoyer
Copy link
Member

shoyer commented Nov 6, 2016

What are the types of each level of xr_var.array.array.array?

Also try concat_characters=False

@shoyer
Copy link
Member

shoyer commented Nov 6, 2016

Also: xr.open_dataset(fname, lock=False, chunks={}) should make a dask array without the mulit-file logic.

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

concat_characters=False is the same error.

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

levels of xr_var.array.array.array:

In [204]: type(xr_var.array)
Out[204]: xarray.core.indexing.LazilyIndexedArray

In [205]: type(xr_var.array.array)
Out[205]: xarray.backends.netCDF4_.NetCDF4ArrayWrapper

In [206]: type(xr_var.array.array.array)
Out[206]: netCDF4._netCDF4.Variable

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

xr.open_dataset(fname, lock=False, chunks={}) is the same error.

@shoyer
Copy link
Member

shoyer commented Nov 7, 2016

Following your stacktrace down, I see five array types:

LazilyIndexedArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
--> 386         for size, k in zip(self.array.shape, self.key):

CharToStringArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/conventions.pyc in shape()
--> 449         return self.array.shape[:-1]

LazilyIndexedArray:

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
--> 386         for size, k in zip(self.array.shape, self.key):

NetCDF4ArrayWrapper (most likely, definitely a NDArrayMixin subclass):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
--> 409         return self.array.shape

netCDF4.Variable:

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

So it really looks like dask-distributed is choking on CharToStringArray.

In particular, it appears that the original error is from a different array than xr_var. Are you sure you're pulling out the top level key from the dask dict?

To be doubly sure, what is the dtype of ds.yParticle? It is also conceivable (though I think unlikely) that some other array with string data has been pulled into it's dask graph (ds.yParticle.data.dask). Some use of dask.optimize.cull could test that hypothesis.

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute?

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

In particular, it appears that the original error is from a different array than xr_var. Are you sure you're pulling out the top level key from the dask dict?

In [214]: ds.yParticle.data.dask
Out[214]: 
{(u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eb',
  0,
  0): (<function dask.array.core.getarray>,
  u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a',
  (slice(0, 31, None), slice(0, 1012000, None))),
 u'fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a': LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
 float64 yParticle(Time, nParticles)
 unlimited dimensions: Time
 current shape = (31, 1012000)
 filling off
 ), key=(slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None)))}
In [215]: x = ds.yParticle.data.dask['fname:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394
     ...: a']

So I think this is the top level key from the dask dict, but note that this may not be the case

In [219]: ds.yParticle.data.dask.keys()
Out[219]: 
[(u'filename:/yParticle-846a0722e86ecac24903e03f48aa35eb',
  0,
  0),
 u'filename:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-f52ad42f43568bf502049f452c4b394a']

In [220]: 

In [220]: x = ds.yParticle.data.dask['In [219]: ds.yParticle.data.dask.keys()
  File "<ipython-input-220-f7fc92c41ba8>", line 1
    x = ds.yParticle.data.dask['In [219]: ds.yParticle.data.dask.keys()
                                                                      ^
SyntaxError: EOL while scanning string literal


In [221]: x = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eb']
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-221-94db2c484015> in <module>()
----> 1 x = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eb']

KeyError: 'filename:/yParticle-846a0722e86ecac24903e03f48aa35eb'

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

In [223]: type(ds.yParticle)
Out[223]: xarray.core.dataarray.DataArray

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

@shoyer, can you please clarify:

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute?

I don't think this is entirely what you mean:

In [244]: x.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-244-f4c69c9c1276> in <module>()
----> 1 x.compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'compute'

In [245]: x.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-245-d906770c195f> in <module>()
----> 1 x.array.compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'compute'

In [246]: x.array.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-246-a1bb7b5fa51a> in <module>()
----> 1 x.array.array.compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'compute'

In [247]: x.array.array.array.compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-247-975c3b146207> in <module>()
----> 1 x.array.array.array.compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

or

In [249]: x.array.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-249-63306814fa5b> in <module>()
----> 1 x.array.array.array.sum().compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

In [250]: x.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-250-77cf68e730e3> in <module>()
----> 1 x.array.array.sum().compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'sum'

In [251]: x.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-251-2714bd5c5439> in <module>()
----> 1 x.array.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [252]: x.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-252-7ce0149d8b09> in <module>()
----> 1 x.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

but hopefully this is helpful.

@shoyer
Copy link
Member

shoyer commented Nov 7, 2016

One other thing to try: at what level of xr_var[.array[.array[.array]]], if any, does the dask array fail to compute?
I don't think this is entirely what you mean:

I mean, e.g.,

chunks = 1e7
da.from_array(xr_var, chunks=chunks, dtype=xr_var.dtype).sum().compute()
da.from_array(xr_var.array, chunks=chunks, dtype=xr_var.array.dtype).sum().compute()
da.from_array(xr_var.array.array, chunks=chunks, dtype=xr_var.array.array.dtype).sum().compute()

I can also make CharToStringArray serialized with dask-distributed on my xarray branch (though as I discussed with @mrocklin today, we will want a slightly different solution later).

@shoyer
Copy link
Member

shoyer commented Nov 7, 2016

Try the latest version of my xarray branch which implements CharToStringArray serialization. If you're still getting the same error, I will be surprised!

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

@shoyer, this is what I get now after updating xarray. It is as you expect-- a different error message:

ds = xr.open_mfdataset(fname, lock=False)
type(ds.yParticle.data)
x = ds.yParticle.data
x.sum().compute()

with output of

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID

I'm also getting this:

In [31]: xr_var = ds.yParticle.data.dask['filename:/yParticle-846a0722e86ecac24903e03f48aa35eboriginal-02b72739da348136ce68ab1de5142905']

In [32]: xr_var.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-32-96eb284a6133> in <module>()
----> 1 xr_var.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [33]: xr_var.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-33-4dcf0a832ed4> in <module>()
----> 1 xr_var.array.sum().compute()

AttributeError: 'LazilyIndexedArray' object has no attribute 'sum'

In [34]: xr_var.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-34-e041c5367803> in <module>()
----> 1 xr_var.array.array.sum().compute()

AttributeError: 'NetCDF4ArrayWrapper' object has no attribute 'sum'

In [35]: xr_var.array.array.array.sum().compute()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-35-a7bbf15b7a38> in <module>()
----> 1 xr_var.array.array.array.sum().compute()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getattr__ (netCDF4/_netCDF4.c:36798)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.getncattr (netCDF4/_netCDF4.c:34035)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4._get_att (netCDF4/_netCDF4.c:4265)()

AttributeError: NetCDF: Attribute not found

Also,

In [36]: da.from_array(xr_var, chunks=chunks).sum().compute()
/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
```python

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [42]: da.from_array(xr_var.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 398, in __getitem__
    return type(self)(self.array, self._updated_key(key))
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 374, in _updated_key
    new_key = iter(canonicalize_indexer(new_key, self.ndim))
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.py", line 382, in ndim
    return len(self.shape)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 386, in shape
    for size, k in zip(self.array.shape, self.key):
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.py", line 409, in shape
    return self.array.shape
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-42-2b115a3e59e3> in <module>()
----> 1 da.from_array(xr_var.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in __getitem__()
    396 
    397     def __getitem__(self, key):
--> 398         return type(self)(self.array, self._updated_key(key))
    399 
    400     def __setitem__(self, key, value):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in _updated_key()
    372 
    373     def _updated_key(self, new_key):
--> 374         new_key = iter(canonicalize_indexer(new_key, self.ndim))
    375         key = []
    376         for size, k in zip(self.array.shape, self.key):

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in ndim()
    380     @property
    381     def ndim(self):
--> 382         return len(self.shape)
    383 
    384     @property

/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.pyc in shape()
    384     def shape(self):
    385         shape = []
--> 386         for size, k in zip(self.array.shape, self.key):
    387             if isinstance(k, slice):
    388                 shape.append(len(range(*k.indices(size))))

/users/pwolfram/lib/python2.7/site-packages/xarray/core/utils.pyc in shape()
    407     @property
    408     def shape(self):
--> 409         return self.array.shape
    410 
    411     def __array__(self, dtype=None):

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [43]: da.from_array(xr_var.array.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 59, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3671, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-43-8841ee6791ee> in <module>()
----> 1 da.from_array(xr_var.array.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.pyc in __getitem__()
     57 
     58         try:
---> 59             data = getitem(self.array, key)
     60         except IndexError:
     61             # Catch IndexError in netCDF4 and return a more informative error

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID
In [44]: da.from_array(xr_var.array.array.array, chunks=chunks).sum().compute()
distributed.utils - ERROR - NetCDF: Not a valid ID
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 49, in getarray
    c = a[b]
  File "netCDF4/_netCDF4.pyx", line 3671, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)
  File "netCDF4/_netCDF4.pyx", line 3378, in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)
  File "netCDF4/_netCDF4.pyx", line 3323, in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)
RuntimeError: NetCDF: Not a valid ID
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-44-1cc685a6c727> in <module>()
----> 1 da.from_array(xr_var.array.array.array, chunks=chunks).sum().compute()

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(self, **kwargs)
     76             Extra keywords to forward to the scheduler ``get`` function.
     77         """
---> 78         return compute(self, **kwargs)[0]
     79 
     80     @classmethod

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    176         dsk = merge(var.dask for var in variables)
    177     keys = [var._keys() for var in variables]
--> 178     results = get(dsk, keys, **kwargs)
    179 
    180     results_iter = iter(results)

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in get(self, dsk, keys, restrictions, loose_restrictions, **kwargs)
   1290 
   1291         try:
-> 1292             results = self.gather(futures)
   1293         except (KeyboardInterrupt, Exception) as e:
   1294             for f in futures.values():

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in gather(self, futures, errors, maxsize)
    881             return (self.gather(f, errors=errors) for f in futures)
    882         else:
--> 883             return sync(self.loop, self._gather, futures, errors=errors)
    884 
    885     @gen.coroutine

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in sync(loop, func, *args, **kwargs)
    132         e.wait(1000000)
    133     if error[0]:
--> 134         six.reraise(type(error[0]), error[0], traceback[0])
    135     else:
    136         return result[0]

/users/pwolfram/lib/python2.7/site-packages/distributed/utils.pyc in f()
    118     def f():
    119         try:
--> 120             result[0] = yield gen.maybe_future(func(*args, **kwargs))
    121         except Exception as exc:
    122             logger.exception(exc)

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1013 
   1014                     try:
-> 1015                         value = future.result()
   1016                     except Exception:
   1017                         self.had_exception = True

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.pyc in result(self, timeout)
    235             return self._result
    236         if self._exc_info is not None:
--> 237             raise_exc_info(self._exc_info)
    238         self._check_done()
    239         return self._result

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.pyc in run(self)
   1019 
   1020                     if exc_info is not None:
-> 1021                         yielded = self.gen.throw(*exc_info)
   1022                         exc_info = None
   1023                     else:

/users/pwolfram/lib/python2.7/site-packages/distributed/client.pyc in _gather(self, futures, errors)
    800                             six.reraise(type(d['exception']),
    801                                         d['exception'],
--> 802                                         d['traceback'])
    803                         except KeyError:
    804                             six.reraise(CancelledError,

/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.pyc in getarray()
     47         lock.acquire()
     48     try:
---> 49         c = a[b]
     50         if type(c) != np.ndarray:
     51             c = np.asarray(c)

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37111)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable.shape.__get__ (netCDF4/_netCDF4.c:32778)()

netCDF4/_netCDF4.pyx in netCDF4._netCDF4.Variable._getdims (netCDF4/_netCDF4.c:31870)()

RuntimeError: NetCDF: Not a valid ID

@pwolfram
Copy link
Author

pwolfram commented Nov 7, 2016

@shoyer, I may be more useful here too if you provide some additional guidance on the debug strategy here but am of course happy to keep trying things.

@shoyer
Copy link
Member

shoyer commented Nov 8, 2016

I don't understand what is going on here. It seems dask is maybe not (de)serializing the netCDF4 variables correctly.

I put up an alternative xarray hack in pydata/xarray#1095 that passes a more extensive integration test, so that might be worth a try.

@pwolfram
Copy link
Author

pwolfram commented Nov 8, 2016

@shoyer and @mrocklin, this looks like it is working now using pydata/xarray#1095:

In [1]: from dask.distributed import Client

In [2]: client = Client('wf609:8786')

In [3]: client
Out[3]: <Client: scheduler="wf609:8786" processes=2 cores=32>

In [5]: import dask.array as da

In [6]: import xarray as xr

In [7]: ds = xr.open_mfdataset('fname', lock=False)

In [8]: x = ds.yParticle.data

In [9]: x.sum().compute()
Out[9]: 31347046718055.527

In [10]: ds = xr.open_mfdataset('./lagrPartTrack.*.nc', lock=False)

In [11]: x = ds.yParticle.data

In [12]: x.sum().compute()
Out[12]: 525875176622133.69

Would this naturally suggest that xarray-distributed is now a reality? If so, I should try something more complex when I get the time tomorrow.

@pwolfram
Copy link
Author

pwolfram commented Nov 8, 2016

@shoyer, note it is possible I'm not correctly testing so if you want me to run a particular scenario again please let me know so that we can double-check that it is working.

@mrocklin
Copy link
Member

mrocklin commented Nov 8, 2016

This looks pretty exciting to me :)

@shoyer
Copy link
Member

shoyer commented Nov 8, 2016

Glad that worked! I was getting worried there.

Does ds.yParticle.sum() or ds.sum() work? That would be closer to the usual xarray workflow.

@pwolfram
Copy link
Author

pwolfram commented Nov 9, 2016

HPC allocations are being transitioned-- sorry about the delay. Hopefully I'll be able to verify this later today or tomorrow.

@pwolfram
Copy link
Author

@shoyer, HPC is back up and both appear to work although we get a memory error for the dataset-based sum.

In [5]: ds = xr.open_mfdataset(filenames, lock=False)
client

In [6]: client
Out[6]: <Client: scheduler="wf332:8786" processes=2 cores=32>

In [7]: ds.yParticle.sum()
Out[7]: 
<xarray.DataArray 'yParticle' ()>
dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>

In [8]: ds.sum()
Out[8]: distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError

@mrocklin
Copy link
Member

It's odd to get a memory error on sum regardless. Any feedback from the diagnostic page? You might also want to check out localhost:8787/workers to get per-worker information.

@pwolfram
Copy link
Author

Diagnostics page-- do you mean http://localhost:8787/status? I didn't see anything but the internet here is really bad so it is possible I've missed something.

@mrocklin
Copy link
Member

Check out http://localhost:8787/workers

On Thu, Nov 10, 2016 at 1:15 PM, Phillip Wolfram notifications@github.com
wrote:

Diagnostics page-- do you mean http://localhost:8787/status? I didn't see
anything but the internet here is really bad so it is possible I've missed
something.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszI589svP8T-VcjqPRn0yatFDsxA5ks5q8184gaJpZM4Kqsny
.

@pwolfram
Copy link
Author

It was failing on uVertexVelocity:

distributed.worker - WARNING -  Compute Failed
Function: execute_task
args:     ((<built-in function apply>, <functools.partial object at 0x2b5975418368>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (30, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 30, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs:   {}
None
distributed.worker - WARNING -  Compute Failed
Function: execute_task
args:     ((<built-in function apply>, <functools.partial object at 0x2b59b042c4c8>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (31, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 31, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs:   {}
None
distributed.worker - INFO - Deleted 15 keys

but works for just that field:

In [9]: ds.uVertexVelocity.sum()
Out[9]: 
<xarray.DataArray 'uVertexVelocity' ()>
dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>

@mrocklin
Copy link
Member

If you upgrade to distributed master the workers will be a bit more
pragmatic about defending memory by using disk. You probably shouldn't be
running into these issues regardless if you're just computing sums, but
it's something to try.

pip install git+https://github.com/dask/distributed.git --upgrade

On Thu, Nov 10, 2016 at 1:22 PM, Phillip Wolfram notifications@github.com
wrote:

It was failing on uVertexVelocity:

distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((, <functools.partial object at 0x2b5975418368>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (30, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 30, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs: {}None
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((, <functools.partial object at 0x2b59b042c4c8>, [(<function getarray at 0x2b59d213ab18>, LazilyIndexedArray(array=LazilyIndexedArray(array=NetCDF4ArrayWrapper(array=<type 'netCDF4._netCDF4.Variable'>
float64 uVertexVelocity(Time, nVertices, nVertLevels)
unlimited dimensions: Time
current shape = (31, 184400, 100)
filling off
), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), key=(slice(None, None, None), slice(None, None, None), slice(None, None, None))), (slice(0, 31, None), slice(0, 184400, None), slice(0, 100, None)))], {'keepdims': True, 'axis': (0, 1, 2)}))
kwargs: {}None
distributed.worker - INFO - Deleted 15 keys

but works for just that field:

In [9]: ds.uVertexVelocity.sum()
Out[9]: <xarray.DataArray 'uVertexVelocity' ()>
dask.array<sum-agg..., shape=(), dtype=float64, chunksize=()>


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszKVkk2qgU0KaHeSM_Orc5_J1i1Cfks5q82DdgaJpZM4Kqsny
.

@pwolfram
Copy link
Author

The http://localhost:8787/workers didn't reveal anything too special but this could be because of the high internet latency on my connection.


@pwolfram
Copy link
Author

Additional thoughts on this issue?

@pwolfram
Copy link
Author

This field is 75.1 GB so it is possible we are overshooting the 2 X 64GB RAM of the distributed cluster. This is probably what is happening. I'll try again with another 2 nodes.

@mrocklin
Copy link
Member

That page should show the memory consumption by each process.

Another thing you could do is try to replicate the computation using only
dask.array. This would mean something like

variables = [ds.Var.data, ds.Var2.data, ...]
dask.compute(*[v.sum() for v in variables])

This would help to isolate the issue between dask.array/distributed and
xarray

On Thu, Nov 10, 2016 at 1:32 PM, Phillip Wolfram notifications@github.com
wrote:

Additional thoughts on this issue?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszG06aCDyPuLLa9ks2L2LYcwLS1Onks5q82NQgaJpZM4Kqsny
.

@mrocklin
Copy link
Member

The normal dask tricks of streaming computations through memory should
still be working here. Also, we should be spilling to disk to avoid
MemoryErrors.

On Thu, Nov 10, 2016 at 1:37 PM, Matthew Rocklin mrocklin@continuum.io
wrote:

That page should show the memory consumption by each process.

Another thing you could do is try to replicate the computation using only
dask.array. This would mean something like

variables = [ds.Var.data, ds.Var2.data, ...]
dask.compute(*[v.sum() for v in variables])

This would help to isolate the issue between dask.array/distributed and
xarray

On Thu, Nov 10, 2016 at 1:32 PM, Phillip Wolfram <notifications@github.com

wrote:

Additional thoughts on this issue?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszG06aCDyPuLLa9ks2L2LYcwLS1Onks5q82NQgaJpZM4Kqsny
.

@pwolfram
Copy link
Author

It worked with 4 nodes, so we were running out of memory on-node.

@pwolfram
Copy link
Author

I'm upgrading distributed and will try again on 2 nodes

@mrocklin
Copy link
Member

That is a little bit surprising.

On Thu, Nov 10, 2016 at 1:42 PM, Phillip Wolfram notifications@github.com
wrote:

It worked with 4 nodes, so we were running out of memory on-node.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#629 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AASszDT3LtnYeTiWcIbPUa0bY45KI31iks5q82WegaJpZM4Kqsny
.

@pwolfram
Copy link
Author

pwolfram commented Nov 10, 2016

Interesting, I'm getting a new error: turns out I just needed to reimport distributed.

@pwolfram
Copy link
Author

Forget the last comment-- restart of distributed fixed it.

@shoyer
Copy link
Member

shoyer commented Nov 10, 2016

Dataset.sum() only sums variables individually -- it's a pretty shallow wrapper around da.sum.

@pwolfram
Copy link
Author

I'm still getting the memory error on two nodes, however:

In [1]: from dask.distributed import Client

In [2]: client = Client('host:8786')

In [3]: import xarray as xr

In [4]: ds = xr.open_mfdataset('/net/scratch3/pwolfram/ZISO_5km/realizations/realization_24-01/analysis_members/lagrPartTrack.*.nc', lock=False)
ds.sum()
In [5]: ds.sum()
Out[5]: distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError
---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)

@pwolfram
Copy link
Author

The issue appears to be on the dask side, assuming I've set this up right:

In [28]: ds = xr.open_mfdataset(fnames, lock=False)

In [29]: x = ds.uVertexVelocity.data.sum()

In [30]: x.compute()
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/utils.py", line 120, in f
    result[0] = yield gen.maybe_future(func(*args, **kwargs))
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/users/pwolfram/lib/python2.7/site-packages/distributed/client.py", line 802, in _gather
    d['traceback'])
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/dask/array/core.py", line 51, in getarray
    c = np.asarray(c)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/net/scratch3/pwolfram/miniconda2/lib/python2.7/site-packages/numpy/core/numeric.py", line 482, in asarray
    return array(a, dtype, copy=False, order=order)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/core/indexing.py", line 389, in __array__
    return np.asarray(array[self.key], dtype=None)
  File "/users/pwolfram/lib/python2.7/site-packages/xarray/backends/netCDF4_.py", line 73, in __getitem__
    data = getitem(self.array, key)
  File "netCDF4/_netCDF4.pyx", line 3695, in netCDF4._netCDF4.Variable.__getitem__ (netCDF4/_netCDF4.c:37923)
  File "netCDF4/_netCDF4.pyx", line 4363, in netCDF4._netCDF4.Variable._get (netCDF4/_netCDF4.c:46946)
MemoryError

@pwolfram
Copy link
Author

Note,

In [31]: ds.uVertexVelocity.nbytes*2**-30
Out[31]: 75.15162229537964

and this is 2 nodes, 64GB each.

@shoyer
Copy link
Member

shoyer commented Dec 1, 2016

There is still the issue of too many open files, but as of pydata/xarray#1128 xarray data stores are pickleable, which will enable their use with dask-distributed.

@pwolfram
Copy link
Author

@shoyer and @mrocklin, is this an issue we can close after pydata/xarray#1198 is merged? The primary reason we opened this issue was to be a placeholder to work through issues related to dask.distributed and xarray integration.

@pwolfram
Copy link
Author

@mrocklin, can we close this issue?

@mrocklin
Copy link
Member

Sure.

@edougherty32
Copy link

Hi, I'm having the same issue in receiving the error message:

RuntimeError: NetCDF: Not a valid ID

When trying to get values from a dask array after performing a computation. Though I see this issue was resolved, using #pydata/xarray#1095, I don't see the explicit solution.

Could you please redirect me to this solution? Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants