Skip to content

Commit

Permalink
Impl. compressor config overwrite
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed Aug 23, 2023
1 parent 910fb3a commit 5a24541
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 6 deletions.
6 changes: 3 additions & 3 deletions python/kvikio/nvcomp_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from numcodecs.abc import Codec
from numcodecs.compat import ensure_contiguous_ndarray_like

import kvikio._lib.libnvcomp_ll as _ll
from kvikio._lib.libnvcomp_ll import SUPPORTED_ALGORITHMS


class NvCompBatchCodec(Codec):
Expand All @@ -34,11 +34,11 @@ def __init__(
stream: Optional[cp.cuda.Stream] = None,
) -> None:
algo_id = algorithm.lower()
algo_t = _ll.SUPPORTED_ALGORITHMS.get(algo_id, None)
algo_t = SUPPORTED_ALGORITHMS.get(algo_id, None)
if algo_t is None:
raise ValueError(
f"{algorithm} is not supported. "
f"Must be one of: {list(_ll.SUPPORTED_ALGORITHMS.keys())}"
f"Must be one of: {list(SUPPORTED_ALGORITHMS.keys())}"
)

self.algorithm = algo_id
Expand Down
134 changes: 132 additions & 2 deletions python/kvikio/zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import os
import os.path
from abc import abstractmethod
from typing import Any, Mapping, Sequence
from typing import Any, Literal, Mapping, Optional, Sequence

import cupy
import numcodecs
import numpy
import numpy as np
import zarr
Expand All @@ -20,6 +21,9 @@

import kvikio
import kvikio.nvcomp
import kvikio.nvcomp_codec
import kvikio.zarr
from kvikio.nvcomp_codec import NvCompBatchCodec

MINIMUM_ZARR_VERSION = "2.15"

Expand Down Expand Up @@ -48,6 +52,17 @@ class GDSStore(zarr.storage.DirectoryStore):
case-insensitive file system. Default value is False.
dimension_separator : {'.', '/'}, optional
Separator placed between the dimensions of a chunk.
compressor_config_overwrite
If not None, use this `Mapping` to specify what is written to the Zarr metadata
file on disk (`.zarray`). Normally, Zarr writes the configuration[1] given by
the `compressor` argument to the `.zarray` file. Use this argument to overwrite
the normal configuration and use the specified `Mapping` instead.
decompressor_config_overwrite
If not None, use this `Mapping` to specify what compressor configuration[1] is
used for decompressing no matter the configuration found in the Zarr metadata
on disk (the `.zarray` file).
[1] https://github.com/zarr-developers/numcodecs/blob/cb155432/numcodecs/abc.py#L79
Notes
-----
Expand All @@ -62,14 +77,24 @@ class GDSStore(zarr.storage.DirectoryStore):
# The default output array type used by getitems().
default_meta_array = numpy.empty(())

def __init__(self, path, normalize_keys=False, dimension_separator=None) -> None:
def __init__(
self,
path,
normalize_keys=False,
dimension_separator=None,
*,
compressor_config_overwrite: Optional[Mapping] = None,
decompressor_config_overwrite: Optional[Mapping] = None,
) -> None:
if not kvikio.zarr.supported:
raise RuntimeError(
f"GDSStore requires Zarr >={kvikio.zarr.MINIMUM_ZARR_VERSION}"
)
super().__init__(
path, normalize_keys=normalize_keys, dimension_separator=dimension_separator
)
self.compressor_config_overwrite = compressor_config_overwrite
self.decompressor_config_overwrite = decompressor_config_overwrite

def __eq__(self, other):
return isinstance(other, GDSStore) and self.path == other.path
Expand All @@ -79,6 +104,23 @@ def _tofile(self, a, fn):
written = f.write(a)
assert written == a.nbytes

def __getitem__(self, key):
ret = super().__getitem__(key)
if self.decompressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(ret)
if meta["compressor"]:
meta["compressor"] = self.decompressor_config_overwrite
ret = self._metadata_class.encode_array_metadata(meta)
return ret

def __setitem__(self, key, value):
if self.compressor_config_overwrite and key == ".zarray":
meta = self._metadata_class.decode_array_metadata(value)
if meta["compressor"]:
meta["compressor"] = self.compressor_config_overwrite
value = self._metadata_class.encode_array_metadata(meta)
super().__setitem__(key, value)

def getitems(
self,
keys: Sequence[str],
Expand Down Expand Up @@ -128,6 +170,94 @@ def getitems(
return ret


lz4_cpu_compressor = numcodecs.LZ4()
lz4_gpu_compressor = NvCompBatchCodec("lz4")


def open_cupy_array(
store: os.PathLike | str,
mode: Literal["r", "r+", "a", "w", "w-"] = "a",
compressor: Codec = lz4_gpu_compressor,
meta_array=cupy.empty(()),
**kwargs,
) -> zarr.Array:
"""Open an Zarr array as a CuPy-like array using file-mode-like semantics.
This function is a CUDA friendly version of `zarr.open_array` that reads
and writes to CuPy arrays. Beside the arguments listed below, the arguments
have the same semantic as in `zarr.open_array`.
Parameters
----------
store
Path to directory in file system. As opposed to `zarr.open_array`,
Store and path to zip files isn't supported.
mode
Persistence mode: 'r' means read only (must exist); 'r+' means
read/write (must exist); 'a' means read/write (create if doesn't
exist); 'w' means create (overwrite if exists); 'w-' means create
(fail if exists).
compressor : Codec, optional
The compressor use when create a Zarr file or None if no compressor
is to be used. This is ignored in "r" and "r+" mode. By default the
LZ4 compressor by nvCOMP is used.
meta_array : array-like, optional
An CuPy-like array instance to use for determining arrays to create and
return to users. It must implement `__cuda_array_interface__`.
**kwargs
The rest of the arguments are forwarded to `zarr.open_array` as-is.
Returns
-------
Zarr array backed by a GDS file store, nvCOMP compression, and CuPy arrays.
"""

if not isinstance(store, (str, os.PathLike)):
raise ValueError("store must be a path")
store = str(os.fspath(store))
if not hasattr(meta_array, "__cuda_array_interface__"):
raise ValueError("meta_array must implement __cuda_array_interface__")

if mode in ("r", "r+"):
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(path=store),
mode=mode,
meta_array=meta_array,
**kwargs,
)
if ret.compressor == lz4_cpu_compressor:
ret = zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=ret.compressor.get_config(),
decompressor_config_overwrite=lz4_gpu_compressor.get_config(),
),
mode=mode,
meta_array=meta_array,
**kwargs,
)
return ret

if compressor == lz4_gpu_compressor:
compressor_config_overwrite = lz4_cpu_compressor.get_config()
decompressor_config_overwrite = compressor.get_config()
else:
compressor_config_overwrite = None
decompressor_config_overwrite = None

return zarr.open_array(
store=kvikio.zarr.GDSStore(
path=store,
compressor_config_overwrite=compressor_config_overwrite,
decompressor_config_overwrite=decompressor_config_overwrite,
),
mode=mode,
meta_array=meta_array,
compressor=compressor,
**kwargs,
)


class NVCompCompressor(Codec):
"""Abstract base class for nvCOMP compressors
Expand Down
90 changes: 89 additions & 1 deletion python/tests/test_zarr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

import math

import numpy
import pytest

cupy = pytest.importorskip("cupy")
zarr = pytest.importorskip("zarr")
kvikio_zarr = pytest.importorskip("kvikio.zarr")

kvikio_nvcomp_codec = pytest.importorskip("kvikio.nvcomp_codec")
numcodecs = pytest.importorskip("numcodecs")

if not kvikio_zarr.supported:
pytest.skip(
Expand Down Expand Up @@ -156,3 +158,89 @@ def test_compressor(store, xp_write, xp_read, compressor):
b = z[:]
assert isinstance(b, xp_read.ndarray)
cupy.testing.assert_array_equal(b, a)


@pytest.mark.parametrize("algo", ["lz4", "zstd"])
def test_decompressor_config_overwrite(tmp_path, xp, algo):
cpu_codec = numcodecs.registry.get_codec({"id": algo})
gpu_codec = kvikio_nvcomp_codec.NvCompBatchCodec(algo)

# Write using Zarr's default file store and the `cpu_codec` compressor
z = zarr.open_array(tmp_path, mode="w", shape=(10,), compressor=cpu_codec)
z[:] = range(10)
assert z.compressor == cpu_codec

# Open file using GDSStore and use `gpu_codec` as decompressor.
z = zarr.open_array(
kvikio_zarr.GDSStore(
tmp_path,
decompressor_config_overwrite=gpu_codec.get_config(),
),
mode="r",
meta_array=xp.empty(()),
)
assert z.compressor == gpu_codec
assert isinstance(z[:], xp.ndarray)
xp.testing.assert_array_equal(z[:], range(10))


@pytest.mark.parametrize("algo", ["lz4"])
def test_compressor_config_overwrite(tmp_path, xp, algo):
cpu_codec = numcodecs.registry.get_codec({"id": algo})
gpu_codec = kvikio_nvcomp_codec.NvCompBatchCodec(algo)

# Write file using GDSStore and the `gpu_codec` compressor. In order
# to make the file compatible with Zarr's builtin CPU decompressor,
# we set `cpu_codec` as the compressor in the meta file on disk.
z = zarr.open_array(
kvikio_zarr.GDSStore(
tmp_path,
compressor_config_overwrite=cpu_codec.get_config(),
decompressor_config_overwrite=gpu_codec.get_config(),
),
mode="w",
shape=10,
compressor=gpu_codec,
meta_array=xp.empty(()),
)
assert z.compressor == gpu_codec
z[:] = xp.arange(10)

# We can now open the file using Zarr's builtin CPU decompressor
z = zarr.open_array(tmp_path, mode="r")
assert isinstance(z[:], numpy.ndarray)
numpy.testing.assert_array_equal(z[:], range(10))


def test_open_cupy_array(tmp_path):
a = cupy.arange(10)
z = kvikio_zarr.open_cupy_array(
tmp_path,
mode="w",
shape=a.shape,
dtype=a.dtype,
chunks=(2,),
)
z[:] = a
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(z[:], type(a))
assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4")
cupy.testing.assert_array_equal(a, z[:])

z = kvikio_zarr.open_cupy_array(
tmp_path,
mode="r",
)
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(z[:], type(a))
assert z.compressor == kvikio_nvcomp_codec.NvCompBatchCodec("lz4")
cupy.testing.assert_array_equal(a, z[:])

z = zarr.open_array(tmp_path, mode="r")
assert a.shape == z.shape
assert a.dtype == z.dtype
assert isinstance(z[:], numpy.ndarray)
assert z.compressor == kvikio_zarr.lz4_cpu_compressor
numpy.testing.assert_array_equal(a.get(), z[:])

0 comments on commit 5a24541

Please sign in to comment.