Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix distributed writes #1793

Merged
merged 40 commits into from
Mar 10, 2018
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
63abe7f
distributed tests that write dask arrays
Dec 17, 2017
1952173
Change zarr test to synchronous API
mrocklin Dec 19, 2017
dd4bfcf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Dec 21, 2017
5c7f94c
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 11, 2018
9e70a3a
initial go at __setitem__ on array wrappers
Jan 11, 2018
ec67a54
fixes for scipy
Jan 12, 2018
2a4faa4
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 24, 2018
5497ad1
cleanup after merging with upstream/master
Jan 25, 2018
c2f5bb8
needless duplication of tests to work around pytest bug
Jan 25, 2018
5344fe8
use netcdf_variable instead of get_array()
Jan 25, 2018
7cbd2e5
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Jan 28, 2018
49366bf
use synchronous dask.distributed test harness
mrocklin Feb 2, 2018
323f17b
Merge branch 'master' into feature/distributed_writes
Feb 2, 2018
81f7610
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 2, 2018
199538e
cleanup tests
Feb 2, 2018
d2050e7
per scheduler locks and autoclose behavior for writes
Feb 3, 2018
76675de
HDF5_LOCK and CombinedLock
Feb 6, 2018
9ac0327
integration test for distributed locks
Feb 6, 2018
05e7d54
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 18, 2018
1672968
more tests and set isopen to false when pickling
Feb 18, 2018
a667615
Fixing style errors.
stickler-ci Feb 18, 2018
2c0a7e8
ds property on DataStorePickleMixin
Feb 19, 2018
6bcadfe
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 19, 2018
aba6bdc
stickler-ci
Feb 19, 2018
5702c67
compat fixes for other backends
Feb 19, 2018
a06b683
HDF5_USE_FILE_LOCKING = False in test_distributed
Feb 21, 2018
efe8308
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 21, 2018
6ef31aa
style fix
Feb 21, 2018
00156c3
update tests to only expect netcdf4 to work, docstrings, and some cle…
Feb 22, 2018
3dcfac5
Fixing style errors.
stickler-ci Feb 22, 2018
29edaa7
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Feb 27, 2018
61ee5a8
Merge branch 'feature/distributed_writes' of github.com:jhamman/xarra…
Feb 27, 2018
91f3c6a
fix imports
Feb 27, 2018
5cb91ba
fix more import bugs
Feb 27, 2018
2b97d4f
update docs
Feb 27, 2018
2dc514f
fix for pynio
Feb 28, 2018
eff0161
cleanup locks and use pytest monkeypatch for environment variable
Feb 28, 2018
5290484
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 6, 2018
c855284
fix failing test using combined lock
Mar 8, 2018
3c2ffbf
Merge branch 'master' of github.com:pydata/xarray into feature/distri…
Mar 8, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions doc/whats-new.rst
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ Bug fixes
- Compatibility fixes to plotting module for Numpy 1.14 and Pandas 0.22
(:issue:`1813`).
By `Joe Hamman <https://github.com/jhamman>`_.
- Fixed to_netcdf when using dask distributed (:issue:`1464`).
By `Joe Hamman <https://github.com/jhamman>`_..
- Bug fix in encoding coordinates with ``{'_FillValue': None}`` in netCDF
metadata (:issue:`1865`).
By `Chris Roth <https://github.com/czr137>`_.
Expand Down
47 changes: 43 additions & 4 deletions xarray/backends/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import numpy as np

from .. import backends, conventions, Dataset
from .common import ArrayWriter, GLOBAL_LOCK
from .common import (ArrayWriter, get_scheduler, get_scheduler_lock,
HDF5_LOCK, CombinedLock)
from ..core import indexing
from ..core.combine import auto_combine
from ..core.utils import close_on_error, is_remote_uri
Expand Down Expand Up @@ -66,9 +67,9 @@ def _default_lock(filename, engine):
else:
# TODO: identify netcdf3 files and don't use the global lock
# for them
lock = GLOBAL_LOCK
lock = HDF5_LOCK
elif engine in {'h5netcdf', 'pynio'}:
lock = GLOBAL_LOCK
lock = HDF5_LOCK
else:
lock = False
return lock
Expand Down Expand Up @@ -131,6 +132,32 @@ def _protect_dataset_variables_inplace(dataset, cache):
variable.data = data


def _get_lock(engine, scheduler, format, path_or_file):
""" Get the lock(s) that apply to a particular scheduler/engine/format"""

locks = []
SchedulerLock = get_scheduler_lock(scheduler)
if format in ['NETCDF4', None] and engine in ['h5netcdf', 'netcdf4']:
locks.append(HDF5_LOCK)

try:
# per file lock
# Dask locks take a name argument (e.g. filename)
locks.append(SchedulerLock(path_or_file))
except TypeError:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be less error prone to pass the name to get_scheduler_lock and have it return a lock instance.

# threading/multiprocessing lock
locks.append(SchedulerLock())

# When we have more than one lock, use the CombinedLock wrapper class
lock = CombinedLock(locks) if len(locks) > 1 else locks[0]

# Question: Should we be dropping one of these two locks when they are they
# are basically the same. For instance, when using netcdf4 and dask is not
# installed, locks will be [threading.Lock(), threading.Lock()]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is harmless, as long as these are different lock instances.

On the other hand, something like CombinedLock([lock, lock]) would never be satisfied because it's impossible to unlock a lock twice.


return lock


def open_dataset(filename_or_obj, group=None, decode_cf=True,
mask_and_scale=True, decode_times=True, autoclose=False,
concat_characters=True, decode_coords=True, engine=None,
Expand Down Expand Up @@ -622,8 +649,20 @@ def to_netcdf(dataset, path_or_file=None, mode='w', format=None, group=None,
# if a writer is provided, store asynchronously
sync = writer is None

# handle scheduler specific logic
scheduler = get_scheduler()
if (dataset.chunks and scheduler in ['distributed', 'multiprocessing'] and
engine != 'netcdf4'):
raise NotImplementedError("Writing netCDF files with the %s backend "
"is not currently supported with dask's %s "
"scheduler" % (engine, scheduler))
lock = _get_lock(engine, scheduler, format, path_or_file)
autoclose = (dataset.chunks and
scheduler in ['distributed', 'multiprocessing'])

target = path_or_file if path_or_file is not None else BytesIO()
store = store_open(target, mode, format, group, writer)
store = store_open(target, mode, format, group, writer,
autoclose=autoclose, lock=lock)

if unlimited_dims is None:
unlimited_dims = dataset.encoding.get('unlimited_dims', None)
Expand Down
135 changes: 116 additions & 19 deletions xarray/backends/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,20 @@
import contextlib
from collections import Mapping, OrderedDict
import warnings
import multiprocessing
import threading

from ..conventions import cf_encoder
from ..core import indexing
from ..core.utils import FrozenOrderedDict, NdimSizeLenMixin
from ..core.pycompat import iteritems, dask_array_type

# Import default lock
try:
from dask.utils import SerializableLock as Lock
from dask.utils import SerializableLock
HDF5_LOCK = SerializableLock()
except ImportError:
from threading import Lock

HDF5_LOCK = threading.Lock()

# Create a logger object, but don't add any handlers. Leave that to user code.
logger = logging.getLogger(__name__)
Expand All @@ -27,8 +30,54 @@
NONE_VAR_NAME = '__values__'


# dask.utils.SerializableLock if available, otherwise just a threading.Lock
GLOBAL_LOCK = Lock()
def get_scheduler(get=None, collection=None):
""" Determine the dask scheduler that is being used.

None is returned if not dask scheduler is active.

See also
--------
dask.utils.effective_get
"""
try:
from dask.utils import effective_get
actual_get = effective_get(get, collection)
try:
from dask.distributed import Client
if isinstance(actual_get.__self__, Client):
return 'distributed'
except (ImportError, AttributeError):
try:
import dask.multiprocessing
if actual_get == dask.multiprocessing.get:
return 'multiprocessing'
else:
return 'threaded'
except ImportError:
return 'threaded'
except ImportError:
return None


def get_scheduler_lock(scheduler):
""" Get the appropriate lock for a certain situation based onthe dask
scheduler used.

See Also
--------
dask.utils.get_scheduler_lock
"""

if scheduler == 'distributed':
from dask.distributed import Lock
return Lock
elif scheduler == 'multiprocessing':
return multiprocessing.Lock
elif scheduler == 'threaded':
from dask.utils import SerializableLock
return SerializableLock
else:
return threading.Lock


def _encode_variable_name(name):
Expand Down Expand Up @@ -77,6 +126,39 @@ def robust_getitem(array, key, catch=Exception, max_retries=6,
time.sleep(1e-3 * next_delay)


class CombinedLock(object):
"""A combination of multiple locks.

Like a locked door, a CombinedLock is locked if any of its constituent
locks are locked.
"""

def __init__(self, locks):
self.locks = locks

def acquire(self, *args):
return all(lock.acquire(*args) for lock in self.locks)

def release(self, *args):
for lock in self.locks:
lock.release(*args)

def __enter__(self):
for lock in self.locks:
lock.__enter__()

def __exit__(self, *args):
for lock in self.locks:
lock.__exit__(*args)

@property
def locked(self):
return any(lock.locked for lock in self.locks)

def __repr__(self):
return "CombinedLock(%s)" % [repr(lock) for lock in self.locks]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think you could equivalently substitute "CombinedLock(%r)" % list(self.locks) here.



class BackendArray(NdimSizeLenMixin, indexing.ExplicitlyIndexed):

def __array__(self, dtype=None):
Expand All @@ -85,7 +167,9 @@ def __array__(self, dtype=None):


class AbstractDataStore(Mapping):
_autoclose = False
_autoclose = None
_ds = None
_isopen = False

def __iter__(self):
return iter(self.variables)
Expand Down Expand Up @@ -168,7 +252,7 @@ def __exit__(self, exception_type, exception_value, traceback):


class ArrayWriter(object):
def __init__(self, lock=GLOBAL_LOCK):
def __init__(self, lock=HDF5_LOCK):
self.sources = []
self.targets = []
self.lock = lock
Expand All @@ -178,11 +262,7 @@ def add(self, source, target):
self.sources.append(source)
self.targets.append(target)
else:
try:
target[...] = source
except TypeError:
# workaround for GH: scipy/scipy#6880
target[:] = source
target[...] = source

def sync(self):
if self.sources:
Expand All @@ -193,9 +273,9 @@ def sync(self):


class AbstractWritableDataStore(AbstractDataStore):
def __init__(self, writer=None):
def __init__(self, writer=None, lock=HDF5_LOCK):
if writer is None:
writer = ArrayWriter()
writer = ArrayWriter(lock=lock)
self.writer = writer

def encode(self, variables, attributes):
Expand Down Expand Up @@ -239,6 +319,9 @@ def set_variable(self, k, v): # pragma: no cover
raise NotImplementedError

def sync(self):
if self._isopen and self._autoclose:
# datastore will be reopened during write
self.close()
self.writer.sync()

def store_dataset(self, dataset):
Expand Down Expand Up @@ -373,27 +456,41 @@ class DataStorePickleMixin(object):

def __getstate__(self):
state = self.__dict__.copy()
del state['ds']
del state['_ds']
del state['_isopen']
if self._mode == 'w':
# file has already been created, don't override when restoring
state['_mode'] = 'a'
return state

def __setstate__(self, state):
self.__dict__.update(state)
self.ds = self._opener(mode=self._mode)
self._ds = None
self._isopen = False

@property
def ds(self):
if self._ds is not None and self._isopen:
return self._ds
ds = self._opener(mode=self._mode)
self._isopen = True
return ds

@contextlib.contextmanager
def ensure_open(self, autoclose):
def ensure_open(self, autoclose=None):
"""
Helper function to make sure datasets are closed and opened
at appropriate times to avoid too many open file errors.

Use requires `autoclose=True` argument to `open_mfdataset`.
"""
if self._autoclose and not self._isopen:

if autoclose is None:
autoclose = self._autoclose
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably use some additional thinking.


if not self._isopen:
try:
self.ds = self._opener()
self._ds = self._opener()
self._isopen = True
yield
finally:
Expand Down
14 changes: 9 additions & 5 deletions xarray/backends/h5netcdf_.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from ..core.utils import FrozenOrderedDict, close_on_error
from ..core.pycompat import iteritems, bytes_type, unicode_type, OrderedDict

from .common import WritableCFDataStore, DataStorePickleMixin, find_root
from .common import (WritableCFDataStore, DataStorePickleMixin, find_root,
HDF5_LOCK)
from .netCDF4_ import (_nc4_group, _encode_nc4_variable, _get_datatype,
_extract_nc4_variable_encoding, BaseNetCDF4Array)

Expand Down Expand Up @@ -64,12 +65,12 @@ class H5NetCDFStore(WritableCFDataStore, DataStorePickleMixin):
"""

def __init__(self, filename, mode='r', format=None, group=None,
writer=None, autoclose=False):
writer=None, autoclose=False, lock=HDF5_LOCK):
if format not in [None, 'NETCDF4']:
raise ValueError('invalid format for h5netcdf backend')
opener = functools.partial(_open_h5netcdf_group, filename, mode=mode,
group=group)
self.ds = opener()
self._ds = opener()
if autoclose:
raise NotImplementedError('autoclose=True is not implemented '
'for the h5netcdf backend pending '
Expand All @@ -81,7 +82,7 @@ def __init__(self, filename, mode='r', format=None, group=None,
self._opener = opener
self._filename = filename
self._mode = mode
super(H5NetCDFStore, self).__init__(writer)
super(H5NetCDFStore, self).__init__(writer, lock=lock)

def open_store_variable(self, name, var):
with self.ensure_open(autoclose=False):
Expand Down Expand Up @@ -173,7 +174,10 @@ def prepare_variable(self, name, variable, check_encoding=False,

for k, v in iteritems(attrs):
nc4_var.setncattr(k, v)
return nc4_var, variable.data

target = H5NetCDFArrayWrapper(name, self)

return target, variable.data

def sync(self):
with self.ensure_open(autoclose=True):
Expand Down
Loading