Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed writes #1793

Merged
merged 40 commits into from
Mar 10, 2018
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
63abe7f
distributed tests that write dask arrays
Dec 17, 2017
1952173
Change zarr test to synchronous API
mrocklin Dec 19, 2017
dd4bfcf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Dec 21, 2017
5c7f94c
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 11, 2018
9e70a3a
initial go at __setitem__ on array wrappers
Jan 11, 2018
ec67a54
fixes for scipy
Jan 12, 2018
2a4faa4
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 24, 2018
5497ad1
cleanup after merging with upstream/master
Jan 25, 2018
c2f5bb8
needless duplication of tests to work around pytest bug
Jan 25, 2018
5344fe8
use netcdf_variable instead of get_array()
Jan 25, 2018
7cbd2e5
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 28, 2018
49366bf
use synchronous dask.distributed test harness
mrocklin Feb 2, 2018
323f17b
Merge branch 'master' into feature/distributed_writes
Feb 2, 2018
81f7610
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 2, 2018
199538e
cleanup tests
Feb 2, 2018
d2050e7
per scheduler locks and autoclose behavior for writes
Feb 3, 2018
76675de
HDF5_LOCK and CombinedLock
Feb 6, 2018
9ac0327
integration test for distributed locks
Feb 6, 2018
05e7d54
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 18, 2018
1672968
more tests and set isopen to false when pickling
Feb 18, 2018
a667615
Fixing style errors.
stickler-ci Feb 18, 2018
2c0a7e8
ds property on DataStorePickleMixin
Feb 19, 2018
6bcadfe
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 19, 2018
aba6bdc
stickler-ci
Feb 19, 2018
5702c67
compat fixes for other backends
Feb 19, 2018
a06b683
HDF5_USE_FILE_LOCKING = False in test_distributed
Feb 21, 2018
efe8308
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 21, 2018
6ef31aa
style fix
Feb 21, 2018
00156c3
update tests to only expect netcdf4 to work, docstrings, and some cle…
Feb 22, 2018
3dcfac5
Fixing style errors.
stickler-ci Feb 22, 2018
29edaa7
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 27, 2018
61ee5a8
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 27, 2018
91f3c6a
fix imports
Feb 27, 2018
5cb91ba
fix more import bugs
Feb 27, 2018
2b97d4f
update docs
Feb 27, 2018
2dc514f
fix for pynio
Feb 28, 2018
eff0161
cleanup locks and use pytest monkeypatch for environment variable
Feb 28, 2018
5290484
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 6, 2018
c855284
fix failing test using combined lock
Mar 8, 2018
3c2ffbf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ Bug fixes
- Compatibility fixes to plotting module for Numpy 1.14 and Pandas 0.22
(:issue:`1813`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Fixed to_netcdf when using dask distributed (:issue:`1464`).
By `Joe Hamman <https://github.com/jhamman>`_..

.. _whats-new.0.10.0:

Expand Down
6 changes: 1 addition & 5 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,7 @@ def add(self, source, target):
self.sources.append(source)
self.targets.append(target)
else:
try:
target[...] = source
except TypeError:
# workaround for GH: scipy/scipy#6880
target[:] = source
target[...] = source

def sync(self):
if self.sources:
Expand Down
5 changes: 4 additions & 1 deletion xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ def prepare_variable(self, name, variable, check_encoding=False,

for k, v in iteritems(attrs):
nc4_var.setncattr(k, v)
return nc4_var, variable.data

target = H5NetCDFArrayWrapper(name, self)

return target, variable.data

def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
9 changes: 8 additions & 1 deletion xarray/backends/netCDF4_.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ def __init__(self, variable_name, datastore):
dtype = np.dtype('O')
self.dtype = dtype

def __setitem__(self, key, value):
with self.datastore.ensure_open(autoclose=True):
data = self.get_array()
data[key] = value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shoyer, is this what you were describing in #1464 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this looks right to me.

def get_array(self):
self.datastore.assert_open()
return self.datastore.ds.variables[self.variable_name]
Expand Down Expand Up @@ -398,7 +403,9 @@ def prepare_variable(self, name, variable, check_encoding=False,
# OrderedDict as the input to setncatts
nc4_var.setncattr(k, v)

return nc4_var, variable.data
target = NetCDF4ArrayWrapper(name, self)

return target, variable.data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhamman
This is too late, but I think nc4_var is never used. Is it correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fujiisoup - it is used in line 405 (nc4_var.setncattr(k, v)).


def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
17 changes: 16 additions & 1 deletion xarray/backends/scipy_.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ def __getitem__(self, key):
copy = self.datastore.ds.use_mmap
return np.array(data, dtype=self.dtype, copy=copy)

def __setitem__(self, key, value):
with self.datastore.ensure_open(autoclose=True):
data = self.get_array()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be self.datastore.ds.variables[self.variable_name] (a netcdf_variable object), not self.get_array(), which returns the .data (a numpy array). You can't enlarge a numpy array, but can enlarge a scipy netcdf variable.

(This manifests itself in writing a netcdf file with a time dimension of length 0. Xarray then crashes when attempting to a decode a length 0 time variable, which is an unrelated bug.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @shoyer - this appears to fix the scipy issue.

try:
data[key] = value
except TypeError:
if key is Ellipsis:
# workaround for GH: scipy/scipy#6880
data[:] = value
else:
raise


def _open_scipy_netcdf(filename, mode, mmap, version):
import scipy.io
Expand Down Expand Up @@ -201,7 +213,10 @@ def prepare_variable(self, name, variable, check_encoding=False,
for k, v in iteritems(variable.attrs):
self._validate_attr_key(k)
setattr(scipy_var, k, v)
return scipy_var, data

target = ScipyArrayWrapper(name, self)

return target, data

def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
69 changes: 43 additions & 26 deletions xarray/tests/test_distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
distributed = pytest.importorskip('distributed')
da = pytest.importorskip('dask.array')
import dask
from dask.distributed import Client
from distributed.utils_test import cluster, gen_cluster
from distributed.utils_test import loop # flake8: noqa
from distributed.client import futures_of
Expand All @@ -17,36 +18,52 @@
requires_zarr)


ENGINES = []
if has_scipy:
ENGINES.append('scipy')
if has_netCDF4:
ENGINES.append('netcdf4')
if has_h5netcdf:
ENGINES.append('h5netcdf')


@pytest.mark.xfail(sys.platform == 'win32',
reason='https://github.com/pydata/xarray/issues/1738')
@pytest.mark.parametrize('engine', ENGINES)
def test_dask_distributed_netcdf_integration_test(loop, engine):
with cluster() as (s, _):
with distributed.Client(s['address'], loop=loop):
original = create_test_data()
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename:
original.to_netcdf(filename, engine=engine)
with xr.open_dataset(filename, chunks=3, engine=engine) as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)
@gen_cluster(client=True, timeout=None)
def test_dask_distributed_netcdf_integration_test_scipy(c, s, a, b):
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6}
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename:
original.to_netcdf(filename, engine='scipy')
with xr.open_dataset(filename, chunks=chunks,
engine='scipy') as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)

@gen_cluster(client=True, timeout=None)
def test_dask_distributed_netcdf_integration_test_netcdf4(c, s, a, b):
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6}
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename:
original.to_netcdf(filename, engine='netcdf4')
with xr.open_dataset(filename, chunks=chunks,
engine='netcdf4') as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)


@gen_cluster(client=True, timeout=None)
def test_dask_distributed_netcdf_integration_test_h5netcdf(c, s, a, b):
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 6}
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename:
original.to_netcdf(filename, engine='h5netcdf')
with xr.open_dataset(filename, chunks=chunks,
engine='h5netcdf') as restored:
assert isinstance(restored.var1.data, da.Array)
computed = restored.compute()
assert_allclose(original, computed)


@requires_zarr
def test_dask_distributed_zarr_integration_test(loop):
with cluster() as (s, _):
with distributed.Client(s['address'], loop=loop):
original = create_test_data()
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS) as filename:
chunks = {'dim1': 4, 'dim2': 3, 'dim3': 5}
with cluster() as (s, [a, b]):
with Client(s['address'], loop=loop) as c:
original = create_test_data().chunk(chunks)
with create_tmp_file(allow_cleanup_failure=ON_WINDOWS,
suffix='.zarr') as filename:
original.to_zarr(filename)
with xr.open_zarr(filename) as restored:
assert isinstance(restored.var1.data, da.Array)
Expand Down