diff --git a/pangeo_forge_recipes/dynamic_target_chunks.py b/pangeo_forge_recipes/dynamic_target_chunks.py index 7d6f3b97..bd11615d 100644 --- a/pangeo_forge_recipes/dynamic_target_chunks.py +++ b/pangeo_forge_recipes/dynamic_target_chunks.py @@ -54,8 +54,8 @@ def dynamic_target_chunks_from_schema( ratio = [target_chunk_ratio[dim] for dim in dims] # The target ratio is defined for total chunks along a certain axis # This means we need to scale the ratio by the shape - ratio_scaled = np.array(ratio)/np.array(shape) - # the input ratio targets the total number of + ratio_scaled = np.array(ratio) / np.array(shape) + # the input ratio targets the total number of ratio_normalized = normalize(ratio_scaled) possible_chunks = [] diff --git a/tests/test_dynamic_target_chunks.py b/tests/test_dynamic_target_chunks.py index 45733229..fcf4a63f 100644 --- a/tests/test_dynamic_target_chunks.py +++ b/tests/test_dynamic_target_chunks.py @@ -1,26 +1,41 @@ +from typing import Dict + import dask.array as dsa import pytest import xarray as xr -from typing import Dict from pangeo_forge_recipes.aggregation import dataset_to_schema from pangeo_forge_recipes.dynamic_target_chunks import dynamic_target_chunks_from_schema -def _create_ds(dims_shape:Dict[str, int]) -> xr.Dataset: + +def _create_ds(dims_shape: Dict[str, int]) -> xr.Dataset: return xr.DataArray( dsa.random.random(list(dims_shape.values())), dims=list(dims_shape.keys()), - ).to_dataset(name="data") + ).to_dataset(name="data") + class TestDynamicTargetChunks: @pytest.mark.parametrize( ("dims_shape", "target_chunk_ratio", "expected_target_chunks"), [ # make sure that for the same dataset we get smaller chunksize along a dimension if the ratio is larger - ({'x': 200, 'y': 200, 'z': 200}, {"x": 1, "y": 1, "z": 10}, {"x": 50, "y": 100, "z": 25}), - ({'x': 200, 'y': 200, 'z': 200}, {"x": 10, "y": 1, "z": 1}, {"x": 25, "y": 100, "z": 50}), + ( + {"x": 200, "y": 200, "z": 200}, + {"x": 1, "y": 1, "z": 10}, + {"x": 50, "y": 100, "z": 25}, + ), + ( + {"x": 200, "y": 200, "z": 200}, + {"x": 10, "y": 1, "z": 1}, + {"x": 25, "y": 100, "z": 50}, + ), # test the special case where we want to just chunk along a single dimension - ({'x': 100, 'y': 300, 'z': 400}, {"x": -1, "y": -1, "z": 1}, {"x": 100, "y": 300, "z": 4}), + ( + {"x": 100, "y": 300, "z": 400}, + {"x": -1, "y": -1, "z": 1}, + {"x": 100, "y": 300, "z": 4}, + ), ], ) def test_dynamic_rechunking(self, dims_shape, target_chunk_ratio, expected_target_chunks): @@ -34,27 +49,26 @@ def test_dynamic_rechunking(self, dims_shape, target_chunk_ratio, expected_targe assert target_chunks[dim] == chunks def test_dynamic_rechunking_maintain_ratio(self): - """Confirm that for a given ratio with two differently sized datasets we maintain a constant ratio + """Confirm that for a given ratio with two differently sized datasets we maintain a constant ratio between total number of chunks""" - ds_equal = _create_ds({'x': 64, 'y': 64}) - ds_long = _create_ds({'x': 64, 'y': 256}) + ds_equal = _create_ds({"x": 64, "y": 64}) + ds_long = _create_ds({"x": 64, "y": 256}) for ds in [ds_equal, ds_long]: print(ds) schema = dataset_to_schema(ds) target_chunks = dynamic_target_chunks_from_schema( - schema, 1e4, target_chunk_ratio={'x':1, 'y':4} - ) + schema, 1e4, target_chunk_ratio={"x": 1, "y": 4} + ) ds_rechunked = ds.chunk(target_chunks) - assert len(ds_rechunked.chunks['y'])/len(ds_rechunked.chunks['x']) == 4 - + assert len(ds_rechunked.chunks["y"]) / len(ds_rechunked.chunks["x"]) == 4 @pytest.mark.parametrize( "target_chunk_ratio", [{"x": 1, "y": -1, "z": 10}, {"x": 6, "y": -1, "z": 2}] ) # always keep y unchunked, and vary the others @pytest.mark.parametrize("target_chunk_nbytes", [1e6, 1e7]) def test_dynamic_skip_dimension(self, target_chunk_ratio, target_chunk_nbytes): - ds = _create_ds({'x':100, 'y': 200, 'z': 300}) + ds = _create_ds({"x": 100, "y": 200, "z": 300}) # Mark dimension as 'not-to-chunk' with -1 schema = dataset_to_schema(ds) target_chunks = dynamic_target_chunks_from_schema( @@ -64,7 +78,7 @@ def test_dynamic_skip_dimension(self, target_chunk_ratio, target_chunk_nbytes): def test_dynamic_rechunking_error_dimension_missing(self): # make sure that an error is raised if some dimension is not specified - ds = _create_ds({'x': 100, 'y': 200, 'z': 300}) + ds = _create_ds({"x": 100, "y": 200, "z": 300}) schema = dataset_to_schema(ds) with pytest.raises( @@ -73,7 +87,7 @@ def test_dynamic_rechunking_error_dimension_missing(self): dynamic_target_chunks_from_schema(schema, 1e6, target_chunk_ratio={"x": 1, "z": 10}) def test_dynamic_rechunking_error_dimension_wrong(self): - ds = _create_ds({'x': 100, 'y': 200, 'z': 300}) + ds = _create_ds({"x": 100, "y": 200, "z": 300}) schema = dataset_to_schema(ds) with pytest.raises( ValueError, match="target_chunk_ratio must contain all dimensions in dataset."