Skip to content

Commit

Permalink
Merge pull request #350 from pfafflabatuiuc/file_locking_fixes
Browse files Browse the repository at this point in the history
The FileOpener now creates a lock file.
  • Loading branch information
wpfff authored Nov 22, 2022
2 parents 74a26ce + 2ce6e5a commit 1d04137
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 35 deletions.
98 changes: 65 additions & 33 deletions plottr/data/datadict_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,20 @@ def _data_file_path(file: Union[str, Path], init_directory: bool = False) -> Pat
"""Get the full filepath of the data file.
If `init_directory` is True, then create the parent directory."""

if isinstance(file, str):
path = Path(file)
else:
path = file
path = Path(file)

if path.suffix != f'.{DATAFILEXT}':
path = Path(path.parent, path.stem + f'.{DATAFILEXT}')
if init_directory:
path.parent.mkdir(parents=True, exist_ok=True)
return path


# TODO: Check if the linking of class in the docstring is working.
def datadict_to_hdf5(datadict: DataDict,
path: str,
path: Union[str, Path],
groupname: str = 'data',
append_mode: AppendMode = AppendMode.new) -> None:
append_mode: AppendMode = AppendMode.new,
file_timeout: Optional[float] = None) -> None:
"""Write a DataDict to DDH5
Note: Meta data is only written during initial writing of the dataset.
Expand All @@ -156,13 +154,16 @@ def datadict_to_hdf5(datadict: DataDict,
Note: we're not checking for content, only length!
- `AppendMode.all` : Append all data in datadict to file data sets.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. Only relevant if you are
writing to a file that already exists and some other program is trying to read it at the same time.
If none uses the default value from the :class:FileOpener.
"""
filepath = _data_file_path(path, True)
if not filepath.exists():
append_mode = AppendMode.none

with FileOpener(filepath, 'a') as f:
with FileOpener(filepath, 'a', file_timeout) as f:
if append_mode is AppendMode.none:
init_file(f, groupname)
assert groupname in f
Expand Down Expand Up @@ -226,12 +227,13 @@ def init_file(f: h5py.File,
f.flush()


def datadict_from_hdf5(path: str,
def datadict_from_hdf5(path: Union[str, Path],
groupname: str = 'data',
startidx: Union[int, None] = None,
stopidx: Union[int, None] = None,
structure_only: bool = False,
ignore_unequal_lengths: bool = True) -> DataDict:
ignore_unequal_lengths: bool = True,
file_timeout: Optional[float] = None) -> DataDict:
"""Load a DataDict from file.
:param path: Full filepath without the file extension.
Expand All @@ -241,6 +243,8 @@ def datadict_from_hdf5(path: str,
:param structure_only: If `True`, don't load the data values.
:param ignore_unequal_lengths: If `True`, don't fail when the rows have
unequal length; will return the longest consistent DataDict possible.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
:return: Validated DataDict.
"""
filepath = _data_file_path(path)
Expand All @@ -251,7 +255,7 @@ def datadict_from_hdf5(path: str,
startidx = 0

res = {}
with FileOpener(filepath, 'r') as f:
with FileOpener(filepath, 'r', file_timeout) as f:
if groupname not in f:
raise ValueError('Group does not exist.')

Expand Down Expand Up @@ -303,23 +307,25 @@ def datadict_from_hdf5(path: str,
return dd


def all_datadicts_from_hdf5(path: str, **kwargs: Any) -> Dict[str, Any]:
def all_datadicts_from_hdf5(path: Union[str, Path], file_timeout: Optional[float] = None, **kwargs: Any) -> Dict[str, Any]:
"""
Loads all the DataDicts contained on a single HDF5 file. Returns a dictionary with the group names as keys and
the DataDicts as the values of that key.
:param path: The path of the HDF5 file.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
:return: Dictionary with group names as key, and the DataDicts inside them as values.
"""
filepath = _data_file_path(path)
if not os.path.exists(filepath):
raise ValueError("Specified file does not exist.")

ret = {}
with FileOpener(filepath, 'r') as f:
with FileOpener(filepath, 'r', file_timeout) as f:
keys = [k for k in f.keys()]
for k in keys:
ret[k] = datadict_from_hdf5(path=path, groupname=k, **kwargs)
ret[k] = datadict_from_hdf5(path=path, groupname=k, file_timeout=file_timeout, **kwargs)
return ret


Expand All @@ -328,16 +334,20 @@ def all_datadicts_from_hdf5(path: str, **kwargs: Any) -> Dict[str, Any]:
class FileOpener:
"""Context manager for opening files while respecting file system locks."""

def __init__(self, path: Path,
def __init__(self, path: Union[Path, str],
mode: str = 'r',
timeout: float = 10.,
timeout: Optional[float] = None,
test_delay: float = 0.1):
self.path = path

self.path = Path(path)
self.lock_path = self.path.parent.joinpath("~" + str(self.path.stem) + '.lock')
if mode not in ['r', 'w', 'w-', 'a']:
raise ValueError("Only 'r', 'w', 'w-', 'a' modes are supported.")
self.mode = mode
self.timeout = timeout
self.default_timeout = 30.
if timeout is None:
self.timeout = self.default_timeout
else:
self.timeout = timeout
self.test_delay = test_delay

self.file: Optional[h5py.File] = None
Expand All @@ -350,20 +360,36 @@ def __exit__(self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> None:
assert self.file is not None
self.file.close()
try:
assert self.file is not None
self.file.close()
finally:
if self.lock_path.is_file():
self.lock_path.unlink()

def open_when_unlocked(self) -> h5py.File:
t0 = time.time()
while True:
try:
f = h5py.File(str(self.path), self.mode)
return f
except (OSError, PermissionError, RuntimeError):
pass
if not self.lock_path.is_file():
try:
self.lock_path.touch(exist_ok=False)
# This happens if some other process beat this one and created the file beforehand
except FileExistsError:
continue

while True:
try:
f = h5py.File(str(self.path), self.mode)
return f
except (OSError, PermissionError, RuntimeError):
pass
time.sleep(self.test_delay) # don't overwhelm the FS by very fast repeated calls.
if time.time() - t0 > self.timeout:
raise RuntimeError('Waiting or file unlock timeout')

time.sleep(self.test_delay) # don't overwhelm the FS by very fast repeated calls.
if time.time() - t0 > self.timeout:
raise RuntimeError('Waiting for file unlock timeout')
raise RuntimeError('Lock file remained for longer than timeout time')


# Node for monitoring #
Expand Down Expand Up @@ -524,18 +550,21 @@ class DDH5Writer(object):
group of that name will be deleted.
:param name: Name of this dataset. Used in path/file creation and added as meta data.
:param filename: Filename to use. Defaults to 'data.ddh5'.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
"""

# TODO: need an operation mode for not keeping data in memory.
# TODO: a mode for working with pre-allocated data

def __init__(self,
datadict: DataDict,
basedir: str = '.',
basedir: Union[str, Path] = '.',
groupname: str = 'data',
name: Optional[str] = None,
filename: str = 'data',
filepath: Optional[str] = None):
filepath: Optional[Union[str, Path]] = None,
file_timeout: Optional[float] = None):
"""Constructor for :class:`.DDH5Writer`"""

self.basedir = Path(basedir)
Expand All @@ -554,6 +583,7 @@ def __init__(self,
self.filepath = Path(filepath)

self.datadict.add_meta('dataset.name', name)
self.file_timeout = file_timeout

def __enter__(self) -> "DDH5Writer":
if self.filepath is None:
Expand All @@ -565,7 +595,8 @@ def __enter__(self) -> "DDH5Writer":
datadict_to_hdf5(self.datadict,
str(self.filepath),
groupname=self.groupname,
append_mode=AppendMode.none)
append_mode=AppendMode.none,
file_timeout=self.file_timeout)
self.inserted_rows = nrecords
return self

Expand All @@ -574,7 +605,7 @@ def __exit__(self,
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> None:
assert self.filepath is not None
with FileOpener(self.filepath, 'a') as f:
with FileOpener(self.filepath, 'a', timeout=self.file_timeout) as f:
add_cur_time_attr(f[self.groupname], name='close')

def data_folder(self) -> Path:
Expand Down Expand Up @@ -631,9 +662,10 @@ def add_data(self, **kwargs: Any) -> None:
if nrecords is not None and nrecords > 0:
datadict_to_hdf5(self.datadict, str(self.filepath),
groupname=self.groupname,
append_mode=mode)
append_mode=mode,
file_timeout=self.file_timeout)

assert self.filepath is not None
with FileOpener(self.filepath, 'a') as f:
with FileOpener(self.filepath, 'a', timeout=self.file_timeout) as f:
add_cur_time_attr(f, name='last_change')
add_cur_time_attr(f[self.groupname], name='last_change')
23 changes: 21 additions & 2 deletions test/pytest/test_ddh5.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,27 @@ def _clean_from_file(datafromfile):
except KeyError:
pass

for axis, data in datafromfile.data_items():
if "label" in data:
datafromfile[axis].pop("label")

return datafromfile

# Test the FileOpener.


def test_file_lock_creation_and_deletion():
lock_path = FILEPATH.parent.joinpath("~" + str(FILEPATH.stem) + '.lock')
try:
with dds.FileOpener(FILEPATH, 'a') as f:
assert lock_path.is_file()
raise RuntimeError('crashing on purpose')
except RuntimeError:
pass
assert not lock_path.is_file()

FILEPATH.unlink()


def test_basic_storage_and_retrieval():
x = np.arange(3)
Expand Down Expand Up @@ -216,7 +235,7 @@ def test_writer_with_large_data():
dataset_from_file = dds.datadict_from_hdf5(writer.filepath)
assert(_clean_from_file(dataset_from_file) == ref_dataset)

rmtree('./TESTDATA')
rmtree(str(Path(writer.filepath).parent))


def test_concurrent_write_and_read():
Expand All @@ -238,4 +257,4 @@ def test_concurrent_write_and_read():
dataset_from_file = dds.datadict_from_hdf5(writer.filepath)
assert(_clean_from_file(dataset_from_file) == ref_dataset)

rmtree('./TESTDATA')
rmtree(str(Path(writer.filepath).parent))

0 comments on commit 1d04137

Please sign in to comment.