Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The FileOpener now creates a lock file. #350

Merged
merged 4 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 65 additions & 33 deletions plottr/data/datadict_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,20 @@ def _data_file_path(file: Union[str, Path], init_directory: bool = False) -> Pat
"""Get the full filepath of the data file.
If `init_directory` is True, then create the parent directory."""

if isinstance(file, str):
path = Path(file)
else:
path = file
path = Path(file)

if path.suffix != f'.{DATAFILEXT}':
path = Path(path.parent, path.stem + f'.{DATAFILEXT}')
if init_directory:
path.parent.mkdir(parents=True, exist_ok=True)
return path


# TODO: Check if the linking of class in the docstring is working.
def datadict_to_hdf5(datadict: DataDict,
path: str,
path: Union[str, Path],
groupname: str = 'data',
append_mode: AppendMode = AppendMode.new) -> None:
append_mode: AppendMode = AppendMode.new,
file_timeout: Optional[float] = None) -> None:
"""Write a DataDict to DDH5
Note: Meta data is only written during initial writing of the dataset.
Expand All @@ -156,13 +154,16 @@ def datadict_to_hdf5(datadict: DataDict,
Note: we're not checking for content, only length!
- `AppendMode.all` : Append all data in datadict to file data sets.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. Only relevant if you are
writing to a file that already exists and some other program is trying to read it at the same time.
If none uses the default value from the :class:FileOpener.
"""
filepath = _data_file_path(path, True)
if not filepath.exists():
append_mode = AppendMode.none

with FileOpener(filepath, 'a') as f:
with FileOpener(filepath, 'a', file_timeout) as f:
if append_mode is AppendMode.none:
init_file(f, groupname)
assert groupname in f
Expand Down Expand Up @@ -226,12 +227,13 @@ def init_file(f: h5py.File,
f.flush()


def datadict_from_hdf5(path: str,
def datadict_from_hdf5(path: Union[str, Path],
groupname: str = 'data',
startidx: Union[int, None] = None,
stopidx: Union[int, None] = None,
structure_only: bool = False,
ignore_unequal_lengths: bool = True) -> DataDict:
ignore_unequal_lengths: bool = True,
file_timeout: Optional[float] = None) -> DataDict:
"""Load a DataDict from file.
:param path: Full filepath without the file extension.
Expand All @@ -241,6 +243,8 @@ def datadict_from_hdf5(path: str,
:param structure_only: If `True`, don't load the data values.
:param ignore_unequal_lengths: If `True`, don't fail when the rows have
unequal length; will return the longest consistent DataDict possible.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
:return: Validated DataDict.
"""
filepath = _data_file_path(path)
Expand All @@ -251,7 +255,7 @@ def datadict_from_hdf5(path: str,
startidx = 0

res = {}
with FileOpener(filepath, 'r') as f:
with FileOpener(filepath, 'r', file_timeout) as f:
if groupname not in f:
raise ValueError('Group does not exist.')

Expand Down Expand Up @@ -303,23 +307,25 @@ def datadict_from_hdf5(path: str,
return dd


def all_datadicts_from_hdf5(path: str, **kwargs: Any) -> Dict[str, Any]:
def all_datadicts_from_hdf5(path: Union[str, Path], file_timeout: Optional[float] = None, **kwargs: Any) -> Dict[str, Any]:
"""
Loads all the DataDicts contained on a single HDF5 file. Returns a dictionary with the group names as keys and
the DataDicts as the values of that key.
:param path: The path of the HDF5 file.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
:return: Dictionary with group names as key, and the DataDicts inside them as values.
"""
filepath = _data_file_path(path)
if not os.path.exists(filepath):
raise ValueError("Specified file does not exist.")

ret = {}
with FileOpener(filepath, 'r') as f:
with FileOpener(filepath, 'r', file_timeout) as f:
keys = [k for k in f.keys()]
for k in keys:
ret[k] = datadict_from_hdf5(path=path, groupname=k, **kwargs)
ret[k] = datadict_from_hdf5(path=path, groupname=k, file_timeout=file_timeout, **kwargs)
return ret


Expand All @@ -328,16 +334,20 @@ def all_datadicts_from_hdf5(path: str, **kwargs: Any) -> Dict[str, Any]:
class FileOpener:
"""Context manager for opening files while respecting file system locks."""

def __init__(self, path: Path,
def __init__(self, path: Union[Path, str],
mode: str = 'r',
timeout: float = 10.,
timeout: Optional[float] = None,
test_delay: float = 0.1):
self.path = path

self.path = Path(path)
self.lock_path = self.path.parent.joinpath("~" + str(self.path.stem) + '.lock')
if mode not in ['r', 'w', 'w-', 'a']:
raise ValueError("Only 'r', 'w', 'w-', 'a' modes are supported.")
self.mode = mode
self.timeout = timeout
self.default_timeout = 30.
if timeout is None:
self.timeout = self.default_timeout
else:
self.timeout = timeout
self.test_delay = test_delay

self.file: Optional[h5py.File] = None
Expand All @@ -350,20 +360,36 @@ def __exit__(self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> None:
assert self.file is not None
self.file.close()
try:
assert self.file is not None
self.file.close()
finally:
if self.lock_path.is_file():
self.lock_path.unlink()

def open_when_unlocked(self) -> h5py.File:
t0 = time.time()
while True:
try:
f = h5py.File(str(self.path), self.mode)
return f
except (OSError, PermissionError, RuntimeError):
pass
if not self.lock_path.is_file():
try:
self.lock_path.touch(exist_ok=False)
# This happens if some other process beat this one and created the file beforehand
except FileExistsError:
continue

while True:
try:
f = h5py.File(str(self.path), self.mode)
return f
except (OSError, PermissionError, RuntimeError):
pass
time.sleep(self.test_delay) # don't overwhelm the FS by very fast repeated calls.
if time.time() - t0 > self.timeout:
raise RuntimeError('Waiting or file unlock timeout')

time.sleep(self.test_delay) # don't overwhelm the FS by very fast repeated calls.
if time.time() - t0 > self.timeout:
raise RuntimeError('Waiting for file unlock timeout')
raise RuntimeError('Lock file remained for longer than timeout time')


# Node for monitoring #
Expand Down Expand Up @@ -524,18 +550,21 @@ class DDH5Writer(object):
group of that name will be deleted.
:param name: Name of this dataset. Used in path/file creation and added as meta data.
:param filename: Filename to use. Defaults to 'data.ddh5'.
:param file_timeout: How long the function will wait for the ddh5 file to unlock. If none uses the default
value from the :class:FileOpener.
"""

# TODO: need an operation mode for not keeping data in memory.
# TODO: a mode for working with pre-allocated data

def __init__(self,
datadict: DataDict,
basedir: str = '.',
basedir: Union[str, Path] = '.',
groupname: str = 'data',
name: Optional[str] = None,
filename: str = 'data',
filepath: Optional[str] = None):
filepath: Optional[Union[str, Path]] = None,
file_timeout: Optional[float] = None):
"""Constructor for :class:`.DDH5Writer`"""

self.basedir = Path(basedir)
Expand All @@ -554,6 +583,7 @@ def __init__(self,
self.filepath = Path(filepath)

self.datadict.add_meta('dataset.name', name)
self.file_timeout = file_timeout

def __enter__(self) -> "DDH5Writer":
if self.filepath is None:
Expand All @@ -565,7 +595,8 @@ def __enter__(self) -> "DDH5Writer":
datadict_to_hdf5(self.datadict,
str(self.filepath),
groupname=self.groupname,
append_mode=AppendMode.none)
append_mode=AppendMode.none,
file_timeout=self.file_timeout)
self.inserted_rows = nrecords
return self

Expand All @@ -574,7 +605,7 @@ def __exit__(self,
exc_value: Optional[BaseException],
exc_traceback: Optional[TracebackType]) -> None:
assert self.filepath is not None
with FileOpener(self.filepath, 'a') as f:
with FileOpener(self.filepath, 'a', timeout=self.file_timeout) as f:
add_cur_time_attr(f[self.groupname], name='close')

def data_folder(self) -> Path:
Expand Down Expand Up @@ -631,9 +662,10 @@ def add_data(self, **kwargs: Any) -> None:
if nrecords is not None and nrecords > 0:
datadict_to_hdf5(self.datadict, str(self.filepath),
groupname=self.groupname,
append_mode=mode)
append_mode=mode,
file_timeout=self.file_timeout)

assert self.filepath is not None
with FileOpener(self.filepath, 'a') as f:
with FileOpener(self.filepath, 'a', timeout=self.file_timeout) as f:
add_cur_time_attr(f, name='last_change')
add_cur_time_attr(f[self.groupname], name='last_change')
23 changes: 21 additions & 2 deletions test/pytest/test_ddh5.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,27 @@ def _clean_from_file(datafromfile):
except KeyError:
pass

for axis, data in datafromfile.data_items():
if "label" in data:
datafromfile[axis].pop("label")

return datafromfile

# Test the FileOpener.


def test_file_lock_creation_and_deletion():
lock_path = FILEPATH.parent.joinpath("~" + str(FILEPATH.stem) + '.lock')
try:
with dds.FileOpener(FILEPATH, 'a') as f:
assert lock_path.is_file()
raise RuntimeError('crashing on purpose')
except RuntimeError:
pass
assert not lock_path.is_file()

FILEPATH.unlink()


def test_basic_storage_and_retrieval():
x = np.arange(3)
Expand Down Expand Up @@ -216,7 +235,7 @@ def test_writer_with_large_data():
dataset_from_file = dds.datadict_from_hdf5(writer.filepath)
assert(_clean_from_file(dataset_from_file) == ref_dataset)

rmtree('./TESTDATA')
rmtree(str(Path(writer.filepath).parent))


def test_concurrent_write_and_read():
Expand All @@ -238,4 +257,4 @@ def test_concurrent_write_and_read():
dataset_from_file = dds.datadict_from_hdf5(writer.filepath)
assert(_clean_from_file(dataset_from_file) == ref_dataset)

rmtree('./TESTDATA')
rmtree(str(Path(writer.filepath).parent))