diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 8b5747ca..5bb23fa6 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -28,7 +28,7 @@ jobs: fetch-depth: 0 # checkout tags (which is not done by default) - name: 🔁 Setup Python id: setup-python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip @@ -58,7 +58,7 @@ jobs: if: | github.event_name == 'push' || github.event_name == 'pull_request' - uses: codecov/codecov-action@v3.1.4 + uses: codecov/codecov-action@v4.0.2 with: file: ./coverage.xml env_vars: OS,PYTHON diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 8d054bbc..40ff0022 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -11,7 +11,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Set up Python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: '3.x' - name: Install dependencies diff --git a/.github/workflows/test-integration.yaml b/.github/workflows/test-integration.yaml index ef10be91..791687f7 100644 --- a/.github/workflows/test-integration.yaml +++ b/.github/workflows/test-integration.yaml @@ -31,19 +31,32 @@ jobs: # integration testing for 3.10 and 3.11 (for runner versions that follow that PR). python-version: ["3.9"] # , "3.10", "3.11"] runner-version: [ - "pangeo-forge-runner==0.8.0", - "pangeo-forge-runner==0.9.0", + "pangeo-forge-runner==0.9.1", + "pangeo-forge-runner==0.9.2", + "pangeo-forge-runner==0.9.3", ] steps: - uses: actions/checkout@v4 - name: 🔁 Setup Python id: setup-python - uses: actions/setup-python@v4 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} cache: pip cache-dependency-path: pyproject.toml + + - name: Install pangeo-forge recipes and runner + shell: bash -l {0} + run: | + python -m pip install ${{ matrix.runner-version }} + python -m pip install -e ".[test,minio]" + + - name: Install optional grib deps + shell: bash -l {0} + run: | + python -m pip install ecmwflibs eccodes cfgrib + - name: 'Setup minio' run: | wget --quiet https://dl.min.io/server/minio/release/linux-amd64/minio @@ -54,11 +67,6 @@ jobs: - name: 🎯 Check cache hit run: echo '${{ steps.setup-python.outputs.cache-hit }}' - - name: 🌈 Install pangeo-forge-recipes & pangeo-forge-runner - shell: bash -l {0} - run: | - python -m pip install ${{ matrix.runner-version }} - python -m pip install -e ".[test,minio]" # order reversed to fix https://github.com/pangeo-forge/pangeo-forge-recipes/pull/595#issuecomment-1811630921 # this should however be fixed in the runner itself diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index cec164d5..aace4669 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,7 @@ repos: - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v4.5.0 hooks: - id: trailing-whitespace - id: end-of-file-fixer @@ -13,13 +13,13 @@ repos: exclude: "docs/" - repo: https://github.com/psf/black - rev: 22.12.0 + rev: 24.2.0 hooks: - id: black args: ["--line-length", "100"] - repo: https://github.com/PyCQA/flake8 - rev: 6.0.0 + rev: 7.0.0 hooks: - id: flake8 exclude: pangeo_forge_recipes/recipes @@ -30,18 +30,18 @@ repos: - id: seed-isort-config - repo: https://github.com/pre-commit/mirrors-mypy - rev: 'v0.991' + rev: 'v1.8.0' hooks: - id: mypy exclude: tests,pangeo_forge_recipes/recipes - repo: https://github.com/pycqa/isort - rev: 5.12.0 + rev: 5.13.2 hooks: - id: isort args: ["--profile", "black"] - repo: https://github.com/rstcheck/rstcheck - rev: v6.1.1 + rev: v6.2.0 hooks: - id: rstcheck diff --git a/docs/composition/file_patterns.md b/docs/composition/file_patterns.md index 55b31aed..c38f5ddf 100644 --- a/docs/composition/file_patterns.md +++ b/docs/composition/file_patterns.md @@ -207,8 +207,8 @@ pattern[index] ## From file pattern to `PCollection` As covered in {doc}`index`, a recipe is composed of a sequence of Apache Beam transforms. -The data collection that Apache Beam transforms operates on is a -[`PCollection`](https://beam.apache.org/documentation/programming-guide/#pcollections). +The data Apache Beam transforms operate on are +[`PCollections`](https://beam.apache.org/documentation/programming-guide/#pcollections). Therefore, we bring the contents of a `FilePattern` into a recipe, we pass the index:url pairs generated by the file pattern's ``items()`` method into Beam's `Create` constructor as follows: diff --git a/docs/composition/index.md b/docs/composition/index.md index a766a41b..6a250d12 100644 --- a/docs/composition/index.md +++ b/docs/composition/index.md @@ -4,7 +4,7 @@ A recipe describes the steps to transform archival source data in one format / location into analysis-ready, cloud-optimized (ARCO) data in another format / -location. Technically, a recipe is as a set of composite +location. Technically, a recipe is a composite of [Apache Beam transforms](https://beam.apache.org/documentation/programming-guide/#transforms) applied to the data collection associated with a {doc}`file pattern `. To write a recipe: diff --git a/docs/composition/styles.md b/docs/composition/styles.md index a4e54675..4379e27a 100644 --- a/docs/composition/styles.md +++ b/docs/composition/styles.md @@ -24,6 +24,14 @@ the recipe pipeline will contain at a minimum the following transforms applied t * {class}`pangeo_forge_recipes.transforms.OpenURLWithFSSpec`: retrieves each pattern file using the specified URLs. * {class}`pangeo_forge_recipes.transforms.OpenWithXarray`: load each pattern file into an [`xarray.Dataset`](https://docs.xarray.dev/en/stable/generated/xarray.Dataset.html). * {class}`pangeo_forge_recipes.transforms.StoreToZarr`: generate a Zarr store by combining the datasets. +* {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates`: consolidate the Dimension Coordinates for dataset read performance. +* {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata`: calls Zarr's convinience function to consolidate metadata. + +```{tip} +If using the {class}`pangeo_forge_recipes.transforms.ConsolidateDimensionCoordinates` transform, make sure to chain on the {class}`pangeo_forge_recipes.transforms.ConsolidateMetadata` transform to your recipe. + +``` + ## Open with Kerchunk, write to virtual Zarr diff --git a/docs/contributing.md b/docs/contributing.md index e0ce9665..f6b21eee 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -163,10 +163,8 @@ pytest tests -v ## Releasing -To make a new release, first add [](./release_notes.md) for the release to the docs. +Navigate to and click "Draft a new release". -Then just go to -and click "Draft a new release". +![How to release gif](https://github.com/pangeo-forge/pangeo-forge-recipes/assets/15016780/c6132967-4f6d-49d9-96eb-48a687130f97) -The [release.yaml](https://github.com/pangeo-forge/pangeo-forge-recipes/blob/main/.github/workflows/release.yaml) -workflow should take care of the rest. +The [release.yaml](https://github.com/pangeo-forge/pangeo-forge-recipes/blob/main/.github/workflows/release.yaml) will be trigged and will publish the new version of `pangeo-forge-recipes` to pypi. diff --git a/docs/release_notes.md b/docs/release_notes.md deleted file mode 100644 index 83066d47..00000000 --- a/docs/release_notes.md +++ /dev/null @@ -1,150 +0,0 @@ -# Release Notes - -## v0.10.4 - 2023-11-15 - -- Add `dynamic_chunking_fn`/`dynamic_chunking_fn_kwargs` keywords to StoreToZarr. This allows users to pass a function that will be called at runtime to determine the target chunks for the resulting datasets based on the in memory representation/size of the recipe dataset. {pull}`595` - -## v0.10.3 - 2023-10-03 - -- Assign injection spec values for command line interface {pull}`566` -- Docs rewrite {pull}`610` -- Simplify reference recipe composition by nesting transforms {pull}`635` -- Reference transforms dependency and bugfixes {pull}`631` {pull}`632` - -## v0.10.2 - 2023-09-19 - -- Fix bug preventing use of multiple merge dims {pull}`591` -- Add parquet storage target for reference recipes {pull}`620` -- Support addition of dataset-level attrs for zarr recipes {pull}`622` -- Integration testing upgrades {pull}`590` {pull}`605` {pull}`607` {pull}`611` -- Add missing `py.typed` marker for mypy compatibility {pull}`613` - -## v0.10.1 - 2023-08-31 - -- Add sentinel as default for transform keyword arguments that are required at runtime and which -recipe developers may not want to set in recipe modules. This allows recipe modules to be importable -(i.e., unit-testable) and type-checkable during development. {pull}`588` -- `StoreToZarr` now emits a `zarr.storage.FSStore` which can be consumed by downstream transforms. -This is useful for opening and testing the completed zarr store, adding it to a catalog, etc. {pull}`574` -- Concurrency limiting transform added. This base transform can be used to limit concurrency for -calls to external services. It is now used internally to allow `OpenURLWithFSSpec` to be limited -to a specified maximum concurrency. {pull}`557` -- Various packaging, testing, and maintenance upgrades {pull}`565` {pull}`567` {pull}`576` -- Patched deserialization bug that affected rechunking on GCP Dataflow {pull}`548` - -## v0.10.0 - 2023-06-30 - -- **Major breaking change:** This release represents a nearly complete rewrite of -the package, removing the custom recipe constructor classes and executors, and -replacing them with a set of modular, domain-specific Apache Beam `PTransform`s, -which can be flexibly composed and executed on any Apache Beam runner. The documention has -been updated to reflect this change. As the first release following this major -rewrite, we expect bugs and documentation gaps to exist, and we look forward to -community support in finding and triaging those issues. A blog post and further -documentaion of the motivations for and opportunities created by this major -change is forthcoming. - -## v0.9 - 2022-05-11 - -- **Breaking changes:** Deprecated `XarrayZarrRecipe` manual stage methods. -Also deprecated `FilePattern(..., is_opendap=True)` kwarg, which is superseded by -`FilePattern(..., file_type="opendap")`. {pull}`362` -- Added `serialization` module along with `BaseRecipe.sha256` and `FilePattern.sha256` methods. -Collectively, this provides for generation of deterministic hashes for both recipe and file -pattern instances. Checking these hashes against those from a prior version of the recipe can be -used to determine whether or not a particular recipe instance in a Python module (which may -contain arbitrary numbers of recipe instances) has changed since the last time the instances in -that module were executed. The file pattern hashes are based on a merkle tree built cumulatively -from all of the index:filepath pairs yielded by the pattern's `self.items()` method. As such, in -cases where a new pattern is intended to append to an existing dataset which was built from a -prior version of that pattern, the pattern hash can be used to determine the index from which to -begin appending. This is demonstrated in the tests. {pull}`349` -- Created new Prefect executor which wraps the Dask executor in a single Task. -This should mitigate problems related to large numbers of Prefect Tasks ({issue}`347`). -- Implemented feature to cap cached filename lengths at 255 bytes on local filesystems, to -accomodate the POSIX filename length limit. Cached filename lengths are not truncated on any other -filesystem. {pull}`353` - -## v0.8.3 - 2022-04-19 - -- Added `.file_type` attribute to {class}`pangeo_forge_recipes.patterns.FilePattern`. This attribute will eventually supercede -`.is_opendap`, which will be deprecated in `0.9.0`. Until then, `FilePattern(..., is_opendap=True)` is supported as equivalent -to `FilePattern(..., file_type="opendap")`. {pull}`322` - -## v0.8.2 - 2022-02-23 - -- Removed click from dependencies and removed cli entrypoint. - -## v0.8.1 - 2022-02-23 - -- Fixed dependency issue with pip installation. -- Fixed bug where recipes would fail if the target chunks exceeded the full - array length. {issue}`279` - -## v0.8.0 - 2022-02-17 - -- **Breaking change:** Replace recipe classes' storage attibutes with `.storage_config` of type {class}`pangeo_forge_recipes.storage.StorageConfig`. {pull}`288` -- Add `setup_logging` convenience function. {pull}`287` - -## v0.7.0 - 2022-02-14 ❤️ - -- Apache Beam executor added. {issue}`169`. By [Alex Merose](https://github.com/alxmrs). -- Dask executor updates. {pull}`260` {pull}`261` -- Index type update. {pull}`257` -- Fix incompatibility with `fsspec>=2021.11.1`. {pull}`247` - -## v0.6.1 - 2021-10-25 - -- Major internal refactor of executors. {pull}`219`. - Began deprecation cycle for recipe methods (e.g. `recipe.prepare_target()`) in - favor of module functions. -- Addition of `open_input_with_fsspec_reference` option on {class}`pangeo_forge_recipes.recipes.XarrayZarrRecipe`, - permitting the bypassing of h5py when opening inputs. {pull}`218` - -## v0.6.0 - 2021-09-02 - -- Added {class}`pangeo_forge_recipes.recipes.HDFReferenceRecipe` class to create virtual Zarrs from collections of - NetCDF / HDF5 files. {pull}`174` -- Limit output from logging. {pull}`175` -- Change documentation structure. {pull}`178` -- Move `fsspec_open_kwargs` and `is_opendap` parameters - out of {class}`pangeo_forge_recipes.recipes.XarrayZarrRecipe` and into - {class}`pangeo_forge_recipes.patterns.FilePattern`. Add `query_string_secrets` - as attribute of {class}`pangeo_forge_recipes.patterns.FilePattern`. {pull}`167` - -## v0.5.0 - 2021-07-11 - -- Added `subset_inputs` option to {class}`pangeo_forge_recipes.recipes.XarrayZarrRecipe`. {issue}`93`, {pull}`166` -- Fixed file opening to eliminate HDF errors related to closed files. {issue}`170`, {pull}`171` -- Changed default behavior of executors so that the `cache_input` loop is always - run, regardless of the value of `cache_inputs`. {pull}`168` - -## v0.4.0 - 2021-06-25 - -- Fixed issue with recipe serialilzation causing high memory usage of Dask schedulers and workers when - executing recipes with Prefect or Dask {pull}`160`. -- Added new methods `.to_dask()`, `to_prefect()`, and `.to_function()` for converting a recipe - to one of the Dask, Prefect, or Python execution plans. The previous method, `recpie.to_pipelines()` - is now deprecated. - -## v0.3.4 - 2021-05-25 - -- Added `copy_pruned` method to {class}`pangeo_forge_recipes.recipes.XarrayZarrRecipe` to facilitate testing. -- Internal refactor of storage module. - -## v0.3.3 - 2021-05-10 - -Many feature enhancements. -Non-backwards compatible changes to core API. -Package renamed from `pangeo_forge` to `pangeo_forge_recipes`. - -There were problems with packaging for the 0.3.0-0.3.2 releases. - -## v0.2.0 - 2021-04-26 - -First release since major Spring 2021 overhaul. -This release depends on Xarray v0.17.1, which has not yet been released as of the date of this release. - -## v0.1.0 - 2020-10-22 - -First release. diff --git a/docs/requirements.txt b/docs/requirements.txt index a86360f1..735c4a70 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,9 +1,9 @@ -sphinx==6.2.1 +sphinx==7.2.6 pangeo-sphinx-book-theme==0.2 myst-nb==1.0.0 sphinx-copybutton==0.5.2 sphinx-togglebutton==0.3.2 -sphinx-autodoc-typehints==1.23.0 -sphinxext-opengraph==0.9.0 +sphinx-autodoc-typehints==2.0.0 +sphinxext-opengraph==0.9.1 sphinx-design==0.5.0 -e . diff --git a/examples/feedstock/gpcp_from_gcs.py b/examples/feedstock/gpcp_from_gcs.py index 9b13d59d..4a9d59ae 100644 --- a/examples/feedstock/gpcp_from_gcs.py +++ b/examples/feedstock/gpcp_from_gcs.py @@ -3,7 +3,13 @@ import zarr from pangeo_forge_recipes.patterns import ConcatDim, FilePattern -from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr +from pangeo_forge_recipes.transforms import ( + ConsolidateDimensionCoordinates, + ConsolidateMetadata, + OpenURLWithFSSpec, + OpenWithXarray, + StoreToZarr, +) dates = [ d.to_pydatetime().strftime("%Y%m%d") @@ -43,5 +49,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: store_name="gpcp.zarr", combine_dims=pattern.combine_dim_keys, ) + | ConsolidateDimensionCoordinates() + | ConsolidateMetadata() | "Test dataset" >> beam.Map(test_ds) ) diff --git a/examples/feedstock/hrrr_kerchunk_concat_step.py b/examples/feedstock/hrrr_kerchunk_concat_step.py index 22ae8a9e..53ab0f45 100644 --- a/examples/feedstock/hrrr_kerchunk_concat_step.py +++ b/examples/feedstock/hrrr_kerchunk_concat_step.py @@ -30,8 +30,9 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: ds = xr.open_dataset(store, engine="zarr", chunks={}) ds = ds.set_coords(("latitude", "longitude")) - assert ds.attrs["centre"] == "kwbc" - assert len(ds["step"]) == 4 + ds = ds.expand_dims(dim="time") + assert ds.attrs["GRIB_centre"] == "kwbc" + assert len(ds["step"]) == 2 assert len(ds["time"]) == 1 assert "t" in ds.data_vars for coord in ["time", "surface", "latitude", "longitude"]: @@ -51,7 +52,6 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: store_name="hrrr-concat-step", concat_dims=pattern.concat_dims, identical_dims=identical_dims, - precombine_inputs=True, ) | "Test dataset" >> beam.Map(test_ds) ) diff --git a/examples/feedstock/hrrr_kerchunk_concat_valid_time.py b/examples/feedstock/hrrr_kerchunk_concat_valid_time.py index bd4b0e6b..ad9176e9 100644 --- a/examples/feedstock/hrrr_kerchunk_concat_valid_time.py +++ b/examples/feedstock/hrrr_kerchunk_concat_valid_time.py @@ -2,6 +2,7 @@ https://projectpythia.org/kerchunk-cookbook/notebooks/case_studies/HRRR.html """ + from typing import Any import apache_beam as beam @@ -64,8 +65,9 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: store_name="hrrr-concat-valid-time", concat_dims=concat_dims, identical_dims=identical_dims, + # fails due to: _pickle.PicklingError: Can't pickle : attribute lookup drop_unknown on __main__ failed mzz_kwargs=dict(preprocess=drop_unknown), - precombine_inputs=True, ) | "Test dataset" >> beam.Map(test_ds) ) diff --git a/examples/feedstock/meta.yaml b/examples/feedstock/meta.yaml index 29fc5db0..e93e8a9c 100644 --- a/examples/feedstock/meta.yaml +++ b/examples/feedstock/meta.yaml @@ -7,3 +7,5 @@ recipes: object: "noaa_oisst:recipe" - id: "terraclimate" object: "terraclimate:recipe" + - id: "hrrr-kerchunk-concat-step" + object: "hrrr_kerchunk_concat_step:recipe" diff --git a/examples/feedstock/narr_opendap.py b/examples/feedstock/narr_opendap.py index 7d0a499c..773965c8 100644 --- a/examples/feedstock/narr_opendap.py +++ b/examples/feedstock/narr_opendap.py @@ -18,6 +18,7 @@ The data we will use are catalogged here (3D data on pressure levels): https://psl.noaa.gov/thredds/catalog/Datasets/NARR/pressure/catalog.html """ + import apache_beam as beam import xarray as xr import zarr diff --git a/examples/feedstock/noaa_oisst.py b/examples/feedstock/noaa_oisst.py index 03e2d986..52775d98 100644 --- a/examples/feedstock/noaa_oisst.py +++ b/examples/feedstock/noaa_oisst.py @@ -3,7 +3,13 @@ import zarr from pangeo_forge_recipes.patterns import ConcatDim, FilePattern -from pangeo_forge_recipes.transforms import OpenURLWithFSSpec, OpenWithXarray, StoreToZarr +from pangeo_forge_recipes.transforms import ( + ConsolidateDimensionCoordinates, + ConsolidateMetadata, + OpenURLWithFSSpec, + OpenWithXarray, + StoreToZarr, +) dates = pd.date_range("1981-09-01", "2022-02-01", freq="D") @@ -26,7 +32,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: # TODO: see if --setup-file option for runner fixes this import xarray as xr - ds = xr.open_dataset(store, engine="zarr", chunks={}) + ds = xr.open_dataset(store, engine="zarr", consolidated=True, chunks={}) for var in ["anom", "err", "ice", "sst"]: assert var in ds.data_vars return store @@ -40,5 +46,7 @@ def test_ds(store: zarr.storage.FSStore) -> zarr.storage.FSStore: store_name="noaa-oisst.zarr", combine_dims=pattern.combine_dim_keys, ) + | ConsolidateDimensionCoordinates() + | ConsolidateMetadata() | beam.Map(test_ds) ) diff --git a/examples/feedstock/terraclimate.py b/examples/feedstock/terraclimate.py index 197ef1a6..e70d87ea 100644 --- a/examples/feedstock/terraclimate.py +++ b/examples/feedstock/terraclimate.py @@ -10,6 +10,7 @@ - **Multiple variables in different files**: One file per year for a dozen different variables. - **Complex preprocessing**: We want to apply different preprocessing depending on the variable. """ + import apache_beam as beam import xarray as xr import zarr diff --git a/pangeo_forge_recipes/aggregation.py b/pangeo_forge_recipes/aggregation.py index 55086f7c..08e04806 100644 --- a/pangeo_forge_recipes/aggregation.py +++ b/pangeo_forge_recipes/aggregation.py @@ -278,9 +278,17 @@ def schema_to_zarr( target_store: zarr.storage.FSStore, target_chunks: Optional[Dict[str, int]] = None, attrs: Optional[Dict[str, str]] = None, + consolidated_metadata: Optional[bool] = True, + encoding: Optional[Dict] = None, ) -> zarr.storage.FSStore: """Initialize a zarr group based on a schema.""" ds = schema_to_template_ds(schema, specified_chunks=target_chunks, attrs=attrs) # using mode="w" makes this function idempotent - ds.to_zarr(target_store, mode="w", compute=False) + ds.to_zarr( + target_store, + mode="w", + compute=False, + consolidated=consolidated_metadata, + encoding=encoding, + ) return target_store diff --git a/pangeo_forge_recipes/chunk_grid.py b/pangeo_forge_recipes/chunk_grid.py index 927ac5f0..32eb1c6b 100644 --- a/pangeo_forge_recipes/chunk_grid.py +++ b/pangeo_forge_recipes/chunk_grid.py @@ -1,6 +1,7 @@ """ Abstract representation of ND chunked arrays """ + from __future__ import annotations import warnings diff --git a/pangeo_forge_recipes/combiners.py b/pangeo_forge_recipes/combiners.py index b100c839..c7f28ad7 100644 --- a/pangeo_forge_recipes/combiners.py +++ b/pangeo_forge_recipes/combiners.py @@ -1,13 +1,13 @@ import operator -from dataclasses import dataclass, field +import sys +from dataclasses import dataclass from functools import reduce -from typing import List, Sequence, Tuple +from typing import Callable, Sequence, TypeVar import apache_beam as beam -from kerchunk.combine import MultiZarrToZarr from .aggregation import XarrayCombineAccumulator, XarraySchema -from .types import CombineOp, Dimension, Index +from .types import CombineOp, Dimension, Index, Indexed @dataclass @@ -27,7 +27,7 @@ def create_accumulator(self) -> XarrayCombineAccumulator: concat_dim = self.dimension.name if self.dimension.operation == CombineOp.CONCAT else None return XarrayCombineAccumulator(concat_dim=concat_dim) - def add_input(self, accumulator: XarrayCombineAccumulator, item: Tuple[Index, XarraySchema]): + def add_input(self, accumulator: XarrayCombineAccumulator, item: Indexed[XarraySchema]): index, schema = item position = self.get_position(index) accumulator.add_input(schema, position) @@ -45,51 +45,50 @@ def extract_output(self, accumulator) -> dict: return accumulator.schema -@dataclass -class CombineMultiZarrToZarr(beam.CombineFn): - """A beam ``CombineFn`` for combining Kerchunk ``MultiZarrToZarr`` objects. - - :param concat_dims: Dimensions along which to concatenate inputs. - :param identical_dims: Dimensions shared among all inputs. - :mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. - :precombine_inputs: If ``True``, precombine each input with itself, using - ``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator. - Used for multi-message GRIB2 inputs, which produce > 1 reference when opened - with kerchunk's ``scan_grib`` function, and therefore need to be consolidated - into a single reference before adding to the accumulator. Also used for inputs - consisting of single reference, for cases where the output dataset concatenates - along a dimension that does not exist in the individual inputs. In this latter - case, precombining adds the additional dimension to the input so that its - dimensionality will match that of the accumulator. - """ +Element = TypeVar("Element") +Accumulator = TypeVar("Accumulator") - concat_dims: List[str] - identical_dims: List[str] - mzz_kwargs: dict = field(default_factory=dict) - precombine_inputs: bool = False - - def to_mzz(self, references): - return MultiZarrToZarr( - references, - concat_dims=self.concat_dims, - identical_dims=self.identical_dims, - **self.mzz_kwargs, - ) - - def create_accumulator(self): - return None - - def add_input(self, accumulator: MultiZarrToZarr, item: list[dict]) -> MultiZarrToZarr: - item = item if not self.precombine_inputs else [self.to_mzz(item).translate()] - if not accumulator: - references = item - else: - references = [accumulator.translate()] + item - return self.to_mzz(references) - - def merge_accumulators(self, accumulators: Sequence[MultiZarrToZarr]) -> MultiZarrToZarr: - references = [a.translate() for a in accumulators] - return self.to_mzz(references) - - def extract_output(self, accumulator: MultiZarrToZarr) -> MultiZarrToZarr: - return accumulator + +def build_reduce_fn( + accumulate_op: Callable[[Element, Element], Accumulator], + merge_op: Callable[[Accumulator, Accumulator], Accumulator], + initializer: Accumulator, +) -> beam.CombineFn: + """Factory to construct reducers without so much ceremony""" + + class AnonymousCombineFn(beam.CombineFn): + def create_accumulator(self): + return initializer + + def add_input(self, accumulator, input): + return accumulate_op(accumulator, input) + + def merge_accumulators(self, accumulators): + acc = accumulators[0] + for accumulator in accumulators[1:]: + acc = merge_op(acc, accumulator) + return acc + + def extract_output(self, accumulator): + return accumulator + + return AnonymousCombineFn + + +# Find minimum/maximum/count values. +# The count is done as a slight optimization to avoid multiple passes across the distribution. +# Note: MyPy struggles with type inference here due to the high degree of genericity. + +MinMaxCountCombineFn = build_reduce_fn( + accumulate_op=lambda acc, input: ( + min(acc[0], input), # type: ignore + max(acc[1], input), # type: ignore + acc[2] + 1, # type: ignore + ), + merge_op=lambda accLeft, accRight: ( + min(accLeft[0], accRight[0]), # type: ignore + max(accLeft[1], accRight[1]), # type: ignore + accLeft[2] + accRight[2], # type: ignore + ), + initializer=(sys.maxsize, -sys.maxsize, 0), +) diff --git a/pangeo_forge_recipes/injections.py b/pangeo_forge_recipes/injections.py index 9d2ac148..8352dbfe 100644 --- a/pangeo_forge_recipes/injections.py +++ b/pangeo_forge_recipes/injections.py @@ -3,6 +3,9 @@ def get_injection_specs(): "StoreToZarr": { "target_root": "TARGET_STORAGE", }, + "WriteReference": { + "target_root": "TARGET_STORAGE", + }, "WriteCombinedReference": { "target_root": "TARGET_STORAGE", }, diff --git a/pangeo_forge_recipes/patterns.py b/pangeo_forge_recipes/patterns.py index 13d1e592..dbe7d224 100644 --- a/pangeo_forge_recipes/patterns.py +++ b/pangeo_forge_recipes/patterns.py @@ -1,6 +1,7 @@ """ Filename / URL patterns. """ + # This allows us to type annotate a method on a class as returning # that class, which is otherwise impossible as that class has not # been fully defined yet! https://peps.python.org/pep-0563/ diff --git a/pangeo_forge_recipes/rechunking.py b/pangeo_forge_recipes/rechunking.py index 57bbd542..bab9fcc9 100644 --- a/pangeo_forge_recipes/rechunking.py +++ b/pangeo_forge_recipes/rechunking.py @@ -6,6 +6,7 @@ import numpy as np import xarray as xr +import zarr from .aggregation import XarraySchema, determine_target_chunks from .chunk_grid import ChunkGrid @@ -238,3 +239,43 @@ def _sort_by_speed_of_varying(item): ds_combined = xr.combine_nested(dsets_to_concat, concat_dim=concat_dims_sorted) return first_index, ds_combined + + +def _gather_coordinate_dimensions(group: zarr.Group) -> List[str]: + return list( + set(itertools.chain(*(group[var].attrs.get("_ARRAY_DIMENSIONS", []) for var in group))) + ) + + +def consolidate_dimension_coordinates( + singleton_target_store: zarr.storage.FSStore, +) -> zarr.storage.FSStore: + """Consolidate dimension coordinates chunking""" + group = zarr.open_group(singleton_target_store) + + dims = (dim for dim in _gather_coordinate_dimensions(group) if dim in group) + for dim in dims: + arr = group[dim] + attrs = dict(arr.attrs) + data = arr[:] + + # This will generally use bulk-delete API calls + # config.storage_config.target.rm(dim, recursive=True) + + singleton_target_store.fs.rm(singleton_target_store.path + "/" + dim, recursive=True) + + new = group.array( + dim, + data, + chunks=arr.shape, + dtype=arr.dtype, + compressor=arr.compressor, + fill_value=arr.fill_value, + order=arr.order, + filters=arr.filters, + overwrite=True, + ) + + new.attrs.update(attrs) + + return singleton_target_store diff --git a/pangeo_forge_recipes/serialization.py b/pangeo_forge_recipes/serialization.py index edeb56ff..49b0b4df 100644 --- a/pangeo_forge_recipes/serialization.py +++ b/pangeo_forge_recipes/serialization.py @@ -1,5 +1,5 @@ from collections.abc import Collection -from dataclasses import asdict +from dataclasses import asdict, is_dataclass from enum import Enum from hashlib import sha256 from json import dumps @@ -53,7 +53,7 @@ def dict_drop_empty(pairs: Sequence[Sequence]) -> dict: return dict((k, v) for k, v in pairs if not (v is None or not v and isinstance(v, Collection))) -def dataclass_sha256(dclass: type, ignore_keys: List[str]) -> bytes: +def dataclass_sha256(dclass: Any, ignore_keys: List[str]) -> bytes: """Generate a deterministic sha256 hash from a Python ``dataclass``. Fields for which the value is either ``None`` or an empty collection are excluded from the hash calculation automatically. To manually exclude other fields from the calculation, pass their names via ``igonore_keys``. @@ -63,6 +63,8 @@ def dataclass_sha256(dclass: type, ignore_keys: List[str]) -> bytes: :param dclass: The dataclass for which to calculate a hash. :param ignore_keys: A list of field names to exclude from the hash calculation. """ + if not is_dataclass(dclass): + raise ValueError("dclass must be an instance of a dataclass") d = asdict(dclass, dict_factory=dict_drop_empty) for k in ignore_keys: diff --git a/pangeo_forge_recipes/storage.py b/pangeo_forge_recipes/storage.py index 42e888f6..7f190b54 100644 --- a/pangeo_forge_recipes/storage.py +++ b/pangeo_forge_recipes/storage.py @@ -9,8 +9,8 @@ import unicodedata from abc import ABC, abstractmethod from contextlib import contextmanager -from dataclasses import dataclass, replace -from typing import Iterator, Optional, Union +from dataclasses import dataclass, field, replace +from typing import Any, Dict, Iterator, Optional, Union from urllib.parse import parse_qs, urlencode, urlparse, urlunparse import fsspec @@ -83,14 +83,18 @@ def _hash_path(path: str) -> str: @dataclass class FSSpecTarget(AbstractTarget): - """Representation of a storage target for Pangeo Forge. + """ + Representation of a storage target for Pangeo Forge. :param fs: The filesystem object we are writing to. :param root_path: The path under which the target data will be stored. + :param fsspec_kwargs: The fsspec kwargs that can be reused as + `target_options` and `remote_options` for fsspec class instantiation """ fs: fsspec.AbstractFileSystem root_path: str = "" + fsspec_kwargs: Dict[Any, Any] = field(default_factory=dict) def __truediv__(self, suffix: str) -> FSSpecTarget: """ @@ -106,6 +110,20 @@ def from_url(cls, url: str): assert len(root_paths) == 1 return cls(fs, root_paths[0]) + def get_fsspec_remote_protocol(self): + """fsspec implementation-specific remote protocal""" + fsspec_protocol = self.fs.protocol + if isinstance(fsspec_protocol, str): + return fsspec_protocol + elif isinstance(fsspec_protocol, tuple): + return fsspec_protocol[0] + elif isinstance(fsspec_protocol, list): + return fsspec_protocol[0] + else: + raise ValueError( + f"could not resolve fsspec protocol '{fsspec_protocol}' from underlying filesystem" + ) + def get_mapper(self) -> fsspec.mapping.FSMap: """Get a mutable mapping object suitable for storing Zarr data.""" return FSStore(self.root_path, fs=self.fs) diff --git a/pangeo_forge_recipes/transforms.py b/pangeo_forge_recipes/transforms.py index 0e121684..adc9881d 100644 --- a/pangeo_forge_recipes/transforms.py +++ b/pangeo_forge_recipes/transforms.py @@ -1,29 +1,38 @@ from __future__ import annotations import logging +import math import random import sys from dataclasses import dataclass, field -from typing import Callable, Dict, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union +from typing import Callable, Dict, List, Optional, Tuple, TypeVar, Union # PEP612 Concatenate & ParamSpec are useful for annotating decorators, but their import # differs between Python versions 3.9 & 3.10. See: https://stackoverflow.com/a/71990006 if sys.version_info < (3, 10): - from typing_extensions import Concatenate, ParamSpec + from typing_extensions import ParamSpec else: - from typing import Concatenate, ParamSpec + from typing import ParamSpec import apache_beam as beam +import fsspec import xarray as xr import zarr +from kerchunk.combine import MultiZarrToZarr from .aggregation import XarraySchema, dataset_to_schema, schema_to_template_ds, schema_to_zarr -from .combiners import CombineMultiZarrToZarr, CombineXarraySchemas +from .combiners import CombineXarraySchemas, MinMaxCountCombineFn from .openers import open_url, open_with_kerchunk, open_with_xarray from .patterns import CombineOp, Dimension, FileType, Index, augment_index_with_start_stop -from .rechunking import combine_fragments, split_fragment +from .rechunking import combine_fragments, consolidate_dimension_coordinates, split_fragment from .storage import CacheFSSpecTarget, FSSpecTarget -from .writers import ZarrWriterMixin, store_dataset_fragment, write_combined_reference +from .types import Indexed +from .writers import ( + ZarrWriterMixin, + consolidate_metadata, + store_dataset_fragment, + write_combined_reference, +) logger = logging.getLogger(__name__) @@ -59,12 +68,11 @@ # Ideally each PTransform should be a simple Map or DoFn calling out to function # from other modules - T = TypeVar("T") -Indexed = Tuple[Index, T] +IndexedArg = Indexed[T] R = TypeVar("R") -IndexedReturn = Tuple[Index, R] +IndexedReturn = Indexed[R] P = ParamSpec("P") @@ -82,49 +90,6 @@ class RequiredAtRuntimeDefault: pass -# TODO: replace with beam.MapTuple? -def _add_keys( - func: Callable[Concatenate[T, P], R], -) -> Callable[Concatenate[Indexed, P], IndexedReturn]: - """Convenience decorator to remove and re-add keys to items in a Map""" - annotations = func.__annotations__.copy() - arg_name, annotation = next(iter(annotations.items())) - annotations[arg_name] = Tuple[Index, annotation] - return_annotation = annotations["return"] - annotations["return"] = Tuple[Index, return_annotation] - - # @wraps(func) # doesn't work for some reason - def wrapper(arg, *args: P.args, **kwargs: P.kwargs): - key, item = arg - result = func(item, *args, **kwargs) - return key, result - - wrapper.__annotations__ = annotations - return wrapper - - -def _add_keys_iter( - func: Callable[Concatenate[T, P], R], -) -> Callable[Concatenate[Iterable[Indexed], P], Iterator[IndexedReturn]]: - """Convenience decorator to iteratively remove and re-add keys to items in a FlatMap""" - annotations = func.__annotations__.copy() - arg_name, annotation = next(iter(annotations.items())) - return_annotation = annotations["return"] - - # mypy doesn't view `annotation` and `return_annotation` as valid types, so ignore - annotations[arg_name] = Iterable[Tuple[Index, annotation]] # type: ignore - annotations["return"] = Iterator[Tuple[Index, return_annotation]] # type: ignore - - def iterable_wrapper(arg, *args: P.args, **kwargs: P.kwargs): - for inner_item in arg: - key, item = inner_item - result = func(item, *args, **kwargs) - yield key, result - - iterable_wrapper.__annotations__ = annotations - return iterable_wrapper - - def _assign_concurrency_group(elem, max_concurrency: int): return (random.randint(0, max_concurrency - 1), elem) @@ -151,15 +116,22 @@ class MapWithConcurrencyLimit(beam.PTransform): def expand(self, pcoll): return ( - pcoll | self.fn.__name__ >> beam.Map(_add_keys(self.fn), *self.args, **self.kwargs) + pcoll + | self.fn.__name__ + >> beam.MapTuple(lambda k, v: (k, self.fn(v, *self.args, **self.kwargs))) if not self.max_concurrency else ( pcoll - | beam.Map(_assign_concurrency_group, self.max_concurrency) - | beam.GroupByKey() - | beam.Values() + | "Assign concurrency key" + >> beam.Map(_assign_concurrency_group, self.max_concurrency) + | "Group together by concurrency key" >> beam.GroupByKey() + | "Drop concurrency key" >> beam.Values() | f"{self.fn.__name__} (max_concurrency={self.max_concurrency})" - >> beam.FlatMap(_add_keys_iter(self.fn), *self.args, **self.kwargs) + >> beam.FlatMap( + lambda kvlist: [ + (kv[0], self.fn(kv[1], *self.args, **self.kwargs)) for kv in kvlist + ] + ) ) ) @@ -211,36 +183,30 @@ class OpenWithKerchunk(beam.PTransform): :param kerchunk_open_kwargs: Additional kwargs to pass to kerchunk opener. Any kwargs which are specific to a particular input file type should be passed here; e.g., ``{"filter": ...}`` for GRIB; ``{"max_chunk_size": ...}`` for NetCDF3, etc. - :param drop_keys: If True, remove Pangeo Forge's FilePattern keys from the output PCollection - before returning. This is the default behavior, which is used for cases where the output - PCollection of references is passed to the ``CombineReferences`` transform for creation of a - Kerchunk reference dataset as the target dataset of the pipeline. If this transform is used - for other use cases (e.g., opening inputs for creation of another target dataset type), you - may want to set this option to False to preserve the keys on the output PCollection. """ # passed directly to `open_with_kerchunk` file_type: FileType = FileType.unknown inline_threshold: Optional[int] = 300 - storage_options: Optional[Dict] = None + storage_options: Optional[Dict] = field(default_factory=dict) remote_protocol: Optional[str] = None kerchunk_open_kwargs: Optional[dict] = field(default_factory=dict) - # not passed to `open_with_kerchunk` - drop_keys: bool = True - def expand(self, pcoll): - refs = pcoll | "Open with Kerchunk" >> beam.Map( - _add_keys(open_with_kerchunk), - file_type=self.file_type, - inline_threshold=self.inline_threshold, - storage_options=self.storage_options, - remote_protocol=self.remote_protocol, - kerchunk_open_kwargs=self.kerchunk_open_kwargs, + return pcoll | "Open with Kerchunk" >> beam.MapTuple( + lambda k, v: ( + k, + open_with_kerchunk( + v, + file_type=self.file_type, + inline_threshold=self.inline_threshold, + storage_options=self.storage_options, + remote_protocol=self.remote_protocol, + kerchunk_open_kwargs=self.kerchunk_open_kwargs, + ), + ) ) - return refs if not self.drop_keys else refs | beam.Values() - @dataclass class OpenWithXarray(beam.PTransform): @@ -261,12 +227,17 @@ class OpenWithXarray(beam.PTransform): xarray_open_kwargs: Optional[dict] = field(default_factory=dict) def expand(self, pcoll): - return pcoll | "Open with Xarray" >> beam.Map( - _add_keys(open_with_xarray), - file_type=self.file_type, - load=self.load, - copy_to_local=self.copy_to_local, - xarray_open_kwargs=self.xarray_open_kwargs, + return pcoll | "Open with Xarray" >> beam.MapTuple( + lambda k, v: ( + k, + open_with_xarray( + v, + file_type=self.file_type, + load=self.load, + copy_to_local=self.copy_to_local, + xarray_open_kwargs=self.xarray_open_kwargs, + ), + ) ) @@ -294,7 +265,7 @@ def expand(self, pcoll): @dataclass class DatasetToSchema(beam.PTransform): def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - return pcoll | beam.Map(_add_keys(dataset_to_schema)) + return pcoll | beam.MapTuple(lambda k, v: (k, dataset_to_schema(v))) @dataclass @@ -308,7 +279,7 @@ class DetermineSchema(beam.PTransform): combine_dims: List[Dimension] def expand(self, pcoll: beam.PCollection) -> beam.PCollection: - schemas = pcoll | beam.Map(_add_keys(dataset_to_schema)) + schemas = pcoll | beam.MapTuple(lambda k, v: (k, dataset_to_schema(v))) cdims = self.combine_dims.copy() while len(cdims) > 0: last_dim = cdims.pop() @@ -349,7 +320,8 @@ def expand(self, pcoll: beam.PCollection): @dataclass class PrepareZarrTarget(beam.PTransform): - """From a singleton PCollection containing a dataset schema, initialize a + """ + From a singleton PCollection containing a dataset schema, initialize a Zarr store with the correct variables, dimensions, attributes and chunking. Note that the dimension coordinates will be initialized with dummy values. @@ -359,11 +331,23 @@ class PrepareZarrTarget(beam.PTransform): If chunking is present in the schema for a given dimension, the length of the first fragment will be used. Otherwise, the dimension will not be chunked. :param attrs: Extra group-level attributes to inject into the dataset. + :param encoding: Dictionary describing encoding for xarray.to_zarr() + :param consolidated_metadata: Bool controlling if xarray.to_zarr() + writes consolidated metadata. Default's to False. In StoreToZarr, + always default to unconsolidated. This leaves it up to the + user whether or not they want to consolidate with + ConsolidateMetadata(). Also, it prevents a broken/inconsistent + state that could arise from metadata being consolidated here, and + then falling out of sync with coordinates if + ConsolidateDimensionCoordinates() is applied to the output of + StoreToZarr(). """ target: str | FSSpecTarget target_chunks: Dict[str, int] = field(default_factory=dict) attrs: Dict[str, str] = field(default_factory=dict) + consolidated_metadata: Optional[bool] = True + encoding: Optional[dict] = field(default_factory=dict) def expand(self, pcoll: beam.PCollection) -> beam.PCollection: if isinstance(self.target, str): @@ -372,14 +356,18 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: target = self.target store = target.get_mapper() initialized_target = pcoll | beam.Map( - schema_to_zarr, target_store=store, target_chunks=self.target_chunks, attrs=self.attrs + schema_to_zarr, + target_store=store, + target_chunks=self.target_chunks, + attrs=self.attrs, + encoding=self.encoding, + consolidated_metadata=False, ) return initialized_target @dataclass class StoreDatasetFragments(beam.PTransform): - target_store: beam.PCollection # side input def expand(self, pcoll: beam.PCollection) -> beam.PCollection: @@ -388,9 +376,13 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: ) -# TODO -# - consolidate coords -# - consolidate metadata +@dataclass +class ConsolidateMetadata(beam.PTransform): + """Calls Zarr Python consolidate_metadata on an existing Zarr store + (https://zarr.readthedocs.io/en/stable/_modules/zarr/convenience.html#consolidate_metadata)""" + + def expand(self, pcoll: beam.PCollection) -> beam.PCollection: + return pcoll | beam.Map(consolidate_metadata) @dataclass @@ -412,37 +404,172 @@ def expand(self, pcoll: beam.PCollection) -> beam.PCollection: return new_fragments +class ConsolidateDimensionCoordinates(beam.PTransform): + def expand( + self, pcoll: beam.PCollection[zarr.storage.FSStore] + ) -> beam.PCollection[zarr.storage.FSStore]: + return pcoll | beam.Map(consolidate_dimension_coordinates) + + @dataclass class CombineReferences(beam.PTransform): """Combines Kerchunk references into a single reference dataset. :param concat_dims: Dimensions along which to concatenate inputs. :param identical_dims: Dimensions shared among all inputs. - :mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. - :precombine_inputs: If ``True``, precombine each input with itself, using - ``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator. - Used for multi-message GRIB2 inputs, which produce > 1 reference when opened - with kerchunk's ``scan_grib`` function, and therefore need to be consolidated - into a single reference before adding to the accumulator. Also used for inputs - consisting of single reference, for cases where the output dataset concatenates - along a dimension that does not exist in the individual inputs. In this latter - case, precombining adds the additional dimension to the input so that its - dimensionality will match that of the accumulator. + :param target_options: Storage options for opening target files + :param remote_options: Storage options for opening remote files + :param remote_protocol: If files are accessed over the network, provide the remote protocol + over which they are accessed. e.g.: "s3", "gcp", "https", etc. + :param max_refs_per_merge: Maximum number of references to combine in a single merge operation. + :param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. """ concat_dims: List[str] identical_dims: List[str] + target_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) + remote_options: Optional[Dict] = field(default_factory=lambda: {"anon": True}) + remote_protocol: Optional[str] = None + max_refs_per_merge: int = 5 + mzz_kwargs: dict = field(default_factory=dict) + + def __post_init__(self): + """Store chosen sort dimension to keep things DRY""" + # last element chosen here to follow the xarray `pop` choice above + self.sort_dimension = self.concat_dims[-1] + + def to_mzz(self, references): + """Converts references into a MultiZarrToZarr object with configured parameters.""" + return MultiZarrToZarr( + references, + concat_dims=self.concat_dims, + identical_dims=self.identical_dims, + target_options=self.target_options, + remote_options=self.remote_options, + remote_protocol=self.remote_protocol, + **self.mzz_kwargs, + ) + + def handle_gribs(self, indexed_references: Tuple[Index, list[dict]]) -> Tuple[Index, dict]: + """Handles the special case of GRIB format files by combining multiple references.""" + + references = indexed_references[1] + idx = indexed_references[0] + if len(references) > 1: + ref = self.to_mzz(references).translate() + return (idx, ref) + elif len(references) == 1: + return (idx, references[0]) + else: + raise ValueError("No references produced for {idx}. Expected at least 1.") + + def bucket_by_position( + self, + indexed_references: Tuple[Index, dict], + global_position_min_max_count: Tuple[int, int, int], + ) -> Tuple[int, dict]: + """ + Assigns a bucket based on the index position to order data during GroupByKey. + + :param indexed_references: A tuple containing the index and the reference dictionary. The + index is used to determine the reference's position within the global data order. + :param global_position_min_max_count: A tuple containing the global minimum and maximum + positions and the total count of references. These values are used to determine the + range and distribution of buckets. + :returns: A tuple where the first element is the bucket number (an integer) assigned to the + reference, and the second element is the original reference dictionary. + """ + idx = indexed_references[0] + ref = indexed_references[1] + global_min, global_max, global_count = global_position_min_max_count + + position = idx.find_position(self.sort_dimension) + + # Calculate the total range size based on global minimum and maximum positions + # And asserts the distribution is contiguous/uniform or dump warning + expected_range_size = global_max - global_min + 1 # +1 to include both ends + if expected_range_size != global_count: + logger.warning("The distribution of indexes is not contiguous/uniform") + + # Determine the number of buckets needed, based on the maximum references allowed per merge + num_buckets = math.ceil(global_count / self.max_refs_per_merge) + + # Calculate the total range size based on global minimum and maximum positions. + range_size = global_max - global_min + + # Calculate the size of each bucket by dividing the total range by the number of buckets + bucket_size = range_size / num_buckets + + # Assign the current reference to a bucket based on its position. + # The bucket number is determined by how far the position is from the global minimum, + # divided by the size of each bucket. + bucket = int((position - global_min) / bucket_size) + + return bucket, ref + + def global_combine_refs(self, refs) -> fsspec.FSMap: + """Performs a global combination of references to produce the final dataset.""" + return fsspec.filesystem( + "reference", + fo=self.to_mzz(refs).translate(), + storage_options={ + "remote_protocol": self.remote_protocol, + "skip_instance_cache": True, + }, + ).get_mapper() + + def expand(self, reference_lists: beam.PCollection) -> beam.PCollection: + min_max_count_positions = ( + reference_lists + | "Get just the positions" + >> beam.MapTuple(lambda k, v: k.find_position(self.sort_dimension)) + | "Get minimum/maximum positions" >> beam.CombineGlobally(MinMaxCountCombineFn()) + ) + return ( + reference_lists + | "Handle special case of gribs" >> beam.Map(self.handle_gribs) + | "Bucket to preserve order" + >> beam.Map( + self.bucket_by_position, + global_position_min_max_count=beam.pvalue.AsSingleton(min_max_count_positions), + ) + | "Group by buckets for ordering" >> beam.GroupByKey() + | "Distributed reduce" >> beam.MapTuple(lambda k, refs: self.to_mzz(refs).translate()) + | "Assign global key for collecting to executor" >> beam.Map(lambda ref: (None, ref)) + | "Group globally" >> beam.GroupByKey() + | "Global reduce" >> beam.MapTuple(lambda k, refs: self.global_combine_refs(refs)) + ) + + +@dataclass +class WriteReference(beam.PTransform, ZarrWriterMixin): + """ + Store a singleton PCollection consisting of a ``kerchunk.combine.MultiZarrToZarr`` object. + + :param store_name: Zarr store will be created with this name under ``target_root``. + :param concat_dims: Dimensions along which to concatenate inputs. + :param target_root: Root path the Zarr store will be created inside; ``store_name`` + will be appended to this prefix to create a full path. + :param output_file_name: Name to give the output references file (``.json`` or ``.parquet`` + suffix) over which they are accessed. e.g.: "s3", "gcp", "https", etc. + :param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. + """ + + store_name: str + concat_dims: List[str] + target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( + default_factory=RequiredAtRuntimeDefault + ) + output_file_name: str = "reference.json" mzz_kwargs: dict = field(default_factory=dict) - precombine_inputs: bool = False def expand(self, references: beam.PCollection) -> beam.PCollection: - return references | beam.CombineGlobally( - CombineMultiZarrToZarr( - concat_dims=self.concat_dims, - identical_dims=self.identical_dims, - mzz_kwargs=self.mzz_kwargs, - precombine_inputs=self.precombine_inputs, - ), + return references | beam.Map( + write_combined_reference, + full_target=self.get_full_target(), + concat_dims=self.concat_dims, + output_file_name=self.output_file_name, + mzz_kwargs=self.mzz_kwargs, ) @@ -454,16 +581,11 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): :param concat_dims: Dimensions along which to concatenate inputs. :param identical_dims: Dimensions shared among all inputs. :param mzz_kwargs: Additional kwargs to pass to ``kerchunk.combine.MultiZarrToZarr``. - :param precombine_inputs: If ``True``, precombine each input with itself, using - ``kerchunk.combine.MultiZarrToZarr``, before adding it to the accumulator. - Used for multi-message GRIB2 inputs, which produce > 1 reference when opened - with kerchunk's ``scan_grib`` function, and therefore need to be consolidated - into a single reference before adding to the accumulator. Also used for inputs - consisting of single reference, for cases where the output dataset concatenates - along a dimension that does not exist in the individual inputs. In this latter - case, precombining adds the additional dimension to the input so that its - dimensionality will match that of the accumulator. - :param target_root: Root path the Zarr store will be created inside; ``store_name`` + :param remote_options: options to pass to ``kerchunk.combine.MultiZarrToZarr`` + to read reference inputs (can include credentials). + :param remote_protocol: If files are accessed over the network, provide the remote protocol + over which they are accessed. e.g.: "s3", "https", etc. + :param target_root: Output root path the store will be created inside; ``store_name`` will be appended to this prefix to create a full path. :param output_file_name: Name to give the output references file (``.json`` or ``.parquet`` suffix). @@ -473,25 +595,30 @@ class WriteCombinedReference(beam.PTransform, ZarrWriterMixin): concat_dims: List[str] identical_dims: List[str] mzz_kwargs: dict = field(default_factory=dict) - precombine_inputs: bool = False + remote_options: Optional[Dict] = field(default_factory=dict) + remote_protocol: Optional[str] = None target_root: Union[str, FSSpecTarget, RequiredAtRuntimeDefault] = field( default_factory=RequiredAtRuntimeDefault ) output_file_name: str = "reference.json" - def expand(self, references: beam.PCollection) -> beam.PCollection: - reference = references | CombineReferences( - concat_dims=self.concat_dims, - identical_dims=self.identical_dims, - mzz_kwargs=self.mzz_kwargs, - precombine_inputs=self.precombine_inputs, - ) - - return reference | beam.Map( - write_combined_reference, - full_target=self.get_full_target(), - concat_dims=self.concat_dims, - output_file_name=self.output_file_name, + def expand(self, references: beam.PCollection) -> beam.PCollection[zarr.storage.FSStore]: + return ( + references + | CombineReferences( + concat_dims=self.concat_dims, + identical_dims=self.identical_dims, + target_options=self.remote_options, + remote_options=self.remote_options, + remote_protocol=self.remote_protocol, + mzz_kwargs=self.mzz_kwargs, + ) + | WriteReference( + store_name=self.store_name, + concat_dims=self.concat_dims, + target_root=self.target_root, + output_file_name=self.output_file_name, + ) ) @@ -506,6 +633,9 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): `store_name` will be appended to this prefix to create a full path. :param target_chunks: Dictionary mapping dimension names to chunks sizes. If a dimension is a not named, the chunks will be inferred from the data. + :param consolidate_dimension_coordinates: Whether to rewrite coordinate variables as a + single chunk. We recommend consolidating coordinate variables to avoid + many small read requests to get the coordinates in xarray. Defaults to ``True``. :param dynamic_chunking_fn: Optionally provide a function that takes an ``xarray.Dataset`` template dataset as its first argument and returns a dynamically generated chunking dict. If provided, ``target_chunks`` cannot also be passed. You can use this to determine chunking @@ -514,6 +644,8 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): out https://github.com/jbusecke/dynamic_chunks :param dynamic_chunking_fn_kwargs: Optional keyword arguments for ``dynamic_chunking_fn``. :param attrs: Extra group-level attributes to inject into the dataset. + + :param encoding: Dictionary encoding for xarray.to_zarr(). """ # TODO: make it so we don't have to explicitly specify combine_dims @@ -527,6 +659,7 @@ class StoreToZarr(beam.PTransform, ZarrWriterMixin): dynamic_chunking_fn: Optional[Callable[[xr.Dataset], dict]] = None dynamic_chunking_fn_kwargs: Optional[dict] = field(default_factory=dict) attrs: Dict[str, str] = field(default_factory=dict) + encoding: Optional[dict] = field(default_factory=dict) def __post_init__(self): if self.target_chunks and self.dynamic_chunking_fn: @@ -547,11 +680,13 @@ def expand( | beam.Map(self.dynamic_chunking_fn, **self.dynamic_chunking_fn_kwargs) ) ) + logger.info(f"Storing Zarr with {target_chunks =} to {self.get_full_target()}") rechunked_datasets = indexed_datasets | Rechunk(target_chunks=target_chunks, schema=schema) target_store = schema | PrepareZarrTarget( target=self.get_full_target(), target_chunks=target_chunks, attrs=self.attrs, + encoding=self.encoding, ) n_target_stores = rechunked_datasets | StoreDatasetFragments(target_store=target_store) singleton_target_store = ( @@ -559,7 +694,5 @@ def expand( | beam.combiners.Sample.FixedSizeGlobally(1) | beam.FlatMap(lambda x: x) # https://stackoverflow.com/a/47146582 ) - # TODO: optionally use `singleton_target_store` to - # consolidate metadata and/or coordinate dims here return singleton_target_store diff --git a/pangeo_forge_recipes/types.py b/pangeo_forge_recipes/types.py index 5e514bef..6dd5d975 100644 --- a/pangeo_forge_recipes/types.py +++ b/pangeo_forge_recipes/types.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from enum import Enum -from typing import Dict, Optional +from typing import Dict, Optional, Tuple, TypeVar class CombineOp(Enum): @@ -72,3 +72,15 @@ def find_concat_dim(self, dim_name: str) -> Optional[Dimension]: return None else: return possible_concat_dims[0] + + def find_position(self, dim_name: str) -> int: + dimension = self.find_concat_dim(dim_name) + if dimension: + return self[dimension].value + else: + raise ValueError(f"No dimension found with name {dim_name}") + + +# A convenience type to represent an indexed value +T = TypeVar("T") +Indexed = Tuple[Index, T] diff --git a/pangeo_forge_recipes/writers.py b/pangeo_forge_recipes/writers.py index 457495fe..fecd4d0f 100644 --- a/pangeo_forge_recipes/writers.py +++ b/pangeo_forge_recipes/writers.py @@ -1,10 +1,11 @@ import os -from typing import List, Protocol, Tuple, Union +from typing import Dict, List, MutableMapping, Optional, Protocol, Tuple, Union +import fsspec import numpy as np import xarray as xr import zarr -from fsspec.implementations.reference import LazyReferenceMapper +from fsspec.implementations.reference import LazyReferenceMapper, ReferenceFileSystem from kerchunk.combine import MultiZarrToZarr from .patterns import CombineOp, Index @@ -66,6 +67,28 @@ def _is_first_in_merge_dim(index): return True +def consolidate_metadata(store: MutableMapping) -> MutableMapping: + """Consolidate metadata for a Zarr store + + :param store: Input Store for Zarr + :type store: MutableMapping + :return: Output Store + :rtype: zarr.storage.FSStore + """ + + import zarr + + if isinstance(store, fsspec.FSMap) and isinstance(store.fs, ReferenceFileSystem): + raise ValueError( + """Creating consolidated metadata for Kerchunk references should not + yield a performance benefit so consolidating metadata is not supported.""" + ) + if isinstance(store, zarr.storage.FSStore): + zarr.convenience.consolidate_metadata(store) + + return store + + def store_dataset_fragment( item: Tuple[Index, xr.Dataset], target_store: zarr.storage.FSStore ) -> zarr.storage.FSStore: @@ -77,7 +100,6 @@ def store_dataset_fragment( index, ds = item zgroup = zarr.open_group(target_store) - # TODO: check that the dataset and the index are compatible # only store coords if this is the first item in a merge dim @@ -93,62 +115,70 @@ def store_dataset_fragment( return target_store -def _select_single_protocol(full_target: FSSpecTarget) -> str: - # Grabs first protocol if there are multiple options: Based off of logic in fsspec: - # https://github.com/fsspec/filesystem_spec/blob/b8aeb13361e89f22f323bbc93c8308ff2ffede19/fsspec/spec.py#L1410-L1414 - return ( - full_target.fs.protocol[0] - if isinstance(full_target.fs.protocol, (tuple, list)) - else full_target.fs.protocol - ) - - def write_combined_reference( - reference: MultiZarrToZarr, + reference: MutableMapping, full_target: FSSpecTarget, concat_dims: List[str], output_file_name: str, - refs_per_component: int = 1000, -) -> FSSpecTarget: + refs_per_component: int = 10000, + mzz_kwargs: Optional[Dict] = None, +) -> zarr.storage.FSStore: """Write a kerchunk combined references object to file.""" - - import ujson # type: ignore - file_ext = os.path.splitext(output_file_name)[-1] - outpath = full_target._full_path(output_file_name) - if file_ext == ".json": - multi_kerchunk = reference.translate() - with full_target.fs.open(outpath, "wb") as f: - f.write(ujson.dumps(multi_kerchunk).encode()) + import ujson # type: ignore - elif file_ext == ".parquet": + # unpack fsspec options that will be used below for call sites without dep injection + storage_options = full_target.fsspec_kwargs # type: ignore[union-attr] + remote_protocol = full_target.get_fsspec_remote_protocol() # type: ignore[union-attr] + if file_ext == ".parquet": # Creates empty parquet store to be written to if full_target.exists(output_file_name): full_target.rm(output_file_name, recursive=True) full_target.makedir(output_file_name) - remote_protocol = _select_single_protocol(full_target) - - out = LazyReferenceMapper.create(refs_per_component, outpath, full_target.fs) + out = LazyReferenceMapper.create( + root=outpath, fs=full_target.fs, record_size=refs_per_component + ) # Calls MultiZarrToZarr on a MultiZarrToZarr object and adds kwargs to write to parquet. MultiZarrToZarr( - [reference.translate()], + [reference], concat_dims=concat_dims, + target_options=storage_options, + remote_options=storage_options, remote_protocol=remote_protocol, out=out, + **mzz_kwargs, ).translate() # call to write reference to empty parquet store out.flush() + # If reference is a ReferenceFileSystem, write to json + elif isinstance(reference, fsspec.FSMap) and isinstance(reference.fs, ReferenceFileSystem): + # context manager reuses dep injected auth credentials without passing storage options + with full_target.fs.open(outpath, "wb") as f: + f.write(ujson.dumps(reference.fs.references).encode()) + else: raise NotImplementedError(f"{file_ext = } not supported.") - - return full_target + return ReferenceFileSystem( + outpath, + target_options=storage_options, + # NOTE: `target_protocol` is required here b/c + # fsspec classes are inconsistent about deriving + # protocols if they are not passed. In this case ReferenceFileSystem + # decides how to read a reference based on `target_protocol` before + # it is automagically derived unfortunately + # https://github.com/fsspec/filesystem_spec/blob/master/fsspec/implementations/reference.py#L650-L663 + target_protocol=remote_protocol, + remote_options=storage_options, + remote_protocol=remote_protocol, + lazy=True, + ).get_mapper() class ZarrWriterProtocol(Protocol): diff --git a/pyproject.toml b/pyproject.toml index bd9895e4..f4108dff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,15 +42,17 @@ dependencies = [ [project.optional-dependencies] test = [ "click", - "pytest", + "pytest<8.0.0", "pytest-cov", "pytest-xdist", "pytest-lazy-fixture", "pytest-sugar", "pytest-timeout", "s3fs", + "gcsfs", "scipy", ] + minio = [ "docker", ] @@ -75,7 +77,7 @@ line-length = 100 [tool.isort] known_first_party = "pangeo_forge_recipes" -known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "kerchunk", "numpy", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "xarray", "zarr"] +known_third_party = ["aiohttp", "apache_beam", "cftime", "click", "dask", "fsspec", "gcsfs", "kerchunk", "numpy", "packaging", "pandas", "pytest", "pytest_lazyfixture", "s3fs", "xarray", "zarr"] multi_line_output = 3 include_trailing_comma = true force_grid_wrap = 0 diff --git a/tests/conftest.py b/tests/conftest.py index bf0238df..2d02739d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,7 @@ Note: Recipe fixtures are defined in their respective test modules, e.g. `test_recipes.py` """ + import os import socket import subprocess @@ -235,6 +236,18 @@ def pipeline(scope="session"): yield p +@pytest.fixture +def pipeline_parallel(scope="session"): + options = PipelineOptions( + runtime_type_check=False, + direct_num_workers=4, + direct_running_mode="multi_processing", + runner="DirectRunner", + ) + with TestPipeline(options=options) as p: + yield p + + @pytest.fixture( params=[True, False], ids=["concurrency_limit", "no_concurrency_limit"], @@ -488,12 +501,6 @@ def tmp_target(tmpdir_factory): return FSSpecTarget(fs, path) -@pytest.fixture() -def tmp_target_url(tmpdir_factory): - path = str(tmpdir_factory.mktemp("target.zarr")) - return path - - @pytest.fixture() def tmp_cache(tmpdir_factory): path = str(tmpdir_factory.mktemp("cache")) diff --git a/tests/test_combiners.py b/tests/test_combiners.py index 350e5c35..6586f858 100644 --- a/tests/test_combiners.py +++ b/tests/test_combiners.py @@ -1,19 +1,23 @@ +import logging + import apache_beam as beam -import fsspec import pytest import xarray as xr from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.testing.util import assert_that -from kerchunk.combine import MultiZarrToZarr -from kerchunk.hdf import SingleHdf5ToZarr from pytest_lazyfixture import lazy_fixture from pangeo_forge_recipes.aggregation import dataset_to_schema -from pangeo_forge_recipes.combiners import CombineMultiZarrToZarr, CombineXarraySchemas +from pangeo_forge_recipes.combiners import CombineXarraySchemas from pangeo_forge_recipes.patterns import FilePattern -from pangeo_forge_recipes.transforms import DatasetToSchema, DetermineSchema, _NestDim -from pangeo_forge_recipes.types import CombineOp, Dimension, Index +from pangeo_forge_recipes.transforms import ( + CombineReferences, + DatasetToSchema, + DetermineSchema, + _NestDim, +) +from pangeo_forge_recipes.types import CombineOp, Dimension, Index, Position @pytest.fixture @@ -124,12 +128,12 @@ def test_NestDim(schema_pcoll_concat_merge, pipeline): pattern, _, pcoll = schema_pcoll_concat_merge pattern_merge_only = FilePattern( pattern.format_function, - *[cdim for cdim in pattern.combine_dims if cdim.operation == CombineOp.MERGE] + *[cdim for cdim in pattern.combine_dims if cdim.operation == CombineOp.MERGE], ) merge_only_indexes = list(pattern_merge_only) pattern_concat_only = FilePattern( pattern.format_function, - *[cdim for cdim in pattern.combine_dims if cdim.operation == CombineOp.CONCAT] + *[cdim for cdim in pattern.combine_dims if cdim.operation == CombineOp.CONCAT], ) concat_only_indexes = list(pattern_concat_only) @@ -190,32 +194,85 @@ def test_DetermineSchema_concat_merge(dimensions, dsets_pcoll_concat_merge, pipe assert_that(output, has_correct_schema(expected_schema)) -def is_expected_mzz(expected_mzz): - def _is_expected_mzz(actual): - assert expected_mzz.translate() == actual[0].translate() - - return _is_expected_mzz +def _is_expected_dataset(expected_ds): + def _impl(actual): + actual_ds = xr.open_dataset(actual[0], engine="zarr") + assert expected_ds == actual_ds -def test_CombineReferences(netcdf_public_http_paths_sequential_1d, pipeline): - urls = netcdf_public_http_paths_sequential_1d[0] + return _impl - def generate_refs(urls): - for url in urls: - with fsspec.open(url) as inf: - h5chunks = SingleHdf5ToZarr(inf, url, inline_threshold=100) - yield [h5chunks.translate()] - refs = [ref[0] for ref in generate_refs(urls)] +@pytest.fixture +def combine_references_fixture(): + return CombineReferences( + concat_dims=["time"], + identical_dims=["x", "y"], + ) - concat_dims = ["time"] - identical_dims = ["lat", "lon"] - expected_mzz = MultiZarrToZarr(refs, concat_dims=concat_dims, identical_dims=identical_dims) - with pipeline as p: - input = p | beam.Create(generate_refs(urls)) - output = input | beam.CombineGlobally( - CombineMultiZarrToZarr(concat_dims=concat_dims, identical_dims=identical_dims) +@pytest.mark.parametrize( + "indexed_reference, global_position_min_max_count, expected", + [ + # assume contiguous data but show examples offsets + # across the array and assume default max_refs_per_merge==5 + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(0)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (0, {"url": "s3://blah.hdf5"}), + ), + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(4)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (0, {"url": "s3://blah.hdf5"}), + ), + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(5)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (1, {"url": "s3://blah.hdf5"}), + ), + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(10)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (2, {"url": "s3://blah.hdf5"}), + ), + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(25)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (5, {"url": "s3://blah.hdf5"}), + ), + ( + (Index({Dimension("time", CombineOp.CONCAT): Position(50)}), {"url": "s3://blah.hdf5"}), + (0, 100, 101), + (10, {"url": "s3://blah.hdf5"}), + ), + ( + ( + Index({Dimension("time", CombineOp.CONCAT): Position(100)}), + {"url": "s3://blah.hdf5"}, + ), + (0, 100, 101), + (21, {"url": "s3://blah.hdf5"}), + ), + ( + ( + Index({Dimension("time", CombineOp.CONCAT): Position(80)}), + {"url": "s3://blah.hdf5"}, + ), + (0, 80, 101), + False, + ), + ], +) +def test_bucket_by_position_contiguous_offsets( + combine_references_fixture, indexed_reference, global_position_min_max_count, expected, caplog +): + with caplog.at_level(logging.WARNING): + result = combine_references_fixture.bucket_by_position( + indexed_reference, global_position_min_max_count ) - assert_that(output, is_expected_mzz(expected_mzz)) + if not expected: + assert "The distribution of indexes is not contiguous/uniform" in caplog.text + else: + assert result == expected diff --git a/tests/test_end_to_end.py b/tests/test_end_to_end.py index bdcce00a..0503eb46 100644 --- a/tests/test_end_to_end.py +++ b/tests/test_end_to_end.py @@ -14,6 +14,8 @@ from pangeo_forge_recipes.patterns import FilePattern, pattern_from_file_sequence from pangeo_forge_recipes.transforms import ( + ConsolidateDimensionCoordinates, + ConsolidateMetadata, OpenWithKerchunk, OpenWithXarray, StoreToZarr, @@ -36,7 +38,7 @@ def test_xarray_zarr( daily_xarray_dataset, netcdf_local_file_pattern, pipeline, - tmp_target_url, + tmp_target, target_chunks, ): pattern = netcdf_local_file_pattern @@ -46,14 +48,14 @@ def test_xarray_zarr( | beam.Create(pattern.items()) | OpenWithXarray(file_type=pattern.file_type) | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="store", target_chunks=target_chunks, combine_dims=pattern.combine_dim_keys, ) ) - ds = xr.open_dataset(os.path.join(tmp_target_url, "store"), engine="zarr") + ds = xr.open_dataset(os.path.join(tmp_target.root_path, "store"), engine="zarr") assert ds.time.encoding["chunks"] == (target_chunks["time"],) xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @@ -62,7 +64,7 @@ def test_xarray_zarr_subpath( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, - tmp_target_url, + tmp_target, ): pattern = netcdf_local_file_pattern_sequential with pipeline as p: @@ -71,13 +73,13 @@ def test_xarray_zarr_subpath( | beam.Create(pattern.items()) | OpenWithXarray(file_type=pattern.file_type) | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="subpath", combine_dims=pattern.combine_dim_keys, ) ) - ds = xr.open_dataset(os.path.join(tmp_target_url, "subpath"), engine="zarr") + ds = xr.open_dataset(os.path.join(tmp_target.root_path, "subpath"), engine="zarr") xr.testing.assert_equal(ds.load(), daily_xarray_dataset) @@ -86,7 +88,7 @@ def test_reference_netcdf( daily_xarray_dataset, netcdf_local_file_pattern_sequential, pipeline, - tmp_target_url, + tmp_target, output_file_name, ): pattern = netcdf_local_file_pattern_sequential @@ -98,13 +100,13 @@ def test_reference_netcdf( | OpenWithKerchunk(file_type=pattern.file_type) | WriteCombinedReference( identical_dims=["lat", "lon"], - target_root=tmp_target_url, + target_root=tmp_target, store_name=store_name, concat_dims=["time"], output_file_name=output_file_name, ) ) - full_path = os.path.join(tmp_target_url, store_name, output_file_name) + full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) file_ext = os.path.splitext(output_file_name)[-1] if file_ext == ".json": @@ -120,6 +122,38 @@ def test_reference_netcdf( xr.testing.assert_equal(ds.load(), daily_xarray_dataset) +def test_reference_netcdf_parallel( + daily_xarray_dataset, + netcdf_local_file_pattern_sequential_multivariable, + pipeline_parallel, + tmp_target, + output_file_name="reference.json", +): + pattern = netcdf_local_file_pattern_sequential_multivariable + store_name = "daily-xarray-dataset" + with pipeline_parallel as p: + ( + p + | beam.Create(pattern.items()) + | OpenWithKerchunk(file_type=pattern.file_type) + | WriteCombinedReference( + identical_dims=["lat", "lon"], + target_root=tmp_target, + store_name=store_name, + concat_dims=["time"], + output_file_name=output_file_name, + ) + ) + full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) + + file_ext = os.path.splitext(output_file_name)[-1] + + if file_ext == ".json": + mapper = fsspec.get_mapper("reference://", fo=full_path) + ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) + xr.testing.assert_equal(ds.load(), daily_xarray_dataset) + + @pytest.mark.xfail( importlib.util.find_spec("cfgrib") is None, reason=( @@ -131,7 +165,7 @@ def test_reference_netcdf( ) def test_reference_grib( pipeline, - tmp_target_url, + tmp_target, ): # This test adapted from: # https://github.com/fsspec/kerchunk/blob/33b00d60d02b0da3f05ccee70d6ebc42d8e09932/kerchunk/tests/test_grib.py#L14-L31 @@ -149,14 +183,14 @@ def test_reference_grib( | WriteCombinedReference( concat_dims=[pattern.concat_dims[0]], identical_dims=["latitude", "longitude"], - target_root=tmp_target_url, + target_root=tmp_target, store_name=store_name, ) ) - full_path = os.path.join(tmp_target_url, store_name, "reference.json") + full_path = os.path.join(tmp_target.root_path, store_name, "reference.json") mapper = fsspec.get_mapper("reference://", fo=full_path) ds = xr.open_dataset(mapper, engine="zarr", backend_kwargs={"consolidated": False}) - assert ds.attrs["centre"] == "cwao" + assert ds.attrs["GRIB_centre"] == "cwao" # ds2 is the original dataset as stored on disk; # keeping `ds2` name for consistency with kerchunk test on which this is based @@ -173,3 +207,29 @@ def test_reference_grib( # various inconsistencies (of dtype casting int to float, etc.). With the right combination of # options passed to the pipeline, seems like these should pass? # xr.testing.assert_equal(ds.load(), ds2) + + +def test_xarray_zarr_consolidate_dimension_coordinates( + netcdf_local_file_pattern_sequential, + pipeline, + tmp_target, +): + pattern = netcdf_local_file_pattern_sequential + with pipeline as p: + ( + p + | beam.Create(pattern.items()) + | OpenWithXarray(file_type=pattern.file_type) + | StoreToZarr( + target_root=tmp_target, + store_name="subpath", + combine_dims=pattern.combine_dim_keys, + ) + | ConsolidateDimensionCoordinates() + | ConsolidateMetadata() + ) + + path = os.path.join(tmp_target.root_path, "subpath") + ds = xr.open_dataset(path, engine="zarr", consolidated=True, chunks={}) + + assert ds.time.encoding["chunks"][0] == ds.time.shape[0] diff --git a/tests/test_integration.py b/tests/test_integration.py index 9206ecba..0c00afa8 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -3,9 +3,11 @@ import secrets import subprocess import time +from importlib.metadata import version from pathlib import Path import pytest +from packaging.version import parse as parse_version # Run only when the `--run-integration` option is passed. # See also `pytest_addoption` in conftest. Reference: @@ -111,9 +113,7 @@ def minio_confpath(minio, tmp_path_factory: pytest.TempPathFactory): @pytest.mark.parametrize("confpath_option", ["local_confpath", "minio_confpath"]) def test_integration(confpath_option: str, recipe_id: str, request): """Run the example recipes in the ``examples/feedstock`` directory.""" - # pytest tests/test_integration.py -k 'test_integration' --run-integration xfails = { - "hrrr-kerchunk-concat-step": "WriteCombineReference doesn't return zarr.storage.FSStore", "hrrr-kerchunk-concat-valid-time": "Can't serialize drop_unknown callback function.", "narr-opendap": "Hangs for unkown reason. Requires further debugging.", "terraclimate": "Hangs for unkown reason. Requires further debugging.", @@ -121,6 +121,10 @@ def test_integration(confpath_option: str, recipe_id: str, request): if recipe_id in xfails: pytest.xfail(xfails[recipe_id]) + runner_version = parse_version(version("pangeo-forge-runner")) + if recipe_id == "hrrr-kerchunk-concat-step" and runner_version <= parse_version("0.9.2"): + pytest.xfail("pg-runner version <= 0.9.2 didn't pass storage options") + confpath = request.getfixturevalue(confpath_option) bake_script = (EXAMPLES / "runner-commands" / "bake.sh").absolute().as_posix() diff --git a/tests/test_openers.py b/tests/test_openers.py index a4f0435f..cb6aebaa 100644 --- a/tests/test_openers.py +++ b/tests/test_openers.py @@ -9,6 +9,7 @@ from pangeo_forge_recipes.openers import open_url, open_with_xarray from pangeo_forge_recipes.patterns import FileType from pangeo_forge_recipes.transforms import OpenWithKerchunk +from pangeo_forge_recipes.types import Index @pytest.fixture( @@ -160,9 +161,9 @@ def test_direct_open_with_xarray(public_url_and_type, load, xarray_open_kwargs): def is_valid_inline_threshold(): - def _is_valid_inline_threshold(references): - - assert isinstance(references[0][0]["refs"]["lat/0"], list) + def _is_valid_inline_threshold(indexed_references): + assert isinstance(indexed_references[0][0], Index) + assert isinstance(indexed_references[0][1][0]["refs"]["lat/0"], list) return _is_valid_inline_threshold diff --git a/tests/test_rechunking.py b/tests/test_rechunking.py index 06401c12..d4f644b6 100644 --- a/tests/test_rechunking.py +++ b/tests/test_rechunking.py @@ -1,11 +1,20 @@ import itertools +import os import random from collections import namedtuple +from tempfile import TemporaryDirectory +import numpy as np import pytest import xarray as xr +import zarr -from pangeo_forge_recipes.rechunking import GroupKey, combine_fragments, split_fragment +from pangeo_forge_recipes.rechunking import ( + GroupKey, + combine_fragments, + consolidate_dimension_coordinates, + split_fragment, +) from pangeo_forge_recipes.types import CombineOp, Dimension, Index, IndexedPosition, Position from .conftest import split_up_files_by_variable_and_day @@ -259,3 +268,24 @@ def test_combine_fragments_errors(): index1 = Index({Dimension("time", CombineOp.CONCAT): IndexedPosition(2)}) with pytest.raises(ValueError, match="are not consistent"): _ = combine_fragments(group, [(index0, ds), (index1, ds)]) + + +def test_consolidate_dimension_coordinates(): + td = TemporaryDirectory() + store_path = os.path.join(td.name + "tmp.zarr") + group = zarr.group(store=store_path, overwrite=True) + group.create(name="data", shape=100, chunks=10, dtype="i4") + group.create(name="time", shape=100, chunks=10, dtype="i4") + group.data[:] = np.random.randn(*group.data.shape) + group.time[:] = np.arange(100) + + # If you don't provide these attrs, + # consolidate_dimension_coordinates does not + # raise an error, while Xarray does + group.data.attrs["_ARRAY_DIMENSIONS"] = ["time"] + group.time.attrs["_ARRAY_DIMENSIONS"] = ["time"] + + consolidated_zarr = consolidate_dimension_coordinates(zarr.storage.FSStore(store_path)) + store = zarr.open(consolidated_zarr) + assert store.time.chunks[0] == 100 + assert store.data.chunks[0] == 10 diff --git a/tests/test_storage.py b/tests/test_storage.py index 1a1e2da7..b621d8f8 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -4,6 +4,8 @@ import pytest from fsspec.implementations.http import HTTPFileSystem from fsspec.implementations.local import LocalFileSystem +from gcsfs import GCSFileSystem +from s3fs import S3FileSystem from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget @@ -95,3 +97,20 @@ def test_suffix(tmp_path): assert str((FSSpecTarget(LocalFileSystem(), tmp_path) / "test").root_path) == str( tmp_path / "test" ) + + +@pytest.mark.parametrize("fs_cls", [LocalFileSystem, HTTPFileSystem, S3FileSystem, GCSFileSystem]) +def test_target_storage_get_remote_protocol(fs_cls, monkeypatch): + # we need to use patch here for s3fs and gcsfs b/c they try to do so much on __init__ + monkeypatch.setattr("s3fs.S3FileSystem.__init__", lambda x: None) + monkeypatch.setattr("gcsfs.GCSFileSystem.__init__", lambda x: None) + monkeypatch.setattr("pangeo_forge_recipes.storage.FSSpecTarget.__post_init__", lambda x: None) + target_root = FSSpecTarget(fs_cls()) + if isinstance(target_root, LocalFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "local" + elif isinstance(target_root, HTTPFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "http" + elif isinstance(target_root, S3FileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "s3" + elif isinstance(target_root, GCSFileSystem): + assert target_root.fs.get_fsspec_remote_protocol() == "gcs" diff --git a/tests/test_transforms.py b/tests/test_transforms.py index fb698e3c..8fdaf2e1 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -8,7 +8,7 @@ from pangeo_forge_recipes.aggregation import dataset_to_schema from pangeo_forge_recipes.patterns import FilePattern, FileType -from pangeo_forge_recipes.storage import CacheFSSpecTarget +from pangeo_forge_recipes.storage import CacheFSSpecTarget, FSSpecTarget from pangeo_forge_recipes.transforms import ( DetermineSchema, IndexItems, @@ -18,7 +18,7 @@ Rechunk, StoreToZarr, ) -from pangeo_forge_recipes.types import CombineOp +from pangeo_forge_recipes.types import CombineOp, Index from .data_generation import make_ds @@ -121,20 +121,24 @@ def manually_load(item): assert_that(loaded_dsets, is_xr_dataset(in_memory=True)) -def is_list_of_refs_dicts(): - def _is_list_of_refs_dicts(refs): - for r in refs[0]: - assert isinstance(r, dict) - assert "refs" in r +def is_list_of_idx_refs_dicts(): + def _is_list_of_idx_refs_dicts(results): + for result in results: + idx = result[0] + references = result[1] + test_ref = references[0] + assert isinstance(idx, Index) + assert isinstance(test_ref, dict) + assert "refs" in test_ref - return _is_list_of_refs_dicts + return _is_list_of_idx_refs_dicts def test_OpenWithKerchunk_via_fsspec(pcoll_opened_files, pipeline): input, pattern, cache_url = pcoll_opened_files with pipeline as p: output = p | input | OpenWithKerchunk(pattern.file_type) - assert_that(output, is_list_of_refs_dicts()) + assert_that(output, is_list_of_idx_refs_dicts()) def test_OpenWithKerchunk_direct(pattern_direct, pipeline): @@ -147,11 +151,11 @@ def test_OpenWithKerchunk_direct(pattern_direct, pipeline): | beam.Create(pattern_direct.items()) | OpenWithKerchunk(file_type=pattern_direct.file_type) ) - assert_that(output, is_list_of_refs_dicts()) + assert_that(output, is_list_of_idx_refs_dicts()) @pytest.mark.parametrize("target_chunks", [{}, {"time": 1}, {"time": 2}, {"time": 2, "lon": 9}]) -def test_PrepareZarrTarget(pipeline, tmp_target_url, target_chunks): +def test_PrepareZarrTarget(pipeline, tmp_target, target_chunks): ds = make_ds() schema = dataset_to_schema(ds) @@ -181,7 +185,7 @@ def _check_target(actual): with pipeline as p: input = p | beam.Create([schema]) - target = input | PrepareZarrTarget(target=tmp_target_url, target_chunks=target_chunks) + target = input | PrepareZarrTarget(target=tmp_target, target_chunks=target_chunks) assert_that(target, correct_target()) @@ -246,7 +250,7 @@ def expand(self, pcoll): def test_StoreToZarr_emits_openable_fsstore( pipeline, netcdf_local_file_pattern_sequential, - tmp_target_url, + tmp_target, ): def is_xrdataset(): def _is_xr_dataset(actual): @@ -260,7 +264,7 @@ def _is_xr_dataset(actual): with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() target_store = datasets | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, ) @@ -272,7 +276,7 @@ def _is_xr_dataset(actual): def test_StoreToZarr_dynamic_chunking_interface( pipeline: beam.Pipeline, netcdf_local_file_pattern_sequential: FilePattern, - tmp_target_url: str, + tmp_target: FSSpecTarget, daily_xarray_dataset: xr.Dataset, with_kws: bool, ): @@ -305,7 +309,7 @@ def dynamic_chunking_fn(template_ds: xr.Dataset, divisor: int = 1): with pipeline as p: datasets = p | beam.Create(pattern.items()) | OpenWithXarray() target_store = datasets | StoreToZarr( - target_root=tmp_target_url, + target_root=tmp_target, store_name="test.zarr", combine_dims=pattern.combine_dim_keys, attrs={}, diff --git a/tests/test_writers.py b/tests/test_writers.py index f5c68649..20dd1047 100644 --- a/tests/test_writers.py +++ b/tests/test_writers.py @@ -1,11 +1,21 @@ +import os + +import apache_beam as beam import fsspec import pytest import xarray as xr import zarr from pangeo_forge_recipes.aggregation import schema_to_zarr +from pangeo_forge_recipes.transforms import ( + ConsolidateMetadata, + OpenWithKerchunk, + OpenWithXarray, + StoreToZarr, + WriteCombinedReference, +) from pangeo_forge_recipes.types import CombineOp, Dimension, Index, IndexedPosition, Position -from pangeo_forge_recipes.writers import _select_single_protocol, store_dataset_fragment +from pangeo_forge_recipes.writers import store_dataset_fragment from .data_generation import make_ds @@ -144,8 +154,86 @@ def test_store_dataset_fragment(temp_store): assert ds.time.encoding.get("units") == ds_target.time.encoding.get("units") -@pytest.mark.parametrize("protocol", ["s3", "https"]) -def test_select_single_protocol(protocol): - assert isinstance( - _select_single_protocol(fsspec.filesystem(protocol, anon=True).get_mapper()), str +def test_zarr_consolidate_metadata( + netcdf_local_file_pattern, + pipeline, + tmp_target, +): + pattern = netcdf_local_file_pattern + with pipeline as p: + ( + p + | beam.Create(pattern.items()) + | OpenWithXarray(file_type=pattern.file_type) + | StoreToZarr( + target_root=tmp_target, + store_name="store", + combine_dims=pattern.combine_dim_keys, + ) + | ConsolidateMetadata() + ) + + path = os.path.join(tmp_target.root_path, "store") + zc = zarr.storage.FSStore(path) + assert zc[".zmetadata"] is not None + + assert xr.open_zarr(path, consolidated=True) + + +def test_zarr_encoding( + netcdf_local_file_pattern, + pipeline, + tmp_target, +): + pattern = netcdf_local_file_pattern + compressor = zarr.Blosc("zstd", clevel=3) + with pipeline as p: + ( + p + | beam.Create(pattern.items()) + | OpenWithXarray(file_type=pattern.file_type) + | StoreToZarr( + target_root=tmp_target, + store_name="store", + combine_dims=pattern.combine_dim_keys, + encoding={"foo": {"compressor": compressor}}, + ) + | ConsolidateMetadata() + ) + zc = zarr.storage.FSStore(os.path.join(tmp_target.root_path, "store")) + z = zarr.open(zc) + assert z.foo.compressor == compressor + + +@pytest.mark.parametrize("output_file_name", ["reference.json", "reference.parquet"]) +def test_reference_netcdf( + netcdf_local_file_pattern_sequential, + pipeline, + tmp_target, + output_file_name, +): + pattern = netcdf_local_file_pattern_sequential + store_name = "daily-xarray-dataset" + with pipeline as p: + ( + p + | beam.Create(pattern.items()) + | OpenWithKerchunk(file_type=pattern.file_type) + | WriteCombinedReference( + identical_dims=["lat", "lon"], + target_root=tmp_target, + store_name=store_name, + concat_dims=["time"], + output_file_name=output_file_name, + ) + ) + + full_path = os.path.join(tmp_target.root_path, store_name, output_file_name) + mapper = fsspec.get_mapper( + "reference://", + target_protocol=tmp_target.get_fsspec_remote_protocol(), + remote_protocol=tmp_target.get_fsspec_remote_protocol(), + fo=full_path, ) + + assert xr.open_zarr(mapper, consolidated=False)